From ad81c5a4d118021453b1e8320afbae8cb167d023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Nison?= Date: Wed, 3 Apr 2019 19:52:57 +0100 Subject: [PATCH 1/9] Fixes tests --- packages/berry-shell/tests/shell.test.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/berry-shell/tests/shell.test.ts b/packages/berry-shell/tests/shell.test.ts index 5b3e9e9a6261..f5262d6c7903 100644 --- a/packages/berry-shell/tests/shell.test.ts +++ b/packages/berry-shell/tests/shell.test.ts @@ -70,9 +70,19 @@ describe(`Simple shell features`, () => { }); }); - it(`should pipe the result of a command into another`, async () => { - await expect(bufferResult(`echo hello world | wc -w | tr -d ' '`)).resolves.toMatchObject({ - stdout: `2\n`, + it(`should pipe the result of a command into another (two commands)`, async () => { + await expect(bufferResult(`echo hello world | node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().toUpperCase()))'`)).resolves.toMatchObject({ + stdout: `HELLO WORLD\n`, + }); + }); + + it(`should pipe the result of a command into another (three commands)`, async () => { + await expect(bufferResult([ + `echo hello world`, + `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().toUpperCase()))'`, + `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().replace(/./g, $0 => \`{\${$0}}\`)))'` + ].join(` | `))).resolves.toMatchObject({ + stdout: `{H}{E}{L}{L}{O}{ }{W}{O}{R}{L}{D}\n`, }); }); From c92fde5170375c443158a99979e424b4994ff684 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Nison?= Date: Wed, 3 Apr 2019 21:27:27 +0100 Subject: [PATCH 2/9] Fixes piping timing --- packages/berry-shell/sources/index.ts | 108 +++++++++++--------------- 1 file changed, 46 insertions(+), 62 deletions(-) diff --git a/packages/berry-shell/sources/index.ts b/packages/berry-shell/sources/index.ts index 53e562bdfb2e..b6610661b305 100644 --- a/packages/berry-shell/sources/index.ts +++ b/packages/berry-shell/sources/index.ts @@ -25,7 +25,8 @@ export type ShellBuiltin = ( args: Array, opts: ShellOptions, state: ShellState, - mustPipe: boolean, + leftMost: boolean, + rightMost: boolean, ) => Promise<{ stdin: Writable | null, promise: Promise, @@ -47,25 +48,33 @@ export type ShellState = { }; function makeBuiltin(builtin: (args: Array, opts: ShellOptions, state: ShellState) => Promise): ShellBuiltin { - return async (args: Array, opts: ShellOptions, state: ShellState, mustPipe: boolean) => { - if (!mustPipe) { - return { - stdin: null, - promise: builtin(args, opts, state), - }; - } else { - const stdin = new PassThrough(); - return { - stdin, - promise: builtin(args, opts, {... state, stdin}).then(result => { - stdin.end(); - return result; - }, error => { - stdin.end(); - throw error; - }), + return async (args: Array, opts: ShellOptions, state: ShellState, leftMost: boolean, rightMost: boolean) => { + const copy = {... state}; + + const stdin = !leftMost + ? copy.stdin = new PassThrough() + : null; + + const close = () => { + if (!leftMost) + state.stdin.end(); + + if (!rightMost) { + state.stdout.end(); + state.stderr.end(); } - } + }; + + return { + stdin, + promise: builtin(args, opts, state).then(result => { + close(); + return result; + }, error => { + close(); + throw error; + }), + }; }; } @@ -114,14 +123,14 @@ const BUILTINS = new Map([ return 0; })], - [`command`, async ([ident, ... rest]: Array, opts: ShellOptions, state: ShellState, mustPipe: boolean) => { + [`command`, async ([ident, ... rest]: Array, opts: ShellOptions, state: ShellState, leftMost: boolean, rightMost: boolean) => { if (typeof ident === `undefined`) - return makeBuiltin(async () => 0)([], opts, state, mustPipe); + return makeBuiltin(async () => 0)([], opts, state, leftMost, rightMost); const stdio: Array = [state.stdin, state.stdout, state.stderr]; const isUserStream = (stream: Stream) => stream instanceof PassThrough; - if (isUserStream(state.stdin) || mustPipe) + if (isUserStream(state.stdin) || !leftMost) stdio[0] = `pipe`; if (isUserStream(state.stdout)) stdio[1] = `pipe`; @@ -135,7 +144,7 @@ const BUILTINS = new Map([ stdio, }); - if (isUserStream(state.stdin) && !mustPipe) + if (isUserStream(state.stdin) && !leftMost) state.stdin.pipe(subprocess.stdin); if (isUserStream(state.stdout)) subprocess.stdout.pipe(state.stdout); @@ -174,19 +183,6 @@ const BUILTINS = new Map([ }], ]); -// Keep this function is sync with its implementation in: -// @berry/core/sources/miscUtils.ts -async function releaseAfterUseAsync(fn: () => Promise, cleanup?: () => any) { - if (!cleanup) - return await fn(); - - try { - return await fn(); - } finally { - await cleanup(); - } -} - async function executeBufferedSubshell(ast: ShellLine, opts: ShellOptions, state: ShellState) { const chunks: Array = []; const stdout = new PassThrough(); @@ -316,8 +312,8 @@ function makeCommandAction(args: Array, opts: ShellOptions) { if (typeof builtin === `undefined`) throw new Error(`Assertion failed: A builtin should exist for "${name}"`) - return async (state: ShellState, mustPipe: boolean) => { - const {stdin, promise} = await builtin(rest, opts, state, mustPipe); + return async (state: ShellState, leftMost: boolean, rightMost: boolean) => { + const {stdin, promise} = await builtin(rest, opts, state, leftMost, rightMost); return {stdin, promise}; }; } @@ -334,7 +330,7 @@ function makeSubshellAction(ast: ShellLine, opts: ShellOptions) { async function executeCommandChain(node: CommandChain, opts: ShellOptions, state: ShellState) { const parts = []; - // First we interpolate all the commands (we don't interpolate subshells + // leftMost we interpolate all the commands (we don't interpolate subshells // because they operate in their own contexts and are allowed to define // new internal variables) @@ -381,22 +377,7 @@ async function executeCommandChain(node: CommandChain, opts: ShellOptions, state for (let t = parts.length - 1; t >= 0; --t) { const {action, pipeType} = parts[t]; - const {stdin, promise} = await action(Object.assign(state, {stdout, stderr}), pipeType !== null); - - // If stdout has been piped (ie if the command we're execting isn't the - // right-most one), then we must close the pipe after our process has - // finished writing into it. We don't need to do this for the last command - // because this responsibility goes to the caller (otherwise we would risk - // closing the real stdout, which isn't meant to happen). - if (t !== parts.length - 1) { - const thisStdout = stdout; - - promise.then(() => { - thisStdout.end(); - }, () => { - thisStdout.end(); - }); - } + const {stdin, promise} = await action(Object.assign(state, {stdout, stderr}), t === 0, t === parts.length - 1); promises.push(promise); @@ -441,7 +422,8 @@ async function executeCommandLine(node: CommandLine, opts: ShellOptions, state: if (state.exitCode !== null) return state.exitCode; - // We must update $?, which always contains the exit code from the last command + // We must update $?, which always contains the exit code from + // the right-most command state.variables[`?`] = String(code); switch (node.then.type) { @@ -468,20 +450,21 @@ async function executeCommandLine(node: CommandLine, opts: ShellOptions, state: } async function executeShellLine(node: ShellLine, opts: ShellOptions, state: ShellState) { - let lastExitCode = 0; + let rightMostExitCode = 0; for (const command of node) { - lastExitCode = await executeCommandLine(command, opts, state); + rightMostExitCode = await executeCommandLine(command, opts, state); // If the execution aborted (usually through "exit"), we must bailout if (state.exitCode !== null) return state.exitCode; - // We must update $?, which always contains the exit code from the last command - state.variables[`?`] = String(lastExitCode); + // We must update $?, which always contains the exit code from + // the right-most command + state.variables[`?`] = String(rightMostExitCode); } - return lastExitCode; + return rightMostExitCode; } function locateArgsVariable(node: ShellLine): boolean { @@ -560,7 +543,8 @@ export async function execute(command: string, args: Array = [], { const ast = parseShell(command); - // If the shell line doesn't use the args, inject it at the end of the last command + // If the shell line doesn't use the args, inject it at the end of the + // right-most command if (!locateArgsVariable(ast) && ast.length > 0) { let command = ast[ast.length - 1]; while (command.then) From 31f6cf42a2ba626319e646714e6ac549fe72f698 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Nison?= Date: Wed, 3 Apr 2019 21:35:47 +0100 Subject: [PATCH 3/9] Disables the enoent test on Windows --- packages/berry-shell/tests/shell.test.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/berry-shell/tests/shell.test.ts b/packages/berry-shell/tests/shell.test.ts index f5262d6c7903..6e209727c890 100644 --- a/packages/berry-shell/tests/shell.test.ts +++ b/packages/berry-shell/tests/shell.test.ts @@ -1,6 +1,10 @@ import {execute} from '@berry/shell'; import {PassThrough} from 'stream'; +const ifNotWin32It = process.platform !== `win32` + ? it + : it.skip; + const bufferResult = async (command: string, args: Array = []) => { const stdout = new PassThrough(); const stderr = new PassThrough(); @@ -53,7 +57,7 @@ describe(`Simple shell features`, () => { }); }); - it(`should throw an error when a command doesn't exist`, async () => { + ifNotWin32It(`should throw an error when a command doesn't exist`, async () => { await expect(bufferResult(`this-command-doesnt-exist-sorry`)).resolves.toMatchObject({ exitCode: 127, stderr: `command not found: this-command-doesnt-exist-sorry\n`, From 847c0b3990eff6ab1b46f08adfe256a993d4031a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Nison?= Date: Wed, 3 Apr 2019 21:49:29 +0100 Subject: [PATCH 4/9] Adds a test that doesnt use builtins --- packages/berry-shell/tests/shell.test.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/berry-shell/tests/shell.test.ts b/packages/berry-shell/tests/shell.test.ts index 6e209727c890..cf265fa672cf 100644 --- a/packages/berry-shell/tests/shell.test.ts +++ b/packages/berry-shell/tests/shell.test.ts @@ -90,6 +90,16 @@ describe(`Simple shell features`, () => { }); }); + it(`should pipe the result of a command into another (no builtins)`, async () => { + await expect(bufferResult([ + `node -e 'process.stdout.write("hello world\\\\n")'`, + `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().toUpperCase()))'`, + `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().replace(/./g, $0 => \`{\${$0}}\`)))'` + ].join(` | `))).resolves.toMatchObject({ + stdout: `{H}{E}{L}{L}{O}{ }{W}{O}{R}{L}{D}\n`, + }); + }); + it(`should shortcut the right branch of a '||' when the left branch succeeds`, async () => { await expect(bufferResult(`true || echo failed`)).resolves.toMatchObject({ stdout: ``, From e04dbb7d2a7614253a9c6feb566cdcce25c56275 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Nison?= Date: Thu, 4 Apr 2019 00:06:18 +0100 Subject: [PATCH 5/9] More fixes --- packages/berry-shell/sources/index.ts | 32 ++++++++++++-------- packages/berry-shell/tests/shell.test.ts | 38 +++++++++++++++++++++--- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/packages/berry-shell/sources/index.ts b/packages/berry-shell/sources/index.ts index b6610661b305..5013ad094efb 100644 --- a/packages/berry-shell/sources/index.ts +++ b/packages/berry-shell/sources/index.ts @@ -26,7 +26,6 @@ export type ShellBuiltin = ( opts: ShellOptions, state: ShellState, leftMost: boolean, - rightMost: boolean, ) => Promise<{ stdin: Writable | null, promise: Promise, @@ -35,6 +34,9 @@ export type ShellBuiltin = ( export type ShellOptions = { args: Array, builtins: Map, + initialStdin: Readable, + initialStdout: Writable, + initialStderr: Writable, }; export type ShellState = { @@ -48,19 +50,22 @@ export type ShellState = { }; function makeBuiltin(builtin: (args: Array, opts: ShellOptions, state: ShellState) => Promise): ShellBuiltin { - return async (args: Array, opts: ShellOptions, state: ShellState, leftMost: boolean, rightMost: boolean) => { - const copy = {... state}; - + return async (args: Array, opts: ShellOptions, state: ShellState, leftMost: boolean) => { const stdin = !leftMost - ? copy.stdin = new PassThrough() + ? new PassThrough() : null; + if (stdin !== null) + state = {... state, stdin}; + const close = () => { - if (!leftMost) + if (state.stdin !== opts.initialStdin) state.stdin.end(); - if (!rightMost) { + if (state.stdout !== opts.initialStdout) state.stdout.end(); + + if (state.stderr !== opts.initialStderr) { state.stderr.end(); } }; @@ -123,9 +128,9 @@ const BUILTINS = new Map([ return 0; })], - [`command`, async ([ident, ... rest]: Array, opts: ShellOptions, state: ShellState, leftMost: boolean, rightMost: boolean) => { + [`command`, async ([ident, ... rest]: Array, opts: ShellOptions, state: ShellState, leftMost: boolean) => { if (typeof ident === `undefined`) - return makeBuiltin(async () => 0)([], opts, state, leftMost, rightMost); + return makeBuiltin(async () => 0)([], opts, state, leftMost); const stdio: Array = [state.stdin, state.stdout, state.stderr]; const isUserStream = (stream: Stream) => stream instanceof PassThrough; @@ -312,8 +317,8 @@ function makeCommandAction(args: Array, opts: ShellOptions) { if (typeof builtin === `undefined`) throw new Error(`Assertion failed: A builtin should exist for "${name}"`) - return async (state: ShellState, leftMost: boolean, rightMost: boolean) => { - const {stdin, promise} = await builtin(rest, opts, state, leftMost, rightMost); + return async (state: ShellState, leftMost: boolean) => { + const {stdin, promise} = await builtin(rest, opts, state, leftMost); return {stdin, promise}; }; } @@ -377,7 +382,7 @@ async function executeCommandChain(node: CommandChain, opts: ShellOptions, state for (let t = parts.length - 1; t >= 0; --t) { const {action, pipeType} = parts[t]; - const {stdin, promise} = await action(Object.assign(state, {stdout, stderr}), t === 0, t === parts.length - 1); + const {stdin, promise} = await action(Object.assign(state, {stdout, stderr}), t === 0); promises.push(promise); @@ -564,6 +569,9 @@ export async function execute(command: string, args: Array = [], { return await executeShellLine(ast, { args, builtins: normalizedBuiltins, + initialStdin: stdin, + initialStdout: stdout, + initialStderr: stderr, }, { cwd, environment: normalizedEnv, diff --git a/packages/berry-shell/tests/shell.test.ts b/packages/berry-shell/tests/shell.test.ts index cf265fa672cf..fb75d769a836 100644 --- a/packages/berry-shell/tests/shell.test.ts +++ b/packages/berry-shell/tests/shell.test.ts @@ -20,7 +20,24 @@ const bufferResult = async (command: string, args: Array = []) => { stderrChunks.push(chunk); }); - const exitCode = await execute(command, args, {stdout, stderr}); + const exitCode = await execute(command, args, {stdout, stderr, builtins: { + [`test-builtin`]: async (args, opts, state) => { + const stdinChunks = []; + + state.stdin.on(`data`, chunk => { + stdinChunks.push(chunk); + }); + + return await new Promise (resolve => { + state.stdin.on(`end`, () => { + const content = Buffer.concat(stdinChunks).toString().trim(); + state.stdout.write(`${content.replace(/(.)./g, `$1`)}\n`); + + resolve(0); + }); + }); + }, + }}); return { exitCode, @@ -75,7 +92,10 @@ describe(`Simple shell features`, () => { }); it(`should pipe the result of a command into another (two commands)`, async () => { - await expect(bufferResult(`echo hello world | node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().toUpperCase()))'`)).resolves.toMatchObject({ + await expect(bufferResult([ + `echo hello world`, + `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().toUpperCase()))'`, + ].join(` | `))).resolves.toMatchObject({ stdout: `HELLO WORLD\n`, }); }); @@ -84,7 +104,7 @@ describe(`Simple shell features`, () => { await expect(bufferResult([ `echo hello world`, `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().toUpperCase()))'`, - `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().replace(/./g, $0 => \`{\${$0}}\`)))'` + `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().replace(/./g, $0 => \`{\${$0}}\`)))'`, ].join(` | `))).resolves.toMatchObject({ stdout: `{H}{E}{L}{L}{O}{ }{W}{O}{R}{L}{D}\n`, }); @@ -94,12 +114,22 @@ describe(`Simple shell features`, () => { await expect(bufferResult([ `node -e 'process.stdout.write("hello world\\\\n")'`, `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().toUpperCase()))'`, - `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().replace(/./g, $0 => \`{\${$0}}\`)))'` + `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().replace(/./g, $0 => \`{\${$0}}\`)))'`, ].join(` | `))).resolves.toMatchObject({ stdout: `{H}{E}{L}{L}{O}{ }{W}{O}{R}{L}{D}\n`, }); }); + it(`should pipe the result of a command into another (only builtins)`, async () => { + await expect(bufferResult([ + `echo abcdefghijkl`, + `test-builtin`, + `test-builtin`, + ].join(` | `))).resolves.toMatchObject({ + stdout: `aei\n`, + }); + }); + it(`should shortcut the right branch of a '||' when the left branch succeeds`, async () => { await expect(bufferResult(`true || echo failed`)).resolves.toMatchObject({ stdout: ``, From 203ccff6390b5b818b856ee792072286e0b5c5bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Nison?= Date: Thu, 4 Apr 2019 00:32:04 +0100 Subject: [PATCH 6/9] Uses cross-spawn for portability --- .pnp.js | 8 ++++++++ packages/berry-shell/package.json | 2 ++ packages/berry-shell/sources/index.ts | 13 ++++++------- yarn.lock | 2 ++ 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/.pnp.js b/.pnp.js index 68429989ca0e..dc635024a329 100755 --- a/.pnp.js +++ b/.pnp.js @@ -9913,6 +9913,14 @@ function $$SETUP_STATE(hydrateRuntimeState) { "@berry/parsers", "workspace:packages/berry-parsers" ], + [ + "@types/cross-spawn", + "npm:6.0.0" + ], + [ + "cross-spawn", + "npm:6.0.5" + ], [ "execa", "npm:1.0.0" diff --git a/packages/berry-shell/package.json b/packages/berry-shell/package.json index fbb487601163..8854805d70d7 100644 --- a/packages/berry-shell/package.json +++ b/packages/berry-shell/package.json @@ -5,6 +5,8 @@ "dependencies": { "@berry/fslib": "workspace:*", "@berry/parsers": "workspace:*", + "@types/cross-spawn": "6.0.0", + "cross-spawn": "^6.0.5", "execa": "^1.0.0", "stream-buffers": "^3.0.2" }, diff --git a/packages/berry-shell/sources/index.ts b/packages/berry-shell/sources/index.ts index 5013ad094efb..bd665d70b039 100644 --- a/packages/berry-shell/sources/index.ts +++ b/packages/berry-shell/sources/index.ts @@ -1,9 +1,8 @@ import {xfs, NodeFS} from '@berry/fslib'; import {CommandSegment, CommandChain, CommandLine, ShellLine, parseShell} from '@berry/parsers'; -import {spawn} from 'child_process'; -import {delimiter, posix} from 'path'; +import crossSpawn from 'cross-spawn'; +import {posix} from 'path'; import {PassThrough, Readable, Stream, Writable} from 'stream'; -import {StringDecoder} from 'string_decoder'; export type UserOptions = { builtins: {[key: string]: UserBuiltin}, @@ -59,12 +58,13 @@ function makeBuiltin(builtin: (args: Array, opts: ShellOptions, state: S state = {... state, stdin}; const close = () => { - if (state.stdin !== opts.initialStdin) + if (state.stdin !== opts.initialStdin) { + // @ts-ignore state.stdin.end(); + } if (state.stdout !== opts.initialStdout) state.stdout.end(); - if (state.stderr !== opts.initialStderr) { state.stderr.end(); } @@ -142,9 +142,8 @@ const BUILTINS = new Map([ if (isUserStream(state.stderr)) stdio[2] = `pipe`; - const subprocess = spawn(ident, rest, { + const subprocess = crossSpawn(ident, rest, { cwd: NodeFS.fromPortablePath(state.cwd), - shell: process.platform === `win32`, // Needed to execute .cmd files env: state.environment, stdio, }); diff --git a/yarn.lock b/yarn.lock index 707f190d2ed4..6ab1dbb39ff9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1951,6 +1951,8 @@ __metadata: dependencies: "@berry/fslib": "workspace:*" "@berry/parsers": "workspace:*" + "@types/cross-spawn": "npm:6.0.0" + cross-spawn: "npm:^6.0.5" execa: "npm:^1.0.0" stream-buffers: "npm:^3.0.2" languageName: unknown From 7ce76ab5971b532a2fde03344a1199e39c0ba917 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Nison?= Date: Thu, 4 Apr 2019 00:55:50 +0100 Subject: [PATCH 7/9] Adds an extra test --- packages/berry-shell/tests/shell.test.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/berry-shell/tests/shell.test.ts b/packages/berry-shell/tests/shell.test.ts index fb75d769a836..346a0ad7189d 100644 --- a/packages/berry-shell/tests/shell.test.ts +++ b/packages/berry-shell/tests/shell.test.ts @@ -91,7 +91,7 @@ describe(`Simple shell features`, () => { }); }); - it(`should pipe the result of a command into another (two commands)`, async () => { + it(`should pipe the result of a command into another (two commands, builtin into native)`, async () => { await expect(bufferResult([ `echo hello world`, `node -e 'process.stdin.on("data", data => process.stdout.write(data.toString().toUpperCase()))'`, @@ -100,6 +100,15 @@ describe(`Simple shell features`, () => { }); }); + it(`should pipe the result of a command into another (two commands, native into pipe)`, async () => { + await expect(bufferResult([ + `node -e 'process.stdout.write("abcdefgh\\\\n");'`, + `test-builtin`, + ].join(` | `))).resolves.toMatchObject({ + stdout: `aceg\n`, + }); + }); + it(`should pipe the result of a command into another (three commands)`, async () => { await expect(bufferResult([ `echo hello world`, From 8b5f2634b7cdb421c8a32095499625ca542f36f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Nison?= Date: Fri, 5 Apr 2019 13:40:13 +0100 Subject: [PATCH 8/9] Prints the node version --- scripts/azure-run-tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/azure-run-tests.yml b/scripts/azure-run-tests.yml index e835cf400060..68de42e42759 100644 --- a/scripts/azure-run-tests.yml +++ b/scripts/azure-run-tests.yml @@ -6,6 +6,7 @@ steps: displayName: 'Install Node.js' - bash: | + node --version node ./scripts/run-yarn.js build:cli node ./scripts/run-yarn.js build:plugin-pack node ./scripts/run-yarn.js build:plugin-stage From efe9e78179eb289995fb7ae59cee1cc82bf6644b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C3=ABl=20Nison?= Date: Fri, 5 Apr 2019 16:55:52 +0100 Subject: [PATCH 9/9] Refactors pipes --- packages/berry-shell/sources/index.ts | 252 ++++++++---------------- packages/berry-shell/sources/pipe.ts | 264 ++++++++++++++++++++++++++ 2 files changed, 338 insertions(+), 178 deletions(-) create mode 100644 packages/berry-shell/sources/pipe.ts diff --git a/packages/berry-shell/sources/index.ts b/packages/berry-shell/sources/index.ts index bd665d70b039..2c97a8efd0b8 100644 --- a/packages/berry-shell/sources/index.ts +++ b/packages/berry-shell/sources/index.ts @@ -1,11 +1,12 @@ import {xfs, NodeFS} from '@berry/fslib'; import {CommandSegment, CommandChain, CommandLine, ShellLine, parseShell} from '@berry/parsers'; -import crossSpawn from 'cross-spawn'; import {posix} from 'path'; import {PassThrough, Readable, Stream, Writable} from 'stream'; +import {Handle, ProtectedStream, Stdio, start, makeBuiltin, makeProcess} from './pipe'; + export type UserOptions = { - builtins: {[key: string]: UserBuiltin}, + builtins: {[key: string]: ShellBuiltin}, cwd: string, env: {[key: string]: string | undefined}, stdin: Readable, @@ -14,21 +15,11 @@ export type UserOptions = { variables: {[key: string]: string}, }; -export type UserBuiltin = ( - args: Array, - opts: ShellOptions, - state: ShellState, -) => Promise; - export type ShellBuiltin = ( args: Array, opts: ShellOptions, state: ShellState, - leftMost: boolean, -) => Promise<{ - stdin: Writable | null, - promise: Promise, -}>; +) => Promise; export type ShellOptions = { args: Array, @@ -48,41 +39,6 @@ export type ShellState = { variables: {[key: string]: string}, }; -function makeBuiltin(builtin: (args: Array, opts: ShellOptions, state: ShellState) => Promise): ShellBuiltin { - return async (args: Array, opts: ShellOptions, state: ShellState, leftMost: boolean) => { - const stdin = !leftMost - ? new PassThrough() - : null; - - if (stdin !== null) - state = {... state, stdin}; - - const close = () => { - if (state.stdin !== opts.initialStdin) { - // @ts-ignore - state.stdin.end(); - } - - if (state.stdout !== opts.initialStdout) - state.stdout.end(); - if (state.stderr !== opts.initialStderr) { - state.stderr.end(); - } - }; - - return { - stdin, - promise: builtin(args, opts, state).then(result => { - close(); - return result; - }, error => { - close(); - throw error; - }), - }; - }; -} - function cloneState(state: ShellState, mergeWith: Partial = {}) { const newState = {... state, ... mergeWith}; @@ -93,7 +49,7 @@ function cloneState(state: ShellState, mergeWith: Partial = {}) { } const BUILTINS = new Map([ - [`cd`, makeBuiltin(async ([target, ... rest]: Array, opts: ShellOptions, state: ShellState) => { + [`cd`, async ([target, ... rest]: Array, opts: ShellOptions, state: ShellState) => { const resolvedTarget = posix.resolve(state.cwd, NodeFS.toPortablePath(target)); const stat = await xfs.statPromise(resolvedTarget); @@ -104,86 +60,28 @@ const BUILTINS = new Map([ state.cwd = target; return 0; } - })], + }], - [`pwd`, makeBuiltin(async (args: Array, opts: ShellOptions, state: ShellState) => { + [`pwd`, async (args: Array, opts: ShellOptions, state: ShellState) => { state.stdout.write(`${NodeFS.fromPortablePath(state.cwd)}\n`); return 0; - })], + }], - [`true`, makeBuiltin(async (args: Array, opts: ShellOptions, state: ShellState) => { + [`true`, async (args: Array, opts: ShellOptions, state: ShellState) => { return 0; - })], + }], - [`false`, makeBuiltin(async (args: Array, opts: ShellOptions, state: ShellState) => { + [`false`, async (args: Array, opts: ShellOptions, state: ShellState) => { return 1; - })], + }], - [`exit`, makeBuiltin(async ([code, ... rest]: Array, opts: ShellOptions, state: ShellState) => { + [`exit`, async ([code, ... rest]: Array, opts: ShellOptions, state: ShellState) => { return state.exitCode = parseInt(code, 10); - })], + }], - [`echo`, makeBuiltin(async (args: Array, opts: ShellOptions, state: ShellState) => { + [`echo`, async (args: Array, opts: ShellOptions, state: ShellState) => { state.stdout.write(`${args.join(` `)}\n`); return 0; - })], - - [`command`, async ([ident, ... rest]: Array, opts: ShellOptions, state: ShellState, leftMost: boolean) => { - if (typeof ident === `undefined`) - return makeBuiltin(async () => 0)([], opts, state, leftMost); - - const stdio: Array = [state.stdin, state.stdout, state.stderr]; - const isUserStream = (stream: Stream) => stream instanceof PassThrough; - - if (isUserStream(state.stdin) || !leftMost) - stdio[0] = `pipe`; - if (isUserStream(state.stdout)) - stdio[1] = `pipe`; - if (isUserStream(state.stderr)) - stdio[2] = `pipe`; - - const subprocess = crossSpawn(ident, rest, { - cwd: NodeFS.fromPortablePath(state.cwd), - env: state.environment, - stdio, - }); - - if (isUserStream(state.stdin) && !leftMost) - state.stdin.pipe(subprocess.stdin); - if (isUserStream(state.stdout)) - subprocess.stdout.pipe(state.stdout); - if (isUserStream(state.stderr)) - subprocess.stderr.pipe(state.stderr); - - return { - stdin: subprocess.stdin, - promise: new Promise(resolve => { - subprocess.on(`error`, error => { - // @ts-ignore - switch (error.code) { - case `ENOENT`: { - state.stderr.write(`command not found: ${ident}\n`); - resolve(127); - } break; - case `EACCESS`: { - state.stderr.write(`permission denied: ${ident}\n`); - resolve(128); - } break; - default: { - state.stderr.write(`uncaught error: ${error.message}\n`); - resolve(1); - } break; - } - }); - subprocess.on(`exit`, code => { - if (code !== null) { - resolve(code); - } else { - resolve(129); - } - }); - }), - }; }], ]); @@ -306,110 +204,108 @@ async function interpolateArguments(commandArgs: Array>, o * $ cat hello | grep world | grep -v foobar */ -function makeCommandAction(args: Array, opts: ShellOptions) { +function makeCommandAction(args: Array, opts: ShellOptions, state: ShellState) { if (!opts.builtins.has(args[0])) args = [`command`, ... args]; const [name, ... rest] = args; + if (name === `command`) { + return makeProcess(rest[0], rest.slice(1), { + cwd: NodeFS.fromPortablePath(state.cwd), + env: state.environment, + }); + } const builtin = opts.builtins.get(name); if (typeof builtin === `undefined`) throw new Error(`Assertion failed: A builtin should exist for "${name}"`) - return async (state: ShellState, leftMost: boolean) => { - const {stdin, promise} = await builtin(rest, opts, state, leftMost); - return {stdin, promise}; - }; + return makeBuiltin(async ({stdin, stdout, stderr}) => { + state.stdin = stdin; + state.stdout = stdout; + state.stderr = stderr; + + return await builtin(rest, opts, state); + }); } -function makeSubshellAction(ast: ShellLine, opts: ShellOptions) { - return async (state: ShellState, mustPipe: boolean) => { +function makeSubshellAction(ast: ShellLine, opts: ShellOptions, state: ShellState) { + return (stdio: Stdio) => { const stdin = new PassThrough(); const promise = executeShellLine(ast, opts, cloneState(state, {stdin})); - return {stdin: stdin as Writable, promise}; + return {stdin, promise}; }; } async function executeCommandChain(node: CommandChain, opts: ShellOptions, state: ShellState) { const parts = []; - // leftMost we interpolate all the commands (we don't interpolate subshells - // because they operate in their own contexts and are allowed to define - // new internal variables) - let current: CommandChain | null = node; let pipeType = null; + let execution: Handle | null = null; + while (current) { - let action; + // Only the final segment is allowed to modify the shell state; all the + // other ones are isolated + const activeState = current.then + ? {... state} + : state; + let action; switch (current.type) { case `command`: { - action = makeCommandAction(await interpolateArguments(current.args, opts, state), opts); + action = makeCommandAction(await interpolateArguments(current.args, opts, state), opts, activeState); } break; case `subshell`: { - action = makeSubshellAction(current.subshell, opts); + // We don't interpolate the subshell because it will be recursively + // interpolated within its own context + action = makeSubshellAction(current.subshell, opts, activeState); } break; } if (typeof action === `undefined`) throw new Error(`Assertion failed: An action should have been generated`); - parts.push({action, pipeType}); + if (pipeType === null) { + // If we're processing the left-most segment of the command, we start a + // new execution pipeline + execution = start(action, { + stdin: new ProtectedStream(activeState.stdin), + stdout: new ProtectedStream(activeState.stdout), + stderr: new ProtectedStream(activeState.stderr), + }); + } else { + if (execution === null) + throw new Error(`The execution pipeline should have been setup`); + + // Otherwise, depending on the exaxct pipe type, we either pipe stdout + // only or stdout and stderr + switch (pipeType) { + case `|`: { + execution = execution.pipeTo(action); + } break; + + case `|&`: { + execution = execution.pipeTo(action); + } break; + } + } - if (typeof current.then !== `undefined`) { + if (current.then) { pipeType = current.then.type; current = current.then.chain; } else { current = null; - pipeType = null; - } - } - - // Note that the execution starts from the right-most command and - // progressively moves towards the left-most command. We run them in this - // order because otherwise we would risk a race condition where (let's - // use A | B as example) A would start writing before B is ready, which - // could cause the pipe buffer to overflow and some writes to be lost. - - let stdout = state.stdout; - let stderr = state.stderr; - - const promises = []; - - for (let t = parts.length - 1; t >= 0; --t) { - const {action, pipeType} = parts[t]; - const {stdin, promise} = await action(Object.assign(state, {stdout, stderr}), t === 0); - - promises.push(promise); - - switch (pipeType) { - case null: { - // no pipe! - } break; - - case `|`: { - if (stdin === null) - throw new Error(`Assertion failed: The pipe is expected to return a writable stream`); - - stdout = stdin; - } break; - - case `|&`: { - if (stdin === null) - throw new Error(`Assertion failed: The pipe is expected to return a writable stream`); - - stdout = stdin; - stderr = stdin; - } break; } } - const exitCodes = await Promise.all(promises); + if (execution === null) + throw new Error(`Assertion failed: The execution pipeline should have been setup`); - return exitCodes[exitCodes.length - 1]; + return await execution.run(); } /** @@ -542,8 +438,8 @@ export async function execute(command: string, args: Array = [], { normalizedEnv[key] = value; const normalizedBuiltins = new Map(BUILTINS); - for (const [key, action] of Object.entries(builtins)) - normalizedBuiltins.set(key, makeBuiltin(action)); + for (const [key, builtin] of Object.entries(builtins)) + normalizedBuiltins.set(key, builtin); const ast = parseShell(command); diff --git a/packages/berry-shell/sources/pipe.ts b/packages/berry-shell/sources/pipe.ts new file mode 100644 index 000000000000..09e649853390 --- /dev/null +++ b/packages/berry-shell/sources/pipe.ts @@ -0,0 +1,264 @@ +import crossSpawn from 'cross-spawn'; +import EventEmitter from 'events'; +import {PassThrough, Readable, Writable} from 'stream'; + +enum Pipe { + STDOUT = 0b01, + STDERR = 0b10, +}; + +export type Stdio = [ + any, + any, + any +]; + +type ProcessImplementation = ( + stdio: Stdio, +) => { + stdin: Writable, + promise: Promise, +}; + +function nextTick() { + return new Promise(resolve => { + process.nextTick(resolve); + }); +} + +export function makeProcess(name: string, args: Array, opts: any): ProcessImplementation { + return (stdio: Stdio) => { + const stdin = stdio[0] instanceof PassThrough + ? `pipe` + : stdio[0]; + + const stdout = stdio[1] instanceof PassThrough + ? `pipe` + : stdio[1]; + + const stderr = stdio[2] instanceof PassThrough + ? `pipe` + : stdio[2]; + + const child = crossSpawn(name, args, {... opts, stdio: [ + stdin, + stdout, + stderr, + ]}); + + if (stdio[0] instanceof PassThrough) + stdio[0].pipe(child.stdin); + if (stdio[1] instanceof PassThrough) + child.stdout.pipe(stdio[1]); + if (stdio[2] instanceof PassThrough) + child.stderr.pipe(stdio[2]); + + return { + stdin: child.stdin, + promise: new Promise(resolve => { + child.on(`error`, error => { + // @ts-ignore + switch (error.code) { + case `ENOENT`: { + stdio[2].write(`command not found: ${name}\n`); + resolve(127); + } break; + case `EACCESS`: { + stdio[2].write(`permission denied: ${name}\n`); + resolve(128); + } break; + default: { + stdio[2].write(`uncaught error: ${error.message}\n`); + resolve(1); + } break; + } + }); + + child.on(`exit`, code => { + if (code !== null) { + resolve(code); + } else { + resolve(129); + } + }); + }), + }; + }; +} + +export function makeBuiltin(builtin: (opts: any) => Promise): ProcessImplementation { + return (stdio: Stdio) => { + const stdin = stdio[0] === `pipe` + ? new PassThrough() + : stdio[0]; + + return { + stdin, + promise: nextTick().then(() => builtin({ + stdin, + stdout: stdio[1], + stderr: stdio[2], + })), + }; + }; +} + +interface StreamLock { + close(): void; + get(): StreamType; +} + +export class ProtectedStream implements StreamLock { + private stream: StreamType; + + constructor(stream: StreamType) { + this.stream = stream; + } + + close() { + // Ignore close request + } + + get() { + return this.stream; + } +} + +class PipeStream implements StreamLock { + private stream: Writable | null = null; + + close() { + if (this.stream === null) { + throw new Error(`Assertion failed: No stream attached`); + } else { + this.stream.end(); + } + } + + attach(stream: Writable) { + this.stream = stream; + } + + get() { + if (this.stream === null) { + throw new Error(`Assertion failed: No stream attached`); + } else { + return this.stream; + } + } +} + +type StartOptions = { + stdin: StreamLock, + stdout: StreamLock, + stderr: StreamLock, +}; + +export class Handle { + private ancestor: Handle | null; + private implementation: ProcessImplementation; + + private stdin: StreamLock | null = null; + private stdout: StreamLock | null = null; + private stderr: StreamLock | null = null; + + private pipe: PipeStream | null = null; + + static start(implementation: ProcessImplementation, {stdin, stdout, stderr}: StartOptions) { + const chain = new Handle(null, implementation); + + chain.stdin = stdin; + chain.stdout = stdout; + chain.stderr = stderr; + + return chain; + } + + constructor(ancestor: Handle | null, implementation: ProcessImplementation) { + this.ancestor = ancestor; + this.implementation = implementation; + } + + pipeTo(implementation: ProcessImplementation, source = Pipe.STDOUT) { + const next = new Handle(this, implementation); + + const pipe = new PipeStream(); + next.pipe = pipe; + + next.stdout = this.stdout; + next.stderr = this.stderr; + + if ((source & Pipe.STDOUT) === Pipe.STDOUT) { + this.stdout = pipe; + } else if (this.ancestor !== null) { + this.stderr = this.ancestor.stdout; + } + + if ((source & Pipe.STDERR) === Pipe.STDERR) { + this.stderr = pipe; + } else if (this.ancestor !== null) { + this.stderr = this.ancestor.stderr; + } + + return next; + } + + async exec() { + const stdio: Stdio = [ + `ignore`, + `ignore`, + `ignore`, + ]; + + if (this.pipe) { + stdio[0] = `pipe`; + } else { + if (this.stdin === null) { + throw new Error(`Assertion failed: No input stream registered`); + } else { + stdio[0] = this.stdin.get(); + } + } + + let stdoutLock: StreamLock; + if (this.stdout === null) { + throw new Error(`Assertion failed: No output stream registered`); + } else { + stdoutLock = this.stdout; + stdio[1] = stdoutLock.get(); + } + + let stderrLock: StreamLock; + if (this.stderr === null) { + throw new Error(`Assertion failed: No error stream registered`); + } else { + stderrLock = this.stderr; + stdio[2] = stderrLock.get(); + } + + const child = this.implementation(stdio); + + if (this.pipe) + this.pipe.attach(child.stdin); + + return await child.promise.then(code => { + stdoutLock.close(); + stderrLock.close(); + + return code; + }); + } + + async run() { + const promises = []; + for (let handle: Handle | null = this; handle; handle = handle.ancestor) + promises.push(handle.exec()); + + const exitCodes = await Promise.all(promises); + return exitCodes[0]; + } +} + +export function start(p: ProcessImplementation, opts: StartOptions) { + return Handle.start(p, opts); +}