Skip to content

Commit

Permalink
Merge pull request #563 from talex5/condition-extras
Browse files Browse the repository at this point in the history
Extend Condition API
  • Loading branch information
talex5 committed Jun 23, 2023
2 parents 6d78208 + 6d286e8 commit da958b6
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 62 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ test_posix:
EIO_BACKEND=posix dune runtest

dscheck:
dune exec -- ./lib_eio/tests/dscheck/test_condition.exe
dune exec -- ./lib_eio/tests/dscheck/test_rcfd.exe
dune exec -- ./lib_eio/tests/dscheck/test_sync.exe
dune exec -- ./lib_eio/tests/dscheck/test_semaphore.exe
Expand Down
57 changes: 21 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,26 @@ Eio replaces existing concurrency libraries such as Lwt
* [Time](#time)
* [Multicore Support](#multicore-support)
* [Synchronisation Tools](#synchronisation-tools)
* [Promises](#promises)
* [Example: Concurrent Cache](#example-concurrent-cache)
* [Streams](#streams)
* [Example: Worker Pool](#example-worker-pool)
* [Mutexes and Semaphores](#mutexes-and-semaphores)
* [Conditions](#conditions)
* [Example: Signal handlers](#example-signal-handlers)
* [Promises](#promises)
* [Example: Concurrent Cache](#example-concurrent-cache)
* [Streams](#streams)
* [Example: Worker Pool](#example-worker-pool)
* [Mutexes and Semaphores](#mutexes-and-semaphores)
* [Conditions](#conditions)
* [Example: Signal handlers](#example-signal-handlers)
* [Design Note: Determinism](#design-note-determinism)
* [Provider Interfaces](#provider-interfaces)
* [Example Applications](#example-applications)
* [Integrations](#integrations)
* [Async](#async)
* [Lwt](#lwt)
* [Unix and System Threads](#unix-and-system-threads)
* [Domainslib](#domainslib)
* [kcas](#kcas)
* [Async](#async)
* [Lwt](#lwt)
* [Unix and System Threads](#unix-and-system-threads)
* [Domainslib](#domainslib)
* [kcas](#kcas)
* [Best Practices](#best-practices)
* [Switches](#switches-1)
* [Casting](#casting)
* [Passing env](#passing-env)
* [Switches](#switches-1)
* [Casting](#casting)
* [Passing env](#passing-env)
* [Further Reading](#further-reading)

<!-- vim-markdown-toc -->
Expand Down Expand Up @@ -1492,29 +1492,14 @@ to tell an application to reload its configuration file:
<!-- $MDX file=examples/signals/main.ml,part=main -->
```ocaml
let main ~config_changed =
while true do
Fiber.both
(fun () ->
(* First, we start waiting for SIGHUP.
This is so that if we get SIGHUP before we finish loading
the old configuration then we'll start again. *)
Eio.Condition.await_no_mutex config_changed;
traceln "Received SIGHUP";
(* We could cancel the loading fiber now, in case it's still running,
but in this example we just wait for it to finish by itself. *)
)
(fun () ->
traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ());
load_config ();
traceln "Finished reading configuration";
)
done
Eio.Condition.loop_no_mutex config_changed (fun () ->
traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ());
load_config ();
traceln "Finished reading configuration";
None (* Keep waiting for futher changes *)
)
```

Unlike the cancellation case above, where we used `Fiber.first`,
here we use `Fiber.both` to wait until we have both read the previous version of the configuration
*and* received a request to reload, then we loop and read it again.

See the `examples/signals` directory for the full code.

## Design Note: Determinism
Expand Down
23 changes: 6 additions & 17 deletions examples/signals/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,12 @@ let load_config () =

(* $MDX part-begin=main *)
let main ~config_changed =
while true do
Fiber.both
(fun () ->
(* First, we start waiting for SIGHUP.
This is so that if we get SIGHUP before we finish loading
the old configuration then we'll start again. *)
Eio.Condition.await_no_mutex config_changed;
traceln "Received SIGHUP";
(* We could cancel the loading fiber now, in case it's still running,
but in this example we just wait for it to finish by itself. *)
)
(fun () ->
traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ());
load_config ();
traceln "Finished reading configuration";
)
done
Eio.Condition.loop_no_mutex config_changed (fun () ->
traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ());
load_config ();
traceln "Finished reading configuration";
None (* Keep waiting for futher changes *)
)
(* $MDX part-end *)

let () =
Expand Down
83 changes: 83 additions & 0 deletions lib_eio/condition.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
(* Import these directly because we copy this file for the dscheck tests. *)
module Fiber_context = Eio__core.Private.Fiber_context
module Suspend = Eio__core.Private.Suspend
module Cancel = Eio__core.Cancel

type t = Broadcast.t

let create () = Broadcast.create ()
Expand Down Expand Up @@ -34,3 +39,81 @@ let await t mutex = await_generic ~mutex t
let await_no_mutex t = await_generic t

let broadcast = Broadcast.resume_all

type request = Broadcast.request option

let register_immediate = Broadcast.suspend

let cancel = function
| Some request -> Broadcast.cancel request
| None -> false

let ensure_cancelled x = ignore (cancel x : bool)

type state =
| Init
| Waiting of ((unit, exn) result -> unit)
| Done

(* There main property want is that we don't suspend forever if a broadcast
happened after [fn] started, or if the fiber is cancelled.
1. We start in the Init state.
2. If a broadcast happens here we move to Done. If we later try to suspend, we'll resume immediately.
3. We run [fn]. If a broadcast happens during this we'll transition to Done as before.
4. If [fn] raises or wants to stop normally, we return without suspending at all.
5. Otherwise, we suspend the fiber.
6. We try to transition from Init to Waiting.
If a broadcast transitioned to Done before this, we resume immediately.
If a broadcast transitions afterwards, [wake] will see the [enqueue] function and wake us.
Therefore, we can only sleep forever if a broadcast never happens after starting [fn].
7. If the fiber is cancelled before suspending, we raise on suspend.
If cancelled after suspending and before the request succeeds, we cancel the request and raise.
If cancelled after the request succeeds, [wake] will resume us.
*)
let rec loop_no_mutex t fn =
let state = Atomic.make Init in
let wake () =
match Atomic.exchange state Done with
| Init -> () (* Broadcast happened before we suspended; suspend will notice *)
| Waiting enqueue -> enqueue (Ok ())
| Done -> assert false
in
let request = Broadcast.suspend t wake in
(* Note: to avoid memory leaks, make sure that [request] is finished in all cases. *)
match fn () with
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
ensure_cancelled request;
Printexc.raise_with_backtrace ex bt
| Some x ->
ensure_cancelled request;
x
| None ->
Suspend.enter_unchecked (fun ctx enqueue ->
match Fiber_context.get_error ctx with
| Some ex ->
ensure_cancelled request;
(* If a broadcast already happened, we still cancel. *)
enqueue (Error ex)
| None ->
let waiting = Waiting enqueue in
if Atomic.compare_and_set state Init waiting then (
(* We were in Init, so [wake] hasn't yet done anything.
When it runs, it will resume us.
We're also not currently cancelled, because we checked above
and cancellations only come from the same thread. *)
Fiber_context.set_cancel_fn ctx (fun ex ->
if cancel request then (
(* We could set the state to Done here, but there's no need;
we're not racing with anything now. [wake] never runs. *)
enqueue (Error ex)
) (* else we already got resumed *)
)
) else (
(* State is already Done, but [wake] couldn't wake us then
because we hadn't moved to [waiting]. Resume now. *)
enqueue (Ok ())
)
);
loop_no_mutex t fn
37 changes: 37 additions & 0 deletions lib_eio/condition.mli
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,44 @@ val await_no_mutex : t -> unit
i.e. you know the condition is still false, and no notification of a change can be sent
until [await_no_mutex] has finished suspending the fiber. *)

val loop_no_mutex : t -> (unit -> 'a option) -> 'a
(** [loop_no_mutex t update] runs [update ()] until it returns [Some x], then returns [x].
If [update ()] returns [None] then it waits until {!broadcast} is called before retrying.
If {!broadcast} is called while [update] is running, [update] runs again immediately.
For example, if [broadcast config_changed] is performed after some configuration file is changed, then
you can ensure [load_config] will always eventually have seen the latest configuration like this:
{[
Fiber.fork_daemon ~sw (fun () ->
loop_no_mutex config_changed (fun () -> load_config (); None)
)
]}
Note that, since there is no lock, [load_config] may see a half-written update if the configuration
is changed again before it finishes reading it,
so it should just log the error and wait to be called again. *)

val broadcast : t -> unit
(** [broadcast t] wakes up any waiting fibers (by appending them to the run-queue to resume later).
If no fibers are waiting, nothing happens. *)

(** {2 Low-level API}
This is intended only for integrating Eio with other IO libraries. *)

type request

val register_immediate : t -> (unit -> unit) -> request
(** [register_immediate t fn] will call [fn ()] the next time {!broadcast} is called.
[fn] runs immediately from the caller's context, which might not be an Eio thread, or may be a signal handler, etc.
Therefore, care is needed here. This is typically used to send a wake-up event to some non-Eio library. *)

val cancel : request -> bool
(** [cancel request] tries to cancel a request created with {!register_unsafe}.
It returns [true] if the request was cancelled (the callback will never be called),
or [false] if the request was already complete (the callback has already been called). *)
9 changes: 8 additions & 1 deletion lib_eio/tests/dscheck/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
(copy_files# (files ../../sem_state.ml))
(copy_files# (files ../../sync.ml))
(copy_files# (files ../../unix/rcfd.ml))
(copy_files# (files ../../condition.ml))
(copy_files# (files ../../core/broadcast.ml))

(executables
(names test_cells test_semaphore test_sync test_rcfd)
(names test_cells test_semaphore test_sync test_rcfd test_condition)
(libraries dscheck optint fmt eio))

(rule
Expand All @@ -27,3 +29,8 @@
(alias dscheck)
(package eio)
(action (run %{exe:test_semaphore.exe})))

(rule
(alias dscheck)
(package eio)
(action (run %{exe:test_condition.exe})))
2 changes: 2 additions & 0 deletions lib_eio/tests/dscheck/eio_mutex.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
let lock _ = assert false
let unlock _ = assert false
11 changes: 5 additions & 6 deletions lib_eio/tests/dscheck/fake_sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@ let run fn =
| Ok x -> Effect.Deep.continue k x
| Error x -> Effect.Deep.discontinue k x
in
let ctx = ref None in
let fiber = lazy (Fiber_context.make_root ()) in
Effect.Deep.try_with fn ()
{ effc = fun (type a) (e : a Effect.t) : ((a, 'b) Effect.Deep.continuation -> 'b) option ->
match e with
| Eio.Private.Effects.Suspend fn ->
Some (fun cont ->
assert (!ctx = None);
let c = Fiber_context.make_root () in
fn c (continue_result cont);
ctx := Some (Fiber_context.cancellation_context c)
fn (Lazy.force fiber) (continue_result cont);
)
| _ -> None
};
!ctx
if Lazy.is_val fiber then
Some (Fiber_context.cancellation_context (Lazy.force fiber))
else None
3 changes: 1 addition & 2 deletions lib_eio/tests/dscheck/fake_sched.mli
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
val run : (unit -> unit) -> Eio.Cancel.t option
(** [run fn] runs [fn ()] in a new fiber and returns its context so it can be cancelled.
[fn] may suspend at most once.
If it doesn't suspend then [run] returns [None] after it finishes. *)
Returns None if it never suspended. *)

val cancel : Eio.Cancel.t -> unit
(** [cancel ctx] cancels the context with a suitable dummy exception. *)
46 changes: 46 additions & 0 deletions lib_eio/tests/dscheck/test_condition.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
let debug = false

exception Abort

module T = Condition

(* [prod] threads increment a counter and notify a condition.
A consumer watches the condition and waits until it has seen
all of them. We check that the client always sees the final value.
If [cancel] is set, we also try to cancel the client and accept
that as success too. *)
let test ~prod ~cancel () =
let t = T.create () in
let sent = Atomic.make 0 in
for _ = 1 to prod do
Atomic.spawn (fun () ->
Atomic.incr sent;
T.broadcast t
)
done;
let finished = ref false in
Atomic.spawn (fun () ->
let ctx =
Fake_sched.run @@ fun () ->
try
T.loop_no_mutex t (fun () ->
if Atomic.get sent = prod && not cancel then Some ()
else None
);
finished := true
with T.Cancel.Cancelled Abort ->
finished := true
in
if cancel then
Option.iter (fun c -> T.Cancel.cancel c Abort) ctx
);
Atomic.final (fun () ->
Atomic.check (fun () -> !finished);
if debug then (
Fmt.pr "%a@." Broadcast.dump t;
);
)

let () =
Atomic.trace (test ~prod:2 ~cancel:false);
Atomic.trace (test ~prod:2 ~cancel:true)
Loading

0 comments on commit da958b6

Please sign in to comment.