From 8717849b31f03c37b7640d00f1e8b12dc62cfd40 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Fri, 26 Jan 2024 18:02:22 -0500 Subject: [PATCH 1/2] feat: support pipe sequences --- mod.test.ts | 15 +++ mod.ts | 9 +- src/command.ts | 35 +++++-- src/commands/cat.ts | 12 ++- src/pipes.ts | 42 ++++++++ src/shell.ts | 235 ++++++++++++++++++++++++++++++++------------ 6 files changed, 271 insertions(+), 77 deletions(-) diff --git a/mod.test.ts b/mod.test.ts index a607949..a025bb9 100644 --- a/mod.test.ts +++ b/mod.test.ts @@ -1085,6 +1085,21 @@ Deno.test("command .lines()", async () => { assertEquals(result, ["1", "2"]); }); +Deno.test("piping in command", async () => { + { + const result = await $`echo 1 | cat -`.text(); + assertEquals(result, "1"); + } + { + const result = await $`echo 1 && echo 2 | cat -`.text(); + assertEquals(result, "1\n2"); + } + { + const result = await $`echo 1 || echo 2 | cat -`.text(); + assertEquals(result, "1"); + } +}); + Deno.test("redirects", async () => { await withTempDir(async (tempDir) => { // absolute diff --git a/mod.ts b/mod.ts index f3b3941..e6b9e7b 100644 --- a/mod.ts +++ b/mod.ts @@ -33,7 +33,14 @@ import { createPathRef, PathRef } from "./src/path.ts"; export type { Delay, DelayIterator } from "./src/common.ts"; export { FsFileWrapper, PathRef } from "./src/path.ts"; export type { ExpandGlobOptions, PathSymlinkOptions, SymlinkOptions, WalkEntry, WalkOptions } from "./src/path.ts"; -export { CommandBuilder, CommandChild, CommandResult, KillSignal, KillSignalController } from "./src/command.ts"; +export { + CommandBuilder, + CommandChild, + CommandResult, + KillSignal, + KillSignalController, + type KillSignalListener, +} from "./src/command.ts"; export type { CommandContext, CommandHandler, CommandPipeReader, CommandPipeWriter } from "./src/command_handler.ts"; export type { Closer, Reader, ShellPipeReaderKind, ShellPipeWriterKind, WriterSync } from "./src/pipes.ts"; export type { diff --git a/src/command.ts b/src/command.ts index 477d0b3..9b983a1 100644 --- a/src/command.ts +++ b/src/command.ts @@ -1075,7 +1075,7 @@ function validateCommandName(command: string) { const SHELL_SIGNAL_CTOR_SYMBOL = Symbol(); interface KillSignalState { - aborted: boolean; + abortedCode: number; listeners: ((signal: Deno.Signal) => void)[]; } @@ -1086,7 +1086,7 @@ export class KillSignalController { constructor() { this.#state = { - aborted: false, + abortedCode: 0, listeners: [], }; this.#killSignal = new KillSignal(SHELL_SIGNAL_CTOR_SYMBOL, this.#state); @@ -1106,6 +1106,9 @@ export class KillSignalController { } } +/** Listener for when a KillSignal is killed. */ +export type KillSignalListener = (signal: Deno.Signal) => void; + /** Similar to `AbortSignal`, but for `Deno.Signal`. * * A `KillSignal` is considered aborted if its controller @@ -1128,7 +1131,7 @@ export class KillSignal { * SIGKILL, SIGABRT, SIGQUIT, SIGINT, or SIGSTOP */ get aborted(): boolean { - return this.#state.aborted; + return this.#state.abortedCode !== 0; } /** @@ -1147,39 +1150,51 @@ export class KillSignal { }; } - addListener(listener: (signal: Deno.Signal) => void) { + addListener(listener: KillSignalListener) { this.#state.listeners.push(listener); } - removeListener(listener: (signal: Deno.Signal) => void) { + removeListener(listener: KillSignalListener) { const index = this.#state.listeners.indexOf(listener); if (index >= 0) { this.#state.listeners.splice(index, 1); } } + + /** @internal - DO NOT USE. Very unstable. Not sure about this. */ + get _abortedExitCode() { + return this.#state.abortedCode; + } } function sendSignalToState(state: KillSignalState, signal: Deno.Signal) { - if (signalCausesAbort(signal)) { - state.aborted = true; + const code = getSignalAbortCode(signal); + if (code !== 0) { + state.abortedCode = code; } for (const listener of state.listeners) { listener(signal); } } -function signalCausesAbort(signal: Deno.Signal) { +function getSignalAbortCode(signal: Deno.Signal) { // consider the command aborted if the signal is any one of these switch (signal) { case "SIGTERM": + return 128 + 15; case "SIGKILL": + return 128 + 9; case "SIGABRT": + return 128 + 6; case "SIGQUIT": + return 128 + 3; case "SIGINT": + return 128 + 2; case "SIGSTOP": - return true; + // should SIGSTOP be considered an abort? + return 128 + 19; default: - return false; + return 0; } } diff --git a/src/commands/cat.ts b/src/commands/cat.ts index e386bf9..93392e4 100644 --- a/src/commands/cat.ts +++ b/src/commands/cat.ts @@ -21,17 +21,18 @@ export async function catCommand( async function executeCat(context: CommandContext) { const flags = parseCatArgs(context.args); - let exit_code = 0; + let exitCode = 0; const buf = new Uint8Array(1024); for (const path of flags.paths) { if (path === "-") { // read from stdin if (typeof context.stdin === "object") { // stdin is a Reader - while (true) { + while (!context.signal.aborted) { const size = await context.stdin.read(buf); if (!size || size === 0) break; else context.stdout.writeSync(buf.slice(0, size)); } + exitCode = context.signal._abortedExitCode; } else { const _assertValue: "null" | "inherit" = context.stdin; throw new Error(`not supported. stdin was '${context.stdin}'`); @@ -40,21 +41,22 @@ async function executeCat(context: CommandContext) { let file; try { file = await Deno.open(pathUtils.join(context.cwd, path), { read: true }); - while (true) { + while (!context.signal.aborted) { // NOTE: rust supports cancellation here const size = file.readSync(buf); if (!size || size === 0) break; else context.stdout.writeSync(buf.slice(0, size)); } + exitCode = context.signal._abortedExitCode; } catch (err) { context.stderr.writeLine(`cat ${path}: ${err}`); - exit_code = 1; + exitCode = 1; } finally { if (file) file.close(); } } } - return exit_code; + return exitCode; } export function parseCatArgs(args: string[]): CatFlags { diff --git a/src/pipes.ts b/src/pipes.ts index 84c6edd..f540862 100644 --- a/src/pipes.ts +++ b/src/pipes.ts @@ -187,3 +187,45 @@ export class PipedBuffer implements WriterSync { this.#hasSet = true; } } + +export class PipeSequencePipe implements Reader, WriterSync { + #inner = new Buffer(); + #readListener: (() => void) | undefined; + #closed = false; + + close() { + this.#readListener?.(); + this.#closed = true; + } + + writeSync(p: Uint8Array): number { + const value = this.#inner.writeSync(p); + if (this.#readListener !== undefined) { + const listener = this.#readListener; + this.#readListener = undefined; + listener(); + } + return value; + } + + read(p: Uint8Array): Promise { + if (this.#readListener !== undefined) { + // doesn't support multiple read listeners at the moment + throw new Error("Misuse of PipeSequencePipe"); + } + + if (this.#inner.length === 0) { + if (this.#closed) { + return Promise.resolve(null); + } else { + return new Promise((resolve) => { + this.#readListener = () => { + resolve(this.#inner.readSync(p)); + }; + }); + } + } else { + return Promise.resolve(this.#inner.readSync(p)); + } + } +} diff --git a/src/shell.ts b/src/shell.ts index 9d66ffd..9e6d631 100644 --- a/src/shell.ts +++ b/src/shell.ts @@ -5,6 +5,7 @@ import { DenoWhichRealEnvironment, fs, path, which } from "./deps.ts"; import { wasmInstance } from "./lib/mod.ts"; import { NullPipeWriter, + PipeSequencePipe, Reader, ShellPipeReaderKind, ShellPipeWriter, @@ -83,6 +84,15 @@ export interface TaggedSequentialList extends SequentialList { kind: "sequentialList"; } +export interface PipeSequence { + kind: "pipeSequence"; + current: Command; + op: PipeSequenceOp; + next: PipelineInner; +} + +export type PipeSequenceOp = "stdout" | "stdoutstderr"; + export type RedirectFd = RedirectFdFd | RedirectFdStdoutStderr; export interface RedirectFdFd { @@ -94,11 +104,6 @@ export interface RedirectFdStdoutStderr { kind: "stdoutStderr"; } -export interface PipeSequence { - kind: "pipeSequence"; - // todo... -} - export type RedirectOp = "redirect" | "append"; export interface Redirect { @@ -248,6 +253,16 @@ function cloneEnv(env: Env) { // } // } +interface ContextOptions { + stdin: CommandPipeReader; + stdout: ShellPipeWriter; + stderr: ShellPipeWriter; + env: Env; + commands: Record; + shellVars: Record; + signal: KillSignal; +} + export class Context { stdin: CommandPipeReader; stdout: ShellPipeWriter; @@ -257,15 +272,7 @@ export class Context { #commands: Record; #signal: KillSignal; - constructor(opts: { - stdin: CommandPipeReader; - stdout: ShellPipeWriter; - stderr: ShellPipeWriter; - env: Env; - commands: Record; - shellVars: Record; - signal: KillSignal; - }) { + constructor(opts: ContextOptions) { this.stdin = opts.stdin; this.stdout = opts.stdout; this.stderr = opts.stderr; @@ -382,6 +389,18 @@ export class Context { }; } + withInner(opts: Partial>) { + return new Context({ + stdin: opts.stdin ?? this.stdin, + stdout: opts.stdout ?? this.stdout, + stderr: opts.stderr ?? this.stderr, + env: this.#env.clone(), + commands: { ...this.#commands }, + shellVars: { ...this.#shellVars }, + signal: this.#signal, + }); + } + clone() { return new Context({ stdin: this.stdin, @@ -580,7 +599,11 @@ function executePipelineInner(inner: PipelineInner, context: Context): Promise, signal: AbortSignal) { - const abortedPromise = new Promise((resolve) => { - signal.addEventListener("abort", listener); - function listener() { - signal.removeEventListener("abort", listener); - resolve(); - } - }); - const writer = writable.getWriter(); - try { - while (!signal.aborted) { - const buffer = new Uint8Array(1024); - const length = await Promise.race([abortedPromise, reader.read(buffer)]); - if (length === 0 || length == null) { - break; - } - await writer.write(buffer.subarray(0, length)); - } - } finally { - await writer.close(); + function getStdioStringValue(value: ShellPipeReaderKind | ShellPipeWriterKind) { + if (value === "inheritPiped") { + return "piped"; + } else if (value === "inherit" || value === "null" || value === "piped") { + return value; + } else { + return "piped"; } } +} - async function pipeReaderToWriterSync( - readable: ReadableStream, - writer: WriterSync, - signal: AbortSignal, - ) { - const reader = readable.getReader(); +async function pipeReaderToWritable(reader: Reader, writable: WritableStream, signal: AbortSignal) { + const abortedPromise = new Promise((resolve) => { + signal.addEventListener("abort", listener); + function listener() { + signal.removeEventListener("abort", listener); + resolve(); + } + }); + const writer = writable.getWriter(); + try { while (!signal.aborted) { - const result = await reader.read(); - if (result.done) { + const buffer = new Uint8Array(1024); + const length = await Promise.race([abortedPromise, reader.read(buffer)]); + if (length === 0 || length == null) { break; } - writeAllSync(result.value); + await writer.write(buffer.subarray(0, length)); } + } finally { + await writer.close(); + } +} - function writeAllSync(arr: Uint8Array) { - let nwritten = 0; - while (nwritten < arr.length && !signal.aborted) { - nwritten += writer.writeSync(arr.subarray(nwritten)); - } +async function pipeReadableToWriterSync( + readable: ReadableStream, + writer: WriterSync, + signal: AbortSignal | KillSignal, +) { + const reader = readable.getReader(); + while (!signal.aborted) { + const result = await reader.read(); + if (result.done) { + break; } + writeAllSync(result.value); } - function getStdioStringValue(value: ShellPipeReaderKind | ShellPipeWriterKind) { - if (value === "inheritPiped") { - return "piped"; - } else if (value === "inherit" || value === "null" || value === "piped") { - return value; - } else { - return "piped"; + function writeAllSync(arr: Uint8Array) { + let nwritten = 0; + while (nwritten < arr.length && !signal.aborted) { + nwritten += writer.writeSync(arr.subarray(nwritten)); + } + } +} + +async function pipeReaderToWriterSync( + reader: Reader, + writer: WriterSync, + signal: AbortSignal | KillSignal, +) { + const buffer = new Uint8Array(1024); + while (!signal.aborted) { + const bytesRead = await reader.read(buffer); + if (bytesRead == null || bytesRead === 0) { + break; + } + writeAllSync(buffer.slice(0, bytesRead)); + } + + function writeAllSync(arr: Uint8Array) { + let nwritten = 0; + while (nwritten < arr.length && !signal.aborted) { + nwritten += writer.writeSync(arr.subarray(nwritten)); + } + } +} + +function pipeCommandPipeReaderToWriterSync( + reader: CommandPipeReader, + writer: ShellPipeWriter, + signal: KillSignal, +) { + switch (reader) { + case "inherit": + return pipeReadableToWriterSync(Deno.stdin.readable, writer, signal); + case "null": + return Promise.resolve(); + default: { + return pipeReaderToWriterSync(reader, writer, signal); } } } @@ -929,6 +994,54 @@ async function resolveCommand(commandName: string, context: Context): Promise { + const waitTasks: Promise[] = []; + let lastOutput = context.stdin; + let nextInner: PipelineInner | undefined = sequence; + while (nextInner != null) { + switch (nextInner.kind) { + case "pipeSequence": + switch (nextInner.op) { + case "stdout": { + const buffer = new PipeSequencePipe(); + const newContext = context.withInner({ + stdout: new ShellPipeWriter("piped", buffer), + stdin: lastOutput, + }); + const commandPromise = executeCommand(nextInner.current, newContext); + waitTasks.push(commandPromise); + commandPromise.finally(() => { + buffer.close(); + }); + lastOutput = buffer; + break; + } + case "stdoutstderr": { + context.stderr.writeLine(`piping to both stdout and stderr is not implemented (ex. |&)`); + return resultFromCode(1); + } + default: { + const _assertNever: never = nextInner.op; + context.stderr.writeLine(`not implemented pipe sequence op: ${nextInner.op}`); + return resultFromCode(1); + } + } + nextInner = nextInner.next; + break; + case "command": + nextInner = undefined; + break; + } + } + waitTasks.push( + pipeCommandPipeReaderToWriterSync(lastOutput, context.stdout, context.signal).then(() => resultFromCode(0)), + ); + const results = await Promise.all(waitTasks); + // the second last result is the last command + const secondLastResult = results[results.length - 2]; + return secondLastResult; +} + async function parseShebangArgs(info: ShebangInfo, context: Context): Promise { function throwUnsupported(): never { throw new Error("Unsupported shebang. Please report this as a bug."); From 202f2fc8d2692f9f2421f489344441bc11ab22c8 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Fri, 26 Jan 2024 18:03:43 -0500 Subject: [PATCH 2/2] update --- src/command.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/command.ts b/src/command.ts index 9b983a1..5160f30 100644 --- a/src/command.ts +++ b/src/command.ts @@ -1169,7 +1169,7 @@ export class KillSignal { function sendSignalToState(state: KillSignalState, signal: Deno.Signal) { const code = getSignalAbortCode(signal); - if (code !== 0) { + if (code !== undefined) { state.abortedCode = code; } for (const listener of state.listeners) { @@ -1194,7 +1194,7 @@ function getSignalAbortCode(signal: Deno.Signal) { // should SIGSTOP be considered an abort? return 128 + 19; default: - return 0; + return undefined; } }