diff --git a/src/core/lwt_stream.ml b/src/core/lwt_stream.ml index e5f6e05473..d39667540e 100644 --- a/src/core/lwt_stream.ml +++ b/src/core/lwt_stream.ml @@ -469,6 +469,22 @@ let rec get_exn_rec s node = let map_exn s = from (fun () -> get_exn_rec s s.node) +let rec get_exn_rec' s node = + if node == !(s.last) then + Lwt.try_bind + (fun () -> feed s) + (fun () -> get_exn_rec' s node) + (fun exn -> Lwt.return (Some (Result.Error exn))) + else + match node.data with + | Some value -> + consume s node; + Lwt.return (Some (Result.Ok value)) + | None -> + Lwt.return_none + +let wrap_exn s = from (fun () -> get_exn_rec' s s.node) + let rec nget_rec node acc n s = if n <= 0 then Lwt.return (List.rev acc) diff --git a/src/core/lwt_stream.mli b/src/core/lwt_stream.mli index 4f89bda5ee..b68d6fd4b5 100644 --- a/src/core/lwt_stream.mli +++ b/src/core/lwt_stream.mli @@ -323,17 +323,12 @@ val concat : 'a t t -> 'a t val flatten : 'a list t -> 'a t (** [flatten st = map_list (fun l -> l) st] *) -(** A value or an error. *) -type 'a result = - | Value of 'a - | Error of exn - -val map_exn : 'a t -> 'a result t - (** [map_exn s] returns a stream that captures all exceptions raised - by the source of the stream (the function passed to {!from}). +val wrap_exn : 'a t -> 'a Lwt.result t +(** [wrap_exn s] is a stream [s'] such that each time [s] yields a value [v], + [s'] yields [Result.Ok v], and when the source of [s] raises an exception + [e], [s'] yields [Result.Error e]. - Note that for push-streams (as returned by {!create}) all - elements of the mapped streams are values. *) + Note that push-streams (as returned by {!create}) never raise exceptions. *) (** {2 Parsing} *) @@ -356,3 +351,28 @@ val hexdump : char t -> string t let () = Lwt_main.run (Lwt_io.write_lines Lwt_io.stdout (Lwt_stream.hexdump (Lwt_io.read_lines Lwt_io.stdin))) ]} *) + +(** {2 Deprecated} *) + +type 'a result = + | Value of 'a + | Error of exn + [@@ocaml.deprecated +"This type is being replaced by Lwt.result and the corresponding function +Lwt_stream.wrap_exn."] +(** A value or an error. + + @deprecated Replaced by {!wrap_exn}, which uses {!Lwt.result}. *) + +[@@@ocaml.warning "-3"] +val map_exn : 'a t -> 'a result t + [@@ocaml.deprecated "Use Lwt_stream.wrap_exn"] +(** [map_exn s] returns a stream that captures all exceptions raised + by the source of the stream (the function passed to {!from}). + + Note that for push-streams (as returned by {!create}) all + elements of the mapped streams are values. + + @deprecated Use {!wrap_exn}. *) + +[@@@ocaml.warning "+3"] diff --git a/tests/core/test_lwt_stream.ml b/tests/core/test_lwt_stream.ml index b4b9816f1c..ef21b5752a 100644 --- a/tests/core/test_lwt_stream.ml +++ b/tests/core/test_lwt_stream.ml @@ -276,7 +276,13 @@ let suite = suite "lwt_stream" [ (* TODO: This will no longer be a shadowing open once Lwt_stream.error is removed. *) let open! Lwt_stream in - let l = [Value 1; Error Exit; Error (Failure "plop"); Value 42; Error End_of_file] in + let l = + [Result.Ok 1; + Result.Error Exit; + Result.Error (Failure "plop"); + Result.Ok 42; + Result.Error End_of_file] + in let q = ref l in let stream = Lwt_stream.from @@ -284,14 +290,14 @@ let suite = suite "lwt_stream" [ match !q with | [] -> return None - | Value x :: l -> + | (Result.Ok x)::l -> q := l; return (Some x) - | Error e :: l -> + | (Result.Error e)::l -> q := l; Lwt.fail e) in - Lwt_stream.to_list (Lwt_stream.map_exn stream) >>= fun l' -> + Lwt_stream.to_list (Lwt_stream.wrap_exn stream) >>= fun l' -> return (l = l')); test "is_closed"