From e2bbaa25c53fd37bc26cd2d5d9f01710b0e06243 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 20 Sep 2020 23:40:49 -0700 Subject: [PATCH 1/6] fix: rename netstring exports, clean up object modes --- packages/SwingSet/src/netstring.js | 9 +++++++-- packages/SwingSet/test/test-netstring.js | 11 ++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/packages/SwingSet/src/netstring.js b/packages/SwingSet/src/netstring.js index 938bf4c888c..61deba1bc42 100644 --- a/packages/SwingSet/src/netstring.js +++ b/packages/SwingSet/src/netstring.js @@ -12,7 +12,7 @@ export function encode(data) { } // input is a sequence of strings, output is a byte pipe -export function encoderStream() { +export function netstringEncoderStream() { function transform(chunk, encoding, callback) { if (!Buffer.isBuffer(chunk)) { throw Error('stream requires Buffers'); @@ -25,6 +25,8 @@ export function encoderStream() { } callback(err); } + // (maybe empty) Buffer in, Buffer out. We use writableObjectMode to + // indicate that empty input buffers are important return new Transform({ transform, writableObjectMode: true }); } @@ -64,7 +66,7 @@ export function decode(data) { } // input is a byte pipe, output is a sequence of Buffers -export function decoderStream() { +export function netstringDecoderStream() { let buffered = Buffer.from(''); function transform(chunk, encoding, callback) { @@ -88,5 +90,8 @@ export function decoderStream() { callback(err); } + // Buffer in, Buffer out, except that each output Buffer is precious, even + // empty ones, and without readableObjectMode the Stream will discard empty + // buffers return new Transform({ transform, readableObjectMode: true }); } diff --git a/packages/SwingSet/test/test-netstring.js b/packages/SwingSet/test/test-netstring.js index 7421b44d053..18443f804da 100644 --- a/packages/SwingSet/test/test-netstring.js +++ b/packages/SwingSet/test/test-netstring.js @@ -1,7 +1,12 @@ import '@agoric/install-ses'; // adds 'harden' to global import test from 'ava'; -import { encode, encoderStream, decode, decoderStream } from '../src/netstring'; +import { + encode, + decode, + netstringEncoderStream, + netstringDecoderStream, +} from '../src/netstring'; const umlaut = 'ümlaut'; const umlautBuffer = Buffer.from(umlaut, 'utf-8'); @@ -51,7 +56,7 @@ test('encode', t => { }); test('encode stream', async t => { - const e = encoderStream(); + const e = netstringEncoderStream(); const chunks = []; e.on('data', data => chunks.push(data)); e.write(Buffer.from('')); @@ -106,7 +111,7 @@ test('decode', t => { }); test('decode stream', async t => { - const d = decoderStream(); + const d = netstringDecoderStream(); function write(s) { d.write(Buffer.from(s)); } From e23b7bb40e20bacf7f64c627333918e7d5137560 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 20 Sep 2020 23:42:52 -0700 Subject: [PATCH 2/6] fix: new 'worker-protocol' module to do Array-to-Buffer conversion --- packages/SwingSet/src/worker-protocol.js | 41 ++++++++++ .../SwingSet/test/test-worker-protocol.js | 76 +++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 packages/SwingSet/src/worker-protocol.js create mode 100644 packages/SwingSet/test/test-worker-protocol.js diff --git a/packages/SwingSet/src/worker-protocol.js b/packages/SwingSet/src/worker-protocol.js new file mode 100644 index 00000000000..3fd0abc5cf1 --- /dev/null +++ b/packages/SwingSet/src/worker-protocol.js @@ -0,0 +1,41 @@ +import { Transform } from 'stream'; + +// Transform objects which convert from hardened Arrays of JSON-serializable +// data into Buffers suitable for netstring conversion. + +export function arrayEncoderStream() { + function transform(object, encoding, callback) { + if (!Array.isArray(object)) { + throw Error('stream requires Arrays'); + } + let err; + try { + this.push(Buffer.from(JSON.stringify(object))); + } catch (e) { + err = e; + } + callback(err); + } + // Array in, Buffer out, hence writableObjectMode + return new Transform({ transform, writableObjectMode: true }); +} + +export function arrayDecoderStream() { + function transform(buf, encoding, callback) { + let err; + try { + if (!Buffer.isBuffer(buf)) { + throw Error('stream expects Buffers'); + } + this.push(JSON.parse(buf)); + } catch (e) { + err = e; + } + // this Transform is a one-to-one conversion of Buffer into Array, so we + // always consume the input each time we're called + callback(err); + } + + // Buffer in, Array out, hence readableObjectMode + return new Transform({ transform, readableObjectMode: true }); +} diff --git a/packages/SwingSet/test/test-worker-protocol.js b/packages/SwingSet/test/test-worker-protocol.js new file mode 100644 index 00000000000..01ce1d1873c --- /dev/null +++ b/packages/SwingSet/test/test-worker-protocol.js @@ -0,0 +1,76 @@ +import '@agoric/install-ses'; // adds 'harden' to global + +import test from 'ava'; +import { arrayEncoderStream, arrayDecoderStream } from '../src/worker-protocol'; +import { + encode, + netstringEncoderStream, + netstringDecoderStream, +} from '../src/netstring'; + +test('arrayEncoderStream', async t => { + const e = arrayEncoderStream(); + const chunks = []; + e.on('data', data => chunks.push(data)); + e.write([]); + + function eq(expected) { + t.deepEqual( + chunks.map(buf => buf.toString()), + expected, + ); + } + eq([`[]`]); + + e.write(['command', { foo: 1 }]); + eq([`[]`, `["command",{"foo":1}]`]); +}); + +test('encode stream', async t => { + const aStream = arrayEncoderStream(); + const nsStream = netstringEncoderStream(); + aStream.pipe(nsStream); + const chunks = []; + nsStream.on('data', data => chunks.push(data)); + function eq(expected) { + t.deepEqual( + chunks.map(buf => buf.toString()), + expected, + ); + } + + aStream.write([1]); + eq(['3:[1],']); + + aStream.write(['command', { foo: 4 }]); + eq(['3:[1],', '21:["command",{"foo":4}],']); +}); + +test('decode stream', async t => { + const nsStream = netstringDecoderStream(); + const aStream = arrayDecoderStream(); + nsStream.pipe(aStream); + function write(s) { + nsStream.write(Buffer.from(s)); + } + + const msgs = []; + aStream.on('data', msg => msgs.push(msg)); + + function eq(expected) { + t.deepEqual(msgs, expected); + } + + let buf = encode(Buffer.from(JSON.stringify([1]))); + write(buf.slice(0, 1)); + eq([]); + write(buf.slice(1)); + eq([[1]]); + msgs.pop(); + + buf = encode(Buffer.from(JSON.stringify(['command', { foo: 2 }]))); + write(buf.slice(0, 4)); + eq([]); + write(buf.slice(4)); + eq([['command', { foo: 2 }]]); +}); From 8eb13fa4940dbd4574e15bbcd14adcc812520b27 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 20 Sep 2020 23:50:44 -0700 Subject: [PATCH 3/6] fix: change encoders/decoders for kernel-worker protocol endpoints Also remove the `setEncoding()` call on kernel-worker pipes, I think it was disabling the objectMode settings --- .../kernel/vatManager/subprocessSupervisor.js | 20 ++++++++++------ .../vatManager/worker-subprocess-node.js | 9 +++---- .../SwingSet/src/spawnSubprocessWorker.js | 24 ++++++++++++------- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js b/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js index 5ed4a2dd7a3..a3247d24481 100644 --- a/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js +++ b/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js @@ -3,11 +3,15 @@ import '@agoric/install-ses'; import anylogger from 'anylogger'; import fs from 'fs'; -import Netstring from 'netstring-stream'; import { assert } from '@agoric/assert'; import { importBundle } from '@agoric/import-bundle'; import { Remotable, getInterfaceOf, makeMarshal } from '@agoric/marshal'; +import { arrayEncoderStream, arrayDecoderStream } from '../../worker-protocol'; +import { + netstringEncoderStream, + netstringDecoderStream, +} from '../../netstring'; import { waitUntilQuiescent } from '../../waitUntilQuiescent'; import { makeLiveSlots } from '../liveSlots'; @@ -71,17 +75,19 @@ function doNotify(vpid, vp) { } } -const toParent = Netstring.writeStream(); -toParent.pipe(fs.createWriteStream('IGNORED', { fd: 4, encoding: 'utf-8' })); +const toParent = arrayEncoderStream(); +toParent + .pipe(netstringEncoderStream()) + .pipe(fs.createWriteStream('IGNORED', { fd: 4, encoding: 'utf-8' })); const fromParent = fs .createReadStream('IGNORED', { fd: 3, encoding: 'utf-8' }) - .pipe(Netstring.readStream()); -fromParent.setEncoding('utf-8'); + .pipe(netstringDecoderStream()) + .pipe(arrayDecoderStream()); function sendUplink(msg) { assert(msg instanceof Array, `msg must be an Array`); - toParent.write(JSON.stringify(msg)); + toParent.write(msg); } // fromParent.on('data', data => { @@ -90,7 +96,7 @@ function sendUplink(msg) { // }); fromParent.on('data', data => { - const [type, ...margs] = JSON.parse(data); + const [type, ...margs] = data; workerLog(`received`, type); if (type === 'start') { // TODO: parent should send ['start', vatID] diff --git a/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js b/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js index d5d1bfc466a..7c4f45db8c4 100644 --- a/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js +++ b/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js @@ -75,8 +75,8 @@ export function makeNodeSubprocessFactory(tools) { const { fromChild, toChild, terminate, done } = startSubprocessWorker(); function sendToWorker(msg) { - assert(msg instanceof Array); - toChild.write(JSON.stringify(msg)); + assert(Array.isArray(msg)); + toChild.write(msg); } const { @@ -114,10 +114,7 @@ export function makeNodeSubprocessFactory(tools) { } } - fromChild.on('data', data => { - const msg = JSON.parse(data); - handleUpstream(msg); - }); + fromChild.on('data', handleUpstream); parentLog(`instructing worker to load bundle..`); sendToWorker(['setBundle', bundle, vatParameters]); diff --git a/packages/SwingSet/src/spawnSubprocessWorker.js b/packages/SwingSet/src/spawnSubprocessWorker.js index e02008d6225..355e44d4e9c 100644 --- a/packages/SwingSet/src/spawnSubprocessWorker.js +++ b/packages/SwingSet/src/spawnSubprocessWorker.js @@ -1,8 +1,13 @@ // this file is loaded by the controller, in the start compartment import { spawn } from 'child_process'; -import Netstring from 'netstring-stream'; - import { makePromiseKit } from '@agoric/promise-kit'; +import { arrayEncoderStream, arrayDecoderStream } from './worker-protocol'; +import { netstringEncoderStream, netstringDecoderStream } from './netstring'; + +// Start a subprocess from a given executable, and arrange a bidirectional +// message channel with a "supervisor" within that process. Return a { +// toChild, fromChild } pair of Streams which accept/emit hardened Arrays of +// JSON-serializable data. // eslint-disable-next-line no-unused-vars function parentLog(first, ...args) { @@ -18,11 +23,12 @@ const stdio = harden(['inherit', 'inherit', 'inherit', 'pipe', 'pipe']); export function startSubprocessWorker(execPath, procArgs = []) { const proc = spawn(execPath, procArgs, { stdio }); - const toChild = Netstring.writeStream(); - toChild.pipe(proc.stdio[3]); + const toChild = arrayEncoderStream(); + toChild.pipe(netstringEncoderStream()).pipe(proc.stdio[3]); // proc.stdio[4].setEncoding('utf-8'); - const fromChild = proc.stdio[4].pipe(Netstring.readStream()); - fromChild.setEncoding('utf-8'); + const fromChild = proc.stdio[4] + .pipe(netstringDecoderStream()) + .pipe(arrayDecoderStream()); // fromChild.addListener('data', data => parentLog(`fd4 data`, data)); // toChild.write('hello child'); @@ -43,13 +49,13 @@ export function startSubprocessWorker(execPath, procArgs = []) { proc.kill(); } - // the Netstring objects don't like being hardened, so we wrap the methods + // the Transform objects don't like being hardened, so we wrap the methods // that get used const wrappedFromChild = { - on: (evName, f) => fromChild.on(evName, f), + on: (...args) => fromChild.on(...args), }; const wrappedToChild = { - write: data => toChild.write(data), + write: (...args) => toChild.write(...args), }; return harden({ From 6ac996c00b876bac89ba99677bc5b66502506f4b Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 20 Sep 2020 23:51:27 -0700 Subject: [PATCH 4/6] fix: stop using netstring-stream --- packages/SwingSet/package.json | 1 - yarn.lock | 9 +-------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/packages/SwingSet/package.json b/packages/SwingSet/package.json index a8fadf1da49..ee9460a04d5 100644 --- a/packages/SwingSet/package.json +++ b/packages/SwingSet/package.json @@ -47,7 +47,6 @@ "@babel/generator": "^7.6.4", "anylogger": "^0.21.0", "esm": "^3.2.5", - "netstring-stream": "^1.0.1", "re2": "^1.10.5", "rollup": "^1.23.1", "rollup-plugin-node-resolve": "^5.2.0", diff --git a/yarn.lock b/yarn.lock index 3d039872b7f..8ecc8af9acd 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7012,13 +7012,6 @@ neo-async@^2.6.0: resolved "https://registry.yarnpkg.com/neo-async/-/neo-async-2.6.1.tgz#ac27ada66167fa8849a6addd837f6b189ad2081c" integrity sha512-iyam8fBuCUpWeKPGpaNMetEocMt364qkCsfL9JuhjXX6dRnguRVOfk2GZaDpPjcOKiiXCPINZC1GczQ7iTq3Zw== -netstring-stream@^1.0.1: - version "1.0.1" - resolved "https://registry.yarnpkg.com/netstring-stream/-/netstring-stream-1.0.1.tgz#d1babecbc4715428154d2956201bc8be3a526729" - integrity sha512-/lXoL4KEi8Cty/AsjPkDF7S/cPaHSDMU8PU4NbJNDbW7EhMIb8o6JJA9BD4LJPqPBchpEEoKAluI+BxuqJkR9g== - dependencies: - through2 "^2.0.3" - nice-try@^1.0.4: version "1.0.5" resolved "https://registry.yarnpkg.com/nice-try/-/nice-try-1.0.5.tgz#a3378a7696ce7d223e88fc9b764bd7ef1089e366" @@ -10174,7 +10167,7 @@ thenify-all@^1.0.0: dependencies: any-promise "^1.0.0" -through2@^2.0.0, through2@^2.0.2, through2@^2.0.3: +through2@^2.0.0, through2@^2.0.2: version "2.0.5" resolved "https://registry.yarnpkg.com/through2/-/through2-2.0.5.tgz#01c1e39eb31d07cb7d03a96a70823260b23132cd" integrity sha512-/mrRod8xqpA+IHSLyGCQ2s8SPHiCDEeQJSep1jqLYeEUClOFG2Qsh+4FU6G9VeqpZnGW/Su8LQGc4YKni5rYSQ== From e9838f13853baa2f1c63d78dde0ca04bba688196 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Thu, 1 Oct 2020 13:19:06 -0700 Subject: [PATCH 5/6] fix: loadVat should accept managerType= in options This got lost during the startup overhaul in 23c3f9d, and the tests which exercised it were disabled for reasons that were fixed in the rest of this branch. --- packages/SwingSet/src/kernel/loadVat.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/SwingSet/src/kernel/loadVat.js b/packages/SwingSet/src/kernel/loadVat.js index e2192339792..7cd3c4f1116 100644 --- a/packages/SwingSet/src/kernel/loadVat.js +++ b/packages/SwingSet/src/kernel/loadVat.js @@ -68,6 +68,7 @@ export function makeVatLoader(stuff) { const allowedDynamicOptions = [ 'metered', + 'managerType', 'vatParameters', 'enableSetup', 'enablePipelining', @@ -75,6 +76,7 @@ export function makeVatLoader(stuff) { const allowedStaticOptions = [ 'vatParameters', + 'managerType', 'enableSetup', 'enablePipelining', ]; From 38c63fbc28b1da44f1b34cc0d656b4fa27ce747e Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 21 Sep 2020 11:16:47 -0700 Subject: [PATCH 6/6] test: reenable test-worker.js for all workers Also use t.shutdown in attempt to cleanup after failure, although I think this doesn't work in all cases (the test run hangs if something goes wrong, which is annoying). --- packages/SwingSet/test/workers/test-worker.js | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/packages/SwingSet/test/workers/test-worker.js b/packages/SwingSet/test/workers/test-worker.js index aa9c22dd5e8..2e6f65fc576 100644 --- a/packages/SwingSet/test/workers/test-worker.js +++ b/packages/SwingSet/test/workers/test-worker.js @@ -12,38 +12,31 @@ maybeTestXS('xs vat manager', async t => { const config = await loadBasedir(__dirname); config.vats.target.creationOptions = { managerType: 'xs-worker' }; const c = await buildVatController(config, []); + t.teardown(c.shutdown); await c.run(); t.is(c.kpStatus(c.bootstrapResult), 'fulfilled'); t.deepEqual(c.dump().log, ['testLog works']); - - await c.shutdown(); }); -// XXX Test temporarily disabled on account of breakage due to some kind of -// mysterious node worker mysteriousity. -test.skip('nodeWorker vat manager', async t => { +test('nodeWorker vat manager', async t => { const config = await loadBasedir(__dirname); config.vats.target.creationOptions = { managerType: 'nodeWorker' }; const c = await buildVatController(config, []); + t.teardown(c.shutdown); await c.run(); t.is(c.kpStatus(c.bootstrapResult), 'fulfilled'); t.deepEqual(c.dump().log, ['testLog works']); - - await c.shutdown(); }); -/* // disabling for now due to possible buffering issue on MacOS test('node-subprocess vat manager', async t => { const config = await loadBasedir(__dirname); config.vats.target.creationOptions = { managerType: 'node-subprocess' }; const c = await buildVatController(config, []); + t.teardown(c.shutdown); await c.run(); t.is(c.kpStatus(c.bootstrapResult), 'fulfilled'); t.deepEqual(c.dump().log, ['testLog works']); - - await c.shutdown(); }); -*/