Skip to content

Commit

Permalink
feat: dedicated io thread, kqueue support, and better syscall handling (
Browse files Browse the repository at this point in the history
#20)

feat: dedicated io scheduler and macOS kqueue support
fix: clear poll before waiting
feat: move io to separate thread
feat: broaden io ops types to fd
fix: include poll as a dep and fmt
try: poll with short timeout
feat: reuse buffer on the receiver end
chore: remove poll_idx
fix: if no open fds to wait for, don't wait
test: add net_test and make it pass
fix: properly Suspend actors on syscalls
  • Loading branch information
leostera committed Dec 22, 2023
1 parent eb02f01 commit 908aa2a
Show file tree
Hide file tree
Showing 20 changed files with 421 additions and 186 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
## Unreleased

* Redo packaging to expose a single public library: `riot`
* Fix issue with schedulers busy-waiting
* Introduce separate IO Schedulers to polling
* Switch to `poll` to support kqueue on macOS
* Reuse read-buffers on Io.read loops
* Add `Dashmap.iter` to iterate over a collection

## 0.0.5

Expand Down
28 changes: 19 additions & 9 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@
(package
(name riot)
(synopsis "An actor-model multi-core scheduler for OCaml 5")
(description "Riot is an actor-model multi-core scheduler for OCaml 5. It brings Erlang-style concurrency to the language, where lighweight process communicate via message passing")
(description
"Riot is an actor-model multi-core scheduler for OCaml 5. It brings Erlang-style concurrency to the language, where lighweight process communicate via message passing")
(depends
(ocaml (>= "5.1"))
(ptime (>= "1.1.0"))
(iomux (>= "0.3"))
(bigstringaf (>= "0.9.1"))
(uri (>= "4.4.0"))
(telemetry (>= "0.0.1"))
(ocaml
(>= "5.1"))
dune
(ptime
(>= "1.1.0"))
(poll
(>= "0.3.1"))
(bigstringaf
(>= "0.9.1"))
(uri
(>= "4.4.0"))
(telemetry
(>= "0.0.1"))
(mdx (and :with-test (>= "2.3.1")))
(odoc (and :with-doc (>= "2.2.2")))
)
(odoc
(and
:with-doc
(>= "2.2.2"))))
(tags
(multicore erlang actor "message-passing" processes)))
4 changes: 2 additions & 2 deletions riot.opam
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ tags: ["multicore" "erlang" "actor" "message-passing" "processes"]
homepage: "https://github.com/leostera/riot"
bug-reports: "https://github.com/leostera/riot/issues"
depends: [
"dune" {>= "3.11"}
"ocaml" {>= "5.1"}
"dune" {>= "3.11"}
"ptime" {>= "1.1.0"}
"iomux" {>= "0.3"}
"poll" {>= "0.3.1"}
"bigstringaf" {>= "0.9.1"}
"uri" {>= "4.4.0"}
"telemetry" {>= "0.0.1"}
Expand Down
55 changes: 31 additions & 24 deletions riot/lib/net/socket.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ open Runtime
open Net
include Socket

module Logger = Logger.Make (struct
let namespace = [ "riot"; "net"; "socket" ]
end)

type listen_opts = {
reuse_addr : bool;
reuse_port : bool;
Expand All @@ -17,56 +21,59 @@ let default_listen_opts =
{ reuse_addr = true; reuse_port = true; backlog = 128; addr = Addr.loopback }

let close socket =
let sch = Scheduler.get_current_scheduler () in
let pool = Scheduler.Pool.get_pool () in
let this = self () in
Logger.trace (fun f ->
f "Process %a: Closing socket fd=%a" Pid.pp this Fd.pp socket);
Io.close sch.io_tbl socket
Io.close pool.io_scheduler.io_tbl socket

let listen ?(opts = default_listen_opts) ~port () =
let sch = Scheduler.get_current_scheduler () in
let pool = Scheduler.Pool.get_pool () in
let { reuse_addr; reuse_port; backlog; addr } = opts in
let addr = Addr.tcp addr port in
Logger.trace (fun f -> f "Listening on 0.0.0.0:%d" port);
Io.listen sch.io_tbl ~reuse_port ~reuse_addr ~backlog addr
Io.listen pool.io_scheduler.io_tbl ~reuse_port ~reuse_addr ~backlog addr

let rec connect addr =
let sch = Scheduler.get_current_scheduler () in
Logger.error (fun f -> f "Connecting to %a" Addr.pp addr);
match Io.connect sch.io_tbl addr with
| `Connected fd -> Ok fd
| `In_progress fd ->
let this = _get_proc (self ()) in
Io.register sch.io_tbl this `w fd;
syscall "connect" `w fd @@ fun socket -> Ok socket
let pool = Scheduler.Pool.get_pool () in
Logger.debug (fun f -> f "Connecting to %a" Addr.pp addr);
match Io.connect pool.io_scheduler.io_tbl addr with
| `Connected fd -> connected addr fd
| `In_progress fd -> in_progress addr fd
| `Abort reason -> Error (`Unix_error reason)
| `Retry ->
yield ();
connect addr

and in_progress addr fd = syscall "connect" `w fd @@ connected addr

and connected addr fd =
Logger.debug (fun f -> f "Connecting to %a via %a" Addr.pp addr Socket.pp fd);
Ok fd

let rec accept ?(timeout = Infinity) (socket : Socket.listen_socket) =
let sch = Scheduler.get_current_scheduler () in
match Io.accept sch.io_tbl socket with
let pool = Scheduler.Pool.get_pool () in
Log.debug (fun f -> f "Socket is Accepting client at fd=%a" Fd.pp socket);
match Io.accept pool.io_scheduler.io_tbl socket with
| exception Fd.(Already_closed _) -> Error `Closed
| `Abort reason -> Error (`Unix_error reason)
| `Retry -> syscall "accept" `r socket @@ accept ~timeout
| `Connected (socket, addr) -> Ok (socket, addr)
| `Connected (conn, addr) -> Ok (conn, addr)

let controlling_process _socket ~new_owner:_ = Ok ()

let rec receive ?(timeout = Infinity) ~len socket =
let bytes = Bytes.create len in
match Io.read socket bytes 0 len with
let rec receive ?(timeout = Infinity) ~buf socket =
let bytes = Bytes.create (Bigstringaf.length buf) in
match Io.read socket bytes 0 (Bytes.length bytes - 1) with
| exception Fd.(Already_closed _) -> Error `Closed
| `Abort reason -> Error (`Unix_error reason)
| `Retry -> syscall "read" `r socket @@ receive ~timeout ~len
| `Retry -> syscall "receive" `r socket @@ receive ~timeout ~buf
| `Read 0 -> Error `Closed
| `Read len ->
let data = Bigstringaf.create len in
Bigstringaf.blit_from_bytes bytes ~src_off:0 data ~dst_off:0 ~len;
Ok data
Bigstringaf.blit_from_bytes bytes ~src_off:0 buf ~dst_off:0 ~len;
Ok len

let rec send data socket =
let rec send ~data socket =
Logger.debug (fun f -> f "sending: %S" (Bigstringaf.to_string data));
let off = 0 in
let len = Bigstringaf.length data in
Expand All @@ -77,7 +84,7 @@ let rec send data socket =
| `Abort reason -> Error (`Unix_error reason)
| `Retry ->
Logger.debug (fun f -> f "retrying");
syscall "write" `w socket @@ send data
syscall "send" `w socket @@ send ~data
| `Wrote bytes ->
Logger.debug (fun f -> f "sent: %S" (Bigstringaf.to_string data));
Ok bytes
Expand Down
6 changes: 3 additions & 3 deletions riot/lib/net/socket.mli
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ val controlling_process : 'a -> new_owner:'b -> (unit, 'c) result

val receive :
?timeout:timeout ->
len:int ->
buf:Bigstringaf.t ->
Fd.t ->
(Bigstringaf.t, [> `Unix_error of Unix.error | `Closed ]) result
(int, [> `Unix_error of Unix.error | `Closed ]) result

val send :
Bigstringaf.t ->
data:Bigstringaf.t ->
Fd.t ->
(int, [> `Unix_error of Unix.error | `Closed ]) result

Expand Down
6 changes: 3 additions & 3 deletions riot/riot.mli
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,11 @@ module Net : sig

val receive :
?timeout:timeout ->
len:int ->
buf:Bigstringaf.t ->
stream_socket ->
(Bigstringaf.t, [> `Closed | `Timeout ]) result
(int, [> `Closed | `Timeout ]) result

val send : Bigstringaf.t -> stream_socket -> (int, [> `Closed ]) result
val send : data:Bigstringaf.t -> stream_socket -> (int, [> `Closed ]) result
val pp : Format.formatter -> _ socket -> unit

val pp_err :
Expand Down
9 changes: 9 additions & 0 deletions riot/runtime/core/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type t = {
(** the save queue is a temporary queue used for storing messages during a selective receive *)
links : Pid.t list Atomic.t;
monitors : Pid.t list Atomic.t;
ready_fds : Fd.t list Atomic.t;
}
(** The process descriptor. *)

Expand All @@ -56,6 +57,7 @@ let make sid fn =
save_queue = Mailbox.create ();
read_save_queue = false;
flags = default_flags ();
ready_fds = Atomic.make [];
}
in
proc
Expand Down Expand Up @@ -118,6 +120,13 @@ let is_finalized t = Atomic.get t.state = Finalized
let has_empty_mailbox t =
Mailbox.is_empty t.save_queue && Mailbox.is_empty t.mailbox

let has_ready_fds t = not (Atomic.get t.ready_fds = [])

let rec set_ready_fds t fds =
let last_fds = Atomic.get t.ready_fds in
if Atomic.compare_and_set t.ready_fds last_fds fds then ()
else set_ready_fds t fds

let has_messages t = not (has_empty_mailbox t)
let message_count t = Mailbox.size t.mailbox + Mailbox.size t.save_queue
let should_awake t = is_alive t && has_messages t
Expand Down
3 changes: 3 additions & 0 deletions riot/runtime/core/process.mli
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type t = {
mutable read_save_queue : bool;
links : Pid.t list Atomic.t;
monitors : Pid.t list Atomic.t;
ready_fds : Fd.t list Atomic.t;
}

exception Process_reviving_is_forbidden of t
Expand All @@ -60,6 +61,8 @@ val is_running : t -> bool
val is_finalized : t -> bool
val has_empty_mailbox : t -> bool
val has_messages : t -> bool
val has_ready_fds : t -> bool
val set_ready_fds : t -> Fd.t list -> unit
val message_count : t -> int
val should_awake : t -> bool
val mark_as_awaiting_io : t -> string -> [ `r | `rw | `w ] -> Fd.t -> unit
Expand Down
2 changes: 1 addition & 1 deletion riot/runtime/net/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(package riot)
(name net)
(libraries core util log iomux uri unix telemetry bigstringaf))
(libraries core util log poll uri unix telemetry bigstringaf))
Loading

0 comments on commit 908aa2a

Please sign in to comment.