Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swingset): queue to promise #5252

Merged
merged 5 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 83 additions & 50 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,10 @@ export default function buildKernel(
insistMessage(msg);
kernelKeeper.incStat('dispatches');
kernelKeeper.incStat('dispatchDeliver');

// eslint-disable-next-line no-use-before-define
if (!vatWarehouse.lookup(vatID)) {
const vatInfo = vatWarehouse.lookup(vatID);
if (!vatInfo) {
// splat
if (msg.result) {
resolveToError(msg.result, VAT_TERMINATION_ERROR);
Expand All @@ -403,6 +405,11 @@ export default function buildKernel(
const kd = harden(['message', target, msg]);
// eslint-disable-next-line no-use-before-define
const vd = vatWarehouse.kernelDeliveryToVatDelivery(vatID, kd);

if (vatInfo.enablePipelining && msg.result) {
mhofman marked this conversation as resolved.
Show resolved Hide resolved
kernelKeeper.requeueKernelPromise(msg.result);
}

return deliverAndLogToVat(vatID, kd, vd);
}

Expand Down Expand Up @@ -730,7 +737,7 @@ export default function buildKernel(
* This does not decrement any refcounts. The caller should do that.
*
* @param { RunQueueEventSend } message
* @returns { { vatID: VatID, targetObject: string } | null }
* @returns { { vatID: VatID | null, target: string } | null }
*/
function routeSendEvent(message) {
const { target, msg } = message;
Expand All @@ -752,12 +759,12 @@ export default function buildKernel(
if (!vatID) {
return splat(VAT_TERMINATION_ERROR);
}
return { vatID, targetObject };
return { vatID, target: targetObject };
}

function enqueue() {
kernelKeeper.addMessageToPromiseQueue(target, msg);
return null; // message is queued, not sent to a vat right now
function requeue() {
// message will be requeued, not sent to a vat right now
return { vatID: null, target };
}

if (type === 'object') {
Expand All @@ -781,7 +788,7 @@ export default function buildKernel(
}
case 'unresolved': {
if (!kp.decider) {
return enqueue();
return requeue();
} else {
insistVatID(kp.decider);
// eslint-disable-next-line no-use-before-define
Expand All @@ -791,9 +798,9 @@ export default function buildKernel(
return splat(VAT_TERMINATION_ERROR);
}
if (deciderVat.enablePipelining) {
return { vatID: kp.decider, targetObject: target };
return { vatID: kp.decider, target };
}
return enqueue();
return requeue();
}
}
default:
Expand Down Expand Up @@ -871,13 +878,25 @@ export default function buildKernel(
let useMeter = false;
let deliverP = null;

// The common action should be delivering events to the vat. Any references
// in the events should no longer be the kernel's responsibility and the
// refcounts should be decremented
if (message.type === 'send') {
useMeter = true;
const route = routeSendEvent(message);
decrementSendEventRefCount(message);
if (route) {
if (!route) {
// Message went splat
decrementSendEventRefCount(message);
} else {
vatID = route.vatID;
deliverP = processSend(vatID, route.targetObject, message.msg);
if (vatID) {
decrementSendEventRefCount(message);
deliverP = processSend(vatID, route.target, message.msg);
} else {
// Message is requeued and stays the kernel's responsibility, do not
// decrement refcounts in this case
kernelKeeper.addMessageToPromiseQueue(route.target, message.msg);
}
}
} else if (message.type === 'notify') {
useMeter = true;
Expand Down Expand Up @@ -1064,46 +1083,63 @@ export default function buildKernel(
}

let processQueueRunning;
async function tryProcessDeliveryMessage(message) {
async function tryProcessMessage(processor, message) {
if (processQueueRunning) {
console.error(`We're currently already running at`, processQueueRunning);
assert.fail(X`Kernel reentrancy is forbidden`);
}
processQueueRunning = Error('here');
return processDeliveryMessage(message).finally(() => {
return processor(message).finally(() => {
processQueueRunning = undefined;
});
}

async function processAcceptanceMessage(message) {
kdebug(`processAcceptanceQ ${JSON.stringify(message)}`);
kdebug(legibilizeMessage(message));
if (processQueueRunning) {
mhofman marked this conversation as resolved.
Show resolved Hide resolved
console.error(`We're currently already running at`, processQueueRunning);
assert.fail(X`Kernel reentrancy is forbidden`);
}
kernelSlog.write({ type: 'crank-start', message });
/** @type { PolicyInput } */
const policyInput = ['none'];
try {
processQueueRunning = Error('here');

// By default we're moving events from one queue to another. Any references
// in the events remain the kernel's responsibility and the refcounts persist
if (message.type === 'send') {
const route = routeSendEvent(message);
if (!route) {
// Message went splat, no longer the kernel's responsibility
decrementSendEventRefCount(message);
} else {
const { vatID, target } = route;
if (target !== message.target) {
// Message has been re-targeted, other refcounts stay intact
kernelKeeper.decrementRefCount(message.target, `deq|msg|t`);
kernelKeeper.incrementRefCount(target, `enq|msg|t`);
}
if (vatID) {
kernelKeeper.addToRunQueue({
...message,
target,
});
} else {
kernelKeeper.addMessageToPromiseQueue(target, message.msg);
}
}
} else {
kernelKeeper.addToRunQueue(message);

kernelKeeper.processRefcounts();
kernelKeeper.saveStats();
const crankNum = kernelKeeper.getCrankNumber();
kernelKeeper.incrementCrankNumber();
const { crankhash, activityhash } = kernelKeeper.commitCrank();
kernelSlog.write({
type: 'crank-finish',
crankNum,
crankhash,
activityhash,
});
} finally {
processQueueRunning = undefined;
}

kernelKeeper.processRefcounts();
kernelKeeper.saveStats();
const crankNum = kernelKeeper.getCrankNumber();
kernelKeeper.incrementCrankNumber();
const { crankhash, activityhash } = kernelKeeper.commitCrank();
kernelSlog.write({
type: 'crank-finish',
crankNum,
crankhash,
activityhash,
});

return harden(policyInput);
}

Expand Down Expand Up @@ -1419,19 +1455,16 @@ export default function buildKernel(
return undefined;
}

function startProcessingNextMessageIfAny() {
/** @type {Promise<PolicyInput> | undefined} */
let resultPromise;
function getNextMessageAndProcessor() {
let message = getNextAcceptanceMessage();
if (message) {
resultPromise = processAcceptanceMessage(message);
} else {
/** @type {(message:any) => Promise<PolicyInput>} */
let processor = processAcceptanceMessage;
if (!message) {
message = getNextDeliveryMessage();
if (message) {
resultPromise = tryProcessDeliveryMessage(message);
}
processor = processDeliveryMessage;
}
return { resultPromise };

return { message, processor };
}

async function step() {
Expand All @@ -1441,10 +1474,10 @@ export default function buildKernel(
if (!started) {
throw new Error('must do kernel.start() before step()');
}
const { resultPromise } = startProcessingNextMessageIfAny();
const { processor, message } = getNextMessageAndProcessor();
// process a single message
if (resultPromise) {
await resultPromise;
if (message) {
await tryProcessMessage(processor, message);
if (kernelPanic) {
throw kernelPanic;
}
Expand Down Expand Up @@ -1473,14 +1506,14 @@ export default function buildKernel(
}
let count = 0;
for (;;) {
const { resultPromise } = startProcessingNextMessageIfAny();
if (!resultPromise) {
const { processor, message } = getNextMessageAndProcessor();
if (!message) {
break;
}
count += 1;
/** @type { PolicyInput } */
// eslint-disable-next-line no-await-in-loop
const policyInput = await resultPromise;
const policyInput = await tryProcessMessage(processor, message);
if (kernelPanic) {
throw kernelPanic;
}
Expand Down
64 changes: 38 additions & 26 deletions packages/SwingSet/src/kernel/state/kernelKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -672,18 +672,12 @@ export default function makeKernelKeeper(
kvStore.delete(`${kpid}.refCount`);
}

function resolveKernelPromise(kernelSlot, rejected, capdata) {
function requeueKernelPromise(kernelSlot) {
insistKernelType('promise', kernelSlot);
insistCapData(capdata);

let idx = 0;
for (const dataSlot of capdata.slots) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(dataSlot, `resolve|${kernelSlot}|s${idx}`);
idx += 1;
}

// Re-queue all messages, so they can be delivered to the resolution.
// Re-queue all messages, so they can be delivered to the resolution if the
// promise was resolved, or to the pipelining vat if the decider was
// updated.
// This is a lateral move, so we retain their original refcounts. TODO:
// this is slightly lazy, sending the message back to the same promise
// that just got resolved. When this message makes it to the front of the
Expand All @@ -699,6 +693,23 @@ export default function makeKernelKeeper(
kvStore.set('acceptanceQueue', JSON.stringify(acceptanceQueue));
incStat('acceptanceQueueLength', p.queue.length);

kvStore.deletePrefixedKeys(`${kernelSlot}.queue.`);
kvStore.set(`${kernelSlot}.queue.nextID`, `0`);
}

function resolveKernelPromise(kernelSlot, rejected, capdata) {
insistKernelType('promise', kernelSlot);
insistCapData(capdata);

let idx = 0;
for (const dataSlot of capdata.slots) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(dataSlot, `resolve|${kernelSlot}|s${idx}`);
idx += 1;
}

requeueKernelPromise(kernelSlot);

deleteKernelPromiseState(kernelSlot);
decStat('kpUnresolved');

Expand Down Expand Up @@ -826,22 +837,22 @@ export default function makeKernelKeeper(
// itself. This isn't strictly necessary (the promise will be kept alive
// by the deciding vat's clist, or the queued message that holds this
// promise as its result), but it matches our policy with run-queue
// messages (each holds a refcount on its target), and makes it easier to
// transfer these messages back to the run-queue in
// resolveKernelPromise() (which doesn't touch any of the refcounts).

// eslint-disable-next-line no-use-before-define
incrementRefCount(kernelSlot, `pq|${kernelSlot}|t`);
if (msg.result) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(msg.result, `pq|${kernelSlot}|r`);
}
let idx = 0;
for (const kref of msg.args.slots) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(kref, `pq|${kernelSlot}|s${idx}`);
idx += 1;
}
// messages (each holds a refcount on its target).
//
// Messages are enqueued on a promise queue in 2 cases:
// - A message routed from the acceptance queue
// - A pipelined message had a decider change while in the run-queue
// Messages are dequeued from a promise queue in 2 cases:
// - The promise is resolved
// - The promise's decider is changed to a pipelining vat
// In all cases the messages are just moved from one queue to another so
// the caller should not need to change the refcounts when moving messages
// sent to promises between queues. Only when re-targeting after resolution
// would the targets refcount be updated (but not the result or slots).
//
// Since messages are moved from queue to queue, the tag describing the ref
// does not designate which current queue the message sits on, but that
// there is some kernel queue holding the message and its content.

const p = getKernelPromise(kernelSlot);
assert(
Expand Down Expand Up @@ -1525,6 +1536,7 @@ export default function makeKernelKeeper(
getKernelPromise,
getResolveablePromise,
hasKernelPromise,
requeueKernelPromise,
resolveKernelPromise,
addMessageToPromiseQueue,
addSubscriberToPromise,
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/test/message-patterns.js
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ export function buildPatterns(log) {
return pipe2;
};
}
out.a70 = ['pipe1', 'pipe2', 'pipe3', 'p1.then', 'p2.then', 'p3.then'];
out.a70 = ['pipe1', 'p1.then', 'pipe2', 'p2.then', 'pipe3', 'p3.then'];
outPipelined.a70 = [
'pipe1',
'pipe2',
Expand Down
Loading