diff --git a/lib_eio/core/eio__core.ml b/lib_eio/core/eio__core.ml index fc7e072c9..daf97efdc 100644 --- a/lib_eio/core/eio__core.ml +++ b/lib_eio/core/eio__core.ml @@ -7,7 +7,6 @@ module Private = struct module Suspend = Suspend module Cells = Cells module Broadcast = Broadcast - module Waiters = Waiters module Ctf = Ctf module Fiber_context = Cancel.Fiber_context module Debug = Debug diff --git a/lib_eio/core/eio__core.mli b/lib_eio/core/eio__core.mli index b69ec3b0c..830680a86 100644 --- a/lib_eio/core/eio__core.mli +++ b/lib_eio/core/eio__core.mli @@ -706,51 +706,6 @@ module Private : sig because you need to unlock a mutex if cancelled). *) end - (** A queue of fibers waiting for an event. *) - module Waiters : sig - type 'a t - (* A queue of fibers waiting for something. - Note: an [_ t] is not thread-safe itself. - To use share it between domains, the user is responsible for wrapping it in a mutex. *) - - val create : unit -> 'a t - - val wake_all : 'a t -> 'a -> unit - (** [wake_all t] calls (and removes) all the functions waiting on [t]. - If [t] is shared between domains, the caller must hold the mutex while calling this. *) - - val wake_one : 'a t -> 'a -> [`Ok | `Queue_empty] - (** [wake_one t] is like {!wake_all}, but only calls (and removes) the first waiter in the queue. - If [t] is shared between domains, the caller must hold the mutex while calling this. *) - - val is_empty : 'a t -> bool - (** [is_empty t] checks whether there are any functions waiting on [t]. - If [t] is shared between domains, the caller must hold the mutex while calling this, - and the result is valid until the mutex is released. *) - - val await : - mutex:Mutex.t option -> - 'a t -> Ctf.id -> 'a - (** [await ~mutex t id] suspends the current fiber and adds its continuation to [t]. - When the waiter is woken, the fiber is resumed and returns the result. - If [t] can be used from multiple domains: - - [mutex] must be set to the mutex to use to unlock it. - - [mutex] must be already held when calling this function, which will unlock it before blocking. - When [await] returns, [mutex] will have been unlocked. - @raise Cancel.Cancelled if the fiber's context is cancelled *) - - val await_internal : - mutex:Mutex.t option -> - 'a t -> Ctf.id -> Fiber_context.t -> - (('a, exn) result -> unit) -> unit - (** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber. - This also allows wrapping the [enqueue] function. - Calls [enqueue (Error (Cancelled _))] if cancelled. - Note: [enqueue] is called from the triggering domain, - which is currently calling {!wake_one} or {!wake_all} - and must therefore be holding [mutex]. *) - end - module Debug : sig val traceln : ?__POS__:string * int * int * int -> diff --git a/lib_eio/core/waiters.mli b/lib_eio/core/waiters.mli deleted file mode 100644 index 1197439a4..000000000 --- a/lib_eio/core/waiters.mli +++ /dev/null @@ -1,20 +0,0 @@ -(* See [eio__core.mli] for details. *) - -type 'a t - -val create : unit -> 'a t - -val wake_all : 'a t -> 'a -> unit - -val wake_one : 'a t -> 'a -> [`Ok | `Queue_empty] - -val is_empty : 'a t -> bool - -val await : - mutex:Mutex.t option -> - 'a t -> Ctf.id -> 'a - -val await_internal : - mutex:Mutex.t option -> - 'a t -> Ctf.id -> Cancel.fiber_context -> - (('a, exn) result -> unit) -> unit diff --git a/lib_eio/core/hook.ml b/lib_eio/hook.ml similarity index 100% rename from lib_eio/core/hook.ml rename to lib_eio/hook.ml diff --git a/lib_eio/core/waiters.ml b/lib_eio/waiters.ml similarity index 89% rename from lib_eio/core/waiters.ml rename to lib_eio/waiters.ml index a73201fef..c0cbd4624 100644 --- a/lib_eio/core/waiters.ml +++ b/lib_eio/waiters.ml @@ -38,8 +38,8 @@ let rec wake_one t v = let is_empty = Lwt_dllist.is_empty -let await_internal ~mutex (t:'a t) id (ctx:Cancel.fiber_context) enqueue = - match Cancel.Fiber_context.get_error ctx with +let await_internal ~mutex (t:'a t) id ctx enqueue = + match Fiber_context.get_error ctx with | Some ex -> Option.iter Mutex.unlock mutex; enqueue (Error ex) @@ -47,7 +47,7 @@ let await_internal ~mutex (t:'a t) id (ctx:Cancel.fiber_context) enqueue = let resolved_waiter = ref Hook.null in let finished = Atomic.make false in let enqueue x = - Ctf.note_read ~reader:id ctx.tid; + Ctf.note_read ~reader:id (Fiber_context.tid ctx); enqueue x in let cancel ex = @@ -56,7 +56,7 @@ let await_internal ~mutex (t:'a t) id (ctx:Cancel.fiber_context) enqueue = enqueue (Error ex) ) in - Cancel.Fiber_context.set_cancel_fn ctx cancel; + Fiber_context.set_cancel_fn ctx cancel; let waiter = { enqueue; finished } in match mutex with | None -> diff --git a/lib_eio/waiters.mli b/lib_eio/waiters.mli new file mode 100644 index 000000000..724cf96e7 --- /dev/null +++ b/lib_eio/waiters.mli @@ -0,0 +1,42 @@ +(** A queue of fibers waiting for an event. *) +type 'a t +(* A queue of fibers waiting for something. + Note: an [_ t] is not thread-safe itself. + To use share it between domains, the user is responsible for wrapping it in a mutex. *) + +val create : unit -> 'a t + +val wake_all : 'a t -> 'a -> unit +(** [wake_all t] calls (and removes) all the functions waiting on [t]. + If [t] is shared between domains, the caller must hold the mutex while calling this. *) + +val wake_one : 'a t -> 'a -> [`Ok | `Queue_empty] +(** [wake_one t] is like {!wake_all}, but only calls (and removes) the first waiter in the queue. + If [t] is shared between domains, the caller must hold the mutex while calling this. *) + +val is_empty : 'a t -> bool +(** [is_empty t] checks whether there are any functions waiting on [t]. + If [t] is shared between domains, the caller must hold the mutex while calling this, + and the result is valid until the mutex is released. *) + +val await : + mutex:Mutex.t option -> + 'a t -> Ctf.id -> 'a +(** [await ~mutex t id] suspends the current fiber and adds its continuation to [t]. + When the waiter is woken, the fiber is resumed and returns the result. + If [t] can be used from multiple domains: + - [mutex] must be set to the mutex to use to unlock it. + - [mutex] must be already held when calling this function, which will unlock it before blocking. + When [await] returns, [mutex] will have been unlocked. + @raise Cancel.Cancelled if the fiber's context is cancelled *) + +val await_internal : + mutex:Mutex.t option -> + 'a t -> Ctf.id -> Fiber_context.t -> + (('a, exn) result -> unit) -> unit +(** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber. + This also allows wrapping the [enqueue] function. + Calls [enqueue (Error (Cancelled _))] if cancelled. + Note: [enqueue] is called from the triggering domain, + which is currently calling {!wake_one} or {!wake_all} + and must therefore be holding [mutex]. *)