From 6b87148a3e7765d0b9f0b5524d106337331ea24d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Nison?= Date: Fri, 5 Apr 2019 16:55:52 +0100 Subject: [PATCH] Refactors pipes --- packages/berry-shell/sources/index.ts | 252 ++++++++----------------- packages/berry-shell/sources/pipe.ts | 260 ++++++++++++++++++++++++++ 2 files changed, 334 insertions(+), 178 deletions(-) create mode 100644 packages/berry-shell/sources/pipe.ts diff --git a/packages/berry-shell/sources/index.ts b/packages/berry-shell/sources/index.ts index bd665d70b039..2c97a8efd0b8 100644 --- a/packages/berry-shell/sources/index.ts +++ b/packages/berry-shell/sources/index.ts @@ -1,11 +1,12 @@ import {xfs, NodeFS} from '@berry/fslib'; import {CommandSegment, CommandChain, CommandLine, ShellLine, parseShell} from '@berry/parsers'; -import crossSpawn from 'cross-spawn'; import {posix} from 'path'; import {PassThrough, Readable, Stream, Writable} from 'stream'; +import {Handle, ProtectedStream, Stdio, start, makeBuiltin, makeProcess} from './pipe'; + export type UserOptions = { - builtins: {[key: string]: UserBuiltin}, + builtins: {[key: string]: ShellBuiltin}, cwd: string, env: {[key: string]: string | undefined}, stdin: Readable, @@ -14,21 +15,11 @@ export type UserOptions = { variables: {[key: string]: string}, }; -export type UserBuiltin = ( - args: Array, - opts: ShellOptions, - state: ShellState, -) => Promise; - export type ShellBuiltin = ( args: Array, opts: ShellOptions, state: ShellState, - leftMost: boolean, -) => Promise<{ - stdin: Writable | null, - promise: Promise, -}>; +) => Promise; export type ShellOptions = { args: Array, @@ -48,41 +39,6 @@ export type ShellState = { variables: {[key: string]: string}, }; -function makeBuiltin(builtin: (args: Array, opts: ShellOptions, state: ShellState) => Promise): ShellBuiltin { - return async (args: Array, opts: ShellOptions, state: ShellState, leftMost: boolean) => { - const stdin = !leftMost - ? new PassThrough() - : null; - - if (stdin !== null) - state = {... state, stdin}; - - const close = () => { - if (state.stdin !== opts.initialStdin) { - // @ts-ignore - state.stdin.end(); - } - - if (state.stdout !== opts.initialStdout) - state.stdout.end(); - if (state.stderr !== opts.initialStderr) { - state.stderr.end(); - } - }; - - return { - stdin, - promise: builtin(args, opts, state).then(result => { - close(); - return result; - }, error => { - close(); - throw error; - }), - }; - }; -} - function cloneState(state: ShellState, mergeWith: Partial = {}) { const newState = {... state, ... mergeWith}; @@ -93,7 +49,7 @@ function cloneState(state: ShellState, mergeWith: Partial = {}) { } const BUILTINS = new Map([ - [`cd`, makeBuiltin(async ([target, ... rest]: Array, opts: ShellOptions, state: ShellState) => { + [`cd`, async ([target, ... rest]: Array, opts: ShellOptions, state: ShellState) => { const resolvedTarget = posix.resolve(state.cwd, NodeFS.toPortablePath(target)); const stat = await xfs.statPromise(resolvedTarget); @@ -104,86 +60,28 @@ const BUILTINS = new Map([ state.cwd = target; return 0; } - })], + }], - [`pwd`, makeBuiltin(async (args: Array, opts: ShellOptions, state: ShellState) => { + [`pwd`, async (args: Array, opts: ShellOptions, state: ShellState) => { state.stdout.write(`${NodeFS.fromPortablePath(state.cwd)}\n`); return 0; - })], + }], - [`true`, makeBuiltin(async (args: Array, opts: ShellOptions, state: ShellState) => { + [`true`, async (args: Array, opts: ShellOptions, state: ShellState) => { return 0; - })], + }], - [`false`, makeBuiltin(async (args: Array, opts: ShellOptions, state: ShellState) => { + [`false`, async (args: Array, opts: ShellOptions, state: ShellState) => { return 1; - })], + }], - [`exit`, makeBuiltin(async ([code, ... rest]: Array, opts: ShellOptions, state: ShellState) => { + [`exit`, async ([code, ... rest]: Array, opts: ShellOptions, state: ShellState) => { return state.exitCode = parseInt(code, 10); - })], + }], - [`echo`, makeBuiltin(async (args: Array, opts: ShellOptions, state: ShellState) => { + [`echo`, async (args: Array, opts: ShellOptions, state: ShellState) => { state.stdout.write(`${args.join(` `)}\n`); return 0; - })], - - [`command`, async ([ident, ... rest]: Array, opts: ShellOptions, state: ShellState, leftMost: boolean) => { - if (typeof ident === `undefined`) - return makeBuiltin(async () => 0)([], opts, state, leftMost); - - const stdio: Array = [state.stdin, state.stdout, state.stderr]; - const isUserStream = (stream: Stream) => stream instanceof PassThrough; - - if (isUserStream(state.stdin) || !leftMost) - stdio[0] = `pipe`; - if (isUserStream(state.stdout)) - stdio[1] = `pipe`; - if (isUserStream(state.stderr)) - stdio[2] = `pipe`; - - const subprocess = crossSpawn(ident, rest, { - cwd: NodeFS.fromPortablePath(state.cwd), - env: state.environment, - stdio, - }); - - if (isUserStream(state.stdin) && !leftMost) - state.stdin.pipe(subprocess.stdin); - if (isUserStream(state.stdout)) - subprocess.stdout.pipe(state.stdout); - if (isUserStream(state.stderr)) - subprocess.stderr.pipe(state.stderr); - - return { - stdin: subprocess.stdin, - promise: new Promise(resolve => { - subprocess.on(`error`, error => { - // @ts-ignore - switch (error.code) { - case `ENOENT`: { - state.stderr.write(`command not found: ${ident}\n`); - resolve(127); - } break; - case `EACCESS`: { - state.stderr.write(`permission denied: ${ident}\n`); - resolve(128); - } break; - default: { - state.stderr.write(`uncaught error: ${error.message}\n`); - resolve(1); - } break; - } - }); - subprocess.on(`exit`, code => { - if (code !== null) { - resolve(code); - } else { - resolve(129); - } - }); - }), - }; }], ]); @@ -306,110 +204,108 @@ async function interpolateArguments(commandArgs: Array>, o * $ cat hello | grep world | grep -v foobar */ -function makeCommandAction(args: Array, opts: ShellOptions) { +function makeCommandAction(args: Array, opts: ShellOptions, state: ShellState) { if (!opts.builtins.has(args[0])) args = [`command`, ... args]; const [name, ... rest] = args; + if (name === `command`) { + return makeProcess(rest[0], rest.slice(1), { + cwd: NodeFS.fromPortablePath(state.cwd), + env: state.environment, + }); + } const builtin = opts.builtins.get(name); if (typeof builtin === `undefined`) throw new Error(`Assertion failed: A builtin should exist for "${name}"`) - return async (state: ShellState, leftMost: boolean) => { - const {stdin, promise} = await builtin(rest, opts, state, leftMost); - return {stdin, promise}; - }; + return makeBuiltin(async ({stdin, stdout, stderr}) => { + state.stdin = stdin; + state.stdout = stdout; + state.stderr = stderr; + + return await builtin(rest, opts, state); + }); } -function makeSubshellAction(ast: ShellLine, opts: ShellOptions) { - return async (state: ShellState, mustPipe: boolean) => { +function makeSubshellAction(ast: ShellLine, opts: ShellOptions, state: ShellState) { + return (stdio: Stdio) => { const stdin = new PassThrough(); const promise = executeShellLine(ast, opts, cloneState(state, {stdin})); - return {stdin: stdin as Writable, promise}; + return {stdin, promise}; }; } async function executeCommandChain(node: CommandChain, opts: ShellOptions, state: ShellState) { const parts = []; - // leftMost we interpolate all the commands (we don't interpolate subshells - // because they operate in their own contexts and are allowed to define - // new internal variables) - let current: CommandChain | null = node; let pipeType = null; + let execution: Handle | null = null; + while (current) { - let action; + // Only the final segment is allowed to modify the shell state; all the + // other ones are isolated + const activeState = current.then + ? {... state} + : state; + let action; switch (current.type) { case `command`: { - action = makeCommandAction(await interpolateArguments(current.args, opts, state), opts); + action = makeCommandAction(await interpolateArguments(current.args, opts, state), opts, activeState); } break; case `subshell`: { - action = makeSubshellAction(current.subshell, opts); + // We don't interpolate the subshell because it will be recursively + // interpolated within its own context + action = makeSubshellAction(current.subshell, opts, activeState); } break; } if (typeof action === `undefined`) throw new Error(`Assertion failed: An action should have been generated`); - parts.push({action, pipeType}); + if (pipeType === null) { + // If we're processing the left-most segment of the command, we start a + // new execution pipeline + execution = start(action, { + stdin: new ProtectedStream(activeState.stdin), + stdout: new ProtectedStream(activeState.stdout), + stderr: new ProtectedStream(activeState.stderr), + }); + } else { + if (execution === null) + throw new Error(`The execution pipeline should have been setup`); + + // Otherwise, depending on the exaxct pipe type, we either pipe stdout + // only or stdout and stderr + switch (pipeType) { + case `|`: { + execution = execution.pipeTo(action); + } break; + + case `|&`: { + execution = execution.pipeTo(action); + } break; + } + } - if (typeof current.then !== `undefined`) { + if (current.then) { pipeType = current.then.type; current = current.then.chain; } else { current = null; - pipeType = null; - } - } - - // Note that the execution starts from the right-most command and - // progressively moves towards the left-most command. We run them in this - // order because otherwise we would risk a race condition where (let's - // use A | B as example) A would start writing before B is ready, which - // could cause the pipe buffer to overflow and some writes to be lost. - - let stdout = state.stdout; - let stderr = state.stderr; - - const promises = []; - - for (let t = parts.length - 1; t >= 0; --t) { - const {action, pipeType} = parts[t]; - const {stdin, promise} = await action(Object.assign(state, {stdout, stderr}), t === 0); - - promises.push(promise); - - switch (pipeType) { - case null: { - // no pipe! - } break; - - case `|`: { - if (stdin === null) - throw new Error(`Assertion failed: The pipe is expected to return a writable stream`); - - stdout = stdin; - } break; - - case `|&`: { - if (stdin === null) - throw new Error(`Assertion failed: The pipe is expected to return a writable stream`); - - stdout = stdin; - stderr = stdin; - } break; } } - const exitCodes = await Promise.all(promises); + if (execution === null) + throw new Error(`Assertion failed: The execution pipeline should have been setup`); - return exitCodes[exitCodes.length - 1]; + return await execution.run(); } /** @@ -542,8 +438,8 @@ export async function execute(command: string, args: Array = [], { normalizedEnv[key] = value; const normalizedBuiltins = new Map(BUILTINS); - for (const [key, action] of Object.entries(builtins)) - normalizedBuiltins.set(key, makeBuiltin(action)); + for (const [key, builtin] of Object.entries(builtins)) + normalizedBuiltins.set(key, builtin); const ast = parseShell(command); diff --git a/packages/berry-shell/sources/pipe.ts b/packages/berry-shell/sources/pipe.ts new file mode 100644 index 000000000000..121511c2491b --- /dev/null +++ b/packages/berry-shell/sources/pipe.ts @@ -0,0 +1,260 @@ +import crossSpawn from 'cross-spawn'; +import EventEmitter from 'events'; +import {PassThrough, Readable, Writable} from 'stream'; + +enum Pipe { + STDOUT = 0b01, + STDERR = 0b10, +}; + +export type Stdio = [ + any, + any, + any +]; + +type ProcessImplementation = ( + stdio: Stdio, +) => { + stdin: Writable, + promise: Promise, +}; + +function nextTick() { + return new Promise(resolve => { + process.nextTick(resolve); + }); +} + +export function makeProcess(name: string, args: Array, opts: any): ProcessImplementation { + return (stdio: Stdio) => { + const stdin = stdio[0]; + + const stdout = stdio[1] instanceof PassThrough + ? `pipe` + : stdio[1]; + + const stderr = stdio[2] instanceof PassThrough + ? `pipe` + : stdio[2]; + + const child = crossSpawn(name, args, {... opts, stdio: [ + stdin, + stdout, + stderr, + ]}); + + if (stdout !== stdio[1]) + child.stdout.pipe(stdio[1]); + if (stdout !== stdio[2]) + child.stdout.pipe(stdio[2]); + + return { + stdin: child.stdin, + promise: new Promise(resolve => { + child.on(`error`, error => { + // @ts-ignore + switch (error.code) { + case `ENOENT`: { + stdio[2].write(`command not found: ${name}\n`); + resolve(127); + } break; + case `EACCESS`: { + stdio[2].write(`permission denied: ${name}\n`); + resolve(128); + } break; + default: { + stdio[2].write(`uncaught error: ${error.message}\n`); + resolve(1); + } break; + } + }); + + child.on(`exit`, code => { + if (code !== null) { + resolve(code); + } else { + resolve(129); + } + }); + }), + }; + }; +} + +export function makeBuiltin(builtin: (opts: any) => Promise): ProcessImplementation { + return (stdio: Stdio) => { + const stdin = stdio[0] === `pipe` + ? new PassThrough() + : stdio[0]; + + return { + stdin, + promise: nextTick().then(() => builtin({ + stdin, + stdout: stdio[1], + stderr: stdio[2], + })), + }; + }; +} + +interface StreamLock { + close(): void; + get(): StreamType; +} + +export class ProtectedStream implements StreamLock { + private stream: StreamType; + + constructor(stream: StreamType) { + this.stream = stream; + } + + close() { + // Ignore close request + } + + get() { + return this.stream; + } +} + +class PipeStream implements StreamLock { + private stream: Writable | null = null; + + close() { + if (this.stream === null) { + throw new Error(`Assertion failed: No stream attached`); + } else { + this.stream.end(); + } + } + + attach(stream: Writable) { + this.stream = stream; + } + + get() { + if (this.stream === null) { + throw new Error(`Assertion failed: No stream attached`); + } else { + return this.stream; + } + } +} + +type StartOptions = { + stdin: StreamLock, + stdout: StreamLock, + stderr: StreamLock, +}; + +export class Handle { + private ancestor: Handle | null; + private implementation: ProcessImplementation; + + private stdin: StreamLock | null = null; + private stdout: StreamLock | null = null; + private stderr: StreamLock | null = null; + + private pipe: PipeStream | null = null; + + static start(implementation: ProcessImplementation, {stdin, stdout, stderr}: StartOptions) { + const chain = new Handle(null, implementation); + + chain.stdin = stdin; + chain.stdout = stdout; + chain.stderr = stderr; + + return chain; + } + + constructor(ancestor: Handle | null, implementation: ProcessImplementation) { + this.ancestor = ancestor; + this.implementation = implementation; + } + + pipeTo(implementation: ProcessImplementation, source = Pipe.STDOUT) { + const next = new Handle(this, implementation); + + const pipe = new PipeStream(); + next.pipe = pipe; + + next.stdout = this.stdout; + next.stderr = this.stderr; + + if ((source & Pipe.STDOUT) === Pipe.STDOUT) { + this.stdout = pipe; + } else if (this.ancestor !== null) { + this.stderr = this.ancestor.stdout; + } + + if ((source & Pipe.STDERR) === Pipe.STDERR) { + this.stderr = pipe; + } else if (this.ancestor !== null) { + this.stderr = this.ancestor.stderr; + } + + return next; + } + + async exec() { + const stdio: Stdio = [ + `ignore`, + `ignore`, + `ignore`, + ]; + + if (this.pipe) { + stdio[0] = `pipe`; + } else { + if (this.stdin === null) { + throw new Error(`Assertion failed: No input stream registered`); + } else { + stdio[0] = this.stdin.get(); + } + } + + let stdoutLock: StreamLock; + if (this.stdout === null) { + throw new Error(`Assertion failed: No output stream registered`); + } else { + stdoutLock = this.stdout; + stdio[1] = stdoutLock.get(); + } + + let stderrLock: StreamLock; + if (this.stderr === null) { + throw new Error(`Assertion failed: No error stream registered`); + } else { + stderrLock = this.stderr; + stdio[2] = stderrLock.get(); + } + + const child = this.implementation(stdio); + + if (this.pipe) + this.pipe.attach(child.stdin); + + return await child.promise.then(code => { + stdoutLock.close(); + stderrLock.close(); + + return code; + }); + } + + async run() { + const promises = []; + for (let handle: Handle | null = this; handle; handle = handle.ancestor) + promises.push(handle.exec()); + + const exitCodes = await Promise.all(promises); + return exitCodes[0]; + } +} + +export function start(p: ProcessImplementation, opts: StartOptions) { + return Handle.start(p, opts); +}