From 77c703c47a45d4bfdc8a484cb9b99fc38ee49f3e Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Fri, 22 Oct 2021 13:56:10 -0700 Subject: [PATCH] Body: yield to other threads when reading (#100) * Body: yield to other threads when reading this diff gives running threads a chance to make progress (via `Lwt.pause`) whenever Piaf is reading a message body from the peer. related: https://github.com/anmonteiro/httpaf/issues/41 * tweaks * update --- lib/body.ml | 34 ++++++++++++++++++++++++++++------ lib/http1.ml | 2 +- lib/http2.ml | 4 ++-- lib/server.ml | 2 +- nix/sources.nix | 2 +- vendor/httpaf | 2 +- 6 files changed, 34 insertions(+), 12 deletions(-) diff --git a/lib/body.ml b/lib/body.ml index 36957dab..a63d56f7 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -86,6 +86,7 @@ type t = { length : length ; contents : contents ; mutable error_received : Error.t Lwt.t + ; mutable read_counter : int } (* Never resolves, giving a chance for the normal successful flow to always @@ -93,7 +94,11 @@ type t = let default_error_received, _ = Lwt.wait () let create ~length contents = - { length; contents; error_received = default_error_received } + { length + ; contents + ; error_received = default_error_received + ; read_counter = 0 + } let length { length; _ } = length @@ -261,7 +266,7 @@ end let embed_error_received t error_received = t.error_received <- error_received -let of_prim_body +let of_raw_body : type a. (module BODY with type Reader.t = a) -> ?on_eof:(t -> unit) @@ -272,16 +277,33 @@ let of_prim_body fun (module Http_body) ?on_eof ~body_length body -> let module Body = Http_body.Reader in let read_fn t () = + let t = Lazy.force t in let waiter, wakener = Lwt.task () in + let on_read_direct buffer ~off ~len = + Lwt.wakeup_later wakener (Some (IOVec.make buffer ~off ~len)) + and on_read_with_yield buffer ~off ~len = + Lwt.async (fun () -> + let* () = Lwt.pause () in + Lwt.wrap2 + Lwt.wakeup_later + wakener + (Some (IOVec.make buffer ~off ~len))) + in + t.read_counter <- t.read_counter + 1; + let on_read = + if t.read_counter > 128 then ( + t.read_counter <- 0; + on_read_with_yield) + else + on_read_direct + in Body.schedule_read body ~on_eof:(fun () -> - Option.iter (fun f -> f (Lazy.force t)) on_eof; + Option.iter (fun f -> f t) on_eof; Body.close body; Lwt.wakeup_later wakener None) - ~on_read:(fun buffer ~off ~len -> - Lwt.wakeup_later wakener (Some (IOVec.make buffer ~off ~len))); - let t = Lazy.force t in + ~on_read; Lwt.choose [ waiter ; Lwt.bind t.error_received (fun _ -> diff --git a/lib/http1.ml b/lib/http1.ml index e3183bd9..3c0d904f 100644 --- a/lib/http1.ml +++ b/lib/http1.ml @@ -87,7 +87,7 @@ module MakeHTTP1 (* TODO: revisit whether this is necessary. *) if request_method = `HEAD then Body.Reader.close body; let body = - Piaf_body.of_prim_body + Piaf_body.of_raw_body (module Body : BODY with type Reader.t = Httpaf.Body.Reader.t) ~body_length: (Httpaf.Response.body_length ~request_method response diff --git a/lib/http2.ml b/lib/http2.ml index 4f93b2ff..0fbddbbb 100644 --- a/lib/http2.ml +++ b/lib/http2.ml @@ -96,7 +96,7 @@ module MakeHTTP2 let request t req ~error_handler ~response_handler = let response_handler response body = let body = - Piaf_body.of_prim_body + Piaf_body.of_raw_body (module Body : BODY with type Reader.t = [ `read ] H2.Body.t) ~body_length:(H2.Response.body_length response :> Piaf_body.length) body @@ -136,7 +136,7 @@ module HTTP : Http_intf.HTTP2 = struct = let response_handler response body = let body = - Piaf_body.of_prim_body + Piaf_body.of_raw_body (module Body : BODY with type Reader.t = [ `read ] H2.Body.t) ~body_length:(H2.Response.body_length response :> Piaf_body.length) body diff --git a/lib/server.ml b/lib/server.ml index 79cef854..a5294380 100644 --- a/lib/server.ml +++ b/lib/server.ml @@ -121,7 +121,7 @@ let request_handler handler client_addr reqd = let request = Reqd.request reqd in let body_length = Httpaf.Request.body_length request in let request_body = - Body.of_prim_body + Body.of_raw_body (module Http1.Body : Body.BODY with type Reader.t = Httpaf.Body.Reader.t) ~body_length:(body_length :> Body.length) ~on_eof:(fun body -> diff --git a/nix/sources.nix b/nix/sources.nix index adadb476..321ca338 100644 --- a/nix/sources.nix +++ b/nix/sources.nix @@ -2,7 +2,7 @@ let overlays = builtins.fetchTarball - https://github.com/anmonteiro/nix-overlays/archive/095501b827.tar.gz; + https://github.com/anmonteiro/nix-overlays/archive/152f359.tar.gz; in import "${overlays}/boot.nix" { diff --git a/vendor/httpaf b/vendor/httpaf index 4e3ae8bb..3a74fd88 160000 --- a/vendor/httpaf +++ b/vendor/httpaf @@ -1 +1 @@ -Subproject commit 4e3ae8bb8630627b33080820ee1dfda8300ebb3e +Subproject commit 3a74fd8851e3019f5889ae1bf9350e90ed40017d