From 19b0f807471d487a6156eba75c78f0b60f972f27 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Wed, 18 Dec 2024 15:43:39 -0300 Subject: [PATCH] feat: Revamped sequencer timetable and tx processing timeout --- .../aztec-node/src/aztec-node/server.ts | 11 +- .../aztec.js/src/utils/anvil_test_watcher.ts | 3 + .../aztec.js/src/utils/cheat_codes.ts | 19 +- .../circuit-types/src/interfaces/configs.ts | 3 + .../end-to-end/src/e2e_block_building.test.ts | 99 ++++++++++- yarn-project/end-to-end/src/fixtures/utils.ts | 4 +- yarn-project/ethereum/src/config.ts | 4 +- yarn-project/ethereum/src/eth_cheat_codes.ts | 30 ++-- yarn-project/foundation/src/config/env_var.ts | 1 + .../foundation/src/log/pino-logger.ts | 2 +- yarn-project/foundation/src/sleep/index.ts | 4 +- yarn-project/foundation/src/timer/date.ts | 2 +- yarn-project/foundation/src/timer/index.ts | 6 +- yarn-project/foundation/src/timer/timeout.ts | 37 ++-- .../p2p/src/services/reqresp/reqresp.ts | 6 +- .../prover-client/src/mocks/test_context.ts | 2 + yarn-project/prover-node/src/prover-node.ts | 8 +- .../pxe/src/kernel_prover/kernel_prover.ts | 2 +- .../src/client/sequencer-client.ts | 30 +++- yarn-project/sequencer-client/src/config.ts | 5 + yarn-project/sequencer-client/src/index.ts | 1 + .../src/sequencer/sequencer.test.ts | 24 +-- .../src/sequencer/sequencer.ts | 167 ++++++++++++++---- .../sequencer-client/src/sequencer/utils.ts | 13 +- .../src/public/public_processor.test.ts | 34 +++- .../simulator/src/public/public_processor.ts | 86 +++++++-- 26 files changed, 462 insertions(+), 141 deletions(-) diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 36ca8f16ce86..a801fbabf285 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -196,6 +196,7 @@ export class AztecNodeService implements AztecNode, Traceable { l2BlockSource: archiver, l1ToL2MessageSource: archiver, telemetry, + dateProvider, ...deps, }); @@ -230,6 +231,10 @@ export class AztecNodeService implements AztecNode, Traceable { return this.blockSource; } + public getContractDataSource(): ContractDataSource { + return this.contractDataSource; + } + public getP2P(): P2P { return this.p2pClient; } @@ -815,7 +820,11 @@ export class AztecNodeService implements AztecNode, Traceable { feeRecipient, ); const prevHeader = (await this.blockSource.getBlock(-1))?.header; - const publicProcessorFactory = new PublicProcessorFactory(this.contractDataSource, this.telemetry); + const publicProcessorFactory = new PublicProcessorFactory( + this.contractDataSource, + new DateProvider(), + this.telemetry, + ); const fork = await this.worldStateSynchronizer.fork(); this.log.verbose(`Simulating public calls for tx ${tx.getTxHash()}`, { diff --git a/yarn-project/aztec.js/src/utils/anvil_test_watcher.ts b/yarn-project/aztec.js/src/utils/anvil_test_watcher.ts index 307d52a18f04..b73b1e09a172 100644 --- a/yarn-project/aztec.js/src/utils/anvil_test_watcher.ts +++ b/yarn-project/aztec.js/src/utils/anvil_test_watcher.ts @@ -1,6 +1,7 @@ import { type EthCheatCodes, type Logger, createLogger } from '@aztec/aztec.js'; import { type EthAddress } from '@aztec/circuits.js'; import { RunningPromise } from '@aztec/foundation/running-promise'; +import { type TestDateProvider } from '@aztec/foundation/timer'; import { RollupAbi } from '@aztec/l1-artifacts'; import { type GetContractReturnType, type HttpTransport, type PublicClient, getAddress, getContract } from 'viem'; @@ -24,6 +25,7 @@ export class AnvilTestWatcher { private cheatcodes: EthCheatCodes, rollupAddress: EthAddress, publicClient: PublicClient, + private dateProvider?: TestDateProvider, ) { this.rollup = getContract({ address: getAddress(rollupAddress.toString()), @@ -69,6 +71,7 @@ export class AnvilTestWatcher { const timestamp = await this.rollup.read.getTimestampForSlot([currentSlot + 1n]); try { await this.cheatcodes.warp(Number(timestamp)); + this.dateProvider?.setTime(Number(timestamp) * 1000); } catch (e) { this.logger.error(`Failed to warp to timestamp ${timestamp}: ${e}`); } diff --git a/yarn-project/aztec.js/src/utils/cheat_codes.ts b/yarn-project/aztec.js/src/utils/cheat_codes.ts index b92366b94e57..507460ca4d94 100644 --- a/yarn-project/aztec.js/src/utils/cheat_codes.ts +++ b/yarn-project/aztec.js/src/utils/cheat_codes.ts @@ -108,8 +108,17 @@ export class RollupCheatCodes { const slotsUntilNextEpoch = epochDuration - (slot % epochDuration) + 1n; const timeToNextEpoch = slotsUntilNextEpoch * slotDuration; const l1Timestamp = BigInt((await this.client.getBlock()).timestamp); - await this.ethCheatCodes.warp(Number(l1Timestamp + timeToNextEpoch)); - this.logger.verbose(`Advanced to next epoch`); + await this.ethCheatCodes.warp(Number(l1Timestamp + timeToNextEpoch), true); + this.logger.warn(`Advanced to next epoch`); + } + + /** Warps time in L1 until the beginning of the next slot. */ + public async advanceToNextSlot() { + const currentSlot = await this.getSlot(); + const timestamp = await this.rollup.read.getTimestampForSlot([currentSlot + 1n]); + await this.ethCheatCodes.warp(Number(timestamp)); + this.logger.warn(`Advanced to slot ${currentSlot + 1n}`); + return [timestamp, currentSlot + 1n]; } /** @@ -120,9 +129,9 @@ export class RollupCheatCodes { const l1Timestamp = (await this.client.getBlock()).timestamp; const slotDuration = await this.rollup.read.SLOT_DURATION(); const timeToWarp = BigInt(howMany) * slotDuration; - await this.ethCheatCodes.warp(l1Timestamp + timeToWarp); + await this.ethCheatCodes.warp(l1Timestamp + timeToWarp, true); const [slot, epoch] = await Promise.all([this.getSlot(), this.getEpoch()]); - this.logger.verbose(`Advanced ${howMany} slots up to slot ${slot} in epoch ${epoch}`); + this.logger.warn(`Advanced ${howMany} slots up to slot ${slot} in epoch ${epoch}`); } /** Returns the current proof claim (if any) */ @@ -163,7 +172,7 @@ export class RollupCheatCodes { await this.asOwner(async account => { await this.rollup.write.setAssumeProvenThroughBlockNumber([blockNumber], { account, chain: this.client.chain }); - this.logger.verbose(`Marked ${blockNumber} as proven`); + this.logger.warn(`Marked ${blockNumber} as proven`); }); } diff --git a/yarn-project/circuit-types/src/interfaces/configs.ts b/yarn-project/circuit-types/src/interfaces/configs.ts index 661e893a4503..baafd3642945 100644 --- a/yarn-project/circuit-types/src/interfaces/configs.ts +++ b/yarn-project/circuit-types/src/interfaces/configs.ts @@ -38,6 +38,8 @@ export interface SequencerConfig { governanceProposerPayload?: EthAddress; /** Whether to enforce the time table when building blocks */ enforceTimeTable?: boolean; + /** How many seconds into an L1 slot we can still send a tx and get it mined. */ + maxL1TxInclusionTimeIntoSlot?: number; } const AllowedElementSchema = z.union([ @@ -59,4 +61,5 @@ export const SequencerConfigSchema = z.object({ maxBlockSizeInBytes: z.number().optional(), enforceFees: z.boolean().optional(), gerousiaPayload: schemas.EthAddress.optional(), + maxL1TxInclusionTimeIntoSlot: z.number().optional(), }) satisfies ZodFor; diff --git a/yarn-project/end-to-end/src/e2e_block_building.test.ts b/yarn-project/end-to-end/src/e2e_block_building.test.ts index d80f7f1ff87a..d72587c64e8f 100644 --- a/yarn-project/end-to-end/src/e2e_block_building.test.ts +++ b/yarn-project/end-to-end/src/e2e_block_building.test.ts @@ -1,4 +1,5 @@ import { getSchnorrAccount } from '@aztec/accounts/schnorr'; +import { type AztecNodeService } from '@aztec/aztec-node'; import { type AztecAddress, type AztecNode, @@ -7,6 +8,7 @@ import { ContractFunctionInteraction, Fq, Fr, + type GlobalVariables, L1EventPayload, L1NotePayload, type Logger, @@ -17,12 +19,20 @@ import { retryUntil, sleep, } from '@aztec/aztec.js'; +// eslint-disable-next-line no-restricted-imports +import { type MerkleTreeWriteOperations, type Tx } from '@aztec/circuit-types'; import { getL1ContractsConfigEnvVars } from '@aztec/ethereum'; -import { times } from '@aztec/foundation/collection'; +import { asyncMap } from '@aztec/foundation/async-map'; +import { times, unique } from '@aztec/foundation/collection'; import { poseidon2Hash } from '@aztec/foundation/crypto'; +import { type TestDateProvider } from '@aztec/foundation/timer'; import { StatefulTestContract, StatefulTestContractArtifact } from '@aztec/noir-contracts.js/StatefulTest'; import { TestContract } from '@aztec/noir-contracts.js/Test'; import { TokenContract } from '@aztec/noir-contracts.js/Token'; +import { type Sequencer, type SequencerClient, SequencerState } from '@aztec/sequencer-client'; +import { PublicProcessorFactory, type PublicTxResult, PublicTxSimulator, type WorldStateDB } from '@aztec/simulator'; +import { type TelemetryClient } from '@aztec/telemetry-client'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { jest } from '@jest/globals'; import 'jest-extended'; @@ -38,6 +48,9 @@ describe('e2e_block_building', () => { let owner: Wallet; let minter: Wallet; let aztecNode: AztecNode; + let sequencer: TestSequencerClient; + let dateProvider: TestDateProvider | undefined; + let cheatCodes: CheatCodes; let teardown: () => Promise; const { aztecEpochProofClaimWindowInL2Slots } = getL1ContractsConfigEnvVars(); @@ -46,13 +59,19 @@ describe('e2e_block_building', () => { const artifact = StatefulTestContractArtifact; beforeAll(async () => { + let sequencerClient; ({ teardown, pxe, logger, aztecNode, wallets: [owner, minter], + sequencer: sequencerClient, + dateProvider, + cheatCodes, } = await setup(2)); + // Bypass accessibility modifiers in sequencer + sequencer = sequencerClient! as unknown as TestSequencerClient; }); afterEach(() => aztecNode.setConfig({ minTxsPerBlock: 1 })); @@ -106,7 +125,7 @@ describe('e2e_block_building', () => { // Assemble N contract deployment txs // We need to create them sequentially since we cannot have parallel calls to a circuit - const TX_COUNT = 8; + const TX_COUNT = 4; await aztecNode.setConfig({ minTxsPerBlock: TX_COUNT }); const methods = times(TX_COUNT, i => contract.methods.increment_public_value(ownerAddress, i)); @@ -127,6 +146,57 @@ describe('e2e_block_building', () => { expect(receipts.map(r => r.blockNumber)).toEqual(times(TX_COUNT, () => receipts[0].blockNumber)); }); + it('processes txs until hitting timetable', async () => { + const TX_COUNT = 32; + + const ownerAddress = owner.getCompleteAddress().address; + const contract = await StatefulTestContract.deploy(owner, ownerAddress, ownerAddress, 1).send().deployed(); + logger.info(`Deployed stateful test contract at ${contract.address}`); + + // We have to set minTxsPerBlock to 1 or we could end with dangling txs. + // We also set enforceTimetable so the deadline makes sense, otherwise we may be starting the + // block too late into the slot, and start processing when the deadline has already passed. + logger.info(`Updating aztec node config`); + await aztecNode.setConfig({ minTxsPerBlock: 1, maxTxsPerBlock: TX_COUNT, enforceTimeTable: true }); + + // We tweak the sequencer so it uses a fake simulator that adds a 200ms delay to every public tx. + const archiver = (aztecNode as AztecNodeService).getContractDataSource(); + sequencer.sequencer.publicProcessorFactory = new TestPublicProcessorFactory( + archiver, + dateProvider!, + new NoopTelemetryClient(), + ); + + // We also cheat the sequencer's timetable so it allocates little time to processing. + // This will leave the sequencer with just 2s to build the block, so it shouldn't be + // able to squeeze in more than 10 txs in each. This is sensitive to the time it takes + // to pick up and validate the txs, so we may need to bump it to work on CI. + sequencer.sequencer.timeTable[SequencerState.WAITING_FOR_TXS] = 2; + sequencer.sequencer.timeTable[SequencerState.CREATING_BLOCK] = 2; + sequencer.sequencer.processTxTime = 1; + + // Flood the mempool with TX_COUNT simultaneous txs + const methods = times(TX_COUNT, i => contract.methods.increment_public_value(ownerAddress, i)); + const provenTxs = await asyncMap(methods, method => method.prove({ skipPublicSimulation: true })); + logger.info(`Sending ${TX_COUNT} txs to the node`); + const txs = await Promise.all(provenTxs.map(tx => tx.send())); + logger.info(`All ${TX_COUNT} txs have been sent`, { txs: await Promise.all(txs.map(tx => tx.getTxHash())) }); + + // We forcefully mine a block to make the L1 timestamp move and sync to it, otherwise the sequencer will + // stay continuously trying to build a block for the same slot, even if the time for it has passed. + // Keep in mind the anvil test watcher only moves the anvil blocks when there is a block mined. + // This is quite ugly, and took me a very long time to realize it was needed. + // Maybe we should change it? And have it always mine a block every 12s even if there is no activity? + const [timestamp] = await cheatCodes.rollup.advanceToNextSlot(); + dateProvider!.setTime(Number(timestamp) * 1000); + + // Await txs to be mined and assert they are mined across multiple different blocks. + const receipts = await Promise.all(txs.map(tx => tx.wait())); + const blockNumbers = receipts.map(r => r.blockNumber!).sort((a, b) => a - b); + logger.info(`Txs mined on blocks: ${unique(blockNumbers)}`); + expect(blockNumbers.at(-1)! - blockNumbers[0]).toBeGreaterThan(1); + }); + it.skip('can call public function from different tx in same block as deployed', async () => { // Ensure both txs will land on the same block await aztecNode.setConfig({ minTxsPerBlock: 2 }); @@ -503,3 +573,28 @@ async function sendAndWait(calls: ContractFunctionInteraction[]) { .map(p => p.wait()), ); } + +type TestSequencer = Omit & { + publicProcessorFactory: PublicProcessorFactory; + timeTable: Record; + processTxTime: number; +}; +type TestSequencerClient = Omit & { sequencer: TestSequencer }; + +class TestPublicTxSimulator extends PublicTxSimulator { + public override async simulate(tx: Tx): Promise { + await sleep(200); + return super.simulate(tx); + } +} +class TestPublicProcessorFactory extends PublicProcessorFactory { + protected override createPublicTxSimulator( + db: MerkleTreeWriteOperations, + worldStateDB: WorldStateDB, + telemetryClient: TelemetryClient, + globalVariables: GlobalVariables, + doMerkleOperations?: boolean, + ): PublicTxSimulator { + return new TestPublicTxSimulator(db, worldStateDB, telemetryClient, globalVariables, doMerkleOperations); + } +} diff --git a/yarn-project/end-to-end/src/fixtures/utils.ts b/yarn-project/end-to-end/src/fixtures/utils.ts index b931dfe1260c..0a4380282c7a 100644 --- a/yarn-project/end-to-end/src/fixtures/utils.ts +++ b/yarn-project/end-to-end/src/fixtures/utils.ts @@ -414,10 +414,13 @@ export async function setup( await ethCheatCodes.warp(opts.l2StartTime); } + const dateProvider = new TestDateProvider(); + const watcher = new AnvilTestWatcher( new EthCheatCodesWithState(config.l1RpcUrl), deployL1ContractsValues.l1ContractAddresses.rollupAddress, deployL1ContractsValues.publicClient, + dateProvider, ); await watcher.start(); @@ -439,7 +442,6 @@ export async function setup( const telemetry = await telemetryPromise; const publisher = new TestL1Publisher(config, telemetry); - const dateProvider = new TestDateProvider(); const aztecNode = await AztecNodeService.createAndSync(config, { telemetry, publisher, dateProvider }); const sequencer = aztecNode.getSequencer(); diff --git a/yarn-project/ethereum/src/config.ts b/yarn-project/ethereum/src/config.ts index eda0024870b3..9d1fee99530b 100644 --- a/yarn-project/ethereum/src/config.ts +++ b/yarn-project/ethereum/src/config.ts @@ -13,13 +13,13 @@ export type L1ContractsConfig = { aztecEpochProofClaimWindowInL2Slots: number; }; -export const DefaultL1ContractsConfig: L1ContractsConfig = { +export const DefaultL1ContractsConfig = { ethereumSlotDuration: 12, aztecSlotDuration: 24, aztecEpochDuration: 16, aztecTargetCommitteeSize: 48, aztecEpochProofClaimWindowInL2Slots: 13, -}; +} satisfies L1ContractsConfig; export const l1ContractsConfigMappings: ConfigMappingsType = { ethereumSlotDuration: { diff --git a/yarn-project/ethereum/src/eth_cheat_codes.ts b/yarn-project/ethereum/src/eth_cheat_codes.ts index 2ec0706619f4..ebb4d95434c6 100644 --- a/yarn-project/ethereum/src/eth_cheat_codes.ts +++ b/yarn-project/ethereum/src/eth_cheat_codes.ts @@ -77,7 +77,7 @@ export class EthCheatCodes { */ public async mine(numberOfBlocks = 1): Promise { await this.doMine(numberOfBlocks); - this.logger.verbose(`Mined ${numberOfBlocks} L1 blocks`); + this.logger.warn(`Mined ${numberOfBlocks} L1 blocks`); } private async doMine(numberOfBlocks = 1): Promise { @@ -107,7 +107,7 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error setting balance for ${account}: ${res.error.message}`); } - this.logger.verbose(`Set balance for ${account} to ${balance}`); + this.logger.warn(`Set balance for ${account} to ${balance}`); } /** @@ -119,7 +119,7 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error setting block interval: ${res.error.message}`); } - this.logger.verbose(`Set L1 block interval to ${interval}`); + this.logger.warn(`Set L1 block interval to ${interval}`); } /** @@ -131,7 +131,7 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error setting next block base fee per gas: ${res.error.message}`); } - this.logger.verbose(`Set L1 next block base fee per gas to ${baseFee}`); + this.logger.warn(`Set L1 next block base fee per gas to ${baseFee}`); } /** @@ -143,7 +143,7 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error setting interval mining: ${res.error.message}`); } - this.logger.verbose(`Set L1 interval mining to ${seconds} seconds`); + this.logger.warn(`Set L1 interval mining to ${seconds} seconds`); } /** @@ -155,7 +155,7 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error setting automine: ${res.error.message}`); } - this.logger.verbose(`Set L1 automine to ${automine}`); + this.logger.warn(`Set L1 automine to ${automine}`); } /** @@ -167,7 +167,7 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error dropping transaction: ${res.error.message}`); } - this.logger.verbose(`Dropped transaction ${txHash}`); + this.logger.warn(`Dropped transaction ${txHash}`); } /** @@ -179,20 +179,22 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error setting next block timestamp: ${res.error.message}`); } - this.logger.verbose(`Set L1 next block timestamp to ${timestamp}`); + this.logger.warn(`Set L1 next block timestamp to ${timestamp}`); } /** * Set the next block timestamp and mines the block * @param timestamp - The timestamp to set the next block to */ - public async warp(timestamp: number | bigint): Promise { + public async warp(timestamp: number | bigint, silent = false): Promise { const res = await this.rpcCall('evm_setNextBlockTimestamp', [Number(timestamp)]); if (res.error) { throw new Error(`Error warping: ${res.error.message}`); } await this.doMine(); - this.logger.verbose(`Warped L1 timestamp to ${timestamp}`); + if (!silent) { + this.logger.warn(`Warped L1 timestamp to ${timestamp}`); + } } /** @@ -218,7 +220,7 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error setting storage for contract ${contract} at ${slot}: ${res.error.message}`); } - this.logger.verbose(`Set L1 storage for contract ${contract} at ${slot} to ${value}`); + this.logger.warn(`Set L1 storage for contract ${contract} at ${slot} to ${value}`); } /** @@ -242,7 +244,7 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error impersonating ${who}: ${res.error.message}`); } - this.logger.verbose(`Impersonating ${who}`); + this.logger.warn(`Impersonating ${who}`); } /** @@ -254,7 +256,7 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error when stopping the impersonation of ${who}: ${res.error.message}`); } - this.logger.verbose(`Stopped impersonating ${who}`); + this.logger.warn(`Stopped impersonating ${who}`); } /** @@ -267,7 +269,7 @@ export class EthCheatCodes { if (res.error) { throw new Error(`Error setting bytecode for ${contract}: ${res.error.message}`); } - this.logger.verbose(`Set bytecode for ${contract} to ${bytecode}`); + this.logger.warn(`Set bytecode for ${contract} to ${bytecode}`); } /** diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 4828878d721c..cf8d34bfd05e 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -144,6 +144,7 @@ export type EnvVar = | 'SEQ_REQUIRED_CONFIRMATIONS' | 'SEQ_TX_POLLING_INTERVAL_MS' | 'SEQ_ENFORCE_TIME_TABLE' + | 'SEQ_MAX_L1_TX_INCLUSION_TIME_INTO_SLOT' | 'STAKING_ASSET_CONTRACT_ADDRESS' | 'REWARD_DISTRIBUTOR_CONTRACT_ADDRESS' | 'TELEMETRY' diff --git a/yarn-project/foundation/src/log/pino-logger.ts b/yarn-project/foundation/src/log/pino-logger.ts index 4178c4ff6648..f8e749483146 100644 --- a/yarn-project/foundation/src/log/pino-logger.ts +++ b/yarn-project/foundation/src/log/pino-logger.ts @@ -122,7 +122,7 @@ export const pinoPrettyOpts = { destination: 2, sync: true, colorize: useColor, - ignore: 'module,pid,hostname,trace_id,span_id,trace_flags', + ignore: 'module,pid,hostname,trace_id,span_id,trace_flags,severity', messageFormat: `${bold('{module}')} ${reset('{msg}')}`, customLevels: 'fatal:60,error:50,warn:40,info:30,verbose:25,debug:20,trace:10', customColors: 'fatal:bgRed,error:red,warn:yellow,info:green,verbose:magenta,debug:blue,trace:gray', diff --git a/yarn-project/foundation/src/sleep/index.ts b/yarn-project/foundation/src/sleep/index.ts index 725e8fb4b220..2629c1830fa2 100644 --- a/yarn-project/foundation/src/sleep/index.ts +++ b/yarn-project/foundation/src/sleep/index.ts @@ -70,6 +70,6 @@ export class InterruptibleSleep { * @param returnValue - The return value of the promise. * @returns A Promise that resolves after the specified duration, allowing the use of 'await' to pause execution. */ -export function sleep(ms: number, returnValue?: T): Promise { - return new Promise(resolve => setTimeout(() => resolve(returnValue), ms)); +export function sleep(ms: number, returnValue?: T): Promise { + return new Promise(resolve => setTimeout(() => resolve(returnValue as T), ms)); } diff --git a/yarn-project/foundation/src/timer/date.ts b/yarn-project/foundation/src/timer/date.ts index f1ed1ee43618..2c210e948283 100644 --- a/yarn-project/foundation/src/timer/date.ts +++ b/yarn-project/foundation/src/timer/date.ts @@ -19,6 +19,6 @@ export class TestDateProvider implements DateProvider { public setTime(timeMs: number) { this.offset = timeMs - Date.now(); - this.logger.warn(`Time set to ${timeMs}`); + this.logger.warn(`Time set to ${new Date(timeMs).toISOString()}`, { offset: this.offset, timeMs }); } } diff --git a/yarn-project/foundation/src/timer/index.ts b/yarn-project/foundation/src/timer/index.ts index afb200bb32f4..6b36d76cf62d 100644 --- a/yarn-project/foundation/src/timer/index.ts +++ b/yarn-project/foundation/src/timer/index.ts @@ -1,4 +1,4 @@ -export { TimeoutTask, executeTimeoutWithCustomError } from './timeout.js'; -export { Timer } from './timer.js'; -export { elapsed, elapsedSync } from './elapsed.js'; export * from './date.js'; +export { elapsed, elapsedSync } from './elapsed.js'; +export { TimeoutTask, executeDeadline, executeTimeout } from './timeout.js'; +export { Timer } from './timer.js'; diff --git a/yarn-project/foundation/src/timer/timeout.ts b/yarn-project/foundation/src/timer/timeout.ts index f599543bc5e4..ae00efc93188 100644 --- a/yarn-project/foundation/src/timer/timeout.ts +++ b/yarn-project/foundation/src/timer/timeout.ts @@ -1,3 +1,5 @@ +import { TimeoutError } from '../error/index.js'; + /** * TimeoutTask class creates an instance for managing and executing a given asynchronous function with a specified timeout duration. * The task will be automatically interrupted if it exceeds the given timeout duration, and will throw a custom error message. @@ -10,14 +12,9 @@ export class TimeoutTask { private interrupt = () => {}; private totalTime = 0; - constructor( - private fn: () => Promise, - private timeout = 0, - fnName = '', - error = () => new Error(`Timeout${fnName ? ` running ${fnName}` : ''} after ${timeout}ms.`), - ) { + constructor(private fn: () => Promise, private timeout: number, errorFn: () => any) { this.interruptPromise = new Promise((_, reject) => { - this.interrupt = () => reject(error()); + this.interrupt = () => reject(errorFn()); }); } @@ -63,17 +60,19 @@ export class TimeoutTask { } } -export const executeTimeout = async (fn: () => Promise, timeout = 0, fnName = '') => { - const task = new TimeoutTask(fn, timeout, fnName); +export async function executeTimeout(fn: () => Promise, timeout: number, errorOrFnName?: string | (() => any)) { + const errorFn = + typeof errorOrFnName === 'function' + ? errorOrFnName + : () => new TimeoutError(`Timeout running ${errorOrFnName ?? 'function'} after ${timeout}ms.`); + const task = new TimeoutTask(fn, timeout, errorFn); return await task.exec(); -}; +} -export const executeTimeoutWithCustomError = async ( - fn: () => Promise, - timeout = 0, - error = () => new Error('No custom error provided'), - fnName = '', -) => { - const task = new TimeoutTask(fn, timeout, fnName, error); - return await task.exec(); -}; +export async function executeDeadline(fn: () => Promise, deadline: Date, error: () => any) { + const timeout = +deadline - Date.now(); + if (timeout <= 0) { + throw error(); + } + return await executeTimeout(fn, timeout, error); +} diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index f7fb2efebb95..e6b8d386c9cb 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -1,7 +1,7 @@ // @attribution: lodestar impl for inspiration import { PeerErrorSeverity } from '@aztec/circuit-types'; import { type Logger, createLogger } from '@aztec/foundation/log'; -import { executeTimeoutWithCustomError } from '@aztec/foundation/timer'; +import { executeTimeout } from '@aztec/foundation/timer'; import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface'; import { pipe } from 'it-pipe'; @@ -159,7 +159,7 @@ export class ReqResp { }; try { - return await executeTimeoutWithCustomError | undefined>( + return await executeTimeout | undefined>( requestFunction, this.overallRequestTimeoutMs, () => new CollectiveReqRespTimeoutError(), @@ -205,7 +205,7 @@ export class ReqResp { this.logger.trace(`Stream opened with ${peerId.toString()} for ${subProtocol}`); // Open the stream with a timeout - const result = await executeTimeoutWithCustomError( + const result = await executeTimeout( (): Promise => pipe([payload], stream!, this.readMessage.bind(this)), this.individualRequestTimeoutMs, () => new IndividualReqRespTimeoutError(), diff --git a/yarn-project/prover-client/src/mocks/test_context.ts b/yarn-project/prover-client/src/mocks/test_context.ts index 803b8d8300e7..907649c93048 100644 --- a/yarn-project/prover-client/src/mocks/test_context.ts +++ b/yarn-project/prover-client/src/mocks/test_context.ts @@ -18,6 +18,7 @@ import { import { times } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/fields'; import { type Logger } from '@aztec/foundation/log'; +import { TestDateProvider } from '@aztec/foundation/timer'; import { getVKTreeRoot } from '@aztec/noir-protocol-circuits-types'; import { protocolContractTreeRoot } from '@aztec/protocol-contracts'; import { @@ -91,6 +92,7 @@ export class TestContext { BlockHeader.empty(), worldStateDB, publicTxSimulator, + new TestDateProvider(), telemetry, ); diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 8eec703ffedc..5bb23e97378b 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -16,6 +16,7 @@ import { import { type ContractDataSource } from '@aztec/circuits.js'; import { compact } from '@aztec/foundation/collection'; import { createLogger } from '@aztec/foundation/log'; +import { DateProvider } from '@aztec/foundation/timer'; import { type Maybe } from '@aztec/foundation/types'; import { type P2P } from '@aztec/p2p'; import { type L1Publisher } from '@aztec/sequencer-client'; @@ -44,6 +45,7 @@ export type ProverNodeOptions = { */ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, ProverNodeApi, Traceable { private log = createLogger('prover-node'); + private dateProvider = new DateProvider(); private latestEpochWeAreProving: bigint | undefined; private jobs: Map = new Map(); @@ -266,7 +268,11 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr await this.worldState.syncImmediate(fromBlock - 1); // Create a processor using the forked world state - const publicProcessorFactory = new PublicProcessorFactory(this.contractDataSource, this.telemetryClient); + const publicProcessorFactory = new PublicProcessorFactory( + this.contractDataSource, + this.dateProvider, + this.telemetryClient, + ); const cleanUp = () => { this.jobs.delete(job.getId()); diff --git a/yarn-project/pxe/src/kernel_prover/kernel_prover.ts b/yarn-project/pxe/src/kernel_prover/kernel_prover.ts index a43676a6bbe3..f35cd8a22770 100644 --- a/yarn-project/pxe/src/kernel_prover/kernel_prover.ts +++ b/yarn-project/pxe/src/kernel_prover/kernel_prover.ts @@ -290,7 +290,7 @@ export class KernelProver { tailOutput.profileResult = { gateCounts }; } - this.log.info(`Witness generation took ${timer.ms()}ms`); + this.log.verbose(`Private kernel witness generation took ${timer.ms()}ms`); // TODO(#7368) how do we 'bincode' encode these inputs? if (!dryRun) { diff --git a/yarn-project/sequencer-client/src/client/sequencer-client.ts b/yarn-project/sequencer-client/src/client/sequencer-client.ts index ba9987262c27..81bf69ad10fd 100644 --- a/yarn-project/sequencer-client/src/client/sequencer-client.ts +++ b/yarn-project/sequencer-client/src/client/sequencer-client.ts @@ -1,6 +1,8 @@ import { type L1ToL2MessageSource, type L2BlockSource, type WorldStateSynchronizer } from '@aztec/circuit-types'; import { type ContractDataSource } from '@aztec/circuits.js'; +import { isAnvilTestChain } from '@aztec/ethereum'; import { type EthAddress } from '@aztec/foundation/eth-address'; +import { type DateProvider } from '@aztec/foundation/timer'; import { type P2P } from '@aztec/p2p'; import { LightweightBlockBuilderFactory } from '@aztec/prover-client/block-builder'; import { PublicProcessorFactory } from '@aztec/simulator'; @@ -17,7 +19,7 @@ import { TxValidatorFactory } from '../tx_validator/tx_validator_factory.js'; * Encapsulates the full sequencer and publisher. */ export class SequencerClient { - constructor(private sequencer: Sequencer) {} + constructor(protected sequencer: Sequencer) {} /** * Initializes and starts a new instance. @@ -43,6 +45,7 @@ export class SequencerClient { l1ToL2MessageSource: L1ToL2MessageSource; telemetry: TelemetryClient; publisher?: L1Publisher; + dateProvider: DateProvider; }, ) { const { @@ -57,7 +60,7 @@ export class SequencerClient { const publisher = deps.publisher ?? new L1Publisher(config, telemetryClient); const globalsBuilder = new GlobalVariableBuilder(config); - const publicProcessorFactory = new PublicProcessorFactory(contractDataSource, telemetryClient); + const publicProcessorFactory = new PublicProcessorFactory(contractDataSource, deps.dateProvider, telemetryClient); const rollup = publisher.getRollupContract(); const [l1GenesisTime, slotDuration] = await Promise.all([ @@ -65,6 +68,23 @@ export class SequencerClient { rollup.read.SLOT_DURATION(), ] as const); + const ethereumSlotDuration = config.ethereumSlotDuration; + + // When running in anvil, assume we can post a tx up until the very last second of an L1 slot. + // Otherwise, assume we must have broadcasted the tx before the slot started (we use a default + // maxL1TxInclusionTimeIntoSlot of zero) to get the tx into that L1 slot. + // In theory, the L1 slot has an initial 4s phase where the block is propagated, so we could + // make it with a propagation time into slot equal to 4s. However, we prefer being conservative. + // See https://www.blocknative.com/blog/anatomy-of-a-slot#7 for more info. + const maxL1TxInclusionTimeIntoSlot = + config.maxL1TxInclusionTimeIntoSlot ?? isAnvilTestChain(config.l1ChainId) ? ethereumSlotDuration : 0; + + const l1Constants = { + l1GenesisTime, + slotDuration: Number(slotDuration), + ethereumSlotDuration, + }; + const sequencer = new Sequencer( publisher, validatorClient, @@ -76,10 +96,10 @@ export class SequencerClient { l1ToL2MessageSource, publicProcessorFactory, new TxValidatorFactory(worldStateSynchronizer.getCommitted(), contractDataSource, !!config.enforceFees), - Number(l1GenesisTime), - Number(slotDuration), + l1Constants, + deps.dateProvider, telemetryClient, - config, + { ...config, maxL1TxInclusionTimeIntoSlot }, ); await validatorClient?.start(); await sequencer.start(); diff --git a/yarn-project/sequencer-client/src/config.ts b/yarn-project/sequencer-client/src/config.ts index 795b8f4b5645..10f714b6cf60 100644 --- a/yarn-project/sequencer-client/src/config.ts +++ b/yarn-project/sequencer-client/src/config.ts @@ -106,6 +106,11 @@ export const sequencerConfigMappings: ConfigMappingsType = { parseEnv: (val: string) => EthAddress.fromString(val), defaultValue: EthAddress.ZERO, }, + maxL1TxInclusionTimeIntoSlot: { + env: 'SEQ_MAX_L1_TX_INCLUSION_TIME_INTO_SLOT', + description: 'How many seconds into an L1 slot we can still send a tx and get it mined.', + parseEnv: (val: string) => (val ? parseInt(val, 10) : undefined), + }, }; export const chainConfigMappings: ConfigMappingsType = { diff --git a/yarn-project/sequencer-client/src/index.ts b/yarn-project/sequencer-client/src/index.ts index cb826f2c5451..dcac430ac134 100644 --- a/yarn-project/sequencer-client/src/index.ts +++ b/yarn-project/sequencer-client/src/index.ts @@ -1,6 +1,7 @@ export * from './client/index.js'; export * from './config.js'; export * from './publisher/index.js'; +export { Sequencer, SequencerState } from './sequencer/index.js'; // Used by the node to simulate public parts of transactions. Should these be moved to a shared library? // ISSUE(#9832) diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts index d65fe4202177..b1a024014505 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts @@ -35,6 +35,7 @@ import { Buffer32 } from '@aztec/foundation/buffer'; import { times } from '@aztec/foundation/collection'; import { randomBytes } from '@aztec/foundation/crypto'; import { Signature } from '@aztec/foundation/eth-signature'; +import { TestDateProvider } from '@aztec/foundation/timer'; import { type Writeable } from '@aztec/foundation/types'; import { type P2P, P2PClientState } from '@aztec/p2p'; import { type BlockBuilderFactory } from '@aztec/prover-client/block-builder'; @@ -70,8 +71,12 @@ describe('sequencer', () => { let sequencer: TestSubject; - const epochDuration = DefaultL1ContractsConfig.aztecEpochDuration; - const slotDuration = DefaultL1ContractsConfig.aztecSlotDuration; + const { + aztecEpochDuration: epochDuration, + aztecSlotDuration: slotDuration, + ethereumSlotDuration, + } = DefaultL1ContractsConfig; + const chainId = new Fr(12345); const version = Fr.ZERO; const coinbase = EthAddress.random(); @@ -188,10 +193,11 @@ describe('sequencer', () => { createBlockProposal: mockFn().mockResolvedValue(createBlockProposal()), }); - const l1GenesisTime = Math.floor(Date.now() / 1000); + const l1GenesisTime = BigInt(Math.floor(Date.now() / 1000)); + const l1Constants = { l1GenesisTime, slotDuration, ethereumSlotDuration }; sequencer = new TestSubject( publisher, - // TDOO(md): add the relevant methods to the validator client that will prevent it stalling when waiting for attestations + // TODO(md): add the relevant methods to the validator client that will prevent it stalling when waiting for attestations validatorClient, globalVariableBuilder, p2p, @@ -201,8 +207,8 @@ describe('sequencer', () => { l1ToL2MessageSource, publicProcessorFactory, new TxValidatorFactory(merkleTreeOps, contractSource, false), - l1GenesisTime, - slotDuration, + l1Constants, + new TestDateProvider(), new NoopTelemetryClient(), { enforceTimeTable: true, maxTxsPerBlock: 4 }, ); @@ -231,9 +237,7 @@ describe('sequencer', () => { }); it.each([ - { - delayedState: SequencerState.WAITING_FOR_TXS, - }, + { delayedState: SequencerState.WAITING_FOR_TXS }, // It would be nice to add the other states, but we would need to inject delays within the `work` loop ])('does not build a block if it does not have enough time left in the slot', async ({ delayedState }) => { // trick the sequencer into thinking that we are just too far into slot 1 @@ -798,7 +802,7 @@ class TestSubject extends Sequencer { } public setL1GenesisTime(l1GenesisTime: number) { - this.l1GenesisTime = l1GenesisTime; + this.l1Constants.l1GenesisTime = BigInt(l1GenesisTime); } public override doRealWork() { diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.ts index bc5457666cb1..5baa91c34df9 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.ts @@ -1,5 +1,6 @@ import { type EpochProofQuote, + type L1RollupConstants, type L1ToL2MessageSource, type L2Block, type L2BlockSource, @@ -27,7 +28,7 @@ import { Fr } from '@aztec/foundation/fields'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { pickFromSchema } from '@aztec/foundation/schemas'; -import { Timer, elapsed } from '@aztec/foundation/timer'; +import { type DateProvider, Timer, elapsed } from '@aztec/foundation/timer'; import { type P2P } from '@aztec/p2p'; import { type BlockBuilderFactory } from '@aztec/prover-client/block-builder'; import { type PublicProcessorFactory } from '@aztec/simulator'; @@ -41,7 +42,9 @@ import { type TxValidatorFactory } from '../tx_validator/tx_validator_factory.js import { getDefaultAllowedSetupFunctions } from './allowed.js'; import { type SequencerConfig } from './config.js'; import { SequencerMetrics } from './metrics.js'; -import { SequencerState, getSecondsIntoSlot, orderAttestations } from './utils.js'; +import { SequencerState, orderAttestations } from './utils.js'; + +export { SequencerState }; export type ShouldProposeArgs = { pendingTxsCount?: number; @@ -63,6 +66,8 @@ export class SequencerTooSlowError extends Error { } } +type SequencerRollupConstants = Pick; + /** * Sequencer client * - Wins a period of time to become the sequencer (depending on finalized protocol). @@ -77,12 +82,14 @@ export class Sequencer { private pollingIntervalMs: number = 1000; private maxTxsPerBlock = 32; private minTxsPerBLock = 1; + private maxL1TxInclusionTimeIntoSlot = 0; // TODO: zero values should not be allowed for the following 2 values in PROD private _coinbase = EthAddress.ZERO; private _feeRecipient = AztecAddress.ZERO; private state = SequencerState.STOPPED; private allowedInSetup: AllowedElement[] = getDefaultAllowedSetupFunctions(); private maxBlockSizeInBytes: number = 1024 * 1024; + private processTxTime: number = 12; private metrics: SequencerMetrics; private isFlushing: boolean = false; @@ -104,8 +111,8 @@ export class Sequencer { private l1ToL2MessageSource: L1ToL2MessageSource, private publicProcessorFactory: PublicProcessorFactory, private txValidatorFactory: TxValidatorFactory, - protected l1GenesisTime: number, - private aztecSlotDuration: number, + protected l1Constants: SequencerRollupConstants, + private dateProvider: DateProvider, telemetry: TelemetryClient, private config: SequencerConfig = {}, private log = createLogger('sequencer'), @@ -152,6 +159,9 @@ export class Sequencer { if (config.governanceProposerPayload) { this.publisher.setPayload(config.governanceProposerPayload); } + if (config.maxL1TxInclusionTimeIntoSlot !== undefined) { + this.maxL1TxInclusionTimeIntoSlot = config.maxL1TxInclusionTimeIntoSlot; + } this.enforceTimeTable = config.enforceTimeTable === true; this.setTimeTable(); @@ -161,20 +171,59 @@ export class Sequencer { } private setTimeTable() { + // How late into the slot can we be to start working + const initialTime = 1; + + // How long it takes to validate the txs collected and get ready to start building + const blockPrepareTime = 2; + + // How long it takes to for attestations to travel across the p2p layer. + const attestationPropagationTime = 2; + + // How long it takes to get a published block into L1. L1 builders typically accept txs up to 4 seconds into their slot, + // but we'll timeout sooner to give it more time to propagate (remember we also have blobs!). Still, when working in anvil, + // we can just post in the very last second of the L1 slot. + const l1PublishingTime = this.l1Constants.ethereumSlotDuration - this.maxL1TxInclusionTimeIntoSlot; + + // How much time we spend validating and processing a block after building it + const blockValidationTime = 1; + + // How much time we have left in the slot for actually processing txs and building the block. + const remainingTimeInSlot = + this.aztecSlotDuration - + initialTime - + blockPrepareTime - + l1PublishingTime - + 2 * attestationPropagationTime - + blockValidationTime; + + // Check that numbers make sense + if (this.enforceTimeTable && remainingTimeInSlot < 0) { + throw new Error(`Not enough time for block building in ${this.aztecSlotDuration}s slot`); + } + + // How much time we have for actually processing txs. Note that we need both the sequencer and the validators to execute txs. + const processTxsTime = remainingTimeInSlot / 2; + this.processTxTime = processTxsTime; + const newTimeTable: Record = { + // No checks needed for any of these transitions [SequencerState.STOPPED]: this.aztecSlotDuration, [SequencerState.IDLE]: this.aztecSlotDuration, [SequencerState.SYNCHRONIZING]: this.aztecSlotDuration, - [SequencerState.PROPOSER_CHECK]: this.aztecSlotDuration, // We always want to allow the full slot to check if we are the proposer - [SequencerState.WAITING_FOR_TXS]: 5, - [SequencerState.CREATING_BLOCK]: 7, - [SequencerState.PUBLISHING_BLOCK_TO_PEERS]: 7 + this.maxTxsPerBlock * 2, // if we take 5 seconds to create block, then 4 transactions at 2 seconds each - [SequencerState.WAITING_FOR_ATTESTATIONS]: 7 + this.maxTxsPerBlock * 2 + 3, // it shouldn't take 3 seconds to publish to peers - [SequencerState.PUBLISHING_BLOCK]: 7 + this.maxTxsPerBlock * 2 + 3 + 5, // wait 5 seconds for attestations + // We always want to allow the full slot to check if we are the proposer + [SequencerState.PROPOSER_CHECK]: this.aztecSlotDuration, + // First transition towards building a block + [SequencerState.WAITING_FOR_TXS]: initialTime, + // We then validate the txs and prepare to start building the block + [SequencerState.CREATING_BLOCK]: initialTime + blockPrepareTime, + // We start collecting attestations after building the block + [SequencerState.COLLECTING_ATTESTATIONS]: initialTime + blockPrepareTime + processTxsTime + blockValidationTime, + // We publish the block after collecting attestations + [SequencerState.PUBLISHING_BLOCK]: this.aztecSlotDuration - l1PublishingTime, }; - if (this.enforceTimeTable && newTimeTable[SequencerState.PUBLISHING_BLOCK] > this.aztecSlotDuration) { - throw new Error('Sequencer cannot publish block in less than a slot'); - } + + this.log.verbose(`Sequencer time table updated with ${processTxsTime}s for processing txs`, newTimeTable); this.timeTable = newTimeTable; } @@ -297,7 +346,8 @@ export class Sequencer { Fr.ZERO, ); - // TODO: It should be responsibility of the P2P layer to validate txs before passing them on here + // TODO: It should be responsibility of the P2P layer to validate txs before passing them on here. + // TODO: We should validate only the number of txs we need to speed up this process. const allValidTxs = await this.takeValidTxs( pendingTxs, this.txValidatorFactory.validatorForNewTxs(newGlobalVariables, this.allowedInSetup), @@ -372,24 +422,21 @@ export class Sequencer { return true; } - if (this.timeTable[proposedState] === this.aztecSlotDuration) { + const maxAllowedTime = this.timeTable[proposedState]; + if (maxAllowedTime === this.aztecSlotDuration) { return true; } - const bufferSeconds = this.timeTable[proposedState] - secondsIntoSlot; + const bufferSeconds = maxAllowedTime - secondsIntoSlot; if (bufferSeconds < 0) { - this.log.warn( - `Too far into slot to transition to ${proposedState}. max allowed: ${this.timeTable[proposedState]}s, time into slot: ${secondsIntoSlot}s`, - ); + this.log.warn(`Too far into slot to transition to ${proposedState}`, { maxAllowedTime, secondsIntoSlot }); return false; } this.metrics.recordStateTransitionBufferMs(Math.floor(bufferSeconds * 1000), proposedState); - this.log.debug( - `Enough time to transition to ${proposedState}, max allowed: ${this.timeTable[proposedState]}s, time into slot: ${secondsIntoSlot}s`, - ); + this.log.trace(`Enough time to transition to ${proposedState}`, { maxAllowedTime, secondsIntoSlot }); return true; } @@ -407,7 +454,7 @@ export class Sequencer { this.log.warn(`Cannot set sequencer from ${this.state} to ${proposedState} as it is stopped.`); return; } - const secondsIntoSlot = getSecondsIntoSlot(this.l1GenesisTime, this.aztecSlotDuration, Number(currentSlotNumber)); + const secondsIntoSlot = this.getSecondsIntoSlot(currentSlotNumber); if (!this.doIHaveEnoughTimeLeft(proposedState, secondsIntoSlot)) { throw new SequencerTooSlowError(this.state, proposedState, this.timeTable[proposedState], secondsIntoSlot); } @@ -511,19 +558,42 @@ export class Sequencer { const blockBuilder = this.blockBuilderFactory.create(orchestratorFork); await blockBuilder.startNewBlock(newGlobalVariables, l1ToL2Messages); + // We set the deadline for tx processing to the start of the CREATING_BLOCK phase, plus the expected time for tx processing. + // Deadline is only set if enforceTimeTable is enabled. + const processingEndTimeWithinSlot = this.timeTable[SequencerState.CREATING_BLOCK] + this.processTxTime; + const processingDeadline = this.enforceTimeTable + ? new Date((this.getSlotStartTimestamp(slot) + processingEndTimeWithinSlot) * 1000) + : undefined; + this.log.verbose(`Processing ${validTxs.length} txs`, { + slot, + slotStart: new Date(this.getSlotStartTimestamp(slot) * 1000), + now: new Date(this.dateProvider.now()), + deadline: processingDeadline, + }); + const processingTxValidator = this.txValidatorFactory.validatorForProcessedTxs(publicProcessorFork); const [publicProcessorDuration, [processedTxs, failedTxs]] = await elapsed(() => - processor.process(validTxs, blockSize, this.txValidatorFactory.validatorForProcessedTxs(publicProcessorFork)), + processor.process(validTxs, blockSize, processingTxValidator, processingDeadline), ); + if (failedTxs.length > 0) { const failedTxData = failedTxs.map(fail => fail.tx); this.log.verbose(`Dropping failed txs ${Tx.getHashes(failedTxData).join(', ')}`); await this.p2pClient.deleteTxs(Tx.getHashes(failedTxData)); } + + if (!this.isFlushing && this.minTxsPerBLock !== undefined && processedTxs.length < this.minTxsPerBLock) { + this.log.warn( + `Block ${blockNumber} has too few txs to be proposed (got ${processedTxs.length} but required ${this.minTxsPerBLock})`, + { slot, blockNumber, processedTxCount: processedTxs.length }, + ); + throw new Error(`Block has too few successful txs to be proposed`); + } + await blockBuilder.addTxs(processedTxs); await interrupt?.(processedTxs); - // All real transactions have been added, set the block as full and complete the proving. + // All real transactions have been added, set the block as full and pad if needed const block = await blockBuilder.setBlockCompleted(); return { @@ -535,8 +605,16 @@ export class Sequencer { }; } finally { // We create a fresh processor each time to reset any cached state (eg storage writes) - await publicProcessorFork.close(); - await orchestratorFork.close(); + // We wait a bit to close the forks since the processor may still be working on a dangling tx + // which was interrupted due to the processingDeadline being hit. + setTimeout(async () => { + try { + await publicProcessorFork.close(); + await orchestratorFork.close(); + } catch (err) { + this.log.error(`Error closing forks`, err); + } + }, 5000); } } @@ -588,6 +666,9 @@ export class Sequencer { } }; + // Start collecting proof quotes for the previous epoch if needed in the background + const proofQuotePromise = this.createProofClaimForPreviousEpoch(slot); + try { const buildBlockRes = await this.buildBlock(validTxs, newGlobalVariables, historicalHeader, interrupt); const { block, publicProcessorDuration, numProcessedTxs, numMsgs, blockBuildingTimer } = buildBlockRes; @@ -625,12 +706,12 @@ export class Sequencer { const stopCollectingAttestationsTimer = this.metrics.startCollectingAttestationsTimer(); const attestations = await this.collectAttestations(block, txHashes); if (attestations !== undefined) { - this.log.verbose(`Collected ${attestations.length} attestations`); + this.log.verbose(`Collected ${attestations.length} attestations`, { blockHash, blockNumber }); } stopCollectingAttestationsTimer(); - this.log.debug('Collecting proof quotes'); - const proofQuote = await this.createProofClaimForPreviousEpoch(newGlobalVariables.slotNumber.toBigInt()); + // Get the proof quote for the previous epoch, if any + const proofQuote = await proofQuotePromise; await this.publishL2Block(block, attestations, txHashes, proofQuote); this.metrics.recordPublishedBlock(workDuration); @@ -682,21 +763,19 @@ export class Sequencer { } const numberOfRequiredAttestations = Math.floor((committee.length * 2) / 3) + 1; + const slotNumber = block.header.globalVariables.slotNumber.toBigInt(); + this.setState(SequencerState.COLLECTING_ATTESTATIONS, slotNumber); - this.log.debug('Creating block proposal'); + this.log.debug('Creating block proposal for validators'); const proposal = await this.validatorClient.createBlockProposal(block.header, block.archive.root, txHashes); if (!proposal) { this.log.warn(`Failed to create block proposal, skipping collecting attestations`); return undefined; } - const slotNumber = block.header.globalVariables.slotNumber.toBigInt(); - - this.setState(SequencerState.PUBLISHING_BLOCK_TO_PEERS, slotNumber); this.log.debug('Broadcasting block proposal to validators'); this.validatorClient.broadcastBlockProposal(proposal); - this.setState(SequencerState.WAITING_FOR_ATTESTATIONS, slotNumber); const attestations = await this.validatorClient.collectAttestations(proposal, numberOfRequiredAttestations); // note: the smart contract requires that the signatures are provided in the order of the committee @@ -713,6 +792,7 @@ export class Sequencer { } // Get quotes for the epoch to be proven + this.log.debug(`Collecting proof quotes for epoch ${epochToProve}`); const quotes = await this.p2pClient.getEpochProofQuotes(epochToProve); this.log.verbose(`Retrieved ${quotes.length} quotes for slot ${slotNumber} epoch ${epochToProve}`, { epochToProve, @@ -856,6 +936,19 @@ export class Sequencer { return result; } + private getSlotStartTimestamp(slotNumber: number | bigint): number { + return Number(this.l1Constants.l1GenesisTime) + Number(slotNumber) * this.l1Constants.slotDuration; + } + + private getSecondsIntoSlot(slotNumber: number | bigint): number { + const slotStartTimestamp = this.getSlotStartTimestamp(slotNumber); + return Number((this.dateProvider.now() / 1000 - slotStartTimestamp).toFixed(3)); + } + + get aztecSlotDuration() { + return this.l1Constants.slotDuration; + } + get coinbase(): EthAddress { return this._coinbase; } @@ -864,7 +957,3 @@ export class Sequencer { return this._feeRecipient; } } - -/** - * State of the sequencer. - */ diff --git a/yarn-project/sequencer-client/src/sequencer/utils.ts b/yarn-project/sequencer-client/src/sequencer/utils.ts index 8bb4b440dc2b..5939b43c353c 100644 --- a/yarn-project/sequencer-client/src/sequencer/utils.ts +++ b/yarn-project/sequencer-client/src/sequencer/utils.ts @@ -27,13 +27,9 @@ export enum SequencerState { */ CREATING_BLOCK = 'CREATING_BLOCK', /** - * Publishing blocks to validator peers. Will move to WAITING_FOR_ATTESTATIONS. + * Collecting attestations from its peers. Will move to PUBLISHING_BLOCK. */ - PUBLISHING_BLOCK_TO_PEERS = 'PUBLISHING_BLOCK_TO_PEERS', - /** - * The block has been published to peers, and we are waiting for attestations. Will move to PUBLISHING_CONTRACT_DATA. - */ - WAITING_FOR_ATTESTATIONS = 'WAITING_FOR_ATTESTATIONS', + COLLECTING_ATTESTATIONS = 'COLLECTING_ATTESTATIONS', /** * Sending the tx to L1 with the L2 block data and awaiting it to be mined. Will move to SYNCHRONIZING. */ @@ -72,8 +68,3 @@ export function orderAttestations(attestations: BlockAttestation[], orderAddress return orderedAttestations; } - -export function getSecondsIntoSlot(l1GenesisTime: number, aztecSlotDuration: number, slotNumber: number): number { - const slotStartTimestamp = l1GenesisTime + slotNumber * aztecSlotDuration; - return Number((Date.now() / 1000 - slotStartTimestamp).toFixed(3)); -} diff --git a/yarn-project/simulator/src/public/public_processor.test.ts b/yarn-project/simulator/src/public/public_processor.test.ts index 51a816e46cf4..c73a6e086772 100644 --- a/yarn-project/simulator/src/public/public_processor.test.ts +++ b/yarn-project/simulator/src/public/public_processor.test.ts @@ -19,6 +19,9 @@ import { RevertCode, } from '@aztec/circuits.js'; import { computePublicDataTreeLeafSlot } from '@aztec/circuits.js/hash'; +import { times } from '@aztec/foundation/collection'; +import { sleep } from '@aztec/foundation/sleep'; +import { TestDateProvider } from '@aztec/foundation/timer'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { type MockProxy, mock } from 'jest-mock-extended'; @@ -31,7 +34,7 @@ import { type PublicTxResult, type PublicTxSimulator } from './public_tx_simulat describe('public_processor', () => { let db: MockProxy; let worldStateDB: MockProxy; - let publicTxProcessor: MockProxy; + let publicTxSimulator: MockProxy; let root: Buffer; let mockedEnqueuedCallsResult: PublicTxResult; @@ -50,7 +53,7 @@ describe('public_processor', () => { beforeEach(() => { db = mock(); worldStateDB = mock(); - publicTxProcessor = mock(); + publicTxSimulator = mock(); root = Buffer.alloc(32, 5); @@ -72,7 +75,7 @@ describe('public_processor', () => { worldStateDB.storageRead.mockResolvedValue(Fr.ZERO); - publicTxProcessor.simulate.mockImplementation(() => { + publicTxSimulator.simulate.mockImplementation(() => { return Promise.resolve(mockedEnqueuedCallsResult); }); @@ -81,7 +84,8 @@ describe('public_processor', () => { globalVariables, BlockHeader.empty(), worldStateDB, - publicTxProcessor, + publicTxSimulator, + new TestDateProvider(), new NoopTelemetryClient(), ); }); @@ -127,7 +131,7 @@ describe('public_processor', () => { }); it('returns failed txs without aborting entire operation', async function () { - publicTxProcessor.simulate.mockRejectedValue(new SimulationError(`Failed`, [])); + publicTxSimulator.simulate.mockRejectedValue(new SimulationError(`Failed`, [])); const tx = mockTxWithPublicCalls(); const [processed, failed] = await processor.process([tx], 1); @@ -166,6 +170,26 @@ describe('public_processor', () => { expect(failed.length).toBe(1); expect(failed[0].tx).toEqual(tx); }); + + it('does not go past the deadline', async function () { + const txs = times(3, seed => mockTxWithPublicCalls({ seed })); + + // The simulator will take 400ms to process each tx + publicTxSimulator.simulate.mockImplementation(async () => { + await sleep(400); + return mockedEnqueuedCallsResult; + }); + + // We allocate a deadline of 1s, so only one 2 txs should fit + const deadline = new Date(Date.now() + 1000); + const [processed, failed] = await processor.process(txs, 3, undefined, deadline); + + expect(processed.length).toBe(2); + expect(processed[0].hash).toEqual(txs[0].getTxHash()); + expect(processed[1].hash).toEqual(txs[1].getTxHash()); + expect(failed).toEqual([]); + expect(worldStateDB.commit).toHaveBeenCalledTimes(2); + }); }); describe('with fee payer', () => { diff --git a/yarn-project/simulator/src/public/public_processor.ts b/yarn-project/simulator/src/public/public_processor.ts index e0f3f328ba33..b9b09a0bc2e5 100644 --- a/yarn-project/simulator/src/public/public_processor.ts +++ b/yarn-project/simulator/src/public/public_processor.ts @@ -23,7 +23,7 @@ import { } from '@aztec/circuits.js'; import { padArrayEnd } from '@aztec/foundation/collection'; import { createLogger } from '@aztec/foundation/log'; -import { Timer } from '@aztec/foundation/timer'; +import { type DateProvider, Timer, elapsed, executeTimeout } from '@aztec/foundation/timer'; import { ProtocolContractAddress } from '@aztec/protocol-contracts'; import { ContractClassRegisteredEvent } from '@aztec/protocol-contracts/class-registerer'; import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; @@ -37,7 +37,11 @@ import { PublicTxSimulator } from './public_tx_simulator.js'; * Creates new instances of PublicProcessor given the provided merkle tree db and contract data source. */ export class PublicProcessorFactory { - constructor(private contractDataSource: ContractDataSource, private telemetryClient: TelemetryClient) {} + constructor( + private contractDataSource: ContractDataSource, + private dateProvider: DateProvider, + private telemetryClient: TelemetryClient, + ) {} /** * Creates a new instance of a PublicProcessor. @@ -55,7 +59,7 @@ export class PublicProcessorFactory { const historicalHeader = maybeHistoricalHeader ?? merkleTree.getInitialHeader(); const worldStateDB = new WorldStateDB(merkleTree, this.contractDataSource); - const publicTxSimulator = new PublicTxSimulator( + const publicTxSimulator = this.createPublicTxSimulator( merkleTree, worldStateDB, this.telemetryClient, @@ -70,9 +74,27 @@ export class PublicProcessorFactory { historicalHeader, worldStateDB, publicTxSimulator, + this.dateProvider, this.telemetryClient, ); } + + protected createPublicTxSimulator( + db: MerkleTreeWriteOperations, + worldStateDB: WorldStateDB, + telemetryClient: TelemetryClient, + globalVariables: GlobalVariables, + doMerkleOperations: boolean = false, + ) { + return new PublicTxSimulator(db, worldStateDB, telemetryClient, globalVariables, doMerkleOperations); + } +} + +class PublicProcessorTimeoutError extends Error { + constructor(message: string = 'Timed out while processing tx') { + super(message); + this.name = 'PublicProcessorTimeoutError'; + } } /** @@ -87,6 +109,7 @@ export class PublicProcessor implements Traceable { protected historicalHeader: BlockHeader, protected worldStateDB: WorldStateDB, protected publicTxSimulator: PublicTxSimulator, + private dateProvider: DateProvider, telemetryClient: TelemetryClient, private log = createLogger('simulator:public-processor'), ) { @@ -107,6 +130,7 @@ export class PublicProcessor implements Traceable { txs: Tx[], maxTransactions = txs.length, txValidator?: TxValidator, + deadline?: Date, ): Promise<[ProcessedTx[], FailedTx[], NestedProcessReturnValues[]]> { // The processor modifies the tx objects in place, so we need to clone them. txs = txs.map(tx => Tx.clone(tx)); @@ -120,17 +144,18 @@ export class PublicProcessor implements Traceable { break; } try { - const [processedTx, returnValues] = await this.processTx(tx, txValidator); + const [processedTx, returnValues] = await this.processTx(tx, txValidator, deadline); result.push(processedTx); returns = returns.concat(returnValues); } catch (err: any) { + if (err?.name === 'PublicProcessorTimeoutError') { + this.log.warn(`Stopping tx processing due to timeout.`); + break; + } const errorMessage = err instanceof Error ? err.message : 'Unknown error'; this.log.warn(`Failed to process tx ${tx.getTxHash()}: ${errorMessage} ${err?.stack}`); - failed.push({ - tx, - error: err instanceof Error ? err : new Error(errorMessage), - }); + failed.push({ tx, error: err instanceof Error ? err : new Error(errorMessage) }); returns.push(new NestedProcessReturnValues([])); } } @@ -142,15 +167,14 @@ export class PublicProcessor implements Traceable { private async processTx( tx: Tx, txValidator?: TxValidator, + deadline?: Date, ): Promise<[ProcessedTx, NestedProcessReturnValues[]]> { - const [processedTx, returnValues] = !tx.hasPublicCalls() - ? await this.processPrivateOnlyTx(tx) - : await this.processTxWithPublicCalls(tx); + const [time, [processedTx, returnValues]] = await elapsed(() => this.processTxWithinDeadline(tx, deadline)); this.log.verbose( !tx.hasPublicCalls() - ? `Processed tx ${processedTx.hash} with no public calls` - : `Processed tx ${processedTx.hash} with ${tx.enqueuedPublicFunctionCalls.length} public calls`, + ? `Processed tx ${processedTx.hash} with no public calls in ${time}ms` + : `Processed tx ${processedTx.hash} with ${tx.enqueuedPublicFunctionCalls.length} public calls in ${time}ms`, { txHash: processedTx.hash, txFee: processedTx.txEffect.transactionFee.toBigInt(), @@ -164,6 +188,7 @@ export class PublicProcessor implements Traceable { unencryptedLogCount: processedTx.txEffect.unencryptedLogs.getTotalLogCount(), privateLogCount: processedTx.txEffect.privateLogs.length, l2ToL1MessageCount: processedTx.txEffect.l2ToL1Msgs.length, + durationMs: time, }, ); @@ -215,6 +240,37 @@ export class PublicProcessor implements Traceable { return [processedTx, returnValues ?? []]; } + /** Processes the given tx within deadline. Returns timeout if deadline is hit. */ + private async processTxWithinDeadline( + tx: Tx, + deadline?: Date, + ): Promise<[ProcessedTx, NestedProcessReturnValues[] | undefined]> { + const processFn: () => Promise<[ProcessedTx, NestedProcessReturnValues[] | undefined]> = tx.hasPublicCalls() + ? () => this.processTxWithPublicCalls(tx) + : () => this.processPrivateOnlyTx(tx); + + if (!deadline) { + return await processFn(); + } + + const timeout = +deadline - this.dateProvider.now(); + this.log.debug(`Processing tx ${tx.getTxHash().toString()} within ${timeout}ms`, { + deadline: deadline.toISOString(), + now: new Date(this.dateProvider.now()).toISOString(), + txHash: tx.getTxHash().toString(), + }); + + if (timeout < 0) { + throw new PublicProcessorTimeoutError(); + } + + return await executeTimeout( + () => processFn(), + timeout, + () => new PublicProcessorTimeoutError(), + ); + } + /** * Creates the public data write for paying the tx fee. * This is used in private only txs, since for txs with public calls @@ -249,7 +305,7 @@ export class PublicProcessor implements Traceable { @trackSpan('PublicProcessor.processPrivateOnlyTx', (tx: Tx) => ({ [Attributes.TX_HASH]: tx.getTxHash().toString(), })) - private async processPrivateOnlyTx(tx: Tx): Promise<[ProcessedTx]> { + private async processPrivateOnlyTx(tx: Tx): Promise<[ProcessedTx, undefined]> { const gasFees = this.globalVariables.gasFees; const transactionFee = tx.data.gasUsed.computeFee(gasFees); @@ -268,7 +324,7 @@ export class PublicProcessor implements Traceable { .filter(log => ContractClassRegisteredEvent.isContractClassRegisteredEvent(log.data)) .map(log => ContractClassRegisteredEvent.fromLog(log.data)), ); - return [processedTx]; + return [processedTx, undefined]; } @trackSpan('PublicProcessor.processTxWithPublicCalls', tx => ({