Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix second chance #13

Merged
merged 5 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions lib/events.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type 'a res = ('a,exn) result

(* Events in progress *)
type _ in_progress =
| Line : Bytes.t * Buffer.t -> string in_progress
| Line : int * Bytes.t * Buffer.t -> string in_progress
| Bytes : int * Bytes.t -> Bytes.t in_progress

(* Reified function composition *)
Expand Down Expand Up @@ -126,7 +126,7 @@ let mkReadInProgress fd = function
| FCons _ as f -> Advanced (ReadInProgress(fd,f))
| FNil x -> Yes x

let one_line () = Line (Bytes.make 1 '0', Buffer.create 40)
let one_line ?(at_least=1) () = Line (at_least,Bytes.make at_least '0', Buffer.create (max 40 at_least))
let some_bytes n ?(buff=Bytes.create n) () = Bytes(n,buff)

module On = struct
Expand Down Expand Up @@ -161,10 +161,12 @@ let parse_content_length_or err k s =
with
(Scanf.Scan_failure _ | Failure _ | End_of_file | Invalid_argument _) as e ->
err (Error e)

let len_httpcle_header = String.length "CONTENT-LENGTH: \n"

let an_httpcle (k : Bytes.t res -> 'b) : 'b fcomp =
let (--?) x y = err k x y in
one_line ()
one_line ~at_least:len_httpcle_header ()
--? (parse_content_length_or (finish_with k) (fun length ->
one_line ()
--? (fun _discard ->
Expand Down Expand Up @@ -199,19 +201,19 @@ let advance_system ready_fds _ = function
else ready_fds, Yes (k code)
| ReadInProgress(_, FNil _) -> assert false
| ReadInProgress(fd,_) as x when not (List.mem fd ready_fds) -> ready_fds, No x
| ReadInProgress(fd, FCons(Line (buff,acc) as line,rest)) ->
| ReadInProgress(fd, FCons(Line (m,buff,acc),rest)) ->
let ready_fds = List.filter ((<>) fd) ready_fds in
ready_fds,
begin try
let n = Unix.read fd buff 0 1 in
let n = Unix.read fd buff 0 m in
if n = 0 then begin
Buffer.clear acc;
mkReadInProgress fd (rest (Error End_of_file))
end else
let c = Bytes.get buff 0 in
let c = Bytes.get buff (n-1) in
if c != '\n' then begin
Buffer.add_char acc c;
mkReadInProgress fd (FCons(line,rest))
Buffer.add_bytes acc (Bytes.sub buff 0 n);
mkReadInProgress fd (FCons(Line (max (m-n) 1,buff,acc),rest))
end else begin
let one_line = Buffer.contents acc in
Buffer.clear acc;
Expand Down
36 changes: 19 additions & 17 deletions lib/sel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,21 @@ end
(* Like List.filter but also returns the minimum priority of ready events.
Moreover ~advance can make the event advance (whilst not being ready yet)*)
let pull_ready ~advance st l =
let rec pull_ready yes no min_priority st l =
let rec pull_ready yes min_priority_ready no st l =
match Sorted.look l with
| Sorted.Nil -> yes, no, min_priority
| Sorted.Nil -> yes, no, min_priority_ready
| Sorted.Cons(({ WithAttributes.it; cancelled; priority; _ } as e, _), rest) ->
match advance st cancelled it with
| st, Yes y ->
let min_priority = Sorted.min_user min_priority priority in
let min_priority_ready = Sorted.min_user min_priority_ready priority in
let e = drop_event_type y e in
pull_ready (Sorted.cons e e.priority yes) no min_priority st rest
pull_ready (Sorted.cons e e.priority yes) min_priority_ready no st rest
| st, Advanced x ->
let min_priority = Sorted.min_user min_priority priority in
pull_ready yes (Sorted.cons { e with it = x } e.priority no) min_priority st rest
pull_ready yes min_priority_ready (Sorted.cons { e with it = x } e.priority no) st rest
| st, No x ->
pull_ready yes (Sorted.cons { e with it = x } e.priority no) min_priority st rest
pull_ready yes min_priority_ready (Sorted.cons { e with it = x } e.priority no) st rest
in
pull_ready Sorted.nil Sorted.nil Sorted.max_priority st l
pull_ready Sorted.nil Sorted.max_priority Sorted.nil st l

type ('a,'b) ev_checker =
'a WithAttributes.t Sorted.t -> 'b WithAttributes.t Sorted.t * 'a WithAttributes.t Sorted.t * Sorted.priority
Expand All @@ -128,21 +127,21 @@ let filter_file_descriptor fds = function
The result is that it when reading 'n' bytes, it is no longer necessary to interleave up to 'n' ready tasks.
*)
let check_for_system_events min_prio_task_queue : ('a system_event,'a) ev_checker = fun waiting ->
let rec check_for_system_events new_ready waiting_skipped min_prio waiting =
let rec check_for_system_events new_ready waiting_skipped min_prio_ready waiting =
let fds = file_descriptors_of waiting in
let ready_fds, _, _ = Unix.select fds [] [] 0.0 in
let new_ready_1, waiting, min_prio_1 = pull_ready ~advance:advance_system ready_fds waiting in
let new_ready_1, waiting, min_prio_ready_1 = pull_ready ~advance:advance_system ready_fds waiting in
let new_ready = Sorted.append new_ready_1 new_ready in
let min_prio = Sorted.min_user min_prio_1 min_prio in
let min_prio_ready = Sorted.min_user min_prio_ready_1 min_prio_ready in
if ready_fds = [] then
new_ready, Sorted.append waiting waiting_skipped, min_prio
new_ready, Sorted.append waiting waiting_skipped, min_prio_ready
else
let waiting, waiting_skipped_1 = Sorted.partition (filter_file_descriptor ready_fds) waiting in
let waiting, waiting_skipped_2 = Sorted.partition_priority (Sorted.le_user min_prio) waiting in
let waiting_skipped = Sorted.concat [waiting_skipped_2; waiting_skipped_1; waiting_skipped] in
check_for_system_events new_ready waiting_skipped min_prio waiting
let waiting_skipped = Sorted.concat [waiting_skipped_1; waiting_skipped] in
check_for_system_events new_ready waiting_skipped min_prio_ready waiting
in
check_for_system_events Sorted.nil Sorted.nil min_prio_task_queue waiting
let waiting, waiting_skipped = Sorted.partition_priority (fun x -> Sorted.le_user x min_prio_task_queue) waiting in
check_for_system_events Sorted.nil waiting_skipped Sorted.max_priority waiting

let check_for_queue_events : ('a queue_event,'a) ev_checker =
fun waiting ->
Expand All @@ -158,7 +157,10 @@ let rec wait_for_system_or_queue_events ~deadline (fds,sys) queue =
let ready_sys, waiting_sys, min_prio_sys = pull_ready ~advance:advance_system ready_fds sys in
let ready_queue, waiting_queue, min_prio_queue = pull_ready ~advance:advance_queue () queue in
if ready_sys <> Sorted.nil || ready_queue <> Sorted.nil
then ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_queue min_prio_sys
then
let min_prio = Sorted.min_priority min_prio_queue min_prio_sys in
let new_ready_sys, waiting_sys, min_prio_new_ready_sys = check_for_system_events min_prio waiting_sys in
Sorted.append new_ready_sys ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_new_ready_sys min_prio
else wait_for_system_or_queue_events ~deadline (fds,waiting_sys) queue

let wait_for_system_or_queue_events ~deadline sys queue =
Expand Down
33 changes: 32 additions & 1 deletion test/fairness_prio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ let%test_unit "sel.loop" =
write "Content-Length: 3\n\n123\n";
let x = Sel.now ~priority:2 (Ok "bad") in
let todo = Todo.add Todo.empty [e;x] in
let rec loop todo =
let loop todo =
let ready, _todo = Sel.pop todo in
match ready with
| Ok "bad" -> [%test_eq: string] "" "bad1"
Expand All @@ -71,3 +71,34 @@ let%test_unit "sel.loop" =
| Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in
loop todo

let%test_unit "sel.loop2" =
let read, write = pipe () in
let e = On.httpcle ~priority:1 read (fun x -> Result.map ~f:Bytes.to_string x) in
write "Content-Length: 3\n\n12";
let x = Sel.now ~priority:2 (Ok "bad") in
let todo = Todo.add Todo.empty [e;x] in
let loop todo =
let ready, _todo = Sel.pop todo in
match ready with
| Ok "bad" -> ()
| Ok _ -> [%test_eq: string] "" "bad2"
| Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in
loop todo

(* pop_opt terminates *)
let%test_unit "sel.loop3" =
let read, write = pipe () in
let e = On.line ~priority:1 read (fun x -> x) in
write "aa\nbb\ncc\n";
let read2, write2 = pipe () in
let x = On.bytes ~priority:2 read2 2 (function Error e -> Error e | Ok s -> Error (Failure (Stdlib.Format.asprintf "lower priority event triggered: '%s'" (Bytes.to_string s)))) in
let todo = Todo.add Todo.empty [e;x] in
let rec loop todo =
let ready, todo = Sel.pop todo in
match ready with
| Ok "cc" -> ()
| Ok s -> write2 s; loop (Todo.add todo [e])
| Error End_of_file -> ()
| Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in
loop todo

30 changes: 14 additions & 16 deletions test/perf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,18 @@ let read_leftover read n =

(*****************************************************************************)

(* pop_opt terminates *)
let%test_unit "sel.loop" =
let %test_unit "sel.event.http_cle" =
let read, write = pipe () in
let e = On.line ~priority:1 read (fun x -> x) in
write "aa\nbb\ncc\n";
let read2, write2 = pipe () in
let x = On.bytes ~priority:2 read2 2 (function Error e -> Error e | Ok s -> Error (Failure (Stdlib.Format.asprintf "lower priority event triggered: '%s'" (Bytes.to_string s)))) in
let todo = Todo.add Todo.empty [e;x] in
let rec loop todo =
let ready, todo = Sel.pop todo in
match ready with
| Ok "cc" -> ()
| Ok s -> write2 s; loop (Todo.add todo [e])
| Error End_of_file -> ()
| Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in
loop todo

let e = On.httpcle read b2s in
let t0 = Unix.gettimeofday () in
let n = 99999 in
for _i = 1 to n do
let todo = Todo.add Todo.empty [e] in
write "content-Length: 4\n\n1\n3.";
let ready, todo = pop_opt todo in
[%test_eq: bool] (Todo.is_empty todo) true;
[%test_eq: string option] ready (Some "1\n3.");
done;
let t1 = Unix.gettimeofday () in
Stdlib.Printf.eprintf "time to pop %d httpcle events: %f\n" n (t1 -. t0)
;;
6 changes: 6 additions & 0 deletions test/test1.ml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ let %test_unit "sel.event.http_cle.recurring.err" =
write "a\n";
let ready, todo = pop_opt todo in
[%test_eq: bool] (Todo.is_empty todo) true;
(* depending on how eager the event is to read data *)
[%test_eq: bool] (osmatch "\\(.*Scan_failure.*\\|End_of_file\\)" ready) true;
let todo = Todo.add todo [e] in
write "content-Lengtx: 2\n";
let ready, todo = pop_opt todo in
[%test_eq: bool] (Todo.is_empty todo) true;
[%test_eq: bool] (osmatch ".*Scan_failure.*" ready) true;
let todo = Todo.add todo [e] in
write "content-Length: 4\n\n4\n6.";
Expand Down
Loading