Skip to content

Commit

Permalink
feat: dedicated io scheduler and macOS kqueue support
Browse files Browse the repository at this point in the history
fix: clear poll before waiting
feat: move io to separate thread
feat: broaden io ops types to fd
fix: include poll as a dep and fmt
try: poll with short timeout
feat: reuse buffer on the receiver end
chore: remove poll_idx
fix: if no open fds to wait for, don't wait
  • Loading branch information
leostera committed Dec 22, 2023
1 parent eb02f01 commit 473e688
Show file tree
Hide file tree
Showing 15 changed files with 234 additions and 120 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

## Unreleased

<<<<<<< HEAD
* Redo packaging to expose a single public library: `riot`
=======
* Fix issue with schedulers busy-waiting
* Switch to `poll` to support kqueue on macOS
>>>>>>> 64e925f (feat: io scheduler and kqueue support)
## 0.0.5

Expand Down
28 changes: 19 additions & 9 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@
(package
(name riot)
(synopsis "An actor-model multi-core scheduler for OCaml 5")
(description "Riot is an actor-model multi-core scheduler for OCaml 5. It brings Erlang-style concurrency to the language, where lighweight process communicate via message passing")
(description
"Riot is an actor-model multi-core scheduler for OCaml 5. It brings Erlang-style concurrency to the language, where lighweight process communicate via message passing")
(depends
(ocaml (>= "5.1"))
(ptime (>= "1.1.0"))
(iomux (>= "0.3"))
(bigstringaf (>= "0.9.1"))
(uri (>= "4.4.0"))
(telemetry (>= "0.0.1"))
(ocaml
(>= "5.1"))
dune
(ptime
(>= "1.1.0"))
(poll
(>= "0.3.1"))
(bigstringaf
(>= "0.9.1"))
(uri
(>= "4.4.0"))
(telemetry
(>= "0.0.1"))
(mdx (and :with-test (>= "2.3.1")))
(odoc (and :with-doc (>= "2.2.2")))
)
(odoc
(and
:with-doc
(>= "2.2.2"))))
(tags
(multicore erlang actor "message-passing" processes)))
4 changes: 2 additions & 2 deletions riot.opam
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ tags: ["multicore" "erlang" "actor" "message-passing" "processes"]
homepage: "https://github.com/leostera/riot"
bug-reports: "https://github.com/leostera/riot/issues"
depends: [
"dune" {>= "3.11"}
"ocaml" {>= "5.1"}
"dune" {>= "3.11"}
"ptime" {>= "1.1.0"}
"iomux" {>= "0.3"}
"poll" {>= "0.3.1"}
"bigstringaf" {>= "0.9.1"}
"uri" {>= "4.4.0"}
"telemetry" {>= "0.0.1"}
Expand Down
31 changes: 15 additions & 16 deletions riot/lib/net/socket.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,54 +17,53 @@ let default_listen_opts =
{ reuse_addr = true; reuse_port = true; backlog = 128; addr = Addr.loopback }

let close socket =
let sch = Scheduler.get_current_scheduler () in
let pool = Scheduler.Pool.get_pool () in
let this = self () in
Logger.trace (fun f ->
f "Process %a: Closing socket fd=%a" Pid.pp this Fd.pp socket);
Io.close sch.io_tbl socket
Io.close pool.io_scheduler.io_tbl socket

let listen ?(opts = default_listen_opts) ~port () =
let sch = Scheduler.get_current_scheduler () in
let pool = Scheduler.Pool.get_pool () in
let { reuse_addr; reuse_port; backlog; addr } = opts in
let addr = Addr.tcp addr port in
Logger.trace (fun f -> f "Listening on 0.0.0.0:%d" port);
Io.listen sch.io_tbl ~reuse_port ~reuse_addr ~backlog addr
Io.listen pool.io_scheduler.io_tbl ~reuse_port ~reuse_addr ~backlog addr

let rec connect addr =
let sch = Scheduler.get_current_scheduler () in
let pool = Scheduler.Pool.get_pool () in
Logger.error (fun f -> f "Connecting to %a" Addr.pp addr);
match Io.connect sch.io_tbl addr with
match Io.connect pool.io_scheduler.io_tbl addr with
| `Connected fd -> Ok fd
| `In_progress fd ->
let this = _get_proc (self ()) in
Io.register sch.io_tbl this `w fd;
Io.register pool.io_scheduler.io_tbl this `w fd;
syscall "connect" `w fd @@ fun socket -> Ok socket
| `Abort reason -> Error (`Unix_error reason)
| `Retry ->
yield ();
connect addr

let rec accept ?(timeout = Infinity) (socket : Socket.listen_socket) =
let sch = Scheduler.get_current_scheduler () in
match Io.accept sch.io_tbl socket with
let pool = Scheduler.Pool.get_pool () in
match Io.accept pool.io_scheduler.io_tbl socket with
| exception Fd.(Already_closed _) -> Error `Closed
| `Abort reason -> Error (`Unix_error reason)
| `Retry -> syscall "accept" `r socket @@ accept ~timeout
| `Connected (socket, addr) -> Ok (socket, addr)

let controlling_process _socket ~new_owner:_ = Ok ()

let rec receive ?(timeout = Infinity) ~len socket =
let bytes = Bytes.create len in
match Io.read socket bytes 0 len with
let rec receive ?(timeout = Infinity) ~buf socket =
let bytes = Bytes.create (Bigstringaf.length buf) in
match Io.read socket bytes 0 (Bytes.length bytes) with
| exception Fd.(Already_closed _) -> Error `Closed
| `Abort reason -> Error (`Unix_error reason)
| `Retry -> syscall "read" `r socket @@ receive ~timeout ~len
| `Retry -> syscall "read" `r socket @@ receive ~timeout ~buf
| `Read 0 -> Error `Closed
| `Read len ->
let data = Bigstringaf.create len in
Bigstringaf.blit_from_bytes bytes ~src_off:0 data ~dst_off:0 ~len;
Ok data
Bigstringaf.blit_from_bytes bytes ~src_off:0 buf ~dst_off:0 ~len;
Ok buf

let rec send data socket =
Logger.debug (fun f -> f "sending: %S" (Bigstringaf.to_string data));
Expand Down
2 changes: 1 addition & 1 deletion riot/lib/net/socket.mli
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ val controlling_process : 'a -> new_owner:'b -> (unit, 'c) result

val receive :
?timeout:timeout ->
len:int ->
buf:Bigstringaf.t ->
Fd.t ->
(Bigstringaf.t, [> `Unix_error of Unix.error | `Closed ]) result

Expand Down
2 changes: 1 addition & 1 deletion riot/riot.mli
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ module Net : sig

val receive :
?timeout:timeout ->
len:int ->
buf:Bigstringaf.t ->
stream_socket ->
(Bigstringaf.t, [> `Closed | `Timeout ]) result

Expand Down
2 changes: 1 addition & 1 deletion riot/runtime/net/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(package riot)
(name net)
(libraries core util log iomux uri unix telemetry bigstringaf))
(libraries core util log poll uri unix telemetry bigstringaf))
111 changes: 52 additions & 59 deletions riot/runtime/net/io.ml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
open Core
open Util
module Poll = Iomux.Poll
module Poll = Poll

type t = {
poll : Poll.t;
poll_timeout : Poll.ppoll_timeout;
poll_timeout : Poll.Timeout.t;
mutable poll_idx : int;
fds : (Fd.t, int) Dashmap.t;
procs : (Fd.t, Process.t * [ `r | `w | `rw ]) Dashmap.t;
Expand All @@ -21,8 +21,8 @@ type write = [ `Wrote of int | op ]

let create () =
{
poll = Poll.create ~maxfds:1024 ();
poll_timeout = Poll.Nanoseconds 10L;
poll = Poll.create ();
poll_timeout = Poll.Timeout.After 500L;
poll_idx = 0;
fds = Dashmap.create 1024;
procs = Dashmap.create 1024;
Expand All @@ -49,18 +49,18 @@ and pp_proc_table ppf tbl =
entries;
Format.fprintf ppf "]"

let mode_of_flags flags =
match (Poll.Flags.(mem flags pollin), Poll.Flags.(mem flags pollout)) with
| true, false -> Some `r
| false, true -> Some `w
| true, true -> Some `rw
| _, _ -> None

let flags_of_mode mode =
let event_of_mode mode =
match mode with
| `rw -> Poll.Flags.(pollin + pollout)
| `r -> Poll.Flags.(pollin)
| `w -> Poll.Flags.(pollout)
| `r -> Poll.Event.read
| `rw -> Poll.Event.read_write
| `w -> Poll.Event.write

let mode_of_event event =
match event with
| Poll.Event.{ writable = false; readable = true } -> Some `r
| Poll.Event.{ writable = true; readable = true } -> Some `rw
| Poll.Event.{ writable = true; readable = false } -> Some `w
| Poll.Event.{ writable = false; readable = false } -> None

(* NOTE(leostera): when we add a new Fd.t to our collection here, we need
to update the current poller so that it knows of it.
Expand All @@ -73,8 +73,8 @@ let add_fd t fd mode =
Log.trace (fun f ->
f "adding fd %d to poll slot %d" (Fd.to_int fd) t.poll_idx);
let unix_fd = Fd.get fd |> Option.get in
let flags = flags_of_mode mode in
Poll.set_index t.poll t.poll_idx unix_fd flags;
let flags = event_of_mode mode in
Poll.set t.poll unix_fd flags;
Dashmap.replace t.fds fd t.poll_idx;
t.poll_idx <- t.poll_idx + 1)

Expand All @@ -89,51 +89,44 @@ let unregister_process t proc =
Dashmap.remove_by t.procs this_proc

let gc t =
Dashmap.remove_by t.fds (fun (fd, idx) ->
let is_open = Fd.is_open fd in
if not is_open then Poll.invalidate_index t.poll idx;
is_open);
Dashmap.remove_by t.fds (fun (fd, _idx) -> Fd.is_open fd);
Dashmap.remove_by t.procs (fun (_fd, (proc, _)) -> Process.is_waiting_io proc)

let can_poll t = not (Dashmap.is_empty t.procs)

let poll t fn =
Poll.clear t.poll;
gc t;
let ready_count = Poll.ppoll_or_poll t.poll t.poll_idx t.poll_timeout in
Poll.iter_ready t.poll ready_count @@ fun _idx raw_fd fd_flags ->
match mode_of_flags fd_flags with
| None ->
Log.trace (fun f ->
let buf = Buffer.create 128 in
let fmt = Format.formatter_of_buffer buf in
if Poll.Flags.(mem fd_flags pollin) then Format.fprintf fmt "pollin,";
if Poll.Flags.(mem fd_flags pollout) then
Format.fprintf fmt "pollout,";
if Poll.Flags.(mem fd_flags pollerr) then
Format.fprintf fmt "pollerr,";
if Poll.Flags.(mem fd_flags pollhup) then
Format.fprintf fmt "pollhup,";
if Poll.Flags.(mem fd_flags pollnval) then
Format.fprintf fmt "pollnval,";
if Poll.Flags.(mem fd_flags pollpri) then Format.fprintf fmt "pollpri";
Format.fprintf fmt "%!";
f "io_poll(%d): unexpected flags: %s" (Obj.magic raw_fd)
(Buffer.contents buf))
| Some mode -> (
match
Dashmap.find_by t.fds (fun (fd, _idx) ->
match Fd.get fd with Some fd' -> fd' = raw_fd | _ -> false)
with
let should_wait = ref false in
Dashmap.iter t.procs (fun (fd, (_proc, mode)) ->
match Fd.get fd with
| None -> ()
| Some (fd, _idx) ->
let mode_and_flag (fd', (proc, mode')) =
Log.trace (fun f ->
f "io_poll(%a=%a,%a=%a): %a" Fd.pp fd' Fd.pp fd Fd.Mode.pp mode'
Fd.Mode.pp mode Process.pp proc);
Fd.equal fd fd' && Fd.Mode.equal mode' mode
in
Dashmap.find_all_by t.procs mode_and_flag
|> List.iter (fun (_fd, proc) -> fn proc))
| Some unix_fd ->
should_wait := true;
Poll.set t.poll unix_fd (event_of_mode mode));
if not !should_wait then ()
else
match Poll.wait t.poll t.poll_timeout with
| `Timeout -> ()
| `Ok ->
Poll.iter_ready t.poll ~f:(fun raw_fd event ->
match mode_of_event event with
| None -> ()
| Some mode -> (
match
Dashmap.find_by t.fds (fun (fd, _idx) ->
match Fd.get fd with Some fd' -> fd' = raw_fd | _ -> false)
with
| None -> ()
| Some (fd, _idx) ->
let mode_and_flag (fd', (proc, mode')) =
Log.trace (fun f ->
f "io_poll(%a=%a,%a=%a): %a" Fd.pp fd' Fd.pp fd
Fd.Mode.pp mode' Fd.Mode.pp mode Process.pp proc);
Fd.equal fd fd' && Fd.Mode.equal mode' mode
in
Dashmap.find_all_by t.procs mode_and_flag
|> List.iter (fun (_fd, proc) -> fn proc)))

(* sockets api *)
let socket sock_domain sock_type =
Expand Down Expand Up @@ -191,15 +184,15 @@ let accept (_t : t) (socket : Fd.t) : accept =
| exception Unix.(Unix_error ((EINTR | EAGAIN | EWOULDBLOCK), _, _)) -> `Retry
| exception Unix.(Unix_error (reason, _, _)) -> `Abort reason

let read (conn : Socket.stream_socket) buf off len : read =
Fd.use ~op_name:"read" conn @@ fun fd ->
let read (fd : Fd.t) buf off len : read =
Fd.use ~op_name:"read" fd @@ fun fd ->
match Unix.read fd buf off len with
| len -> `Read len
| exception Unix.(Unix_error ((EINTR | EAGAIN | EWOULDBLOCK), _, _)) -> `Retry
| exception Unix.(Unix_error (reason, _, _)) -> `Abort reason

let write (conn : Socket.stream_socket) buf off len : write =
Fd.use ~op_name:"write" conn @@ fun fd ->
let write (fd : Fd.t) buf off len : write =
Fd.use ~op_name:"write" fd @@ fun fd ->
match Unix.write fd buf off len with
| len -> `Wrote len
| exception Unix.(Unix_error ((EINTR | EAGAIN | EWOULDBLOCK), _, _)) -> `Retry
Expand Down
4 changes: 2 additions & 2 deletions riot/runtime/net/io.mli
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ val connect :
[> `Abort of Unix.error | `Connected of Fd.t | `In_progress of Fd.t | `Retry ]

val accept : t -> Fd.t -> accept
val read : Socket.stream_socket -> bytes -> int -> int -> read
val write : Socket.stream_socket -> bytes -> int -> int -> write
val read : Fd.t -> bytes -> int -> int -> read
val write : Fd.t -> bytes -> int -> int -> write
25 changes: 25 additions & 0 deletions riot/runtime/net/unix_readv.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include <caml/mlvalues.h>
#include <caml/memory.h>
#include <caml/alloc.h>
#include <caml/fail.h>
#include <caml/bigarray.h>
#include <sys/uio.h>
#include <unistd.h>

CAMLprim value riot_unix_readv(value fd, value iovs, value count) {
CAMLparam3(fd, iovs, count);

size_t len = Int_val(count);
struct iovec c_iovs[len];

for (int i = 0; i < len; i++) {
value iov = Field(iovs, i);
c_iovs[i].iov_base = Caml_ba_data_val(Field(iov, 0));
c_iovs[i].iov_len = Int_val(Field(iov, 1));
}

size_t ret = readv(Int_val(fd), c_iovs, len);
if (ret == -1) caml_failwith("readv failed");

CAMLreturn(Val_long(ret));
}
25 changes: 25 additions & 0 deletions riot/runtime/net/unix_writev.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include <caml/mlvalues.h>
#include <caml/memory.h>
#include <caml/alloc.h>
#include <caml/fail.h>
#include <caml/bigarray.h>
#include <sys/uio.h>
#include <unistd.h>

CAMLprim value riot_unix_writev(value fd, value iovs, value count) {
CAMLparam3(fd, iovs, count);

size_t len = Int_val(count);
struct iovec c_iovs[len];

for (int i = 0; i < len; i++) {
value iov = Field(iovs, i);
c_iovs[i].iov_base = Caml_ba_data_val(Field(iov, 0));
c_iovs[i].iov_len = Int_val(Field(iov, 1));
}

size_t ret = writev(Int_val(fd), c_iovs, len);
if (ret == -1) caml_failwith("writev failed");

CAMLreturn(Val_long(ret));
}
Loading

0 comments on commit 473e688

Please sign in to comment.