Skip to content

Commit

Permalink
Move Waiters and Hook out of Eio_core
Browse files Browse the repository at this point in the history
Nothing there needs them now.
  • Loading branch information
talex5 committed Jun 26, 2023
1 parent 35b93c0 commit d063513
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 70 deletions.
1 change: 0 additions & 1 deletion lib_eio/core/eio__core.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ module Private = struct
module Suspend = Suspend
module Cells = Cells
module Broadcast = Broadcast
module Waiters = Waiters
module Ctf = Ctf
module Fiber_context = Cancel.Fiber_context
module Debug = Debug
Expand Down
45 changes: 0 additions & 45 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -706,51 +706,6 @@ module Private : sig
because you need to unlock a mutex if cancelled). *)
end

(** A queue of fibers waiting for an event. *)
module Waiters : sig
type 'a t
(* A queue of fibers waiting for something.
Note: an [_ t] is not thread-safe itself.
To use share it between domains, the user is responsible for wrapping it in a mutex. *)

val create : unit -> 'a t

val wake_all : 'a t -> 'a -> unit
(** [wake_all t] calls (and removes) all the functions waiting on [t].
If [t] is shared between domains, the caller must hold the mutex while calling this. *)

val wake_one : 'a t -> 'a -> [`Ok | `Queue_empty]
(** [wake_one t] is like {!wake_all}, but only calls (and removes) the first waiter in the queue.
If [t] is shared between domains, the caller must hold the mutex while calling this. *)

val is_empty : 'a t -> bool
(** [is_empty t] checks whether there are any functions waiting on [t].
If [t] is shared between domains, the caller must hold the mutex while calling this,
and the result is valid until the mutex is released. *)

val await :
mutex:Mutex.t option ->
'a t -> Ctf.id -> 'a
(** [await ~mutex t id] suspends the current fiber and adds its continuation to [t].
When the waiter is woken, the fiber is resumed and returns the result.
If [t] can be used from multiple domains:
- [mutex] must be set to the mutex to use to unlock it.
- [mutex] must be already held when calling this function, which will unlock it before blocking.
When [await] returns, [mutex] will have been unlocked.
@raise Cancel.Cancelled if the fiber's context is cancelled *)

val await_internal :
mutex:Mutex.t option ->
'a t -> Ctf.id -> Fiber_context.t ->
(('a, exn) result -> unit) -> unit
(** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber.
This also allows wrapping the [enqueue] function.
Calls [enqueue (Error (Cancelled _))] if cancelled.
Note: [enqueue] is called from the triggering domain,
which is currently calling {!wake_one} or {!wake_all}
and must therefore be holding [mutex]. *)
end

module Debug : sig
val traceln :
?__POS__:string * int * int * int ->
Expand Down
20 changes: 0 additions & 20 deletions lib_eio/core/waiters.mli

This file was deleted.

File renamed without changes.
8 changes: 4 additions & 4 deletions lib_eio/core/waiters.ml → lib_eio/waiters.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ let rec wake_one t v =

let is_empty = Lwt_dllist.is_empty

let await_internal ~mutex (t:'a t) id (ctx:Cancel.fiber_context) enqueue =
match Cancel.Fiber_context.get_error ctx with
let await_internal ~mutex (t:'a t) id ctx enqueue =
match Fiber_context.get_error ctx with
| Some ex ->
Option.iter Mutex.unlock mutex;
enqueue (Error ex)
| None ->
let resolved_waiter = ref Hook.null in
let finished = Atomic.make false in
let enqueue x =
Ctf.note_read ~reader:id ctx.tid;
Ctf.note_read ~reader:id (Fiber_context.tid ctx);
enqueue x
in
let cancel ex =
Expand All @@ -56,7 +56,7 @@ let await_internal ~mutex (t:'a t) id (ctx:Cancel.fiber_context) enqueue =
enqueue (Error ex)
)
in
Cancel.Fiber_context.set_cancel_fn ctx cancel;
Fiber_context.set_cancel_fn ctx cancel;
let waiter = { enqueue; finished } in
match mutex with
| None ->
Expand Down
42 changes: 42 additions & 0 deletions lib_eio/waiters.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
(** A queue of fibers waiting for an event. *)
type 'a t
(* A queue of fibers waiting for something.
Note: an [_ t] is not thread-safe itself.
To use share it between domains, the user is responsible for wrapping it in a mutex. *)

val create : unit -> 'a t

val wake_all : 'a t -> 'a -> unit
(** [wake_all t] calls (and removes) all the functions waiting on [t].
If [t] is shared between domains, the caller must hold the mutex while calling this. *)

val wake_one : 'a t -> 'a -> [`Ok | `Queue_empty]
(** [wake_one t] is like {!wake_all}, but only calls (and removes) the first waiter in the queue.
If [t] is shared between domains, the caller must hold the mutex while calling this. *)

val is_empty : 'a t -> bool
(** [is_empty t] checks whether there are any functions waiting on [t].
If [t] is shared between domains, the caller must hold the mutex while calling this,
and the result is valid until the mutex is released. *)

val await :
mutex:Mutex.t option ->
'a t -> Ctf.id -> 'a
(** [await ~mutex t id] suspends the current fiber and adds its continuation to [t].
When the waiter is woken, the fiber is resumed and returns the result.
If [t] can be used from multiple domains:
- [mutex] must be set to the mutex to use to unlock it.
- [mutex] must be already held when calling this function, which will unlock it before blocking.
When [await] returns, [mutex] will have been unlocked.
@raise Cancel.Cancelled if the fiber's context is cancelled *)

val await_internal :
mutex:Mutex.t option ->
'a t -> Ctf.id -> Fiber_context.t ->
(('a, exn) result -> unit) -> unit
(** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber.
This also allows wrapping the [enqueue] function.
Calls [enqueue (Error (Cancelled _))] if cancelled.
Note: [enqueue] is called from the triggering domain,
which is currently calling {!wake_one} or {!wake_all}
and must therefore be holding [mutex]. *)

0 comments on commit d063513

Please sign in to comment.