Skip to content

Commit

Permalink
eio_linux: refactor fixed buffer code
Browse files Browse the repository at this point in the history
Instead of having separate Alloc, Alloc_or_wait and Free effects,
the scheduler now provides a single Get effect to return itself,
and the actual work is now done in the calling fiber. This is cleaner,
and seems to be slightly faster too.

Note that `alloc_fixed_or_wait` is currently not cancellable (it wasn't
before either, but it's more obvious now).

It would be possible to use DLS to store the scheduler rather than using
an effect. However, the improvement in speed is minimal and there are
some complications with sys-threads, so probably better to wait for
OCaml to support thread-local-storage first.
  • Loading branch information
talex5 committed Sep 4, 2024
1 parent d47b5e2 commit cc2cd3d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 41 deletions.
1 change: 1 addition & 0 deletions lib_eio/core/eio__core.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Private = struct
module Suspend = Suspend
module Cells = Cells
module Broadcast = Broadcast
module Single_waiter = Single_waiter
module Trace = Trace
module Fiber_context = Cancel.Fiber_context
module Debug = Debug
Expand Down
1 change: 1 addition & 0 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ module Private : sig

module Cells = Cells
module Broadcast = Broadcast
module Single_waiter = Single_waiter

(** Every fiber has an associated context. *)
module Fiber_context : sig
Expand Down
33 changes: 28 additions & 5 deletions lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,34 @@ let write ?file_offset:off fd buf len =
raise @@ Err.wrap (Uring.error_of_errno res) "write" ""
)

let alloc_fixed () = Effect.perform Sched.Alloc

let alloc_fixed_or_wait () = Effect.perform Sched.Alloc_or_wait

let free_fixed buf = Effect.perform (Sched.Free buf)
let alloc_fixed () =
let s = Sched.get () in
match s.mem with
| None -> None
| Some mem ->
match Uring.Region.alloc mem with
| buf -> Some buf
| exception Uring.Region.No_space -> None

let alloc_fixed_or_wait () =
let s = Sched.get () in
match s.mem with
| None -> failwith "No fixed buffer available"
| Some mem ->
match Uring.Region.alloc mem with
| buf -> buf
| exception Uring.Region.No_space ->
let id = Eio.Private.Trace.mint_id () in
let trigger = Eio.Private.Single_waiter.create () in
Queue.push trigger s.mem_q;
(* todo: remove protect; but needs to remove from queue on cancel *)
Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id

let free_fixed buf =
let s = Sched.get () in
match Queue.take_opt s.mem_q with
| None -> Uring.Region.free buf
| Some k -> Eio.Private.Single_waiter.wake k (Ok buf)

let splice src ~dst ~len =
Fd.use_exn "splice-src" src @@ fun src ->
Expand Down
42 changes: 6 additions & 36 deletions lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type t = {
uring: io_job Uring.t;
mem: Uring.Region.t option;
io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *)
mem_q : Uring.Region.chunk Suspended.t Queue.t;
mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t;

(* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *)
run_q : runnable Lf_queue.t;
Expand All @@ -74,9 +74,9 @@ type t = {
type _ Effect.t +=
| Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t
| Cancel : io_job Uring.job -> unit Effect.t
| Alloc : Uring.Region.chunk option Effect.t
| Alloc_or_wait : Uring.Region.chunk Effect.t
| Free : Uring.Region.chunk -> unit Effect.t
| Get : t Effect.t

let get () = Effect.perform Get

let wake_buffer =
let b = Bytes.create 8 in
Expand Down Expand Up @@ -339,21 +339,6 @@ and complete_rw_req st ({len; cur_off; action; _} as req) res =
| _, Exactly len -> Suspended.continue action len
| n, Upto _ -> Suspended.continue action n

let alloc_buf_or_wait st k =
match st.mem with
| None -> Suspended.discontinue k (Failure "No fixed buffer available")
| Some mem ->
match Uring.Region.alloc mem with
| buf -> Suspended.continue k buf
| exception Uring.Region.No_space ->
Queue.push k st.mem_q;
schedule st

let free_buf st buf =
match Queue.take_opt st.mem_q with
| None -> Uring.Region.free buf
| Some k -> enqueue_thread st k buf

let rec enqueue_poll_add fd poll_mask st action =
Trace.log "poll_add";
let retry = with_cancel_hook ~action st (fun () ->
Expand Down Expand Up @@ -411,8 +396,9 @@ let run ~extra_effects st main arg =
Fiber_context.destroy fiber;
Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ())
);
effc = fun (type a) (e : a Effect.t) ->
effc = fun (type a) (e : a Effect.t) : ((a, _) continuation -> _) option ->
match e with
| Get -> Some (fun k -> continue k st)
| Enter fn -> Some (fun k ->
match Fiber_context.get_error fiber with
| Some e -> discontinue k e
Expand Down Expand Up @@ -467,22 +453,6 @@ let run ~extra_effects st main arg =
Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn;
schedule st
)
| Alloc -> Some (fun k ->
match st.mem with
| None -> continue k None
| Some mem ->
match Uring.Region.alloc mem with
| buf -> continue k (Some buf)
| exception Uring.Region.No_space -> continue k None
)
| Alloc_or_wait -> Some (fun k ->
let k = { Suspended.k; fiber } in
alloc_buf_or_wait st k
)
| Free buf -> Some (fun k ->
free_buf st buf;
continue k ()
)
| e -> extra_effects.effc e
}
in
Expand Down

0 comments on commit cc2cd3d

Please sign in to comment.