Skip to content

Commit

Permalink
fix: include poll as a dep and fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
leostera committed Dec 21, 2023
1 parent f336208 commit 04c6d34
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 38 deletions.
31 changes: 20 additions & 11 deletions dune-project
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(lang dune 3.10)
(lang dune 3.11)

(name riot)

Expand All @@ -16,16 +16,25 @@
(package
(name riot)
(synopsis "An actor-model multi-core scheduler for OCaml 5")
(description "Riot is an actor-model multi-core scheduler for OCaml 5. It brings Erlang-style concurrency to the language, where lighweight process communicate via message passing")
(description
"Riot is an actor-model multi-core scheduler for OCaml 5. It brings Erlang-style concurrency to the language, where lighweight process communicate via message passing")
(depends
(ocaml (>= "5.1"))
dune
(ptime (>= "1.1.0"))
(iomux (>= "0.3"))
(bigstringaf (>= "0.9.1"))
(uri (>= "4.4.0"))
(telemetry (>= "0.0.1"))
(odoc (and :with-doc (>= "2.2.2")))
)
(ocaml
(>= "5.1"))
dune
(ptime
(>= "1.1.0"))
(poll
(>= "0.3.1"))
(bigstringaf
(>= "0.9.1"))
(uri
(>= "4.4.0"))
(telemetry
(>= "0.0.1"))
(odoc
(and
:with-doc
(>= "2.2.2"))))
(tags
(multicore erlang actor "message-passing" processes)))
4 changes: 2 additions & 2 deletions riot.opam
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ homepage: "https://github.com/leostera/riot"
bug-reports: "https://github.com/leostera/riot/issues"
depends: [
"ocaml" {>= "5.1"}
"dune" {>= "3.10"}
"dune" {>= "3.11"}
"ptime" {>= "1.1.0"}
"iomux" {>= "0.3"}
"poll" {>= "0.3.1"}
"bigstringaf" {>= "0.9.1"}
"uri" {>= "4.4.0"}
"telemetry" {>= "0.0.1"}
Expand Down
47 changes: 23 additions & 24 deletions riot/runtime/scheduler/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type io = {

type pool = {
mutable stop : bool;
io_scheduler: io;
io_scheduler : io;
schedulers : t list;
processes : Proc_table.t;
registry : Proc_registry.t;
Expand All @@ -45,7 +45,7 @@ module Scheduler = struct
idle_condition = Condition.create ();
}

let get_current_scheduler, (set_current_scheduler: t -> unit) =
let get_current_scheduler, (set_current_scheduler : t -> unit) =
Thread_local.make ~name:"CURRENT_SCHEDULER"

let get_current_process_pid, set_current_process_pid =
Expand All @@ -60,7 +60,7 @@ module Scheduler = struct
let set_timer sch time mode fn =
Timer_wheel.make_timer sch.timers time mode fn

let add_to_run_queue (sch: t) (proc : Process.t) =
let add_to_run_queue (sch : t) (proc : Process.t) =
Mutex.protect sch.idle_mutex @@ fun () ->
Proc_set.remove sch.sleep_set proc;
Proc_queue.queue sch.run_queue proc;
Expand All @@ -72,7 +72,7 @@ module Scheduler = struct

let awake_process pool (proc : Process.t) =
List.iter
(fun (sch: t) ->
(fun (sch : t) ->
if Scheduler_uid.equal sch.uid proc.sid then add_to_run_queue sch proc)
pool.schedulers

Expand Down Expand Up @@ -106,7 +106,7 @@ module Scheduler = struct
in
go fuel

let handle_syscall k (pool) (_sch: t) (proc : Process.t) syscall mode fd =
let handle_syscall k pool (_sch : t) (proc : Process.t) syscall mode fd =
let open Proc_state in
Log.trace (fun f ->
let mode = match mode with `r -> "r" | `w -> "w" | `rw -> "rw" in
Expand All @@ -116,13 +116,14 @@ module Scheduler = struct
Process.mark_as_awaiting_io proc syscall mode fd;
k Yield

let perform pool (sch: t) (proc : Process.t) =
let perform pool (sch : t) (proc : Process.t) =
let open Proc_state in
let open Proc_effect in
let perform : type a b. (a, b) step_callback =
fun k eff ->
match eff with
| Syscall { name; mode; fd } -> handle_syscall k pool sch proc name mode fd
| Syscall { name; mode; fd } ->
handle_syscall k pool sch proc name mode fd
| Receive { ref } -> handle_receive k proc ref
| Yield ->
Log.trace (fun f ->
Expand All @@ -145,7 +146,7 @@ module Scheduler = struct
Log.debug (fun f -> f "Hibernated process %a" Pid.pp proc.pid);
Log.trace (fun f -> f "sleep_set: %d" (Proc_set.size sch.sleep_set)))

let handle_exit_proc pool (_sch: t) proc reason =
let handle_exit_proc pool (_sch : t) proc reason =
Io.unregister_process pool.io_scheduler.io_tbl proc;

Proc_registry.remove pool.registry (Process.pid proc);
Expand Down Expand Up @@ -195,7 +196,7 @@ module Scheduler = struct
awake_process pool linked_proc)
linked_pids

let handle_run_proc pool (sch: t) proc =
let handle_run_proc pool (sch : t) proc =
Log.trace (fun f -> f "Running process %a" Process.pp proc);
let exception Terminated_while_running of Process.exit_reason in
try
Expand Down Expand Up @@ -225,7 +226,7 @@ module Scheduler = struct
Log.trace (fun f -> f "Process %a finished" Pid.pp proc.pid);
add_to_run_queue sch proc

let step_process pool (sch: t) (proc : Process.t) =
let step_process pool (sch : t) (proc : Process.t) =
!Tracer.tracer_proc_run (sch.uid |> Scheduler_uid.to_int) proc;
match Process.state proc with
| Finalized -> failwith "finalized processes should never be stepped on"
Expand All @@ -234,10 +235,9 @@ module Scheduler = struct
| Exited reason -> handle_exit_proc pool sch proc reason
| Running | Runnable -> handle_run_proc pool sch proc


let tick_timers _pool (sch : t) = Timer_wheel.tick sch.timers

let run pool (sch: t) () =
let run pool (sch : t) () =
Log.trace (fun f -> f "> enter worker loop");
let exception Exit in
(try
Expand All @@ -248,7 +248,7 @@ module Scheduler = struct
while
(not pool.stop)
&& Proc_queue.is_empty sch.run_queue
&& (not (Timer_wheel.can_tick sch.timers))
&& not (Timer_wheel.can_tick sch.timers)
do
Condition.wait sch.idle_condition sch.idle_mutex
done;
Expand All @@ -271,7 +271,7 @@ end
include Scheduler

module Io_scheduler = struct
let make ~rnd () =
let make ~rnd () =
let uid = Uid.next () in
Log.debug (fun f -> f "Making Io_thread with id: %a" Uid.pp uid);
{
Expand All @@ -291,24 +291,23 @@ module Io_scheduler = struct
awake_process pool proc
| _ -> ()

let run pool io () =
let run pool io () =
Log.trace (fun f -> f "> enter io loop");
let exception Exit in
(try
while true do
if pool.stop then raise_notrace Exit;
poll_io pool io;
poll_io pool io
done
with Exit -> ());
Log.trace (fun f -> f "< exit worker loop")

end

module Pool = struct
let get_pool, set_pool = Thread_local.make ~name:"POOL"

let shutdown pool =
let rec wake_up_scheduler (sch: t) =
let rec wake_up_scheduler (sch : t) =
if Mutex.try_lock sch.idle_mutex then (
Condition.signal sch.idle_condition;
Mutex.unlock sch.idle_mutex)
Expand Down Expand Up @@ -342,7 +341,7 @@ module Pool = struct
registry = Proc_registry.create ();
}
in
let spawn (scheduler: t) =
let spawn (scheduler : t) =
Stdlib.Domain.spawn (fun () ->
set_pool pool;
Scheduler.set_current_scheduler scheduler;
Expand All @@ -361,18 +360,18 @@ module Pool = struct
in
Log.debug (fun f -> f "Created %d schedulers" (List.length schedulers));

let io_thread = Stdlib.Domain.spawn (fun () ->
try
Io_scheduler.run pool io_scheduler ();
let io_thread =
Stdlib.Domain.spawn (fun () ->
try Io_scheduler.run pool io_scheduler ()
with exn ->
Log.error (fun f ->
f "Io_scheduler.run exception: %s due to: %s%!"
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string
(Printexc.get_raw_backtrace ())));
shutdown pool)
in
in

let scheduler_threads = List.map spawn schedulers in
(pool, io_thread :: scheduler_threads )
(pool, io_thread :: scheduler_threads)
end
2 changes: 1 addition & 1 deletion riot/runtime/scheduler/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type io = {

type pool = {
mutable stop : bool;
io_scheduler: io;
io_scheduler : io;
schedulers : t list;
processes : Proc_table.t;
registry : Proc_registry.t;
Expand Down

0 comments on commit 04c6d34

Please sign in to comment.