Skip to content

Commit

Permalink
xsnap worker: completes 1 delivery; what does doSyscall return???
Browse files Browse the repository at this point in the history
 - Use Tagged type consistently;
   don't constrain tag to be string.
 - clean up logging: use parentLog(), trace(), ...
 - static typing for doProcess: capture dispatch while
   it's known to be not null
  • Loading branch information
dckc committed Jan 19, 2021
1 parent 1da1341 commit 5fe2224
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 43 deletions.
39 changes: 17 additions & 22 deletions packages/SwingSet/src/kernel/vatManager/manager-subprocess-xsnap.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ function parentLog(first, ...args) {
console.error(`--parent: ${first}`, ...args);
}

const trace = label => x => {
parentLog(label, x);
return x;
};

const encoder = new TextEncoder();
const decoder = new TextDecoder();

Expand All @@ -25,7 +30,7 @@ const decoder = new TextDecoder();
* @typedef { ReturnType<typeof import('@agoric/xsnap').xsnap> } XSnap
* @typedef { ReturnType<typeof import('../state/kernelKeeper').default> } KernelKeeper
* @typedef { ReturnType<typeof import('./manager-nodeworker').makeNodeWorkerVatManagerFactory> } VatManagerFactory
* @typedef { [string, ...unknown[]] } Tagged
* @typedef { [unknown, ...unknown[]] } Tagged
*/
export function makeXsSubprocessFactory({
startXSnap,
Expand All @@ -39,7 +44,7 @@ export function makeXsSubprocessFactory({
* @param { ManagerOptions } managerOptions
*/
async function createFromBundle(vatID, bundle, managerOptions) {
console.log('@@createFromBundle', { vatID, managerOptions: { ...managerOptions, bundle: '<suppressed>' } });
parentLog('createFromBundle', { vatID });
const { vatParameters, virtualObjectCacheSize } = managerOptions;
assert(!managerOptions.metered, 'not supported yet');
assert(!managerOptions.enableSetup, 'not supported at all');
Expand All @@ -60,24 +65,24 @@ export function makeXsSubprocessFactory({
transcriptManager,
);

/** @type { (vatSyscallObject: Tagged) => Tagged } */
//@@??? should this return Tagged?
/** @type { (vatSyscallObject: Tagged) => unknown } */
function handleSyscall(vatSyscallObject) {
return doSyscall(vatSyscallObject);
return trace('doSyscall')(doSyscall(vatSyscallObject));
}

/** @type { (vref: unknown, count: number) => void } */
function vatDecref(vref, count) {
decref(vatID, vref, count);
}

/** @type { (item: Tagged) => Tagged } */
/** @type { (item: Tagged) => unknown } */
function handleUpstream([type, ...args]) {
parentLog(`handleUpstream`, type, args.length);
switch (type) {
case 'syscall': {
parentLog(`syscall`, args);
const [scTag, ...vatSyscallArgs] = args;
assert(typeof scTag === 'string');
return handleSyscall([scTag, ...vatSyscallArgs]);
}
case 'testLog':
Expand All @@ -90,47 +95,38 @@ export function makeXsSubprocessFactory({
return ['OK'];
}
default:
parentLog(`unrecognized uplink message ${type}`);
return ['?'];
throw new Error(`unrecognized uplink message ${type}`);
}
}

/** @type { (msg: Uint8Array) => Uint8Array } */
function handleCommand(msg) {
console.log('handleCommand', { length: msg.byteLength });
parentLog('handleCommand', { length: msg.byteLength });
const tagged = handleUpstream(JSON.parse(decoder.decode(msg)));
return encoder.encode(JSON.stringify(tagged));
}

console.log('@@do we return from startXSnap?');
// start the worker and establish a connection
const { worker, bundles } = startXSnap(`${vatID}`, handleCommand);
console.log('@@YES, we return from startXSnap.');
await worker.evaluate('1+1'); //@@
console.log('and evaluate() finishes @@');
console.log('@@bundle keys', Object.keys(bundles.lockdown));
for await (const [it, superCode] of Object.entries(bundles)) {
parentLog('bundle', it);
assert(
superCode.moduleFormat === 'getExport',
details`${it} unexpected: ${superCode.moduleFormat}`,
);
console.log('@@evaluating...', it);
const x = await worker.evaluate(
await worker.evaluate(
`(${superCode.source}
)()`.trim(),
);
console.log('@@evaluated.', it, x);
}

/** @type { (item: Tagged) => Promise<Tagged> } */
async function issueTagged(item) {
console.log('@@issueTagged', item[0]);
parentLog('issueTagged', item[0]);
const txt = await worker.issueStringCommand(JSON.stringify(item));
const reply = JSON.parse(txt);
console.log('@@reply', Array.isArray(reply), reply[0]);
assert(Array.isArray(reply));
const [tag, ...rest] = reply;
assert(typeof tag === 'string');
return [tag, ...rest];
}

Expand Down Expand Up @@ -160,7 +156,6 @@ export function makeXsSubprocessFactory({
/** @type { (item: Tagged) => Promise<Tagged> } */
async function deliver(delivery) {
parentLog(`sending delivery`, delivery);
// ensure return tag is 'deliverDone'?
const result = await commandResult(['deliver', ...delivery]);
parentLog(`deliverDone`, result[0], result.length);
return result;
Expand Down Expand Up @@ -189,7 +184,7 @@ export function makeXsSubprocessFactory({
shutdown,
});

console.log('@@returning manager', Object.keys(manager));
parentLog('manager', Object.keys(manager));
return manager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,30 @@
import { assert, details } from '@agoric/assert';
import { importBundle } from '@agoric/import-bundle';
import { Remotable, getInterfaceOf, makeMarshal } from '@agoric/marshal';
// grumble... waitUntilQuiescent is exported and closes over ambient authority
import { waitUntilQuiescent } from '../../waitUntilQuiescent';

import { makeLiveSlots } from '../liveSlots';

const encoder = new TextEncoder();
const decoder = new TextDecoder();

// eslint-disable-next-line no-unused-vars
function workerLog(first, ...args) {
// @ts-ignore
// eslint-disable-next-line
print(`---worker: ${first}`, ...args);
}

workerLog(`supervisor started`);

/**
* @typedef { [unknown, ...unknown[]] } Tagged
*/
const Item = {
/** @type { (item: Tagged) => ArrayBuffer } */
encode: tagged => encoder.encode(JSON.stringify(tagged)).buffer,

/** @type { (msg: ArrayBuffer) => Tagged } */
decode(msg) {
const txt = decoder.decode(msg);
Expand Down Expand Up @@ -44,26 +56,36 @@ function testLog(...args) {
issueCommand(Item.encode(['testLog', ...args]));
}

// eslint-disable-next-line no-unused-vars
function workerLog(first, ...args) {
testLog(`---worker: ${first}`, ...args);
/**
* @param { (value: void) => void } f
* @param { string } errmsg
*/
function runAndWait(f, errmsg) {
Promise.resolve()
.then(f)
.then(undefined, err => workerLog(`doProcess: ${errmsg}:`, err));
return waitUntilQuiescent();
}

workerLog(`supervisor started`);

function makeWorker() {
let dispatch;
/** @type { Record<string, (...args: unknown[]) => void> | null } */
let dispatch = null;

/** @type { (dr: Tagged, errmsg: string) => Promise<Tagged> } */
async function doProcess(dispatchRecord, errmsg) {
const dispatchOp = dispatchRecord[0];
const dispatchArgs = dispatchRecord.slice(1);
assert(dispatch);
const theDispatch = dispatch;
const [dispatchOp, ...dispatchArgs] = dispatchRecord;
assert(typeof dispatchOp === 'string');
workerLog(`runAndWait`);
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
await runAndWait(() => theDispatch[dispatchOp](...dispatchArgs), errmsg);
workerLog(`doProcess done`);
/** @type { Tagged } */
const vatDeliveryResults = harden(['ok']);
return vatDeliveryResults;
}

/** @type { (ts: unknown, msg: any) => Promise<Tagged> } */
function doMessage(targetSlot, msg) {
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
return doProcess(
Expand All @@ -72,6 +94,7 @@ function makeWorker() {
);
}

/** @type { (pv: unknown, msg: any) => Promise<Tagged> } */
function doNotify(primaryVpid, resolutions) {
const errmsg = `vat.promise[${primaryVpid}] failed`;
return doProcess(['notify', primaryVpid, resolutions], errmsg);
Expand All @@ -90,12 +113,15 @@ function makeWorker() {
vatParameters,
virtualObjectCacheSize,
) {
/** @type { (item: Tagged) => Tagged } */
//@@??? should this return Tagged?
/** @type { (item: Tagged) => unknown } */
function doSyscall(vatSyscallObject) {
return Item.decode(
// @ts-ignore
// eslint-disable-next-line no-undef
issueCommand(Item.encode(['syscall', ...vatSyscallObject])),
return JSON.parse(
decoder.decode(
// @ts-ignore
// eslint-disable-next-line no-undef
issueCommand(Item.encode(['syscall', ...vatSyscallObject])),
),
);
}

Expand Down Expand Up @@ -137,6 +163,7 @@ function makeWorker() {
workerLog(`got vatNS:`, Object.keys(vatNS).join(','));
ls.setBuildRootObject(vatNS.buildRootObject);
dispatch = ls.dispatch;
assert(dispatch);
workerLog(`got dispatch:`, Object.keys(dispatch).join(','));
return ['dispatchReady'];
}
Expand All @@ -152,11 +179,11 @@ function makeWorker() {
const [dtype, ...dargs] = args;
switch (dtype) {
case 'message': {
const res = await doMessage(...dargs);
const res = await doMessage(dargs[0], dargs[1]);
return ['deliverDone', ...res];
}
case 'notify': {
const res = await doNotify(...dargs);
const res = await doNotify(dargs[0], dargs[1]);
return ['deliverDone', ...res];
}
default:
Expand Down
7 changes: 2 additions & 5 deletions packages/SwingSet/test/workers/test-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ test('xs vat manager', async t => {
const c = await buildVatController(config, []);
t.teardown(c.shutdown);

console.log('@@ready to run...');
await c.run();
t.is(c.kpStatus(c.bootstrapResult), 'fulfilled');
t.deepEqual(c.dump().log, ['testLog works']);
});

// @@ SKIP?
test.skip('nodeWorker vat manager', async t => {
test('nodeWorker vat manager', async t => {
const config = await loadBasedir(__dirname);
config.vats.target.creationOptions = { managerType: 'nodeWorker' };
const c = await buildVatController(config, []);
Expand All @@ -26,8 +24,7 @@ test.skip('nodeWorker vat manager', async t => {
t.deepEqual(c.dump().log, ['testLog works']);
});

// @@ SKIP?
test.skip('node-subprocess vat manager', async t => {
test('node-subprocess vat manager', async t => {
const config = await loadBasedir(__dirname);
config.vats.target.creationOptions = { managerType: 'node-subprocess' };
const c = await buildVatController(config, []);
Expand Down

0 comments on commit 5fe2224

Please sign in to comment.