diff --git a/Makefile b/Makefile index 0a0d8084a..1bfe96dfc 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,7 @@ test_posix: EIO_BACKEND=posix dune runtest dscheck: + dune exec -- ./lib_eio/tests/dscheck/test_condition.exe dune exec -- ./lib_eio/tests/dscheck/test_rcfd.exe dune exec -- ./lib_eio/tests/dscheck/test_sync.exe dune exec -- ./lib_eio/tests/dscheck/test_semaphore.exe diff --git a/README.md b/README.md index a57317070..6132a9b7e 100644 --- a/README.md +++ b/README.md @@ -37,26 +37,26 @@ Eio replaces existing concurrency libraries such as Lwt * [Time](#time) * [Multicore Support](#multicore-support) * [Synchronisation Tools](#synchronisation-tools) - * [Promises](#promises) - * [Example: Concurrent Cache](#example-concurrent-cache) - * [Streams](#streams) - * [Example: Worker Pool](#example-worker-pool) - * [Mutexes and Semaphores](#mutexes-and-semaphores) - * [Conditions](#conditions) - * [Example: Signal handlers](#example-signal-handlers) + * [Promises](#promises) + * [Example: Concurrent Cache](#example-concurrent-cache) + * [Streams](#streams) + * [Example: Worker Pool](#example-worker-pool) + * [Mutexes and Semaphores](#mutexes-and-semaphores) + * [Conditions](#conditions) + * [Example: Signal handlers](#example-signal-handlers) * [Design Note: Determinism](#design-note-determinism) * [Provider Interfaces](#provider-interfaces) * [Example Applications](#example-applications) * [Integrations](#integrations) - * [Async](#async) - * [Lwt](#lwt) - * [Unix and System Threads](#unix-and-system-threads) - * [Domainslib](#domainslib) - * [kcas](#kcas) + * [Async](#async) + * [Lwt](#lwt) + * [Unix and System Threads](#unix-and-system-threads) + * [Domainslib](#domainslib) + * [kcas](#kcas) * [Best Practices](#best-practices) - * [Switches](#switches-1) - * [Casting](#casting) - * [Passing env](#passing-env) + * [Switches](#switches-1) + * [Casting](#casting) + * [Passing env](#passing-env) * [Further Reading](#further-reading) @@ -1492,29 +1492,14 @@ to tell an application to reload its configuration file: ```ocaml let main ~config_changed = - while true do - Fiber.both - (fun () -> - (* First, we start waiting for SIGHUP. - This is so that if we get SIGHUP before we finish loading - the old configuration then we'll start again. *) - Eio.Condition.await_no_mutex config_changed; - traceln "Received SIGHUP"; - (* We could cancel the loading fiber now, in case it's still running, - but in this example we just wait for it to finish by itself. *) - ) - (fun () -> - traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ()); - load_config (); - traceln "Finished reading configuration"; - ) - done + Eio.Condition.loop_no_mutex config_changed (fun () -> + traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ()); + load_config (); + traceln "Finished reading configuration"; + None (* Keep waiting for futher changes *) + ) ``` -Unlike the cancellation case above, where we used `Fiber.first`, -here we use `Fiber.both` to wait until we have both read the previous version of the configuration -*and* received a request to reload, then we loop and read it again. - See the `examples/signals` directory for the full code. ## Design Note: Determinism diff --git a/examples/signals/main.ml b/examples/signals/main.ml index ebc74b4ab..e0f626411 100644 --- a/examples/signals/main.ml +++ b/examples/signals/main.ml @@ -8,23 +8,12 @@ let load_config () = (* $MDX part-begin=main *) let main ~config_changed = - while true do - Fiber.both - (fun () -> - (* First, we start waiting for SIGHUP. - This is so that if we get SIGHUP before we finish loading - the old configuration then we'll start again. *) - Eio.Condition.await_no_mutex config_changed; - traceln "Received SIGHUP"; - (* We could cancel the loading fiber now, in case it's still running, - but in this example we just wait for it to finish by itself. *) - ) - (fun () -> - traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ()); - load_config (); - traceln "Finished reading configuration"; - ) - done + Eio.Condition.loop_no_mutex config_changed (fun () -> + traceln "Reading configuration ('kill -SIGHUP %d' to reload)..." (Unix.getpid ()); + load_config (); + traceln "Finished reading configuration"; + None (* Keep waiting for futher changes *) + ) (* $MDX part-end *) let () = diff --git a/lib_eio/condition.ml b/lib_eio/condition.ml index bc737c937..92287f489 100644 --- a/lib_eio/condition.ml +++ b/lib_eio/condition.ml @@ -1,3 +1,8 @@ +(* Import these directly because we copy this file for the dscheck tests. *) +module Fiber_context = Eio__core.Private.Fiber_context +module Suspend = Eio__core.Private.Suspend +module Cancel = Eio__core.Cancel + type t = Broadcast.t let create () = Broadcast.create () @@ -34,3 +39,81 @@ let await t mutex = await_generic ~mutex t let await_no_mutex t = await_generic t let broadcast = Broadcast.resume_all + +type request = Broadcast.request option + +let register_immediate = Broadcast.suspend + +let cancel = function + | Some request -> Broadcast.cancel request + | None -> false + +let ensure_cancelled x = ignore (cancel x : bool) + +type state = + | Init + | Waiting of ((unit, exn) result -> unit) + | Done + +(* There main property want is that we don't suspend forever if a broadcast + happened after [fn] started, or if the fiber is cancelled. + + 1. We start in the Init state. + 2. If a broadcast happens here we move to Done. If we later try to suspend, we'll resume immediately. + 3. We run [fn]. If a broadcast happens during this we'll transition to Done as before. + 4. If [fn] raises or wants to stop normally, we return without suspending at all. + 5. Otherwise, we suspend the fiber. + 6. We try to transition from Init to Waiting. + If a broadcast transitioned to Done before this, we resume immediately. + If a broadcast transitions afterwards, [wake] will see the [enqueue] function and wake us. + Therefore, we can only sleep forever if a broadcast never happens after starting [fn]. + 7. If the fiber is cancelled before suspending, we raise on suspend. + If cancelled after suspending and before the request succeeds, we cancel the request and raise. + If cancelled after the request succeeds, [wake] will resume us. +*) +let rec loop_no_mutex t fn = + let state = Atomic.make Init in + let wake () = + match Atomic.exchange state Done with + | Init -> () (* Broadcast happened before we suspended; suspend will notice *) + | Waiting enqueue -> enqueue (Ok ()) + | Done -> assert false + in + let request = Broadcast.suspend t wake in + (* Note: to avoid memory leaks, make sure that [request] is finished in all cases. *) + match fn () with + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + ensure_cancelled request; + Printexc.raise_with_backtrace ex bt + | Some x -> + ensure_cancelled request; + x + | None -> + Suspend.enter_unchecked (fun ctx enqueue -> + match Fiber_context.get_error ctx with + | Some ex -> + ensure_cancelled request; + (* If a broadcast already happened, we still cancel. *) + enqueue (Error ex) + | None -> + let waiting = Waiting enqueue in + if Atomic.compare_and_set state Init waiting then ( + (* We were in Init, so [wake] hasn't yet done anything. + When it runs, it will resume us. + We're also not currently cancelled, because we checked above + and cancellations only come from the same thread. *) + Fiber_context.set_cancel_fn ctx (fun ex -> + if cancel request then ( + (* We could set the state to Done here, but there's no need; + we're not racing with anything now. [wake] never runs. *) + enqueue (Error ex) + ) (* else we already got resumed *) + ) + ) else ( + (* State is already Done, but [wake] couldn't wake us then + because we hadn't moved to [waiting]. Resume now. *) + enqueue (Ok ()) + ) + ); + loop_no_mutex t fn diff --git a/lib_eio/condition.mli b/lib_eio/condition.mli index 5ade6dffb..368eec88e 100644 --- a/lib_eio/condition.mli +++ b/lib_eio/condition.mli @@ -60,7 +60,44 @@ val await_no_mutex : t -> unit i.e. you know the condition is still false, and no notification of a change can be sent until [await_no_mutex] has finished suspending the fiber. *) +val loop_no_mutex : t -> (unit -> 'a option) -> 'a +(** [loop_no_mutex t update] runs [update ()] until it returns [Some x], then returns [x]. + + If [update ()] returns [None] then it waits until {!broadcast} is called before retrying. + If {!broadcast} is called while [update] is running, [update] runs again immediately. + + For example, if [broadcast config_changed] is performed after some configuration file is changed, then + you can ensure [load_config] will always eventually have seen the latest configuration like this: + + {[ + Fiber.fork_daemon ~sw (fun () -> + loop_no_mutex config_changed (fun () -> load_config (); None) + ) + ]} + + Note that, since there is no lock, [load_config] may see a half-written update if the configuration + is changed again before it finishes reading it, + so it should just log the error and wait to be called again. *) + val broadcast : t -> unit (** [broadcast t] wakes up any waiting fibers (by appending them to the run-queue to resume later). If no fibers are waiting, nothing happens. *) + +(** {2 Low-level API} + + This is intended only for integrating Eio with other IO libraries. *) + +type request + +val register_immediate : t -> (unit -> unit) -> request +(** [register_immediate t fn] will call [fn ()] the next time {!broadcast} is called. + + [fn] runs immediately from the caller's context, which might not be an Eio thread, or may be a signal handler, etc. + Therefore, care is needed here. This is typically used to send a wake-up event to some non-Eio library. *) + +val cancel : request -> bool +(** [cancel request] tries to cancel a request created with {!register_unsafe}. + + It returns [true] if the request was cancelled (the callback will never be called), + or [false] if the request was already complete (the callback has already been called). *) diff --git a/lib_eio/tests/dscheck/dune b/lib_eio/tests/dscheck/dune index e9fb4fd2b..91923c83a 100644 --- a/lib_eio/tests/dscheck/dune +++ b/lib_eio/tests/dscheck/dune @@ -3,9 +3,11 @@ (copy_files# (files ../../sem_state.ml)) (copy_files# (files ../../sync.ml)) (copy_files# (files ../../unix/rcfd.ml)) +(copy_files# (files ../../condition.ml)) +(copy_files# (files ../../core/broadcast.ml)) (executables - (names test_cells test_semaphore test_sync test_rcfd) + (names test_cells test_semaphore test_sync test_rcfd test_condition) (libraries dscheck optint fmt eio)) (rule @@ -27,3 +29,8 @@ (alias dscheck) (package eio) (action (run %{exe:test_semaphore.exe}))) + +(rule + (alias dscheck) + (package eio) + (action (run %{exe:test_condition.exe}))) diff --git a/lib_eio/tests/dscheck/eio_mutex.ml b/lib_eio/tests/dscheck/eio_mutex.ml new file mode 100644 index 000000000..c6d1b804c --- /dev/null +++ b/lib_eio/tests/dscheck/eio_mutex.ml @@ -0,0 +1,2 @@ +let lock _ = assert false +let unlock _ = assert false diff --git a/lib_eio/tests/dscheck/fake_sched.ml b/lib_eio/tests/dscheck/fake_sched.ml index d240df9c2..7e5a968d4 100644 --- a/lib_eio/tests/dscheck/fake_sched.ml +++ b/lib_eio/tests/dscheck/fake_sched.ml @@ -6,17 +6,16 @@ let run fn = | Ok x -> Effect.Deep.continue k x | Error x -> Effect.Deep.discontinue k x in - let ctx = ref None in + let fiber = lazy (Fiber_context.make_root ()) in Effect.Deep.try_with fn () { effc = fun (type a) (e : a Effect.t) : ((a, 'b) Effect.Deep.continuation -> 'b) option -> match e with | Eio.Private.Effects.Suspend fn -> Some (fun cont -> - assert (!ctx = None); - let c = Fiber_context.make_root () in - fn c (continue_result cont); - ctx := Some (Fiber_context.cancellation_context c) + fn (Lazy.force fiber) (continue_result cont); ) | _ -> None }; - !ctx + if Lazy.is_val fiber then + Some (Fiber_context.cancellation_context (Lazy.force fiber)) + else None diff --git a/lib_eio/tests/dscheck/fake_sched.mli b/lib_eio/tests/dscheck/fake_sched.mli index aebdd3da8..7c125eabf 100644 --- a/lib_eio/tests/dscheck/fake_sched.mli +++ b/lib_eio/tests/dscheck/fake_sched.mli @@ -1,8 +1,7 @@ val run : (unit -> unit) -> Eio.Cancel.t option (** [run fn] runs [fn ()] in a new fiber and returns its context so it can be cancelled. - [fn] may suspend at most once. - If it doesn't suspend then [run] returns [None] after it finishes. *) + Returns None if it never suspended. *) val cancel : Eio.Cancel.t -> unit (** [cancel ctx] cancels the context with a suitable dummy exception. *) diff --git a/lib_eio/tests/dscheck/test_condition.ml b/lib_eio/tests/dscheck/test_condition.ml new file mode 100644 index 000000000..ac9726bd0 --- /dev/null +++ b/lib_eio/tests/dscheck/test_condition.ml @@ -0,0 +1,46 @@ +let debug = false + +exception Abort + +module T = Condition + +(* [prod] threads increment a counter and notify a condition. + A consumer watches the condition and waits until it has seen + all of them. We check that the client always sees the final value. + If [cancel] is set, we also try to cancel the client and accept + that as success too. *) +let test ~prod ~cancel () = + let t = T.create () in + let sent = Atomic.make 0 in + for _ = 1 to prod do + Atomic.spawn (fun () -> + Atomic.incr sent; + T.broadcast t + ) + done; + let finished = ref false in + Atomic.spawn (fun () -> + let ctx = + Fake_sched.run @@ fun () -> + try + T.loop_no_mutex t (fun () -> + if Atomic.get sent = prod && not cancel then Some () + else None + ); + finished := true + with T.Cancel.Cancelled Abort -> + finished := true + in + if cancel then + Option.iter (fun c -> T.Cancel.cancel c Abort) ctx + ); + Atomic.final (fun () -> + Atomic.check (fun () -> !finished); + if debug then ( + Fmt.pr "%a@." Broadcast.dump t; + ); + ) + +let () = + Atomic.trace (test ~prod:2 ~cancel:false); + Atomic.trace (test ~prod:2 ~cancel:true) diff --git a/tests/condition.md b/tests/condition.md index 46ae1040b..8cb5fdfe1 100644 --- a/tests/condition.md +++ b/tests/condition.md @@ -237,3 +237,81 @@ Cancellation while waiting: +Forked fiber unlocking Exception: Failure "Simulated error". ``` + +### Looping + +```ocaml +# Eio_mock.Backend.run @@ fun () -> + let cond = Eio.Condition.create () in + let x = ref 0 in + let set v = + traceln "setting x=%d" v; + x := v; Eio.Condition.broadcast cond + in + Fiber.both + (fun () -> + Eio.Condition.loop_no_mutex cond (fun () -> + traceln "Checking x..."; + Fiber.yield (); + let seen = !x in + traceln "Saw x = %d" seen; + if seen = 3 then (traceln "Finished"; Some ()) + else None + ) + ) + (fun () -> + set 1; Fiber.yield (); + set 2; Fiber.yield (); + set 3; Fiber.yield (); + set 4; Fiber.yield (); + );; ++Checking x... ++setting x=1 ++Saw x = 1 ++setting x=2 ++Checking x... ++setting x=3 ++Saw x = 3 ++Finished ++setting x=4 +- : unit = () +``` + +Cancelling: + +```ocaml +# Eio_mock.Backend.run @@ fun () -> + let cond = Eio.Condition.create () in + Fiber.both + (fun () -> Eio.Condition.loop_no_mutex cond (fun () -> traceln "Checking"; None)) + (fun () -> failwith "Simulated error");; ++Checking +Exception: Failure "Simulated error". +``` + +Cancelling after succeeding: + +```ocaml +# Eio_mock.Backend.run @@ fun () -> + let cond = Eio.Condition.create () in + Fiber.both + (fun () -> Eio.Condition.loop_no_mutex cond (fun () -> traceln "Checking"; None)) + (fun () -> + traceln "Broadcasting"; + Eio.Condition.broadcast cond; + failwith "Simulated error" + );; ++Checking ++Broadcasting ++Checking +Exception: Failure "Simulated error". +``` + +User function raises: + +```ocaml +# Eio_mock.Backend.run @@ fun () -> + let cond = Eio.Condition.create () in + Eio.Condition.loop_no_mutex cond (fun () -> Fiber.yield (); failwith "Simulated failure");; +Exception: Failure "Simulated failure". +```