From f64e2299de6bf59364a41add75dbaa216d11a865 Mon Sep 17 00:00:00 2001 From: "amano.kenji" Date: Thu, 15 Feb 2024 11:45:25 +0000 Subject: [PATCH] Add stream/lines(-channel) These functions read lines from a stream. --- spork/sh.janet | 98 ---------------------------------------------- spork/stream.janet | 62 +++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 98 deletions(-) create mode 100644 spork/stream.janet diff --git a/spork/sh.janet b/spork/sh.janet index 2e725c7..fba9c4d 100644 --- a/spork/sh.janet +++ b/spork/sh.janet @@ -181,101 +181,3 @@ "Output a string with all arguments correctly quoted" [& args] (string/join (map shell-quote args) " ")) - -(defn lines - ``` - It executes a list of command arguments described by args, starts an asynchronous task that retrieves each line from - the command's standard output as a buffer, and returns a struct with the following keys. - - * `:chan` - The channel that gives each line from the task. - * `:fiber` - A fiber that yields each line from the channel. - * `:cancel` - An object-oriented function that accepts the struct and an optional argument which is signal passed to - `os/proc-kill`. The default signal is `:term`. It kills the process and ends the task. You can call this function - multiple times without errors. - * `:exit-code` - An object-oriented function that accepts the struct and returns the process exit code after waiting - for the task to end. After this function returns the exit code, it returns `nil`. - - Each line is terminated by a newline character, `\n`. After end of process, the channel gives the last line from the - process. After the channel gives the last line, the process exit code is retrieved, and the process is closed before - the task ends. Make sure to either take all lines from the channel or call `:cancel`. If you don't, the process - remains frozen in the background or becomes a zombie process. - - If `stderr` is false or nil, /dev/null is opened and fed the command's standard error and closed when the task - finishes. If `stderr` is a `core/file` value or a `core/stream` value, `stderr` is fed the command's standard error, - and it is not closed by this function. If `stderr` is any other value, the returned struct gains a key, `:stderr`. - `:stderr` is an object-oriented function that accepts the struct and returns standard error of the command as a - buffer after waiting for the task to finish. After it is called for the first time, it returns `nil`. - ``` - [args &named stderr] - (def /dev/null (unless stderr - (devnull))) - (def proc (os/spawn args :p {:out :pipe :err (cond - # If `stderr` is false or nil - (not (nil? /dev/null)) - /dev/null - # If `stderr` is core/file or core/stream - (let [stderr-type (type stderr)] - (or (= stderr-type :core/file) - (= stderr-type :core/stream))) - stderr - # If `stderr` is any other value - :pipe)})) - (def {:out out :err err} proc) - (def exit-code (ev/chan 1)) - (def stderr-ch (when err - (ev/chan 1))) - (when stderr-ch - (ev/spawn - (ev/give stderr-ch (ev/read err :all)))) - # lines channel must have capacity of 0. Otherwise, it can be closed before `ev/take` can take the last line. - (def lines (ev/chan)) - (defn fetch-lines - [chunk] - # if-let breaks tail call optimization - (def idx (string/find "\n" chunk)) - (if idx - (do - # Give the first line - (ev/give lines (buffer/slice chunk 0 idx)) - # Eliminate the first line from chunk without creating a new buffer - (def idx+1 (inc idx)) - (buffer/blit chunk chunk 0 idx+1) - (fetch-lines (buffer/popn chunk idx+1))) - (if (ev/read out 1024 chunk) - (fetch-lines chunk) - (when (not (empty? chunk)) - (ev/give lines chunk))))) - (ev/spawn - (defer (do - (:close lines) - (when /dev/null - (:close /dev/null)) - (ev/give exit-code (:close proc))) - (try - (when-let [chunk (ev/read out 1024)] - (fetch-lines chunk)) - # :cancel causes an error. - ([_])))) - {:chan lines - :fiber (fiber/new (fn recurse [] - # if-let breaks tail call optimization - (def line (ev/take lines)) - (when line - (yield line) - (recurse))) - :yi) - :cancel (fn [self &opt signal] - (default signal :term) - (try - (os/proc-kill proc false signal) - # after the process exit code is retrieved, `os/proc-kill` throws an error. - ([_])) - (:close lines) - nil) - :exit-code (fn [self] (when-let [ec (ev/take exit-code)] - (ev/chan-close exit-code) - ec)) - :stderr (when stderr-ch - (fn [self] (when-let [buf (ev/take stderr-ch)] - (ev/chan-close stderr-ch) - buf)))}) diff --git a/spork/stream.janet b/spork/stream.janet new file mode 100644 index 0000000..4ae5bed --- /dev/null +++ b/spork/stream.janet @@ -0,0 +1,62 @@ +(defn lines + ``` + Returns a fiber that yields each line from a core/stream value. If separator is not specified, the default separator + is `\n`. If the stream is closed before the fiber yields all lines, an error is thrown from the stream. + ``` + [stream &opt separator] + (default separator "\n") + (defn yield-lines + [chunk] + # if-let breaks tail call optimization. when-let depends on if-let. + # https://github.com/janet-lang/janet/issues/1401 + (def idx (string/find "\n" chunk)) + (when idx + # Yield the first line + (yield (buffer/slice chunk 0 idx)) + # Eliminate the first line from chunk without creating a new buffer + (def idx+1 (inc idx)) + (buffer/blit chunk chunk 0 idx+1) + (yield-lines (buffer/popn chunk idx+1)))) + (defn fetch-lines + [chunk] + (if (ev/read stream 1024 chunk) + (do + (yield-lines chunk) + (fetch-lines chunk)) + (do + (yield-lines chunk) + (when (not (empty? chunk)) + (yield chunk))))) + (coro (fetch-lines @""))) + +(defn lines-channel + ``` + Returns a channel that gives each line from a core/stream value. An asynchronous task feeds lines to the channel. If + separator is not specified, the default separator is `\n`. To make sure that the task is finished, drain all lines + from the channel, or close the stream and the channel. Otherwise, the task remains frozen in the background. The + channel gives `nil` after one of those conditions applies. + + * end of stream is reached. + * stream or channel is closed. + + This function ignores errors from closed streams and closed channels. + ``` + [stream &opt separator] + (def fiber (lines stream separator)) + (def ch (ev/chan)) + (defn give-lines + [] + # if-let breaks tail call optimization. when-let depends on if-let. + # https://github.com/janet-lang/janet/issues/1401 + (def line (resume fiber)) + (when line + (do + (ev/give ch line) + (give-lines)))) + (ev/spawn + (try + (defer (:close ch) + (give-lines)) + # Ignore errors caused by closed streams and closed channels. + ([_]))) + ch)