diff --git a/lib/events.ml b/lib/events.ml index c5e9368..9bd1dc8 100644 --- a/lib/events.ml +++ b/lib/events.ml @@ -16,7 +16,7 @@ type 'a res = ('a,exn) result (* Events in progress *) type _ in_progress = - | Line : Bytes.t * Buffer.t -> string in_progress + | Line : int * Bytes.t * Buffer.t -> string in_progress | Bytes : int * Bytes.t -> Bytes.t in_progress (* Reified function composition *) @@ -126,7 +126,7 @@ let mkReadInProgress fd = function | FCons _ as f -> Advanced (ReadInProgress(fd,f)) | FNil x -> Yes x -let one_line () = Line (Bytes.make 1 '0', Buffer.create 40) +let one_line ?(at_least=1) () = Line (at_least,Bytes.make at_least '0', Buffer.create (max 40 at_least)) let some_bytes n ?(buff=Bytes.create n) () = Bytes(n,buff) module On = struct @@ -161,10 +161,12 @@ let parse_content_length_or err k s = with (Scanf.Scan_failure _ | Failure _ | End_of_file | Invalid_argument _) as e -> err (Error e) + +let len_httpcle_header = String.length "CONTENT-LENGTH: \n" let an_httpcle (k : Bytes.t res -> 'b) : 'b fcomp = let (--?) x y = err k x y in - one_line () + one_line ~at_least:len_httpcle_header () --? (parse_content_length_or (finish_with k) (fun length -> one_line () --? (fun _discard -> @@ -199,19 +201,19 @@ let advance_system ready_fds _ = function else ready_fds, Yes (k code) | ReadInProgress(_, FNil _) -> assert false | ReadInProgress(fd,_) as x when not (List.mem fd ready_fds) -> ready_fds, No x - | ReadInProgress(fd, FCons(Line (buff,acc) as line,rest)) -> + | ReadInProgress(fd, FCons(Line (m,buff,acc),rest)) -> let ready_fds = List.filter ((<>) fd) ready_fds in ready_fds, begin try - let n = Unix.read fd buff 0 1 in + let n = Unix.read fd buff 0 m in if n = 0 then begin Buffer.clear acc; mkReadInProgress fd (rest (Error End_of_file)) end else - let c = Bytes.get buff 0 in + let c = Bytes.get buff (n-1) in if c != '\n' then begin - Buffer.add_char acc c; - mkReadInProgress fd (FCons(line,rest)) + Buffer.add_bytes acc (Bytes.sub buff 0 n); + mkReadInProgress fd (FCons(Line (max (m-n) 1,buff,acc),rest)) end else begin let one_line = Buffer.contents acc in Buffer.clear acc; diff --git a/lib/sel.ml b/lib/sel.ml index e4f9e99..8af7228 100644 --- a/lib/sel.ml +++ b/lib/sel.ml @@ -92,22 +92,21 @@ end (* Like List.filter but also returns the minimum priority of ready events. Moreover ~advance can make the event advance (whilst not being ready yet)*) let pull_ready ~advance st l = - let rec pull_ready yes no min_priority st l = + let rec pull_ready yes min_priority_ready no st l = match Sorted.look l with - | Sorted.Nil -> yes, no, min_priority + | Sorted.Nil -> yes, no, min_priority_ready | Sorted.Cons(({ WithAttributes.it; cancelled; priority; _ } as e, _), rest) -> match advance st cancelled it with | st, Yes y -> - let min_priority = Sorted.min_user min_priority priority in + let min_priority_ready = Sorted.min_user min_priority_ready priority in let e = drop_event_type y e in - pull_ready (Sorted.cons e e.priority yes) no min_priority st rest + pull_ready (Sorted.cons e e.priority yes) min_priority_ready no st rest | st, Advanced x -> - let min_priority = Sorted.min_user min_priority priority in - pull_ready yes (Sorted.cons { e with it = x } e.priority no) min_priority st rest + pull_ready yes min_priority_ready (Sorted.cons { e with it = x } e.priority no) st rest | st, No x -> - pull_ready yes (Sorted.cons { e with it = x } e.priority no) min_priority st rest + pull_ready yes min_priority_ready (Sorted.cons { e with it = x } e.priority no) st rest in - pull_ready Sorted.nil Sorted.nil Sorted.max_priority st l + pull_ready Sorted.nil Sorted.max_priority Sorted.nil st l type ('a,'b) ev_checker = 'a WithAttributes.t Sorted.t -> 'b WithAttributes.t Sorted.t * 'a WithAttributes.t Sorted.t * Sorted.priority @@ -128,21 +127,21 @@ let filter_file_descriptor fds = function The result is that it when reading 'n' bytes, it is no longer necessary to interleave up to 'n' ready tasks. *) let check_for_system_events min_prio_task_queue : ('a system_event,'a) ev_checker = fun waiting -> - let rec check_for_system_events new_ready waiting_skipped min_prio waiting = + let rec check_for_system_events new_ready waiting_skipped min_prio_ready waiting = let fds = file_descriptors_of waiting in let ready_fds, _, _ = Unix.select fds [] [] 0.0 in - let new_ready_1, waiting, min_prio_1 = pull_ready ~advance:advance_system ready_fds waiting in + let new_ready_1, waiting, min_prio_ready_1 = pull_ready ~advance:advance_system ready_fds waiting in let new_ready = Sorted.append new_ready_1 new_ready in - let min_prio = Sorted.min_user min_prio_1 min_prio in + let min_prio_ready = Sorted.min_user min_prio_ready_1 min_prio_ready in if ready_fds = [] then - new_ready, Sorted.append waiting waiting_skipped, min_prio + new_ready, Sorted.append waiting waiting_skipped, min_prio_ready else let waiting, waiting_skipped_1 = Sorted.partition (filter_file_descriptor ready_fds) waiting in - let waiting, waiting_skipped_2 = Sorted.partition_priority (Sorted.le_user min_prio) waiting in - let waiting_skipped = Sorted.concat [waiting_skipped_2; waiting_skipped_1; waiting_skipped] in - check_for_system_events new_ready waiting_skipped min_prio waiting + let waiting_skipped = Sorted.concat [waiting_skipped_1; waiting_skipped] in + check_for_system_events new_ready waiting_skipped min_prio_ready waiting in - check_for_system_events Sorted.nil Sorted.nil min_prio_task_queue waiting + let waiting, waiting_skipped = Sorted.partition_priority (fun x -> Sorted.le_user x min_prio_task_queue) waiting in + check_for_system_events Sorted.nil waiting_skipped Sorted.max_priority waiting let check_for_queue_events : ('a queue_event,'a) ev_checker = fun waiting -> @@ -158,7 +157,10 @@ let rec wait_for_system_or_queue_events ~deadline (fds,sys) queue = let ready_sys, waiting_sys, min_prio_sys = pull_ready ~advance:advance_system ready_fds sys in let ready_queue, waiting_queue, min_prio_queue = pull_ready ~advance:advance_queue () queue in if ready_sys <> Sorted.nil || ready_queue <> Sorted.nil - then ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_queue min_prio_sys + then + let min_prio = Sorted.min_priority min_prio_queue min_prio_sys in + let new_ready_sys, waiting_sys, min_prio_new_ready_sys = check_for_system_events min_prio waiting_sys in + Sorted.append new_ready_sys ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_new_ready_sys min_prio else wait_for_system_or_queue_events ~deadline (fds,waiting_sys) queue let wait_for_system_or_queue_events ~deadline sys queue = diff --git a/test/fairness_prio.ml b/test/fairness_prio.ml index 28131bc..bffa8af 100644 --- a/test/fairness_prio.ml +++ b/test/fairness_prio.ml @@ -62,7 +62,7 @@ let%test_unit "sel.loop" = write "Content-Length: 3\n\n123\n"; let x = Sel.now ~priority:2 (Ok "bad") in let todo = Todo.add Todo.empty [e;x] in - let rec loop todo = + let loop todo = let ready, _todo = Sel.pop todo in match ready with | Ok "bad" -> [%test_eq: string] "" "bad1" @@ -71,3 +71,34 @@ let%test_unit "sel.loop" = | Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in loop todo +let%test_unit "sel.loop2" = + let read, write = pipe () in + let e = On.httpcle ~priority:1 read (fun x -> Result.map ~f:Bytes.to_string x) in + write "Content-Length: 3\n\n12"; + let x = Sel.now ~priority:2 (Ok "bad") in + let todo = Todo.add Todo.empty [e;x] in + let loop todo = + let ready, _todo = Sel.pop todo in + match ready with + | Ok "bad" -> () + | Ok _ -> [%test_eq: string] "" "bad2" + | Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in + loop todo + +(* pop_opt terminates *) +let%test_unit "sel.loop3" = + let read, write = pipe () in + let e = On.line ~priority:1 read (fun x -> x) in + write "aa\nbb\ncc\n"; + let read2, write2 = pipe () in + let x = On.bytes ~priority:2 read2 2 (function Error e -> Error e | Ok s -> Error (Failure (Stdlib.Format.asprintf "lower priority event triggered: '%s'" (Bytes.to_string s)))) in + let todo = Todo.add Todo.empty [e;x] in + let rec loop todo = + let ready, todo = Sel.pop todo in + match ready with + | Ok "cc" -> () + | Ok s -> write2 s; loop (Todo.add todo [e]) + | Error End_of_file -> () + | Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in + loop todo + diff --git a/test/perf.ml b/test/perf.ml index d32b7e0..2af57ba 100644 --- a/test/perf.ml +++ b/test/perf.ml @@ -55,20 +55,18 @@ let read_leftover read n = (*****************************************************************************) -(* pop_opt terminates *) -let%test_unit "sel.loop" = +let %test_unit "sel.event.http_cle" = let read, write = pipe () in - let e = On.line ~priority:1 read (fun x -> x) in - write "aa\nbb\ncc\n"; - let read2, write2 = pipe () in - let x = On.bytes ~priority:2 read2 2 (function Error e -> Error e | Ok s -> Error (Failure (Stdlib.Format.asprintf "lower priority event triggered: '%s'" (Bytes.to_string s)))) in - let todo = Todo.add Todo.empty [e;x] in - let rec loop todo = - let ready, todo = Sel.pop todo in - match ready with - | Ok "cc" -> () - | Ok s -> write2 s; loop (Todo.add todo [e]) - | Error End_of_file -> () - | Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in - loop todo - + let e = On.httpcle read b2s in + let t0 = Unix.gettimeofday () in + let n = 99999 in + for _i = 1 to n do + let todo = Todo.add Todo.empty [e] in + write "content-Length: 4\n\n1\n3."; + let ready, todo = pop_opt todo in + [%test_eq: bool] (Todo.is_empty todo) true; + [%test_eq: string option] ready (Some "1\n3."); + done; + let t1 = Unix.gettimeofday () in + Stdlib.Printf.eprintf "time to pop %d httpcle events: %f\n" n (t1 -. t0) +;; diff --git a/test/test1.ml b/test/test1.ml index 1773cb6..15d69ae 100644 --- a/test/test1.ml +++ b/test/test1.ml @@ -222,6 +222,12 @@ let %test_unit "sel.event.http_cle.recurring.err" = write "a\n"; let ready, todo = pop_opt todo in [%test_eq: bool] (Todo.is_empty todo) true; + (* depending on how eager the event is to read data *) + [%test_eq: bool] (osmatch "\\(.*Scan_failure.*\\|End_of_file\\)" ready) true; + let todo = Todo.add todo [e] in + write "content-Lengtx: 2\n"; + let ready, todo = pop_opt todo in + [%test_eq: bool] (Todo.is_empty todo) true; [%test_eq: bool] (osmatch ".*Scan_failure.*" ready) true; let todo = Todo.add todo [e] in write "content-Length: 4\n\n4\n6.";