Skip to content

Commit

Permalink
Merge branch '1775-protocol-fix'
Browse files Browse the repository at this point in the history
This fixes the two ends of the netstring-based "kernel-worker" protocol: the
previous version failed to parse large inbound messages, such as non-trivial
vat bundles.

The replacement netstring parser is based on Node.js "Streams", in their
"object mode". We intend to replace this with one based on async iterators,
once I can figure out some other problems with that branch.

We re-enable test-worker.js for all worker types, now that the decoding
problem is fixed.

refs #1299
refs #1127
  • Loading branch information
warner committed Oct 1, 2020
2 parents c169ffd + 38c63fb commit ecc0d7c
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 47 deletions.
1 change: 0 additions & 1 deletion packages/SwingSet/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
"@babel/generator": "^7.6.4",
"anylogger": "^0.21.0",
"esm": "^3.2.5",
"netstring-stream": "^1.0.1",
"re2": "^1.10.5",
"rollup": "^1.23.1",
"rollup-plugin-node-resolve": "^5.2.0",
Expand Down
2 changes: 2 additions & 0 deletions packages/SwingSet/src/kernel/loadVat.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ export function makeVatLoader(stuff) {

const allowedDynamicOptions = [
'metered',
'managerType',
'vatParameters',
'enableSetup',
'enablePipelining',
];

const allowedStaticOptions = [
'vatParameters',
'managerType',
'enableSetup',
'enablePipelining',
];
Expand Down
20 changes: 13 additions & 7 deletions packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ import '@agoric/install-ses';

import anylogger from 'anylogger';
import fs from 'fs';
import Netstring from 'netstring-stream';

import { assert } from '@agoric/assert';
import { importBundle } from '@agoric/import-bundle';
import { Remotable, getInterfaceOf, makeMarshal } from '@agoric/marshal';
import { arrayEncoderStream, arrayDecoderStream } from '../../worker-protocol';
import {
netstringEncoderStream,
netstringDecoderStream,
} from '../../netstring';
import { waitUntilQuiescent } from '../../waitUntilQuiescent';
import { makeLiveSlots } from '../liveSlots';

Expand Down Expand Up @@ -71,17 +75,19 @@ function doNotify(vpid, vp) {
}
}

const toParent = Netstring.writeStream();
toParent.pipe(fs.createWriteStream('IGNORED', { fd: 4, encoding: 'utf-8' }));
const toParent = arrayEncoderStream();
toParent
.pipe(netstringEncoderStream())
.pipe(fs.createWriteStream('IGNORED', { fd: 4, encoding: 'utf-8' }));

const fromParent = fs
.createReadStream('IGNORED', { fd: 3, encoding: 'utf-8' })
.pipe(Netstring.readStream());
fromParent.setEncoding('utf-8');
.pipe(netstringDecoderStream())
.pipe(arrayDecoderStream());

function sendUplink(msg) {
assert(msg instanceof Array, `msg must be an Array`);
toParent.write(JSON.stringify(msg));
toParent.write(msg);
}

// fromParent.on('data', data => {
Expand All @@ -90,7 +96,7 @@ function sendUplink(msg) {
// });

fromParent.on('data', data => {
const [type, ...margs] = JSON.parse(data);
const [type, ...margs] = data;
workerLog(`received`, type);
if (type === 'start') {
// TODO: parent should send ['start', vatID]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ export function makeNodeSubprocessFactory(tools) {
const { fromChild, toChild, terminate, done } = startSubprocessWorker();

function sendToWorker(msg) {
assert(msg instanceof Array);
toChild.write(JSON.stringify(msg));
assert(Array.isArray(msg));
toChild.write(msg);
}

const {
Expand Down Expand Up @@ -114,10 +114,7 @@ export function makeNodeSubprocessFactory(tools) {
}
}

fromChild.on('data', data => {
const msg = JSON.parse(data);
handleUpstream(msg);
});
fromChild.on('data', handleUpstream);

parentLog(`instructing worker to load bundle..`);
sendToWorker(['setBundle', bundle, vatParameters]);
Expand Down
9 changes: 7 additions & 2 deletions packages/SwingSet/src/netstring.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export function encode(data) {
}

// input is a sequence of strings, output is a byte pipe
export function encoderStream() {
export function netstringEncoderStream() {
function transform(chunk, encoding, callback) {
if (!Buffer.isBuffer(chunk)) {
throw Error('stream requires Buffers');
Expand All @@ -25,6 +25,8 @@ export function encoderStream() {
}
callback(err);
}
// (maybe empty) Buffer in, Buffer out. We use writableObjectMode to
// indicate that empty input buffers are important
return new Transform({ transform, writableObjectMode: true });
}

Expand Down Expand Up @@ -64,7 +66,7 @@ export function decode(data) {
}

// input is a byte pipe, output is a sequence of Buffers
export function decoderStream() {
export function netstringDecoderStream() {
let buffered = Buffer.from('');

function transform(chunk, encoding, callback) {
Expand All @@ -88,5 +90,8 @@ export function decoderStream() {
callback(err);
}

// Buffer in, Buffer out, except that each output Buffer is precious, even
// empty ones, and without readableObjectMode the Stream will discard empty
// buffers
return new Transform({ transform, readableObjectMode: true });
}
24 changes: 15 additions & 9 deletions packages/SwingSet/src/spawnSubprocessWorker.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
// this file is loaded by the controller, in the start compartment
import { spawn } from 'child_process';
import Netstring from 'netstring-stream';

import { makePromiseKit } from '@agoric/promise-kit';
import { arrayEncoderStream, arrayDecoderStream } from './worker-protocol';
import { netstringEncoderStream, netstringDecoderStream } from './netstring';

// Start a subprocess from a given executable, and arrange a bidirectional
// message channel with a "supervisor" within that process. Return a {
// toChild, fromChild } pair of Streams which accept/emit hardened Arrays of
// JSON-serializable data.

// eslint-disable-next-line no-unused-vars
function parentLog(first, ...args) {
Expand All @@ -18,11 +23,12 @@ const stdio = harden(['inherit', 'inherit', 'inherit', 'pipe', 'pipe']);
export function startSubprocessWorker(execPath, procArgs = []) {
const proc = spawn(execPath, procArgs, { stdio });

const toChild = Netstring.writeStream();
toChild.pipe(proc.stdio[3]);
const toChild = arrayEncoderStream();
toChild.pipe(netstringEncoderStream()).pipe(proc.stdio[3]);
// proc.stdio[4].setEncoding('utf-8');
const fromChild = proc.stdio[4].pipe(Netstring.readStream());
fromChild.setEncoding('utf-8');
const fromChild = proc.stdio[4]
.pipe(netstringDecoderStream())
.pipe(arrayDecoderStream());

// fromChild.addListener('data', data => parentLog(`fd4 data`, data));
// toChild.write('hello child');
Expand All @@ -43,13 +49,13 @@ export function startSubprocessWorker(execPath, procArgs = []) {
proc.kill();
}

// the Netstring objects don't like being hardened, so we wrap the methods
// the Transform objects don't like being hardened, so we wrap the methods
// that get used
const wrappedFromChild = {
on: (evName, f) => fromChild.on(evName, f),
on: (...args) => fromChild.on(...args),
};
const wrappedToChild = {
write: data => toChild.write(data),
write: (...args) => toChild.write(...args),
};

return harden({
Expand Down
41 changes: 41 additions & 0 deletions packages/SwingSet/src/worker-protocol.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Transform } from 'stream';

// Transform objects which convert from hardened Arrays of JSON-serializable
// data into Buffers suitable for netstring conversion.

export function arrayEncoderStream() {
function transform(object, encoding, callback) {
if (!Array.isArray(object)) {
throw Error('stream requires Arrays');
}
let err;
try {
this.push(Buffer.from(JSON.stringify(object)));
} catch (e) {
err = e;
}
callback(err);
}
// Array in, Buffer out, hence writableObjectMode
return new Transform({ transform, writableObjectMode: true });
}

export function arrayDecoderStream() {
function transform(buf, encoding, callback) {
let err;
try {
if (!Buffer.isBuffer(buf)) {
throw Error('stream expects Buffers');
}
this.push(JSON.parse(buf));
} catch (e) {
err = e;
}
// this Transform is a one-to-one conversion of Buffer into Array, so we
// always consume the input each time we're called
callback(err);
}

// Buffer in, Array out, hence readableObjectMode
return new Transform({ transform, readableObjectMode: true });
}
11 changes: 8 additions & 3 deletions packages/SwingSet/test/test-netstring.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import '@agoric/install-ses'; // adds 'harden' to global

import test from 'ava';
import { encode, encoderStream, decode, decoderStream } from '../src/netstring';
import {
encode,
decode,
netstringEncoderStream,
netstringDecoderStream,
} from '../src/netstring';

const umlaut = 'ümlaut';
const umlautBuffer = Buffer.from(umlaut, 'utf-8');
Expand Down Expand Up @@ -51,7 +56,7 @@ test('encode', t => {
});

test('encode stream', async t => {
const e = encoderStream();
const e = netstringEncoderStream();
const chunks = [];
e.on('data', data => chunks.push(data));
e.write(Buffer.from(''));
Expand Down Expand Up @@ -106,7 +111,7 @@ test('decode', t => {
});

test('decode stream', async t => {
const d = decoderStream();
const d = netstringDecoderStream();
function write(s) {
d.write(Buffer.from(s));
}
Expand Down
76 changes: 76 additions & 0 deletions packages/SwingSet/test/test-worker-protocol.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import '@agoric/install-ses'; // adds 'harden' to global

import test from 'ava';
import { arrayEncoderStream, arrayDecoderStream } from '../src/worker-protocol';
import {
encode,
netstringEncoderStream,
netstringDecoderStream,
} from '../src/netstring';

test('arrayEncoderStream', async t => {
const e = arrayEncoderStream();
const chunks = [];
e.on('data', data => chunks.push(data));
e.write([]);

function eq(expected) {
t.deepEqual(
chunks.map(buf => buf.toString()),
expected,
);
}
eq([`[]`]);

e.write(['command', { foo: 1 }]);
eq([`[]`, `["command",{"foo":1}]`]);
});

test('encode stream', async t => {
const aStream = arrayEncoderStream();
const nsStream = netstringEncoderStream();
aStream.pipe(nsStream);
const chunks = [];
nsStream.on('data', data => chunks.push(data));
function eq(expected) {
t.deepEqual(
chunks.map(buf => buf.toString()),
expected,
);
}

aStream.write([1]);
eq(['3:[1],']);

aStream.write(['command', { foo: 4 }]);
eq(['3:[1],', '21:["command",{"foo":4}],']);
});

test('decode stream', async t => {
const nsStream = netstringDecoderStream();
const aStream = arrayDecoderStream();
nsStream.pipe(aStream);
function write(s) {
nsStream.write(Buffer.from(s));
}

const msgs = [];
aStream.on('data', msg => msgs.push(msg));

function eq(expected) {
t.deepEqual(msgs, expected);
}

let buf = encode(Buffer.from(JSON.stringify([1])));
write(buf.slice(0, 1));
eq([]);
write(buf.slice(1));
eq([[1]]);
msgs.pop();

buf = encode(Buffer.from(JSON.stringify(['command', { foo: 2 }])));
write(buf.slice(0, 4));
eq([]);
write(buf.slice(4));
eq([['command', { foo: 2 }]]);
});
15 changes: 4 additions & 11 deletions packages/SwingSet/test/workers/test-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,31 @@ maybeTestXS('xs vat manager', async t => {
const config = await loadBasedir(__dirname);
config.vats.target.creationOptions = { managerType: 'xs-worker' };
const c = await buildVatController(config, []);
t.teardown(c.shutdown);

await c.run();
t.is(c.kpStatus(c.bootstrapResult), 'fulfilled');
t.deepEqual(c.dump().log, ['testLog works']);

await c.shutdown();
});

// XXX Test temporarily disabled on account of breakage due to some kind of
// mysterious node worker mysteriousity.
test.skip('nodeWorker vat manager', async t => {
test('nodeWorker vat manager', async t => {
const config = await loadBasedir(__dirname);
config.vats.target.creationOptions = { managerType: 'nodeWorker' };
const c = await buildVatController(config, []);
t.teardown(c.shutdown);

await c.run();
t.is(c.kpStatus(c.bootstrapResult), 'fulfilled');
t.deepEqual(c.dump().log, ['testLog works']);

await c.shutdown();
});

/* // disabling for now due to possible buffering issue on MacOS
test('node-subprocess vat manager', async t => {
const config = await loadBasedir(__dirname);
config.vats.target.creationOptions = { managerType: 'node-subprocess' };
const c = await buildVatController(config, []);
t.teardown(c.shutdown);

await c.run();
t.is(c.kpStatus(c.bootstrapResult), 'fulfilled');
t.deepEqual(c.dump().log, ['testLog works']);
await c.shutdown();
});
*/
Loading

0 comments on commit ecc0d7c

Please sign in to comment.