Skip to content

Commit

Permalink
Add stream/lines(-channel)
Browse files Browse the repository at this point in the history
These functions read lines from a stream.
  • Loading branch information
amano-kenji committed Feb 15, 2024
1 parent 985ffbc commit f64e229
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 98 deletions.
98 changes: 0 additions & 98 deletions spork/sh.janet
Original file line number Diff line number Diff line change
Expand Up @@ -181,101 +181,3 @@
"Output a string with all arguments correctly quoted"
[& args]
(string/join (map shell-quote args) " "))

(defn lines
```
It executes a list of command arguments described by args, starts an asynchronous task that retrieves each line from
the command's standard output as a buffer, and returns a struct with the following keys.
* `:chan` - The channel that gives each line from the task.
* `:fiber` - A fiber that yields each line from the channel.
* `:cancel` - An object-oriented function that accepts the struct and an optional argument which is signal passed to
`os/proc-kill`. The default signal is `:term`. It kills the process and ends the task. You can call this function
multiple times without errors.
* `:exit-code` - An object-oriented function that accepts the struct and returns the process exit code after waiting
for the task to end. After this function returns the exit code, it returns `nil`.
Each line is terminated by a newline character, `\n`. After end of process, the channel gives the last line from the
process. After the channel gives the last line, the process exit code is retrieved, and the process is closed before
the task ends. Make sure to either take all lines from the channel or call `:cancel`. If you don't, the process
remains frozen in the background or becomes a zombie process.
If `stderr` is false or nil, /dev/null is opened and fed the command's standard error and closed when the task
finishes. If `stderr` is a `core/file` value or a `core/stream` value, `stderr` is fed the command's standard error,
and it is not closed by this function. If `stderr` is any other value, the returned struct gains a key, `:stderr`.
`:stderr` is an object-oriented function that accepts the struct and returns standard error of the command as a
buffer after waiting for the task to finish. After it is called for the first time, it returns `nil`.
```
[args &named stderr]
(def /dev/null (unless stderr
(devnull)))
(def proc (os/spawn args :p {:out :pipe :err (cond
# If `stderr` is false or nil
(not (nil? /dev/null))
/dev/null
# If `stderr` is core/file or core/stream
(let [stderr-type (type stderr)]
(or (= stderr-type :core/file)
(= stderr-type :core/stream)))
stderr
# If `stderr` is any other value
:pipe)}))
(def {:out out :err err} proc)
(def exit-code (ev/chan 1))
(def stderr-ch (when err
(ev/chan 1)))
(when stderr-ch
(ev/spawn
(ev/give stderr-ch (ev/read err :all))))
# lines channel must have capacity of 0. Otherwise, it can be closed before `ev/take` can take the last line.
(def lines (ev/chan))
(defn fetch-lines
[chunk]
# if-let breaks tail call optimization
(def idx (string/find "\n" chunk))
(if idx
(do
# Give the first line
(ev/give lines (buffer/slice chunk 0 idx))
# Eliminate the first line from chunk without creating a new buffer
(def idx+1 (inc idx))
(buffer/blit chunk chunk 0 idx+1)
(fetch-lines (buffer/popn chunk idx+1)))
(if (ev/read out 1024 chunk)
(fetch-lines chunk)
(when (not (empty? chunk))
(ev/give lines chunk)))))
(ev/spawn
(defer (do
(:close lines)
(when /dev/null
(:close /dev/null))
(ev/give exit-code (:close proc)))
(try
(when-let [chunk (ev/read out 1024)]
(fetch-lines chunk))
# :cancel causes an error.
([_]))))
{:chan lines
:fiber (fiber/new (fn recurse []
# if-let breaks tail call optimization
(def line (ev/take lines))
(when line
(yield line)
(recurse)))
:yi)
:cancel (fn [self &opt signal]
(default signal :term)
(try
(os/proc-kill proc false signal)
# after the process exit code is retrieved, `os/proc-kill` throws an error.
([_]))
(:close lines)
nil)
:exit-code (fn [self] (when-let [ec (ev/take exit-code)]
(ev/chan-close exit-code)
ec))
:stderr (when stderr-ch
(fn [self] (when-let [buf (ev/take stderr-ch)]
(ev/chan-close stderr-ch)
buf)))})
62 changes: 62 additions & 0 deletions spork/stream.janet
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
(defn lines
```
Returns a fiber that yields each line from a core/stream value. If separator is not specified, the default separator
is `\n`. If the stream is closed before the fiber yields all lines, an error is thrown from the stream.
```
[stream &opt separator]
(default separator "\n")
(defn yield-lines
[chunk]
# if-let breaks tail call optimization. when-let depends on if-let.
# https://github.com/janet-lang/janet/issues/1401
(def idx (string/find "\n" chunk))
(when idx
# Yield the first line
(yield (buffer/slice chunk 0 idx))
# Eliminate the first line from chunk without creating a new buffer
(def idx+1 (inc idx))
(buffer/blit chunk chunk 0 idx+1)
(yield-lines (buffer/popn chunk idx+1))))
(defn fetch-lines
[chunk]
(if (ev/read stream 1024 chunk)
(do
(yield-lines chunk)
(fetch-lines chunk))
(do
(yield-lines chunk)
(when (not (empty? chunk))
(yield chunk)))))
(coro (fetch-lines @"")))

(defn lines-channel
```
Returns a channel that gives each line from a core/stream value. An asynchronous task feeds lines to the channel. If
separator is not specified, the default separator is `\n`. To make sure that the task is finished, drain all lines
from the channel, or close the stream and the channel. Otherwise, the task remains frozen in the background. The
channel gives `nil` after one of those conditions applies.
* end of stream is reached.
* stream or channel is closed.
This function ignores errors from closed streams and closed channels.
```
[stream &opt separator]
(def fiber (lines stream separator))
(def ch (ev/chan))
(defn give-lines
[]
# if-let breaks tail call optimization. when-let depends on if-let.
# https://github.com/janet-lang/janet/issues/1401
(def line (resume fiber))
(when line
(do
(ev/give ch line)
(give-lines))))
(ev/spawn
(try
(defer (:close ch)
(give-lines))
# Ignore errors caused by closed streams and closed channels.
([_])))
ch)

0 comments on commit f64e229

Please sign in to comment.