Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eio_linux: split flow into its own file #727

Merged
merged 1 commit into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 14 additions & 168 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,82 +38,6 @@ let get_dir_fd_opt (Eio.Resource.T (t, ops)) =
| Some f -> Some (f t)
| None -> None

(* When copying between a source with an FD and a sink with an FD, we can share the chunk
and avoid copying. *)
let fast_copy src dst =
let fallback () =
(* No chunks available. Use regular memory instead. *)
let buf = Cstruct.create 4096 in
try
while true do
let got = Low_level.readv src [buf] in
Low_level.writev dst [Cstruct.sub buf 0 got]
done
with End_of_file -> ()
in
Low_level.with_chunk ~fallback @@ fun chunk ->
let chunk_size = Uring.Region.length chunk in
try
while true do
let got = Low_level.read_upto src chunk chunk_size in
Low_level.write dst chunk got
done
with End_of_file -> ()

(* Try a fast copy using splice. If the FDs don't support that, switch to copying. *)
let _fast_copy_try_splice src dst =
try
while true do
let _ : int = Low_level.splice src ~dst ~len:max_int in
()
done
with
| End_of_file -> ()
| Eio.Exn.Io (Eio.Exn.X Eio_unix.Unix_error ((EAGAIN | EINVAL), "splice", _), _) -> fast_copy src dst

(* XXX workaround for issue #319, PR #327 *)
let fast_copy_try_splice src dst = fast_copy src dst

let[@tail_mod_cons] rec list_take n = function
| [] -> []
| x :: xs ->
if n = 0 then []
else x :: list_take (n - 1) xs

let truncate_to_iomax xs =
if List.compare_length_with xs Uring.iov_max <= 0 then xs
else list_take Uring.iov_max xs

(* Copy using the [Read_source_buffer] optimisation.
Avoids a copy if the source already has the data. *)
let copy_with_rsb rsb dst =
let write xs = Low_level.writev_single dst (truncate_to_iomax xs) in
try
while true do rsb write done
with End_of_file -> ()

(* Copy by allocating a chunk from the pre-shared buffer and asking
the source to write into it. This used when the other methods
aren't available. *)
let fallback_copy (type src) (module Src : Eio.Flow.Pi.SOURCE with type t = src) src dst =
let fallback () =
(* No chunks available. Use regular memory instead. *)
let buf = Cstruct.create 4096 in
try
while true do
let got = Src.single_read src buf in
Low_level.writev dst [Cstruct.sub buf 0 got]
done
with End_of_file -> ()
in
Low_level.with_chunk ~fallback @@ fun chunk ->
let chunk_cs = Uring.Region.to_cstruct chunk in
try
while true do
let got = Src.single_read src chunk_cs in
Low_level.write dst chunk got
done
with End_of_file -> ()

module Datagram_socket = struct
type tag = [`Generic | `Unix]
Expand Down Expand Up @@ -145,71 +69,6 @@ let datagram_handler = Eio_unix.Pi.datagram_handler (module Datagram_socket)
let datagram_socket fd =
Eio.Resource.T (fd, datagram_handler)

module Flow = struct
type tag = [`Generic | `Unix]

type t = Eio_unix.Fd.t

let fd t = t

let close = Eio_unix.Fd.close

let stat = Low_level.fstat

let single_read t buf =
Low_level.readv t [buf]

let pread t ~file_offset bufs =
Low_level.readv ~file_offset t bufs

let pwrite t ~file_offset bufs =
Low_level.writev_single ~file_offset t (truncate_to_iomax bufs)

let read_methods = []

let single_write t bufs = Low_level.writev_single t (truncate_to_iomax bufs)

let copy t ~src =
match Eio_unix.Resource.fd_opt src with
| Some src -> fast_copy_try_splice src t
| None ->
let Eio.Resource.T (src, ops) = src in
let module Src = (val (Eio.Resource.get ops Eio.Flow.Pi.Source)) in
let rec aux = function
| Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb (rsb src) t
| _ :: xs -> aux xs
| [] -> fallback_copy (module Src) src t
in
aux Src.read_methods

let shutdown t cmd =
Low_level.shutdown t @@ match cmd with
| `Receive -> Unix.SHUTDOWN_RECEIVE
| `Send -> Unix.SHUTDOWN_SEND
| `All -> Unix.SHUTDOWN_ALL

let send_msg t ~fds data =
Low_level.send_msg t ~fds data

let recv_msg_with_fds t ~sw ~max_fds data =
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds data in
n, fds

let seek = Low_level.lseek
let sync = Low_level.fsync
let truncate = Low_level.ftruncate
end

let flow_handler = Eio_unix.Pi.flow_handler (module Flow)

let flow fd =
let r = Eio.Resource.T (fd, flow_handler) in
(r : [`Unix_fd | Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r :>
[< `Unix_fd | Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r)

let source fd = (flow fd :> _ Eio_unix.source)
let sink fd = (flow fd :> _ Eio_unix.sink)

module Listening_socket = struct
type t = Fd.t

Expand All @@ -226,7 +85,7 @@ module Listening_socket = struct
| Unix.ADDR_UNIX path -> `Unix path
| Unix.ADDR_INET (host, port) -> `Tcp (Eio_unix.Net.Ipaddr.of_unix host, port)
in
let flow = (flow client :> _ Eio.Net.stream_socket) in
let flow = (Flow.of_fd client :> _ Eio.Net.stream_socket) in
flow, client_addr

let listening_addr fd =
Expand Down Expand Up @@ -254,7 +113,7 @@ let connect ~sw connect_addr =
let sock_unix = Unix.socket ~cloexec:true (socket_domain_of connect_addr) Unix.SOCK_STREAM 0 in
let sock = Fd.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in
Low_level.connect sock addr;
(flow sock :> _ Eio_unix.Net.stream_socket)
(Flow.of_fd sock :> _ Eio_unix.Net.stream_socket)

module Impl = struct
type t = unit
Expand Down Expand Up @@ -488,7 +347,7 @@ end = struct
~flags:Uring.Open_flags.cloexec
~perm:0
in
(flow fd :> Eio.File.ro_ty r)
(Flow.of_fd fd :> Eio.File.ro_ty r)

let open_out t ~sw ~append ~create path =
let perm, flags =
Expand All @@ -504,7 +363,7 @@ end = struct
~flags:Uring.Open_flags.(cloexec + flags)
~perm
in
(flow fd :> Eio.File.rw_ty r)
(Flow.of_fd fd :> Eio.File.rw_ty r)

let native_internal t path =
if Filename.is_relative path then (
Expand Down Expand Up @@ -585,7 +444,7 @@ end = struct
~flags:Uring.Open_flags.(cloexec + path + (if follow then empty else nofollow))
~perm:0
in
Flow.stat fd
Low_level.fstat fd
)

let rename t old_path t2 new_path =
Expand Down Expand Up @@ -615,34 +474,21 @@ end

let dir ~label ~path fd = Eio.Resource.T (Dir.v ~label ~path fd, Dir_handler.v)

module Secure_random = struct
type t = unit
let single_read () buf = Low_level.getrandom buf; Cstruct.length buf
let read_methods = []
end

let secure_random =
let ops = Eio.Flow.Pi.source (module Secure_random) in
Eio.Resource.T ((), ops)

let stdenv ~run_event_loop =
let stdin = source Eio_unix.Fd.stdin in
let stdout = sink Eio_unix.Fd.stdout in
let stderr = sink Eio_unix.Fd.stderr in
let fs = (dir ~label:"fs" ~path:"" Fs, "") in
let cwd = (dir ~label:"cwd" ~path:"" Cwd, "") in
object (_ : stdenv)
method stdin = stdin
method stdout = stdout
method stderr = stderr
method stdin = Flow.stdin
method stdout = Flow.stdout
method stderr = Flow.stderr
method net = net
method process_mgr = process_mgr
method domain_mgr = domain_mgr ~run_event_loop
method clock = clock
method mono_clock = mono_clock
method fs = (fs :> Eio.Fs.dir_ty Eio.Path.t)
method cwd = (cwd :> Eio.Fs.dir_ty Eio.Path.t)
method secure_random = secure_random
method secure_random = Flow.secure_random
method debug = Eio.Private.Debug.v
method backend_id = "linux"
end
Expand All @@ -656,7 +502,7 @@ let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a =
| Eio_unix.Private.Get_monotonic_clock -> Some (fun k -> continue k mono_clock)
| Eio_unix.Net.Import_socket_stream (sw, close_unix, fd) -> Some (fun k ->
let fd = Fd.of_unix ~sw ~seekable:false ~close_unix fd in
continue k (flow fd :> _ Eio_unix.Net.stream_socket)
continue k (Flow.of_fd fd :> _ Eio_unix.Net.stream_socket)
)
| Eio_unix.Net.Import_socket_datagram (sw, close_unix, fd) -> Some (fun k ->
let fd = Fd.of_unix ~sw ~seekable:false ~close_unix fd in
Expand All @@ -665,8 +511,8 @@ let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a =
| Eio_unix.Net.Socketpair_stream (sw, domain, protocol) -> Some (fun k ->
match
let a, b = Unix.socketpair ~cloexec:true domain Unix.SOCK_STREAM protocol in
let a = Fd.of_unix ~sw ~seekable:false ~close_unix:true a |> flow in
let b = Fd.of_unix ~sw ~seekable:false ~close_unix:true b |> flow in
let a = Fd.of_unix ~sw ~seekable:false ~close_unix:true a |> Flow.of_fd in
let b = Fd.of_unix ~sw ~seekable:false ~close_unix:true b |> Flow.of_fd in
((a :> _ Eio_unix.Net.stream_socket), (b :> _ Eio_unix.Net.stream_socket))
with
| r -> continue k r
Expand All @@ -687,8 +533,8 @@ let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a =
| Eio_unix.Private.Pipe sw -> Some (fun k ->
match
let r, w = Low_level.pipe ~sw in
let r = (flow r :> _ Eio_unix.source) in
let w = (flow w :> _ Eio_unix.sink) in
let r = (Flow.of_fd r :> _ Eio_unix.source) in
let w = (Flow.of_fd w :> _ Eio_unix.sink) in
(r, w)
with
| r -> continue k r
Expand Down
Loading
Loading