Skip to content

Commit

Permalink
fix: autorun halted processes on pipe run (#951)
Browse files Browse the repository at this point in the history
relates #949
  • Loading branch information
antongolub authored Nov 23, 2024
1 parent 15bb135 commit 8900e45
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 11 deletions.
4 changes: 2 additions & 2 deletions .size-limit.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
"name": "zx/core",
"path": ["build/core.cjs", "build/util.cjs", "build/vendor-core.cjs"],
"limit": "71.1 kB",
"limit": "72 kB",
"brotli": false,
"gzip": false
},
Expand Down Expand Up @@ -30,7 +30,7 @@
{
"name": "all",
"path": "build/*",
"limit": "833.6 kB",
"limit": "835 kB",
"brotli": false,
"gzip": false
}
Expand Down
32 changes: 23 additions & 9 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { type AsyncHook, AsyncLocalStorage, createHook } from 'node:async_hooks'
import { type Readable, type Writable } from 'node:stream'
import { inspect } from 'node:util'
import { EOL as _EOL } from 'node:os'
import { EventEmitter } from 'node:events'
import {
exec,
buildCmd,
Expand Down Expand Up @@ -205,6 +206,10 @@ export class ProcessPromise extends Promise<ProcessOutput> {
private _resolved = false
private _halted?: boolean
private _piped = false
private _pipedFrom?: ProcessPromise
private _run = false
private _ee = new EventEmitter()
private _stdin = new VoidStream()
private _zurk: ReturnType<typeof exec> | null = null
private _output: ProcessOutput | null = null
private _reject: Resolve = noop
Expand All @@ -225,7 +230,10 @@ export class ProcessPromise extends Promise<ProcessOutput> {
}

run(): ProcessPromise {
if (this.child) return this // The _run() can be called from a few places.
if (this._run) return this // The _run() can be called from a few places.
this._halted = false
this._run = true
this._pipedFrom?.run()

const $ = this._snapshot
const self = this
Expand Down Expand Up @@ -255,9 +263,11 @@ export class ProcessPromise extends Promise<ProcessOutput> {
spawn: $.spawn,
spawnSync: $.spawnSync,
store: $.store,
stdin: self._stdin,
stdio: self._stdio ?? $.stdio,
sync: $[SYNC],
detached: $.detached,
ee: self._ee,
run: (cb) => cb(),
on: {
start: () => {
Expand Down Expand Up @@ -326,20 +336,18 @@ export class ProcessPromise extends Promise<ProcessOutput> {
...args: any[]
): (Writable & PromiseLike<Writable>) | ProcessPromise {
if (isStringLiteral(dest, ...args))
return this.pipe($(dest as TemplateStringsArray, ...args))
return this.pipe($({ halt: true })(dest as TemplateStringsArray, ...args))
if (isString(dest))
throw new Error('The pipe() method does not take strings. Forgot $?')

this._piped = true
const { store, ee, fulfilled } = this._zurk!
const ee = this._ee
const from = new VoidStream()
const fill = () => {
for (const chunk of store.stdout) {
from.write(chunk)
}
for (const chunk of this._zurk!.store.stdout) from.write(chunk)
}

if (fulfilled) {
if (this._resolved) {
fill()
from.end()
} else {
Expand All @@ -354,8 +362,14 @@ export class ProcessPromise extends Promise<ProcessOutput> {
}

if (dest instanceof ProcessPromise) {
this.catch((e) => (dest.isNothrow() ? noop : dest._reject(e)))
from.pipe(dest.stdin)
dest._pipedFrom = this

if (dest.isHalted() && this.isHalted()) {
ee.once('start', () => from.pipe(dest.run()._stdin))
} else {
this.catch((e) => (dest.isNothrow() ? noop : dest._reject(e)))
from.pipe(dest.run()._stdin)
}
return dest
}
from.once('end', () => dest.emit('end-piped-from')).pipe(dest)
Expand Down
26 changes: 26 additions & 0 deletions test/core.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,32 @@ describe('core', () => {
assert.equal(o2, 'HELLO WORLD\n')
})

test('$ > $ halted', async () => {
const $h = $({ halt: true })
const { stdout } = await $`echo "hello"`
.pipe($h`awk '{print $1" world"}'`)
.pipe($h`tr '[a-z]' '[A-Z]'`)

assert.equal(stdout, 'HELLO WORLD\n')
})

test('$ halted > $ halted', async () => {
const $h = $({ halt: true })
const { stdout } = await $h`echo "hello"`
.pipe($h`awk '{print $1" world"}'`)
.pipe($h`tr '[a-z]' '[A-Z]'`)
.run()

assert.equal(stdout, 'HELLO WORLD\n')
})

test('$ halted > $ literal', async () => {
const { stdout } = await $({ halt: true })`echo "hello"`
.pipe`awk '{print $1" world"}'`.pipe`tr '[a-z]' '[A-Z]'`.run()

assert.equal(stdout, 'HELLO WORLD\n')
})

test('$ > stream', async () => {
const file = tempfile()
const fileStream = fs.createWriteStream(file)
Expand Down

0 comments on commit 8900e45

Please sign in to comment.