Skip to content

Commit

Permalink
eio_linux: split flow into its own file
Browse files Browse the repository at this point in the history
  • Loading branch information
talex5 committed May 10, 2024
1 parent bd2c92e commit 3c93b44
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 168 deletions.
182 changes: 14 additions & 168 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,82 +38,6 @@ let get_dir_fd_opt (Eio.Resource.T (t, ops)) =
| Some f -> Some (f t)
| None -> None

(* When copying between a source with an FD and a sink with an FD, we can share the chunk
and avoid copying. *)
let fast_copy src dst =
let fallback () =
(* No chunks available. Use regular memory instead. *)
let buf = Cstruct.create 4096 in
try
while true do
let got = Low_level.readv src [buf] in
Low_level.writev dst [Cstruct.sub buf 0 got]
done
with End_of_file -> ()
in
Low_level.with_chunk ~fallback @@ fun chunk ->
let chunk_size = Uring.Region.length chunk in
try
while true do
let got = Low_level.read_upto src chunk chunk_size in
Low_level.write dst chunk got
done
with End_of_file -> ()

(* Try a fast copy using splice. If the FDs don't support that, switch to copying. *)
let _fast_copy_try_splice src dst =
try
while true do
let _ : int = Low_level.splice src ~dst ~len:max_int in
()
done
with
| End_of_file -> ()
| Eio.Exn.Io (Eio.Exn.X Eio_unix.Unix_error ((EAGAIN | EINVAL), "splice", _), _) -> fast_copy src dst

(* XXX workaround for issue #319, PR #327 *)
let fast_copy_try_splice src dst = fast_copy src dst

let[@tail_mod_cons] rec list_take n = function
| [] -> []
| x :: xs ->
if n = 0 then []
else x :: list_take (n - 1) xs

let truncate_to_iomax xs =
if List.compare_length_with xs Uring.iov_max <= 0 then xs
else list_take Uring.iov_max xs

(* Copy using the [Read_source_buffer] optimisation.
Avoids a copy if the source already has the data. *)
let copy_with_rsb rsb dst =
let write xs = Low_level.writev_single dst (truncate_to_iomax xs) in
try
while true do rsb write done
with End_of_file -> ()

(* Copy by allocating a chunk from the pre-shared buffer and asking
the source to write into it. This used when the other methods
aren't available. *)
let fallback_copy (type src) (module Src : Eio.Flow.Pi.SOURCE with type t = src) src dst =
let fallback () =
(* No chunks available. Use regular memory instead. *)
let buf = Cstruct.create 4096 in
try
while true do
let got = Src.single_read src buf in
Low_level.writev dst [Cstruct.sub buf 0 got]
done
with End_of_file -> ()
in
Low_level.with_chunk ~fallback @@ fun chunk ->
let chunk_cs = Uring.Region.to_cstruct chunk in
try
while true do
let got = Src.single_read src chunk_cs in
Low_level.write dst chunk got
done
with End_of_file -> ()

module Datagram_socket = struct
type tag = [`Generic | `Unix]
Expand Down Expand Up @@ -145,71 +69,6 @@ let datagram_handler = Eio_unix.Pi.datagram_handler (module Datagram_socket)
let datagram_socket fd =
Eio.Resource.T (fd, datagram_handler)

module Flow = struct
type tag = [`Generic | `Unix]

type t = Eio_unix.Fd.t

let fd t = t

let close = Eio_unix.Fd.close

let stat = Low_level.fstat

let single_read t buf =
Low_level.readv t [buf]

let pread t ~file_offset bufs =
Low_level.readv ~file_offset t bufs

let pwrite t ~file_offset bufs =
Low_level.writev_single ~file_offset t (truncate_to_iomax bufs)

let read_methods = []

let single_write t bufs = Low_level.writev_single t (truncate_to_iomax bufs)

let copy t ~src =
match Eio_unix.Resource.fd_opt src with
| Some src -> fast_copy_try_splice src t
| None ->
let Eio.Resource.T (src, ops) = src in
let module Src = (val (Eio.Resource.get ops Eio.Flow.Pi.Source)) in
let rec aux = function
| Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb (rsb src) t
| _ :: xs -> aux xs
| [] -> fallback_copy (module Src) src t
in
aux Src.read_methods

let shutdown t cmd =
Low_level.shutdown t @@ match cmd with
| `Receive -> Unix.SHUTDOWN_RECEIVE
| `Send -> Unix.SHUTDOWN_SEND
| `All -> Unix.SHUTDOWN_ALL

let send_msg t ~fds data =
Low_level.send_msg t ~fds data

let recv_msg_with_fds t ~sw ~max_fds data =
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds data in
n, fds

let seek = Low_level.lseek
let sync = Low_level.fsync
let truncate = Low_level.ftruncate
end

let flow_handler = Eio_unix.Pi.flow_handler (module Flow)

let flow fd =
let r = Eio.Resource.T (fd, flow_handler) in
(r : [`Unix_fd | Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r :>
[< `Unix_fd | Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r)

let source fd = (flow fd :> _ Eio_unix.source)
let sink fd = (flow fd :> _ Eio_unix.sink)

module Listening_socket = struct
type t = Fd.t

Expand All @@ -226,7 +85,7 @@ module Listening_socket = struct
| Unix.ADDR_UNIX path -> `Unix path
| Unix.ADDR_INET (host, port) -> `Tcp (Eio_unix.Net.Ipaddr.of_unix host, port)
in
let flow = (flow client :> _ Eio.Net.stream_socket) in
let flow = (Flow.of_fd client :> _ Eio.Net.stream_socket) in
flow, client_addr

let listening_addr fd =
Expand Down Expand Up @@ -254,7 +113,7 @@ let connect ~sw connect_addr =
let sock_unix = Unix.socket ~cloexec:true (socket_domain_of connect_addr) Unix.SOCK_STREAM 0 in
let sock = Fd.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in
Low_level.connect sock addr;
(flow sock :> _ Eio_unix.Net.stream_socket)
(Flow.of_fd sock :> _ Eio_unix.Net.stream_socket)

module Impl = struct
type t = unit
Expand Down Expand Up @@ -488,7 +347,7 @@ end = struct
~flags:Uring.Open_flags.cloexec
~perm:0
in
(flow fd :> Eio.File.ro_ty r)
(Flow.of_fd fd :> Eio.File.ro_ty r)

let open_out t ~sw ~append ~create path =
let perm, flags =
Expand All @@ -504,7 +363,7 @@ end = struct
~flags:Uring.Open_flags.(cloexec + flags)
~perm
in
(flow fd :> Eio.File.rw_ty r)
(Flow.of_fd fd :> Eio.File.rw_ty r)

let native_internal t path =
if Filename.is_relative path then (
Expand Down Expand Up @@ -585,7 +444,7 @@ end = struct
~flags:Uring.Open_flags.(cloexec + path + (if follow then empty else nofollow))
~perm:0
in
Flow.stat fd
Low_level.fstat fd
)

let rename t old_path t2 new_path =
Expand Down Expand Up @@ -615,34 +474,21 @@ end

let dir ~label ~path fd = Eio.Resource.T (Dir.v ~label ~path fd, Dir_handler.v)

module Secure_random = struct
type t = unit
let single_read () buf = Low_level.getrandom buf; Cstruct.length buf
let read_methods = []
end

let secure_random =
let ops = Eio.Flow.Pi.source (module Secure_random) in
Eio.Resource.T ((), ops)

let stdenv ~run_event_loop =
let stdin = source Eio_unix.Fd.stdin in
let stdout = sink Eio_unix.Fd.stdout in
let stderr = sink Eio_unix.Fd.stderr in
let fs = (dir ~label:"fs" ~path:"" Fs, "") in
let cwd = (dir ~label:"cwd" ~path:"" Cwd, "") in
object (_ : stdenv)
method stdin = stdin
method stdout = stdout
method stderr = stderr
method stdin = Flow.stdin
method stdout = Flow.stdout
method stderr = Flow.stderr
method net = net
method process_mgr = process_mgr
method domain_mgr = domain_mgr ~run_event_loop
method clock = clock
method mono_clock = mono_clock
method fs = (fs :> Eio.Fs.dir_ty Eio.Path.t)
method cwd = (cwd :> Eio.Fs.dir_ty Eio.Path.t)
method secure_random = secure_random
method secure_random = Flow.secure_random
method debug = Eio.Private.Debug.v
method backend_id = "linux"
end
Expand All @@ -656,7 +502,7 @@ let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a =
| Eio_unix.Private.Get_monotonic_clock -> Some (fun k -> continue k mono_clock)
| Eio_unix.Net.Import_socket_stream (sw, close_unix, fd) -> Some (fun k ->
let fd = Fd.of_unix ~sw ~seekable:false ~close_unix fd in
continue k (flow fd :> _ Eio_unix.Net.stream_socket)
continue k (Flow.of_fd fd :> _ Eio_unix.Net.stream_socket)
)
| Eio_unix.Net.Import_socket_datagram (sw, close_unix, fd) -> Some (fun k ->
let fd = Fd.of_unix ~sw ~seekable:false ~close_unix fd in
Expand All @@ -665,8 +511,8 @@ let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a =
| Eio_unix.Net.Socketpair_stream (sw, domain, protocol) -> Some (fun k ->
match
let a, b = Unix.socketpair ~cloexec:true domain Unix.SOCK_STREAM protocol in
let a = Fd.of_unix ~sw ~seekable:false ~close_unix:true a |> flow in
let b = Fd.of_unix ~sw ~seekable:false ~close_unix:true b |> flow in
let a = Fd.of_unix ~sw ~seekable:false ~close_unix:true a |> Flow.of_fd in
let b = Fd.of_unix ~sw ~seekable:false ~close_unix:true b |> Flow.of_fd in
((a :> _ Eio_unix.Net.stream_socket), (b :> _ Eio_unix.Net.stream_socket))
with
| r -> continue k r
Expand All @@ -687,8 +533,8 @@ let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a =
| Eio_unix.Private.Pipe sw -> Some (fun k ->
match
let r, w = Low_level.pipe ~sw in
let r = (flow r :> _ Eio_unix.source) in
let w = (flow w :> _ Eio_unix.sink) in
let r = (Flow.of_fd r :> _ Eio_unix.source) in
let w = (Flow.of_fd w :> _ Eio_unix.sink) in
(r, w)
with
| r -> continue k r
Expand Down
Loading

0 comments on commit 3c93b44

Please sign in to comment.