From 5c3398428787de999c8cc20bbd1d8e4d0d85afbc Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Wed, 30 Aug 2023 00:03:59 +0000 Subject: [PATCH 1/2] refactor(cosmic-swingset): share runSwingset with bootstrap --- packages/cosmic-swingset/src/launch-chain.js | 182 ++++++++++--------- packages/cosmic-swingset/src/sim-chain.js | 1 + 2 files changed, 94 insertions(+), 89 deletions(-) diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index e001d2f6e47..ad0668a1a0f 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -198,7 +198,8 @@ export async function buildSwingset( /** * @typedef {import('@agoric/swingset-vat').RunPolicy & { * shouldRun(): boolean; - * remainingBeans(): bigint; + * remainingBeans(): bigint | undefined; + * totalBeans(): bigint; * }} ChainRunPolicy */ @@ -211,19 +212,24 @@ export async function buildSwingset( /** * @param {BeansPerUnit} beansPerUnit + * @param {boolean} [ignoreBlockLimit] * @returns {ChainRunPolicy} */ -function computronCounter({ - [BeansPerBlockComputeLimit]: blockComputeLimit, - [BeansPerVatCreation]: vatCreation, - [BeansPerXsnapComputron]: xsnapComputron, -}) { +function computronCounter( + { + [BeansPerBlockComputeLimit]: blockComputeLimit, + [BeansPerVatCreation]: vatCreation, + [BeansPerXsnapComputron]: xsnapComputron, + }, + ignoreBlockLimit = false, +) { assert.typeof(blockComputeLimit, 'bigint'); assert.typeof(vatCreation, 'bigint'); assert.typeof(xsnapComputron, 'bigint'); let totalBeans = 0n; - const shouldRun = () => totalBeans < blockComputeLimit; - const remainingBeans = () => blockComputeLimit - totalBeans; + const shouldRun = () => ignoreBlockLimit || totalBeans < blockComputeLimit; + const remainingBeans = () => + ignoreBlockLimit ? undefined : blockComputeLimit - totalBeans; const policy = harden({ vatCreated() { @@ -247,23 +253,17 @@ function computronCounter({ return shouldRun(); }, emptyCrank() { - return true; + return shouldRun(); }, shouldRun, remainingBeans, + totalBeans() { + return totalBeans; + }, }); return policy; } -function neverStop() { - return harden({ - vatCreated: () => true, - crankComplete: () => true, - crankFailed: () => true, - emptyCrank: () => true, - }); -} - export async function launch({ actionQueueStorage, highPriorityQueueStorage, @@ -353,14 +353,55 @@ export async function launch({ inboundQueueMetrics, }); - async function bootstrapBlock(_blockHeight, blockTime) { + /** + * @param {number} blockHeight + * @param {ChainRunPolicy} runPolicy + */ + function makeRunSwingset(blockHeight, runPolicy) { + let runNum = 0; + async function runSwingset() { + const startBeans = runPolicy.totalBeans(); + controller.writeSlogObject({ + type: 'cosmic-swingset-run-start', + blockHeight, + runNum, + startBeans, + remainingBeans: runPolicy.remainingBeans(), + }); + // TODO: crankScheduler does a schedulerBlockTimeHistogram thing + // that needs to be revisited, it used to be called once per + // block, now it's once per processed inbound queue item + await crankScheduler(runPolicy); + const finishBeans = runPolicy.totalBeans(); + controller.writeSlogObject({ + type: 'kernel-stats', + stats: controller.getStats(), + }); + controller.writeSlogObject({ + type: 'cosmic-swingset-run-finish', + blockHeight, + runNum, + startBeans, + finishBeans, + usedBeans: finishBeans - startBeans, + remainingBeans: runPolicy.remainingBeans(), + }); + runNum += 1; + return runPolicy.shouldRun(); + } + return runSwingset; + } + + async function bootstrapBlock(blockHeight, blockTime, params) { // We need to let bootstrap know of the chain time. The time of the first // block may be the genesis time, or the block time of the upgrade block. timer.poll(blockTime); // This is before the initial block, we need to finish processing the // entire bootstrap before opening for business. - const policy = neverStop(); - await crankScheduler(policy); + const runPolicy = computronCounter(params.beansPerUnit, true); + const runSwingset = makeRunSwingset(blockHeight, runPolicy); + + await runSwingset(); } async function saveChainState() { @@ -510,67 +551,39 @@ export async function launch({ return p; } - async function runKernel(runPolicy, blockHeight, blockTime) { - let runNum = 0; - async function runSwingset() { - const initialBeans = runPolicy.remainingBeans(); - controller.writeSlogObject({ - type: 'cosmic-swingset-run-start', - blockHeight, - runNum, - initialBeans, - }); - // TODO: crankScheduler does a schedulerBlockTimeHistogram thing - // that needs to be revisited, it used to be called once per - // block, now it's once per processed inbound queue item - await crankScheduler(runPolicy); - const remainingBeans = runPolicy.remainingBeans(); - controller.writeSlogObject({ - type: 'kernel-stats', - stats: controller.getStats(), - }); - controller.writeSlogObject({ - type: 'cosmic-swingset-run-finish', - blockHeight, - runNum, - remainingBeans, - usedBeans: initialBeans - remainingBeans, - }); - runNum += 1; - return runPolicy.shouldRun(); - } - - /** - * Process as much as we can from an inbound queue, which contains - * first the old actions not previously processed, followed by actions - * newly added, running the kernel to completion after each. - * - * @param {InboundQueue} inboundQueue - */ - async function processActions(inboundQueue) { - let keepGoing = true; - for (const { action, context } of inboundQueue.consumeAll()) { - const inboundNum = `${context.blockHeight}-${context.txHash}-${context.msgIdx}`; - inboundQueueMetrics.decStat(); - // eslint-disable-next-line no-await-in-loop - await performAction(action, inboundNum); - // eslint-disable-next-line no-await-in-loop - keepGoing = await runSwingset(); - if (!keepGoing) { - // any leftover actions will remain on the inbound queue for possible - // processing in the next block - break; - } + /** + * Process as much as we can from an inbound queue, which contains + * first the old actions not previously processed, followed by actions + * newly added, running the kernel to completion after each. + * + * @param {InboundQueue} inboundQueue + * @param {ReturnType} runSwingset + */ + async function processActions(inboundQueue, runSwingset) { + let keepGoing = true; + for (const { action, context } of inboundQueue.consumeAll()) { + const inboundNum = `${context.blockHeight}-${context.txHash}-${context.msgIdx}`; + inboundQueueMetrics.decStat(); + // eslint-disable-next-line no-await-in-loop + await performAction(action, inboundNum); + // eslint-disable-next-line no-await-in-loop + keepGoing = await runSwingset(); + if (!keepGoing) { + // any leftover actions will remain on the inbound queue for possible + // processing in the next block + break; } - return keepGoing; } + return keepGoing; + } + async function runKernel(runSwingset, blockHeight, blockTime) { // First, complete leftover work, if any let keepGoing = await runSwingset(); if (!keepGoing) return; // Then, process as much as we can from the priorityQueue. - keepGoing = await processActions(highPriorityQueue); + keepGoing = await processActions(highPriorityQueue, runSwingset); if (!keepGoing) return; // Then, update the timer device with the new external time, which might @@ -587,7 +600,7 @@ export async function launch({ if (!keepGoing) return; // Finally, process as much as we can from the actionQueue. - await processActions(actionQueue); + await processActions(actionQueue, runSwingset); } async function endBlock(blockHeight, blockTime, params) { @@ -603,8 +616,9 @@ export async function launch({ // make a runPolicy that will be shared across all cycles const runPolicy = computronCounter(params.beansPerUnit); + const runSwingset = makeRunSwingset(blockHeight, runPolicy); - await runKernel(runPolicy, blockHeight, blockTime); + await runKernel(runSwingset, blockHeight, blockTime); if (END_BLOCK_SPIN_MS) { // Introduce a busy-wait to artificially put load on the chain. @@ -708,34 +722,24 @@ export async function launch({ // ); switch (action.type) { case ActionType.AG_COSMOS_INIT: { - const { isBootstrap, upgradePlan, blockTime } = action; + const { isBootstrap, upgradePlan, blockTime, params } = action; // This only runs for the very first block on the chain. if (isBootstrap) { verboseBlocks && blockManagerConsole.info('block bootstrap'); (savedHeight === 0 && savedBeginHeight === 0) || Fail`Cannot run a bootstrap block at height ${savedHeight}`; + const bootstrapBlockParams = parseParams(params); const blockHeight = 0; - const runNum = 0; controller.writeSlogObject({ type: 'cosmic-swingset-bootstrap-block-start', blockTime, }); - controller.writeSlogObject({ - type: 'cosmic-swingset-run-start', - blockHeight, - runNum, - }); // Start a block transaction, but without changing state // for the upcoming begin block check saveBeginHeight(savedBeginHeight); await processAction(action.type, async () => - bootstrapBlock(blockHeight, blockTime), + bootstrapBlock(blockHeight, blockTime, bootstrapBlockParams), ); - controller.writeSlogObject({ - type: 'cosmic-swingset-run-finish', - blockHeight, - runNum, - }); controller.writeSlogObject({ type: 'cosmic-swingset-bootstrap-block-finish', blockTime, diff --git a/packages/cosmic-swingset/src/sim-chain.js b/packages/cosmic-swingset/src/sim-chain.js index 78f113594a6..4faf71a6142 100644 --- a/packages/cosmic-swingset/src/sim-chain.js +++ b/packages/cosmic-swingset/src/sim-chain.js @@ -216,6 +216,7 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) { type: 'AG_COSMOS_INIT', blockTime: scaleBlockTime(Date.now()), isBootstrap: true, + params: DEFAULT_SIM_SWINGSET_PARAMS, }); blockHeight = initialHeight; }; From 1cf7c92ee422c794c1b706214a21d3c06a36fdf0 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Thu, 31 Aug 2023 00:10:28 +0000 Subject: [PATCH 2/2] feat(cosmic-swingset): run upgrade actions to completion --- packages/cosmic-swingset/src/launch-chain.js | 85 +++++++++++++------- 1 file changed, 54 insertions(+), 31 deletions(-) diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index ad0668a1a0f..56a4dfe95c0 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -40,7 +40,7 @@ import { BeansPerXsnapComputron, } from './sim-params.js'; import { parseParams } from './params.js'; -import { makeQueue } from './helpers/make-queue.js'; +import { makeQueue, makeQueueStorageMock } from './helpers/make-queue.js'; import { exportStorage } from './export-storage.js'; import { parseLocatedJson } from './helpers/json.js'; @@ -312,6 +312,14 @@ export async function launch({ const actionQueue = makeQueue(actionQueueStorage); /** @type {InboundQueue} */ const highPriorityQueue = makeQueue(highPriorityQueueStorage); + /** + * In memory queue holding actions that must be consumed entirely + * during the block. If it's not drained, we open the gates to + * hangover hell. + * + * @type {InboundQueue} + */ + const runThisBlock = makeQueue(makeQueueStorageMock().storage); // Not to be confused with the gas model, this meter is for OpenTelemetry. const metricMeter = metricsProvider.getMeter('ag-chain-cosmos'); @@ -582,6 +590,13 @@ export async function launch({ let keepGoing = await runSwingset(); if (!keepGoing) return; + // Then, if we have anything in the special runThisBlock queue, process + // it and do no further work. + if (runThisBlock.size()) { + await processActions(runThisBlock, runSwingset); + return; + } + // Then, process as much as we can from the priorityQueue. keepGoing = await processActions(highPriorityQueue, runSwingset); if (!keepGoing) return; @@ -611,11 +626,15 @@ export async function launch({ // First, record new actions (bridge/mailbox/etc events that cosmos // added up for delivery to swingset) into our inboundQueue metrics inboundQueueMetrics.updateLength( - actionQueue.size() + highPriorityQueue.size(), + actionQueue.size() + highPriorityQueue.size() + runThisBlock.size(), ); + // If we have work to complete this block, it needs to run to completion. + // It will also run to completion any work that swingset still had pending. + const neverStop = runThisBlock.size() > 0; + // make a runPolicy that will be shared across all cycles - const runPolicy = computronCounter(params.beansPerUnit); + const runPolicy = computronCounter(params.beansPerUnit, neverStop); const runSwingset = makeRunSwingset(blockHeight, runPolicy); await runKernel(runSwingset, blockHeight, blockTime); @@ -747,38 +766,36 @@ export async function launch({ } if (upgradePlan) { const blockHeight = upgradePlan.height; - if (blockNeedsExecution(blockHeight)) { - controller.writeSlogObject({ - type: 'cosmic-swingset-upgrade-start', - blockHeight, - blockTime, - upgradePlan, - }); - // Process upgrade plan - const upgradedAction = { - type: ActionType.ENACTED_UPGRADE, - upgradePlan, - blockHeight, - blockTime, - }; - await doBlockingSend(upgradedAction); - controller.writeSlogObject({ - type: 'cosmic-swingset-upgrade-finish', - blockHeight, - blockTime, - }); - } + + // Process upgrade plan + const upgradedAction = { + type: ActionType.ENACTED_UPGRADE, + upgradePlan, + blockHeight, + blockTime, + }; + await doBlockingSend(upgradedAction); } return true; } case ActionType.ENACTED_UPGRADE: { // Install and execute new core proposals. - const { - upgradePlan: { info: upgradeInfoJson = null } = {}, + const { upgradePlan, blockHeight, blockTime } = action; + + if (!blockNeedsExecution(blockHeight)) { + return undefined; + } + + controller.writeSlogObject({ + type: 'cosmic-swingset-upgrade-start', blockHeight, blockTime, - } = action; + upgradePlan, + }); + + const { info: upgradeInfoJson = null } = upgradePlan || {}; + const upgradePlanInfo = upgradeInfoJson && parseLocatedJson(upgradeInfoJson, 'ENACTED_UPGRADE upgradePlan.info'); @@ -813,9 +830,6 @@ export async function launch({ } // Now queue the code for evaluation. - // - // TODO: Once SwingSet sprouts some tools for preemption, we should use - // them to help the upgrade process finish promptly. const coreEvalAction = { type: ActionType.CORE_EVAL, blockHeight, @@ -827,7 +841,7 @@ export async function launch({ }, ], }; - highPriorityQueue.push({ + runThisBlock.push({ context: { blockHeight, txHash: 'x/upgrade', @@ -836,6 +850,12 @@ export async function launch({ action: coreEvalAction, }); + controller.writeSlogObject({ + type: 'cosmic-swingset-upgrade-finish', + blockHeight, + blockTime, + }); + return undefined; } @@ -849,6 +869,9 @@ export async function launch({ ); } + runThisBlock.size() === 0 || + Fail`We didn't process all "run this block" actions`; + controller.writeSlogObject({ type: 'cosmic-swingset-commit-block-start', blockHeight,