From 13831aba043fd97e8272797d7b33467b45bdbfc5 Mon Sep 17 00:00:00 2001 From: Omniscient Date: Sat, 16 Mar 2024 23:12:40 +1100 Subject: [PATCH 1/9] type signature thats it :P --- riot/riot.mli | 3 +++ 1 file changed, 3 insertions(+) diff --git a/riot/riot.mli b/riot/riot.mli index b21c0b3..f1e9607 100644 --- a/riot/riot.mli +++ b/riot/riot.mli @@ -232,6 +232,9 @@ val spawn_pinned : (unit -> unit) -> Pid.t val spawn_link : (unit -> unit) -> Pid.t (** Spawns a new process and links it to the current process before returning. *) +val spawn_blocking : (unit -> unit) -> Pid.t +(** Spawns a new isolated process that does not yield to the Riot scheduler. *) + exception Name_already_registered of string * Pid.t val register : string -> Pid.t -> unit From 53f84fa99951c86d501a55f8bf87e43886d4ffbb Mon Sep 17 00:00:00 2001 From: omnisci3nce <17525998+omnisci3nce@users.noreply.github.com> Date: Sat, 13 Apr 2024 12:15:44 +1000 Subject: [PATCH 2/9] split spawn scheduler function out of closure inside Pool.make --- riot/runtime/scheduler/scheduler.ml | 43 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/riot/runtime/scheduler/scheduler.ml b/riot/runtime/scheduler/scheduler.ml index 2c7384a..046e407 100644 --- a/riot/runtime/scheduler/scheduler.ml +++ b/riot/runtime/scheduler/scheduler.ml @@ -535,6 +535,23 @@ module Pool = struct sockets and handle that as a regular value rather than as a signal. *) Sys.set_signal Sys.sigpipe Sys.Signal_ignore + let spawn_scheduler_on_pool pool (scheduler : t) : unit Domain.t = + Stdlib.Domain.spawn (fun () -> + setup (); + set_pool pool; + Scheduler.set_current_scheduler scheduler; + try + Scheduler.run pool scheduler (); + Log.trace (fun f -> + f "<<< shutting down scheduler #%a" Scheduler_uid.pp scheduler.uid) + with exn -> + Log.error (fun f -> + f "Scheduler.run exception: %s due to: %s%!" + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string + (Printexc.get_raw_backtrace ()))); + shutdown pool 1) + let make ?(rnd = Random.State.make_self_init ()) ~domains ~main () = setup (); @@ -553,25 +570,9 @@ module Pool = struct registry = Proc_registry.create (); } in - let spawn (scheduler : t) = - Stdlib.Domain.spawn (fun () -> - setup (); - set_pool pool; - Scheduler.set_current_scheduler scheduler; - try - Scheduler.run pool scheduler (); - Log.trace (fun f -> - f "<<< shutting down scheduler #%a" Scheduler_uid.pp - scheduler.uid) - with exn -> - Log.error (fun f -> - f "Scheduler.run exception: %s due to: %s%!" - (Printexc.to_string exn) - (Printexc.raw_backtrace_to_string - (Printexc.get_raw_backtrace ()))); - shutdown pool 1) - in - Log.debug (fun f -> f "Created %d schedulers" (List.length schedulers)); + Log.debug (fun f -> + f "Created %d schedulers including the main scheduler" + (List.length schedulers)); let io_thread = Stdlib.Domain.spawn (fun () -> @@ -585,6 +586,8 @@ module Pool = struct shutdown pool 2) in - let scheduler_threads = List.map spawn schedulers in + let scheduler_threads = + List.map (spawn_scheduler_on_pool pool) schedulers + in (pool, io_thread :: scheduler_threads) end From 3e29ed7dac0eb0680987dd3ed9e8131f6746fbe1 Mon Sep 17 00:00:00 2001 From: omnisci3nce <17525998+omnisci3nce@users.noreply.github.com> Date: Sat, 13 Apr 2024 12:16:09 +1000 Subject: [PATCH 3/9] implement spawn_blocking to create a new Scheduler with its own Domain --- riot/runtime/import.ml | 5 +++++ riot/runtime/scheduler/scheduler.ml | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/riot/runtime/import.ml b/riot/runtime/import.ml index c1c5e40..babf923 100644 --- a/riot/runtime/import.ml +++ b/riot/runtime/import.ml @@ -139,6 +139,11 @@ let spawn_pinned fn = let spawn_link fn = _spawn ~do_link:true fn +let spawn_blocking fn = + let pool = _get_pool () in + let blocking_scheduler = Scheduler.Pool.spawn_blocking pool in + _spawn ~do_link:false ~scheduler:blocking_scheduler fn + let monitor pid = let pool = _get_pool () in let this = _get_proc (self ()) in diff --git a/riot/runtime/scheduler/scheduler.ml b/riot/runtime/scheduler/scheduler.ml index 046e407..3414816 100644 --- a/riot/runtime/scheduler/scheduler.ml +++ b/riot/runtime/scheduler/scheduler.ml @@ -590,4 +590,9 @@ module Pool = struct List.map (spawn_scheduler_on_pool pool) schedulers in (pool, io_thread :: scheduler_threads) + + (** Creates a new blocking scheduler in the pool *) + let spawn_blocking ?(rnd = Random.State.make_self_init ()) _pool = + let new_scheduler = Scheduler.make ~rnd () in + new_scheduler end From 9c91300fdb34ff8388f7b39d38684521f63037ad Mon Sep 17 00:00:00 2001 From: omnisci3nce <17525998+omnisci3nce@users.noreply.github.com> Date: Sat, 13 Apr 2024 23:42:40 +1000 Subject: [PATCH 4/9] wip: create test --- test/dune | 6 ++++++ test/process_blocking_test.ml | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 test/process_blocking_test.ml diff --git a/test/dune b/test/dune index dff1e47..c0a60fc 100644 --- a/test/dune +++ b/test/dune @@ -115,6 +115,12 @@ (modules link_processes_test) (libraries riot)) +(test + (package riot) + (name process_blocking_test) + (modules process_blocking_test) + (libraries riot)) + (test (package riot) (name process_registration_test) diff --git a/test/process_blocking_test.ml b/test/process_blocking_test.ml new file mode 100644 index 0000000..cb7a214 --- /dev/null +++ b/test/process_blocking_test.ml @@ -0,0 +1,32 @@ +[@@@warning "-8"] + +open Riot + +type Message.t += Hello_world + +let factorial n = + let rec aux n acc = + match n with + | 1 -> acc + | x -> aux (n-1) (acc * x) + in + aux n 1 + +let busy_worker + +let () = + Riot.run @@ fun () -> + (* Runtime.set_log_level (Some Trace); *) + let pid = + spawn (fun () -> + let selector msg = + match msg with Hello_world -> `select `hello_world | _ -> `skip + in + match receive ~selector () with + | `hello_world -> + Logger.info (fun f -> f "hello world from %a!" Pid.pp (self ())); + shutdown ()) + in + send pid Hello_world; + let a = factorial 30 in + print_int a; From 558fbd24bcfa226c02591543736dc5b219bb0da8 Mon Sep 17 00:00:00 2001 From: omnisci3nce <17525998+omnisci3nce@users.noreply.github.com> Date: Tue, 14 May 2024 10:27:04 +1000 Subject: [PATCH 5/9] flesh out the example in the test --- test/dune | 10 +++--- test/process_blocking_test.ml | 57 +++++++++++++++++++++++------------ 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/test/dune b/test/dune index c0a60fc..0351067 100644 --- a/test/dune +++ b/test/dune @@ -121,11 +121,11 @@ (modules process_blocking_test) (libraries riot)) -(test - (package riot) - (name process_registration_test) - (modules process_registration_test) - (libraries riot)) +; (test +; (package riot) +; (name process_registration_test) +; (modules process_registration_test) +; (libraries riot)) ; (test ; (package riot) diff --git a/test/process_blocking_test.ml b/test/process_blocking_test.ml index cb7a214..f3e946d 100644 --- a/test/process_blocking_test.ml +++ b/test/process_blocking_test.ml @@ -2,31 +2,48 @@ open Riot -type Message.t += Hello_world +type Message.t += AnswerToAllTheWorldsProblems of int +type Message.t += CountdownFinished let factorial n = let rec aux n acc = - match n with - | 1 -> acc - | x -> aux (n-1) (acc * x) + Logger.info (fun f -> f "Factorial %d" n); + match n with 1 -> acc | x -> aux (n - 1) (acc * x) in aux n 1 -let busy_worker +let busy_worker recipient_pid () = + let number = factorial 5 in + send recipient_pid (AnswerToAllTheWorldsProblems number) + +let rec countdown_worker recipient_pid n = + Logger.info (fun f -> f "Countdown loop n = %d" n); + + if n = 0 then send recipient_pid CountdownFinished + else ( + yield (); + countdown_worker recipient_pid (n - 1)) + +let rec wait_for_answer () = + print_endline "RECV"; + match receive_any () with + | AnswerToAllTheWorldsProblems n -> + Printf.printf + "Got the answer!\n\ + \ The answer to all the worlds problems has been calculated to be %d\n" + n + | _ -> wait_for_answer () let () = - Riot.run @@ fun () -> - (* Runtime.set_log_level (Some Trace); *) - let pid = - spawn (fun () -> - let selector msg = - match msg with Hello_world -> `select `hello_world | _ -> `skip - in - match receive ~selector () with - | `hello_world -> - Logger.info (fun f -> f "hello world from %a!" Pid.pp (self ())); - shutdown ()) - in - send pid Hello_world; - let a = factorial 30 in - print_int a; + print_endline "yooo"; + Riot.run ~workers:0 @@ fun () -> + Runtime.set_log_level (Some Trace); + + let main_pid = self () in + + let pid_waiting = spawn wait_for_answer in + let _factorial_answer_pid = spawn_blocking (busy_worker pid_waiting) in + let _countdown_pid = spawn (fun () -> countdown_worker main_pid 1000) in + wait_pids [ pid_waiting ]; + flush_all (); + shutdown () From 5bc2d908dbc0f21e69d6e6015b99e872f83ad61b Mon Sep 17 00:00:00 2001 From: omnisci3nce <17525998+omnisci3nce@users.noreply.github.com> Date: Tue, 14 May 2024 11:02:59 +1000 Subject: [PATCH 6/9] add blocking process to the run queue --- riot/runtime/import.ml | 22 ++++++++++++++++++++-- riot/runtime/scheduler/scheduler.ml | 9 +++++++-- test/dune | 10 +++++----- test/process_blocking_test.ml | 26 +++++++++++++++----------- 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/riot/runtime/import.ml b/riot/runtime/import.ml index babf923..dbd548f 100644 --- a/riot/runtime/import.ml +++ b/riot/runtime/import.ml @@ -141,8 +141,26 @@ let spawn_link fn = _spawn ~do_link:true fn let spawn_blocking fn = let pool = _get_pool () in - let blocking_scheduler = Scheduler.Pool.spawn_blocking pool in - _spawn ~do_link:false ~scheduler:blocking_scheduler fn + let blocking_scheduler = Scheduler.Pool.spawn_blocking_scheduler pool in + let proc = + Process.make blocking_scheduler.uid (fun () -> + try + fn (); + Normal + with + | Proc_state.Unwind -> Normal + | exn -> + Log.error (fun f -> + f "Process %a died with unhandled exception %s:\n%s" Pid.pp + (self ()) (Printexc.to_string exn) + (Printexc.get_backtrace ())); + + Exception exn) + in + Scheduler.Pool.register_process pool proc; + let _ = Scheduler.kickstart_blocking_process pool blocking_scheduler proc in + proc.pid +(* _spawn ~do_link:false ~scheduler:blocking_scheduler fn *) let monitor pid = let pool = _get_pool () in diff --git a/riot/runtime/scheduler/scheduler.ml b/riot/runtime/scheduler/scheduler.ml index 3414816..bc864fb 100644 --- a/riot/runtime/scheduler/scheduler.ml +++ b/riot/runtime/scheduler/scheduler.ml @@ -95,6 +95,10 @@ module Scheduler = struct add_to_run_queue sch proc) pool.schedulers + let kickstart_blocking_process pool sch (proc : Process.t) = + add_to_run_queue sch proc; + pool.schedulers + let handle_receive k pool sch (proc : Process.t) ~(ref : 'a Ref.t option) ~timeout ~selector = Trace.handle_receive_span @@ fun () -> @@ -571,7 +575,7 @@ module Pool = struct } in Log.debug (fun f -> - f "Created %d schedulers including the main scheduler" + f "Created %d schedulers excluding the main scheduler" (List.length schedulers)); let io_thread = @@ -592,7 +596,8 @@ module Pool = struct (pool, io_thread :: scheduler_threads) (** Creates a new blocking scheduler in the pool *) - let spawn_blocking ?(rnd = Random.State.make_self_init ()) _pool = + let spawn_blocking_scheduler ?(rnd = Random.State.make_self_init ()) pool = let new_scheduler = Scheduler.make ~rnd () in + let _domain = spawn_scheduler_on_pool pool new_scheduler in new_scheduler end diff --git a/test/dune b/test/dune index 0351067..c0a60fc 100644 --- a/test/dune +++ b/test/dune @@ -121,11 +121,11 @@ (modules process_blocking_test) (libraries riot)) -; (test -; (package riot) -; (name process_registration_test) -; (modules process_registration_test) -; (libraries riot)) +(test + (package riot) + (name process_registration_test) + (modules process_registration_test) + (libraries riot)) ; (test ; (package riot) diff --git a/test/process_blocking_test.ml b/test/process_blocking_test.ml index f3e946d..66acaa1 100644 --- a/test/process_blocking_test.ml +++ b/test/process_blocking_test.ml @@ -3,29 +3,31 @@ open Riot type Message.t += AnswerToAllTheWorldsProblems of int -type Message.t += CountdownFinished + +(* Factorial is too fast so just a little function that eats some more CPU time*) +let rec block_longer n = if n == 0 then () else block_longer (n - 1) let factorial n = let rec aux n acc = Logger.info (fun f -> f "Factorial %d" n); + block_longer 100000; match n with 1 -> acc | x -> aux (n - 1) (acc * x) in aux n 1 let busy_worker recipient_pid () = - let number = factorial 5 in + let number = factorial 30 in send recipient_pid (AnswerToAllTheWorldsProblems number) -let rec countdown_worker recipient_pid n = +let rec countdown_worker n = Logger.info (fun f -> f "Countdown loop n = %d" n); - if n = 0 then send recipient_pid CountdownFinished + if n = 0 then () else ( yield (); - countdown_worker recipient_pid (n - 1)) + countdown_worker (n - 1)) let rec wait_for_answer () = - print_endline "RECV"; match receive_any () with | AnswerToAllTheWorldsProblems n -> Printf.printf @@ -35,15 +37,17 @@ let rec wait_for_answer () = | _ -> wait_for_answer () let () = - print_endline "yooo"; - Riot.run ~workers:0 @@ fun () -> Runtime.set_log_level (Some Trace); + print_endline "Test spawn_blocking"; + Riot.run ~workers:0 @@ fun () -> + let _ = Logger.start () |> Result.get_ok in + Logger.set_log_level (Some Debug); - let main_pid = self () in - + (* let main_pid = self () in *) let pid_waiting = spawn wait_for_answer in + + let _countdown_pid = spawn (fun () -> countdown_worker 100) in let _factorial_answer_pid = spawn_blocking (busy_worker pid_waiting) in - let _countdown_pid = spawn (fun () -> countdown_worker main_pid 1000) in wait_pids [ pid_waiting ]; flush_all (); shutdown () From 8520c2aa696acbdd1377a4eac49ef7d739c3832b Mon Sep 17 00:00:00 2001 From: omnisci3nce <17525998+omnisci3nce@users.noreply.github.com> Date: Tue, 14 May 2024 13:00:09 +1000 Subject: [PATCH 7/9] minor change --- riot/runtime/import.ml | 3 +++ test/process_blocking_test.ml | 8 +++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/riot/runtime/import.ml b/riot/runtime/import.ml index dbd548f..7ece1de 100644 --- a/riot/runtime/import.ml +++ b/riot/runtime/import.ml @@ -141,7 +141,10 @@ let spawn_link fn = _spawn ~do_link:true fn let spawn_blocking fn = let pool = _get_pool () in + (* Create a scheduler *) let blocking_scheduler = Scheduler.Pool.spawn_blocking_scheduler pool in + + (* Start the process *) let proc = Process.make blocking_scheduler.uid (fun () -> try diff --git a/test/process_blocking_test.ml b/test/process_blocking_test.ml index 66acaa1..2579dbd 100644 --- a/test/process_blocking_test.ml +++ b/test/process_blocking_test.ml @@ -31,19 +31,17 @@ let rec wait_for_answer () = match receive_any () with | AnswerToAllTheWorldsProblems n -> Printf.printf - "Got the answer!\n\ - \ The answer to all the worlds problems has been calculated to be %d\n" + "Got the answer!\nThe answer to all the worlds problems has been calculated to be %d\n" n | _ -> wait_for_answer () let () = - Runtime.set_log_level (Some Trace); + (* Runtime.set_log_level (Some Trace); *) print_endline "Test spawn_blocking"; Riot.run ~workers:0 @@ fun () -> let _ = Logger.start () |> Result.get_ok in - Logger.set_log_level (Some Debug); + Logger.set_log_level (Some Info); - (* let main_pid = self () in *) let pid_waiting = spawn wait_for_answer in let _countdown_pid = spawn (fun () -> countdown_worker 100) in From b579cc562f2438b2916738eb8dff634e66c6e17e Mon Sep 17 00:00:00 2001 From: omnisci3nce <17525998+omnisci3nce@users.noreply.github.com> Date: Wed, 15 May 2024 15:50:20 +1000 Subject: [PATCH 8/9] try using a flag on processes to mark a scheduler to be stopped when process completes --- riot/lib/process.ml | 7 ++- riot/riot.mli | 1 + riot/runtime/core/process.ml | 17 ++++++- riot/runtime/import.ml | 7 ++- riot/runtime/scheduler/scheduler.ml | 70 ++++++++++++++++++++++++++++- test/process_blocking_test.ml | 5 ++- 6 files changed, 98 insertions(+), 9 deletions(-) diff --git a/riot/lib/process.ml b/riot/lib/process.ml index 3ac10fd..b41289e 100644 --- a/riot/lib/process.ml +++ b/riot/lib/process.ml @@ -7,12 +7,17 @@ end) type t = P.t type priority = P.priority = High | Normal | Low -type process_flag = P.process_flag = Trap_exit of bool | Priority of priority + +type process_flag = P.process_flag = + | Trap_exit of bool + | Priority of priority + | IsBlockingProc of bool let pp_flag fmt t = match t with | Trap_exit b -> Format.fprintf fmt "trap_exit <- %b" b | Priority p -> Format.fprintf fmt "priority <- %s" (P.priority_to_string p) + | _ -> failwith "TODO" type exit_reason = P.exit_reason = | Normal diff --git a/riot/riot.mli b/riot/riot.mli index ec9fc8f..9644a0b 100644 --- a/riot/riot.mli +++ b/riot/riot.mli @@ -113,6 +113,7 @@ module Process : sig (** Processes with a [High] priority will be scheduled before processes with a [Normal] priority which will be scheduled before processes with a [Low] priority. *) + | IsBlockingProc of bool (* An [exit_reason] describes why a process finished. *) type exit_reason = diff --git a/riot/runtime/core/process.ml b/riot/runtime/core/process.ml index 798879a..e4eeaea 100644 --- a/riot/runtime/core/process.ml +++ b/riot/runtime/core/process.ml @@ -39,12 +39,20 @@ let priority_to_string = function type process_flags = { trap_exits : bool Atomic.t; priority : priority Atomic.t; + is_blocking_proc : bool Atomic.t; } -type process_flag = Trap_exit of bool | Priority of priority +type process_flag = + | Trap_exit of bool + | Priority of priority + | IsBlockingProc of bool let default_flags () = - { trap_exits = Atomic.make false; priority = Atomic.make Normal } + { + trap_exits = Atomic.make false; + priority = Atomic.make Normal; + is_blocking_proc = Atomic.make false; + } type t = { pid : Pid.t; @@ -169,6 +177,7 @@ let is_runnable t = Atomic.get t.state = Runnable let is_running t = Atomic.get t.state = Running let is_finalized t = Atomic.get t.state = Finalized let is_main t = Pid.equal (pid t) Pid.main +let is_blocking_proc t = Atomic.get t.flags.is_blocking_proc let has_empty_mailbox t = Mailbox.is_empty t.save_queue && Mailbox.is_empty t.mailbox @@ -274,6 +283,10 @@ let rec set_flag t flag = let old_flag = Atomic.get t.flags.priority in if Atomic.compare_and_set t.flags.priority old_flag p then () else set_flag t flag + | IsBlockingProc b -> + let old_flag = Atomic.get t.flags.is_blocking_proc in + if Atomic.compare_and_set t.flags.is_blocking_proc old_flag b then () + else set_flag t flag let set_cont t c = t.cont <- Some c let set_sid t sid = Atomic.set t.sid sid diff --git a/riot/runtime/import.ml b/riot/runtime/import.ml index 7ece1de..568bc58 100644 --- a/riot/runtime/import.ml +++ b/riot/runtime/import.ml @@ -146,7 +146,7 @@ let spawn_blocking fn = (* Start the process *) let proc = - Process.make blocking_scheduler.uid (fun () -> + Process.make blocking_scheduler.scheduler.uid (fun () -> try fn (); Normal @@ -160,8 +160,11 @@ let spawn_blocking fn = Exception exn) in + Process.set_flag proc (IsBlockingProc true); Scheduler.Pool.register_process pool proc; - let _ = Scheduler.kickstart_blocking_process pool blocking_scheduler proc in + let _ = + Scheduler.kickstart_blocking_process pool blocking_scheduler.scheduler proc + in proc.pid (* _spawn ~do_link:false ~scheduler:blocking_scheduler fn *) diff --git a/riot/runtime/scheduler/scheduler.ml b/riot/runtime/scheduler/scheduler.ml index bc864fb..e805100 100644 --- a/riot/runtime/scheduler/scheduler.ml +++ b/riot/runtime/scheduler/scheduler.ml @@ -12,6 +12,7 @@ type t = { idle_mutex : Mutex.t; idle_condition : Condition.t; currently_stealing : Mutex.t; + mutable stop : bool; } type io = { @@ -26,12 +27,15 @@ type io = { mutable calls_send : int; } +type blocking = { scheduler : t; domain : unit Domain.t } + type pool = { mutable stop : bool; mutable status : int; io_scheduler : io; schedulers : t list; processes : Proc_table.t; + blocking_schedulers : blocking list Atomic.t; mutable proc_count : int; registry : Proc_registry.t; } @@ -60,6 +64,7 @@ module Scheduler = struct idle_mutex = Mutex.create (); idle_condition = Condition.create (); currently_stealing = Mutex.create (); + stop = false; } let get_current_scheduler, (set_current_scheduler : t -> unit) = @@ -363,6 +368,11 @@ module Scheduler = struct awake_process pool linked_proc) linked_pids; + if Process.is_blocking_proc proc then ( + Log.debug (fun f -> f "Set scheduler.stop to true"); + sch.stop <- true) + else (); + Proc_queue.remove sch.run_queue proc; Proc_table.remove pool.processes proc.pid; Proc_registry.remove pool.registry proc.pid; @@ -458,6 +468,7 @@ module Scheduler = struct (try while true do if pool.stop then raise_notrace Exit; + if sch.stop then raise_notrace Exit; Mutex.lock sch.idle_mutex; while @@ -475,6 +486,57 @@ module Scheduler = struct Log.trace (fun f -> f "< exit worker loop") end +module Blocking_scheduler = struct + (* include Scheduler *) + type t = blocking + + let make sch domain = { scheduler = sch; domain } + + let rec add_to_pool pool blocking = + let dom_list = Atomic.get pool.blocking_schedulers in + if + Atomic.compare_and_set pool.blocking_schedulers dom_list + (blocking :: dom_list) + then () + else add_to_pool pool blocking + + let rec remove_from_pool pool blocking = + let cur = Atomic.get pool.blocking_schedulers in + let without_removee = List.filter (fun sch -> sch.domain != blocking.domain) cur in + if Atomic.compare_and_set pool.blocking_schedulers cur without_removee then + () + else remove_from_pool pool blocking + + (* let run pool (sch : t) () = + Log.trace (fun f -> f "> enter worker loop"); + let exception Exit in + (try + while true do + if pool.stop then raise_notrace Exit; + + Mutex.lock sch.idle_mutex; + while + (not pool.stop) + && Proc_queue.is_empty sch.run_queue + && not (Timer_wheel.can_tick sch.timers) + do + Condition.wait sch.idle_condition sch.idle_mutex + done; + Mutex.unlock sch.idle_mutex; + + Scheduler.run_loop pool sch + done + with Exit -> ()); + Log.trace (fun f -> f "< exit worker loop") *) + + (* Override the handle exit function *) + let handle_exit_blocking_proc pool sch proc reason = + Scheduler.handle_exit_proc pool sch.scheduler proc reason; + (* In addition to the above, we want to remove this scheduler thereby freeing up the thread/core *) + (* TODO: Remove domain from pool *) + remove_from_pool pool sch +end + include Scheduler module Io_scheduler = struct @@ -542,6 +604,7 @@ module Pool = struct let spawn_scheduler_on_pool pool (scheduler : t) : unit Domain.t = Stdlib.Domain.spawn (fun () -> setup (); + Stdlib.Domain.at_exit (fun () -> Log.warn (fun f -> f "Domain freed")); set_pool pool; Scheduler.set_current_scheduler scheduler; try @@ -571,6 +634,7 @@ module Pool = struct io_scheduler; schedulers = [ main ] @ schedulers; processes = Proc_table.create (); + blocking_schedulers = Atomic.make []; registry = Proc_registry.create (); } in @@ -598,6 +662,8 @@ module Pool = struct (** Creates a new blocking scheduler in the pool *) let spawn_blocking_scheduler ?(rnd = Random.State.make_self_init ()) pool = let new_scheduler = Scheduler.make ~rnd () in - let _domain = spawn_scheduler_on_pool pool new_scheduler in - new_scheduler + let domain = spawn_scheduler_on_pool pool new_scheduler in + let blocking_sch = Blocking_scheduler.make new_scheduler domain in + Blocking_scheduler.add_to_pool pool blocking_sch; + blocking_sch end diff --git a/test/process_blocking_test.ml b/test/process_blocking_test.ml index 2579dbd..b3fecee 100644 --- a/test/process_blocking_test.ml +++ b/test/process_blocking_test.ml @@ -31,12 +31,13 @@ let rec wait_for_answer () = match receive_any () with | AnswerToAllTheWorldsProblems n -> Printf.printf - "Got the answer!\nThe answer to all the worlds problems has been calculated to be %d\n" + "Got the answer!\n\ + The answer to all the worlds problems has been calculated to be %d\n" n | _ -> wait_for_answer () let () = - (* Runtime.set_log_level (Some Trace); *) + Runtime.set_log_level (Some Trace); print_endline "Test spawn_blocking"; Riot.run ~workers:0 @@ fun () -> let _ = Logger.start () |> Result.get_ok in From 8e749cd2f64353b085c4a83d14a2d5b065acbaa1 Mon Sep 17 00:00:00 2001 From: omnisci3nce <17525998+omnisci3nce@users.noreply.github.com> Date: Wed, 15 May 2024 15:53:45 +1000 Subject: [PATCH 9/9] remome commented out --- riot/runtime/scheduler/scheduler.ml | 29 +++-------------------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/riot/runtime/scheduler/scheduler.ml b/riot/runtime/scheduler/scheduler.ml index e805100..cc81b35 100644 --- a/riot/runtime/scheduler/scheduler.ml +++ b/riot/runtime/scheduler/scheduler.ml @@ -507,34 +507,11 @@ module Blocking_scheduler = struct () else remove_from_pool pool blocking - (* let run pool (sch : t) () = - Log.trace (fun f -> f "> enter worker loop"); - let exception Exit in - (try - while true do - if pool.stop then raise_notrace Exit; - - Mutex.lock sch.idle_mutex; - while - (not pool.stop) - && Proc_queue.is_empty sch.run_queue - && not (Timer_wheel.can_tick sch.timers) - do - Condition.wait sch.idle_condition sch.idle_mutex - done; - Mutex.unlock sch.idle_mutex; - - Scheduler.run_loop pool sch - done - with Exit -> ()); - Log.trace (fun f -> f "< exit worker loop") *) (* Override the handle exit function *) - let handle_exit_blocking_proc pool sch proc reason = - Scheduler.handle_exit_proc pool sch.scheduler proc reason; - (* In addition to the above, we want to remove this scheduler thereby freeing up the thread/core *) - (* TODO: Remove domain from pool *) - remove_from_pool pool sch + (* let handle_exit_blocking_proc pool sch proc reason = *) + (* Scheduler.handle_exit_proc pool sch.scheduler proc reason; *) + (* remove_from_pool pool sch *) end include Scheduler