Skip to content

Commit

Permalink
Merge pull request #5252 from Agoric/mhofman/4542-queue-to-promise
Browse files Browse the repository at this point in the history
feat(swingset): queue to promise
  • Loading branch information
mergify[bot] authored May 5, 2022
2 parents 73f62e2 + c4f7a3e commit 92da539
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 91 deletions.
133 changes: 83 additions & 50 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,10 @@ export default function buildKernel(
insistMessage(msg);
kernelKeeper.incStat('dispatches');
kernelKeeper.incStat('dispatchDeliver');

// eslint-disable-next-line no-use-before-define
if (!vatWarehouse.lookup(vatID)) {
const vatInfo = vatWarehouse.lookup(vatID);
if (!vatInfo) {
// splat
if (msg.result) {
resolveToError(msg.result, VAT_TERMINATION_ERROR);
Expand All @@ -403,6 +405,11 @@ export default function buildKernel(
const kd = harden(['message', target, msg]);
// eslint-disable-next-line no-use-before-define
const vd = vatWarehouse.kernelDeliveryToVatDelivery(vatID, kd);

if (vatInfo.enablePipelining && msg.result) {
kernelKeeper.requeueKernelPromise(msg.result);
}

return deliverAndLogToVat(vatID, kd, vd);
}

Expand Down Expand Up @@ -730,7 +737,7 @@ export default function buildKernel(
* This does not decrement any refcounts. The caller should do that.
*
* @param { RunQueueEventSend } message
* @returns { { vatID: VatID, targetObject: string } | null }
* @returns { { vatID: VatID | null, target: string } | null }
*/
function routeSendEvent(message) {
const { target, msg } = message;
Expand All @@ -752,12 +759,12 @@ export default function buildKernel(
if (!vatID) {
return splat(VAT_TERMINATION_ERROR);
}
return { vatID, targetObject };
return { vatID, target: targetObject };
}

function enqueue() {
kernelKeeper.addMessageToPromiseQueue(target, msg);
return null; // message is queued, not sent to a vat right now
function requeue() {
// message will be requeued, not sent to a vat right now
return { vatID: null, target };
}

if (type === 'object') {
Expand All @@ -781,7 +788,7 @@ export default function buildKernel(
}
case 'unresolved': {
if (!kp.decider) {
return enqueue();
return requeue();
} else {
insistVatID(kp.decider);
// eslint-disable-next-line no-use-before-define
Expand All @@ -791,9 +798,9 @@ export default function buildKernel(
return splat(VAT_TERMINATION_ERROR);
}
if (deciderVat.enablePipelining) {
return { vatID: kp.decider, targetObject: target };
return { vatID: kp.decider, target };
}
return enqueue();
return requeue();
}
}
default:
Expand Down Expand Up @@ -871,13 +878,25 @@ export default function buildKernel(
let useMeter = false;
let deliverP = null;

// The common action should be delivering events to the vat. Any references
// in the events should no longer be the kernel's responsibility and the
// refcounts should be decremented
if (message.type === 'send') {
useMeter = true;
const route = routeSendEvent(message);
decrementSendEventRefCount(message);
if (route) {
if (!route) {
// Message went splat
decrementSendEventRefCount(message);
} else {
vatID = route.vatID;
deliverP = processSend(vatID, route.targetObject, message.msg);
if (vatID) {
decrementSendEventRefCount(message);
deliverP = processSend(vatID, route.target, message.msg);
} else {
// Message is requeued and stays the kernel's responsibility, do not
// decrement refcounts in this case
kernelKeeper.addMessageToPromiseQueue(route.target, message.msg);
}
}
} else if (message.type === 'notify') {
useMeter = true;
Expand Down Expand Up @@ -1064,46 +1083,63 @@ export default function buildKernel(
}

let processQueueRunning;
async function tryProcessDeliveryMessage(message) {
async function tryProcessMessage(processor, message) {
if (processQueueRunning) {
console.error(`We're currently already running at`, processQueueRunning);
assert.fail(X`Kernel reentrancy is forbidden`);
}
processQueueRunning = Error('here');
return processDeliveryMessage(message).finally(() => {
return processor(message).finally(() => {
processQueueRunning = undefined;
});
}

async function processAcceptanceMessage(message) {
kdebug(`processAcceptanceQ ${JSON.stringify(message)}`);
kdebug(legibilizeMessage(message));
if (processQueueRunning) {
console.error(`We're currently already running at`, processQueueRunning);
assert.fail(X`Kernel reentrancy is forbidden`);
}
kernelSlog.write({ type: 'crank-start', message });
/** @type { PolicyInput } */
const policyInput = ['none'];
try {
processQueueRunning = Error('here');

// By default we're moving events from one queue to another. Any references
// in the events remain the kernel's responsibility and the refcounts persist
if (message.type === 'send') {
const route = routeSendEvent(message);
if (!route) {
// Message went splat, no longer the kernel's responsibility
decrementSendEventRefCount(message);
} else {
const { vatID, target } = route;
if (target !== message.target) {
// Message has been re-targeted, other refcounts stay intact
kernelKeeper.decrementRefCount(message.target, `deq|msg|t`);
kernelKeeper.incrementRefCount(target, `enq|msg|t`);
}
if (vatID) {
kernelKeeper.addToRunQueue({
...message,
target,
});
} else {
kernelKeeper.addMessageToPromiseQueue(target, message.msg);
}
}
} else {
kernelKeeper.addToRunQueue(message);

kernelKeeper.processRefcounts();
kernelKeeper.saveStats();
const crankNum = kernelKeeper.getCrankNumber();
kernelKeeper.incrementCrankNumber();
const { crankhash, activityhash } = kernelKeeper.commitCrank();
kernelSlog.write({
type: 'crank-finish',
crankNum,
crankhash,
activityhash,
});
} finally {
processQueueRunning = undefined;
}

kernelKeeper.processRefcounts();
kernelKeeper.saveStats();
const crankNum = kernelKeeper.getCrankNumber();
kernelKeeper.incrementCrankNumber();
const { crankhash, activityhash } = kernelKeeper.commitCrank();
kernelSlog.write({
type: 'crank-finish',
crankNum,
crankhash,
activityhash,
});

return harden(policyInput);
}

Expand Down Expand Up @@ -1419,19 +1455,16 @@ export default function buildKernel(
return undefined;
}

function startProcessingNextMessageIfAny() {
/** @type {Promise<PolicyInput> | undefined} */
let resultPromise;
function getNextMessageAndProcessor() {
let message = getNextAcceptanceMessage();
if (message) {
resultPromise = processAcceptanceMessage(message);
} else {
/** @type {(message:any) => Promise<PolicyInput>} */
let processor = processAcceptanceMessage;
if (!message) {
message = getNextDeliveryMessage();
if (message) {
resultPromise = tryProcessDeliveryMessage(message);
}
processor = processDeliveryMessage;
}
return { resultPromise };

return { message, processor };
}

async function step() {
Expand All @@ -1441,10 +1474,10 @@ export default function buildKernel(
if (!started) {
throw new Error('must do kernel.start() before step()');
}
const { resultPromise } = startProcessingNextMessageIfAny();
const { processor, message } = getNextMessageAndProcessor();
// process a single message
if (resultPromise) {
await resultPromise;
if (message) {
await tryProcessMessage(processor, message);
if (kernelPanic) {
throw kernelPanic;
}
Expand Down Expand Up @@ -1473,14 +1506,14 @@ export default function buildKernel(
}
let count = 0;
for (;;) {
const { resultPromise } = startProcessingNextMessageIfAny();
if (!resultPromise) {
const { processor, message } = getNextMessageAndProcessor();
if (!message) {
break;
}
count += 1;
/** @type { PolicyInput } */
// eslint-disable-next-line no-await-in-loop
const policyInput = await resultPromise;
const policyInput = await tryProcessMessage(processor, message);
if (kernelPanic) {
throw kernelPanic;
}
Expand Down
64 changes: 38 additions & 26 deletions packages/SwingSet/src/kernel/state/kernelKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -672,18 +672,12 @@ export default function makeKernelKeeper(
kvStore.delete(`${kpid}.refCount`);
}

function resolveKernelPromise(kernelSlot, rejected, capdata) {
function requeueKernelPromise(kernelSlot) {
insistKernelType('promise', kernelSlot);
insistCapData(capdata);

let idx = 0;
for (const dataSlot of capdata.slots) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(dataSlot, `resolve|${kernelSlot}|s${idx}`);
idx += 1;
}

// Re-queue all messages, so they can be delivered to the resolution.
// Re-queue all messages, so they can be delivered to the resolution if the
// promise was resolved, or to the pipelining vat if the decider was
// updated.
// This is a lateral move, so we retain their original refcounts. TODO:
// this is slightly lazy, sending the message back to the same promise
// that just got resolved. When this message makes it to the front of the
Expand All @@ -699,6 +693,23 @@ export default function makeKernelKeeper(
kvStore.set('acceptanceQueue', JSON.stringify(acceptanceQueue));
incStat('acceptanceQueueLength', p.queue.length);

kvStore.deletePrefixedKeys(`${kernelSlot}.queue.`);
kvStore.set(`${kernelSlot}.queue.nextID`, `0`);
}

function resolveKernelPromise(kernelSlot, rejected, capdata) {
insistKernelType('promise', kernelSlot);
insistCapData(capdata);

let idx = 0;
for (const dataSlot of capdata.slots) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(dataSlot, `resolve|${kernelSlot}|s${idx}`);
idx += 1;
}

requeueKernelPromise(kernelSlot);

deleteKernelPromiseState(kernelSlot);
decStat('kpUnresolved');

Expand Down Expand Up @@ -826,22 +837,22 @@ export default function makeKernelKeeper(
// itself. This isn't strictly necessary (the promise will be kept alive
// by the deciding vat's clist, or the queued message that holds this
// promise as its result), but it matches our policy with run-queue
// messages (each holds a refcount on its target), and makes it easier to
// transfer these messages back to the run-queue in
// resolveKernelPromise() (which doesn't touch any of the refcounts).

// eslint-disable-next-line no-use-before-define
incrementRefCount(kernelSlot, `pq|${kernelSlot}|t`);
if (msg.result) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(msg.result, `pq|${kernelSlot}|r`);
}
let idx = 0;
for (const kref of msg.args.slots) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(kref, `pq|${kernelSlot}|s${idx}`);
idx += 1;
}
// messages (each holds a refcount on its target).
//
// Messages are enqueued on a promise queue in 2 cases:
// - A message routed from the acceptance queue
// - A pipelined message had a decider change while in the run-queue
// Messages are dequeued from a promise queue in 2 cases:
// - The promise is resolved
// - The promise's decider is changed to a pipelining vat
// In all cases the messages are just moved from one queue to another so
// the caller should not need to change the refcounts when moving messages
// sent to promises between queues. Only when re-targeting after resolution
// would the targets refcount be updated (but not the result or slots).
//
// Since messages are moved from queue to queue, the tag describing the ref
// does not designate which current queue the message sits on, but that
// there is some kernel queue holding the message and its content.

const p = getKernelPromise(kernelSlot);
assert(
Expand Down Expand Up @@ -1525,6 +1536,7 @@ export default function makeKernelKeeper(
getKernelPromise,
getResolveablePromise,
hasKernelPromise,
requeueKernelPromise,
resolveKernelPromise,
addMessageToPromiseQueue,
addSubscriberToPromise,
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/test/message-patterns.js
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ export function buildPatterns(log) {
return pipe2;
};
}
out.a70 = ['pipe1', 'pipe2', 'pipe3', 'p1.then', 'p2.then', 'p3.then'];
out.a70 = ['pipe1', 'p1.then', 'pipe2', 'p2.then', 'pipe3', 'p3.then'];
outPipelined.a70 = [
'pipe1',
'pipe2',
Expand Down
Loading

0 comments on commit 92da539

Please sign in to comment.