Skip to content

Commit

Permalink
Body: yield to other threads when reading (#100)
Browse files Browse the repository at this point in the history
* Body: yield to other threads when reading

this diff gives running threads a chance to make progress (via
`Lwt.pause`) whenever Piaf is reading a message body from the peer.

related: anmonteiro/httpun#41

* tweaks

* update
  • Loading branch information
anmonteiro authored Oct 22, 2021
1 parent 4cd3eef commit 77c703c
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 12 deletions.
34 changes: 28 additions & 6 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,19 @@ type t =
{ length : length
; contents : contents
; mutable error_received : Error.t Lwt.t
; mutable read_counter : int
}

(* Never resolves, giving a chance for the normal successful flow to always
* resolve. *)
let default_error_received, _ = Lwt.wait ()

let create ~length contents =
{ length; contents; error_received = default_error_received }
{ length
; contents
; error_received = default_error_received
; read_counter = 0
}

let length { length; _ } = length

Expand Down Expand Up @@ -261,7 +266,7 @@ end

let embed_error_received t error_received = t.error_received <- error_received

let of_prim_body
let of_raw_body
: type a.
(module BODY with type Reader.t = a)
-> ?on_eof:(t -> unit)
Expand All @@ -272,16 +277,33 @@ let of_prim_body
fun (module Http_body) ?on_eof ~body_length body ->
let module Body = Http_body.Reader in
let read_fn t () =
let t = Lazy.force t in
let waiter, wakener = Lwt.task () in
let on_read_direct buffer ~off ~len =
Lwt.wakeup_later wakener (Some (IOVec.make buffer ~off ~len))
and on_read_with_yield buffer ~off ~len =
Lwt.async (fun () ->
let* () = Lwt.pause () in
Lwt.wrap2
Lwt.wakeup_later
wakener
(Some (IOVec.make buffer ~off ~len)))
in
t.read_counter <- t.read_counter + 1;
let on_read =
if t.read_counter > 128 then (
t.read_counter <- 0;
on_read_with_yield)
else
on_read_direct
in
Body.schedule_read
body
~on_eof:(fun () ->
Option.iter (fun f -> f (Lazy.force t)) on_eof;
Option.iter (fun f -> f t) on_eof;
Body.close body;
Lwt.wakeup_later wakener None)
~on_read:(fun buffer ~off ~len ->
Lwt.wakeup_later wakener (Some (IOVec.make buffer ~off ~len)));
let t = Lazy.force t in
~on_read;
Lwt.choose
[ waiter
; Lwt.bind t.error_received (fun _ ->
Expand Down
2 changes: 1 addition & 1 deletion lib/http1.ml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ module MakeHTTP1
(* TODO: revisit whether this is necessary. *)
if request_method = `HEAD then Body.Reader.close body;
let body =
Piaf_body.of_prim_body
Piaf_body.of_raw_body
(module Body : BODY with type Reader.t = Httpaf.Body.Reader.t)
~body_length:
(Httpaf.Response.body_length ~request_method response
Expand Down
4 changes: 2 additions & 2 deletions lib/http2.ml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ module MakeHTTP2
let request t req ~error_handler ~response_handler =
let response_handler response body =
let body =
Piaf_body.of_prim_body
Piaf_body.of_raw_body
(module Body : BODY with type Reader.t = [ `read ] H2.Body.t)
~body_length:(H2.Response.body_length response :> Piaf_body.length)
body
Expand Down Expand Up @@ -136,7 +136,7 @@ module HTTP : Http_intf.HTTP2 = struct
=
let response_handler response body =
let body =
Piaf_body.of_prim_body
Piaf_body.of_raw_body
(module Body : BODY with type Reader.t = [ `read ] H2.Body.t)
~body_length:(H2.Response.body_length response :> Piaf_body.length)
body
Expand Down
2 changes: 1 addition & 1 deletion lib/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ let request_handler handler client_addr reqd =
let request = Reqd.request reqd in
let body_length = Httpaf.Request.body_length request in
let request_body =
Body.of_prim_body
Body.of_raw_body
(module Http1.Body : Body.BODY with type Reader.t = Httpaf.Body.Reader.t)
~body_length:(body_length :> Body.length)
~on_eof:(fun body ->
Expand Down
2 changes: 1 addition & 1 deletion nix/sources.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
let
overlays =
builtins.fetchTarball
https://github.com/anmonteiro/nix-overlays/archive/095501b827.tar.gz;
https://github.com/anmonteiro/nix-overlays/archive/152f359.tar.gz;

in
import "${overlays}/boot.nix" {
Expand Down
2 changes: 1 addition & 1 deletion vendor/httpaf

0 comments on commit 77c703c

Please sign in to comment.