Skip to content

Commit

Permalink
fix: Fix race condition between RPC Server and Aztec Node (#1700)
Browse files Browse the repository at this point in the history
This PR introduces the following changes:

1. Refactored the way in which the world-state syncs it's blocks from
the configured block source.
2. When the world state is accessed, the node first checks to see if it
is in sync. If not then it performs an immediate sync to bring it up to
the latest state.

# Checklist:
Remove the checklist to signal you've completed it. Enable auto-merge if
the PR is ready to merge.
- [ ] If the pull request requires a cryptography review (e.g.
cryptographic algorithm implementations) I have added the 'crypto' tag.
- [ ] I have reviewed my diff in github, line by line and removed
unexpected formatting changes, testing logs, or commented-out code.
- [ ] Every change is related to the PR description.
- [ ] I have
[linked](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue)
this pull request to relevant issues (if any exist).

---------

Co-authored-by: Santiago Palladino <santiago@aztecprotocol.com>
  • Loading branch information
PhilWindle and spalladino authored Aug 22, 2023
1 parent 95d1350 commit 4c89941
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 58 deletions.
70 changes: 48 additions & 22 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import {
import {
MerkleTrees,
ServerWorldStateSynchroniser,
WorldStateConfig,
WorldStateSynchroniser,
computePublicDataTreeLeafIndex,
getConfigEnvVars as getWorldStateConfig,
} from '@aztec/world-state';

import { default as levelup } from 'levelup';
Expand All @@ -54,7 +56,6 @@ export class AztecNodeService implements AztecNode {
protected unencryptedLogsSource: L2LogsSource,
protected contractDataSource: ContractDataSource,
protected l1ToL2MessageSource: L1ToL2MessageSource,
protected merkleTreeDB: MerkleTrees,
protected worldStateSynchroniser: WorldStateSynchroniser,
protected sequencer: SequencerClient,
protected chainId: number,
Expand All @@ -80,7 +81,8 @@ export class AztecNodeService implements AztecNode {

// now create the merkle trees and the world state syncher
const merkleTreeDB = await MerkleTrees.new(levelup(createMemDown()), await CircuitsWasm.get());
const worldStateSynchroniser = new ServerWorldStateSynchroniser(merkleTreeDB, archiver);
const worldStateConfig: WorldStateConfig = getWorldStateConfig();
const worldStateSynchroniser = new ServerWorldStateSynchroniser(merkleTreeDB, archiver, worldStateConfig);

// start both and wait for them to sync from the block source
await Promise.all([p2pClient.start(), worldStateSynchroniser.start()]);
Expand All @@ -101,7 +103,6 @@ export class AztecNodeService implements AztecNode {
archiver,
archiver,
archiver,
merkleTreeDB,
worldStateSynchroniser,
sequencer,
config.chainId,
Expand Down Expand Up @@ -216,7 +217,6 @@ export class AztecNodeService implements AztecNode {
await this.sequencer.stop();
await this.p2pClient.stop();
await this.worldStateSynchroniser.stop();
await this.merkleTreeDB.stop();
await this.blockSource.stop();
this.log.info(`Stopped`);
}
Expand All @@ -243,35 +243,39 @@ export class AztecNodeService implements AztecNode {
* @param leafValue - The value to search for.
* @returns The index of the given leaf in the contracts tree or undefined if not found.
*/
public findContractIndex(leafValue: Buffer): Promise<bigint | undefined> {
return this.merkleTreeDB.findLeafIndex(MerkleTreeId.CONTRACT_TREE, leafValue, false);
public async findContractIndex(leafValue: Buffer): Promise<bigint | undefined> {
const committedDb = await this.getWorldState();
return committedDb.findLeafIndex(MerkleTreeId.CONTRACT_TREE, leafValue);
}

/**
* Returns the sibling path for the given index in the contract tree.
* @param leafIndex - The index of the leaf for which the sibling path is required.
* @returns The sibling path for the leaf index.
*/
public getContractPath(leafIndex: bigint): Promise<SiblingPath<typeof CONTRACT_TREE_HEIGHT>> {
return this.merkleTreeDB.getSiblingPath(MerkleTreeId.CONTRACT_TREE, leafIndex, false);
public async getContractPath(leafIndex: bigint): Promise<SiblingPath<typeof CONTRACT_TREE_HEIGHT>> {
const committedDb = await this.getWorldState();
return committedDb.getSiblingPath(MerkleTreeId.CONTRACT_TREE, leafIndex);
}

/**
* Find the index of the given commitment.
* @param leafValue - The value to search for.
* @returns The index of the given leaf in the private data tree or undefined if not found.
*/
public findCommitmentIndex(leafValue: Buffer): Promise<bigint | undefined> {
return this.merkleTreeDB.findLeafIndex(MerkleTreeId.PRIVATE_DATA_TREE, leafValue, false);
public async findCommitmentIndex(leafValue: Buffer): Promise<bigint | undefined> {
const committedDb = await this.getWorldState();
return committedDb.findLeafIndex(MerkleTreeId.PRIVATE_DATA_TREE, leafValue);
}

/**
* Returns the sibling path for the given index in the data tree.
* @param leafIndex - The index of the leaf for which the sibling path is required.
* @returns The sibling path for the leaf index.
*/
public getDataTreePath(leafIndex: bigint): Promise<SiblingPath<typeof PRIVATE_DATA_TREE_HEIGHT>> {
return this.merkleTreeDB.getSiblingPath(MerkleTreeId.PRIVATE_DATA_TREE, leafIndex, false);
public async getDataTreePath(leafIndex: bigint): Promise<SiblingPath<typeof PRIVATE_DATA_TREE_HEIGHT>> {
const committedDb = await this.getWorldState();
return committedDb.getSiblingPath(MerkleTreeId.PRIVATE_DATA_TREE, leafIndex);
}

/**
Expand All @@ -282,12 +286,9 @@ export class AztecNodeService implements AztecNode {
*/
public async getL1ToL2MessageAndIndex(messageKey: Fr): Promise<L1ToL2MessageAndIndex> {
// todo: #697 - make this one lookup.
const committedDb = await this.getWorldState();
const message = await this.l1ToL2MessageSource.getConfirmedL1ToL2Message(messageKey);
const index = (await this.merkleTreeDB.findLeafIndex(
MerkleTreeId.L1_TO_L2_MESSAGES_TREE,
messageKey.toBuffer(),
false,
))!;
const index = (await committedDb.findLeafIndex(MerkleTreeId.L1_TO_L2_MESSAGES_TREE, messageKey.toBuffer()))!;
return Promise.resolve({ message, index });
}

Expand All @@ -296,8 +297,9 @@ export class AztecNodeService implements AztecNode {
* @param leafIndex - Index of the leaf in the tree.
* @returns The sibling path.
*/
public getL1ToL2MessagesTreePath(leafIndex: bigint): Promise<SiblingPath<typeof L1_TO_L2_MSG_TREE_HEIGHT>> {
return this.merkleTreeDB.getSiblingPath(MerkleTreeId.L1_TO_L2_MESSAGES_TREE, leafIndex, false);
public async getL1ToL2MessagesTreePath(leafIndex: bigint): Promise<SiblingPath<typeof L1_TO_L2_MSG_TREE_HEIGHT>> {
const committedDb = await this.getWorldState();
return committedDb.getSiblingPath(MerkleTreeId.L1_TO_L2_MESSAGES_TREE, leafIndex);
}

/**
Expand All @@ -308,16 +310,17 @@ export class AztecNodeService implements AztecNode {
* Note: Aztec's version of `eth_getStorageAt`.
*/
public async getPublicStorageAt(contract: AztecAddress, slot: bigint): Promise<Buffer | undefined> {
const committedDb = await this.getWorldState();
const leafIndex = computePublicDataTreeLeafIndex(contract, new Fr(slot), await CircuitsWasm.get());
return this.merkleTreeDB.getLeafValue(MerkleTreeId.PUBLIC_DATA_TREE, leafIndex, false);
return committedDb.getLeafValue(MerkleTreeId.PUBLIC_DATA_TREE, leafIndex);
}

/**
* Returns the current committed roots for the data trees.
* @returns The current committed roots for the data trees.
*/
public async getTreeRoots(): Promise<Record<MerkleTreeId, Fr>> {
const committedDb = this.worldStateSynchroniser.getCommitted();
const committedDb = await this.getWorldState();
const getTreeRoot = async (id: MerkleTreeId) => Fr.fromBuffer((await committedDb.getTreeInfo(id)).root);

const [privateDataTree, nullifierTree, contractTree, l1ToL2MessagesTree, blocksTree, publicDataTree] =
Expand Down Expand Up @@ -345,7 +348,7 @@ export class AztecNodeService implements AztecNode {
* @returns The current committed block data.
*/
public async getHistoricBlockData(): Promise<HistoricBlockData> {
const committedDb = this.worldStateSynchroniser.getCommitted();
const committedDb = await this.getWorldState();
const [roots, globalsHash] = await Promise.all([this.getTreeRoots(), committedDb.getLatestGlobalVariablesHash()]);

return new HistoricBlockData(
Expand All @@ -359,4 +362,27 @@ export class AztecNodeService implements AztecNode {
globalsHash,
);
}

/**
* Returns an instance of MerkleTreeOperations having first ensured the world state is fully synched
* @returns An instance of a committed MerkleTreeOperations
*/
private async getWorldState() {
try {
// Attempt to sync the world state if necessary
await this.syncWorldState();
} catch (err) {
this.log.error(`Error getting world state: ${err}`);
}
return this.worldStateSynchroniser.getCommitted();
}

/**
* Ensure we fully sync the world state
* @returns A promise that fulfils once the world state is synced
*/
private async syncWorldState() {
const blockSourceHeight = await this.blockSource.getBlockHeight();
await this.worldStateSynchroniser.syncImmediate(blockSourceHeight);
}
}
2 changes: 1 addition & 1 deletion yarn-project/aztec-sandbox/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async function main() {
accountStrings.push(` Public Key: ${completeAddress.publicKey.toString()}\n\n`);
}
}
logger.info(`${splash}\n${github}\n\n`.concat(...accountStrings));
logger.info(`${splash}\n${github}\n\n`.concat(...accountStrings).concat(`\nAztec Sandbox now ready for use!`));
}

main().catch(err => {
Expand Down
70 changes: 47 additions & 23 deletions yarn-project/types/src/l2_block_downloader/l2_block_downloader.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MemoryFifo, Semaphore } from '@aztec/foundation/fifo';
import { MemoryFifo, Semaphore, SerialQueue } from '@aztec/foundation/fifo';
import { createDebugLogger } from '@aztec/foundation/log';
import { InterruptableSleep } from '@aztec/foundation/sleep';

Expand All @@ -18,7 +18,8 @@ export class L2BlockDownloader {
private from = 0;
private interruptableSleep = new InterruptableSleep();
private semaphore: Semaphore;
private queue = new MemoryFifo<L2Block[]>();
private jobQueue = new SerialQueue();
private blockQueue = new MemoryFifo<L2Block[]>();

constructor(private l2BlockSource: L2BlockSource, maxQueueSize: number, private pollIntervalMS = 10000) {
this.semaphore = new Semaphore(maxQueueSize);
Expand All @@ -29,59 +30,82 @@ export class L2BlockDownloader {
* @param from - The block number to start downloading from. Defaults to INITIAL_L2_BLOCK_NUM.
*/
public start(from = INITIAL_L2_BLOCK_NUM) {
this.from = from;

if (this.running) {
this.interruptableSleep.interrupt();
return;
}

this.from = from;
this.running = true;

const fn = async () => {
while (this.running) {
try {
const blocks = await this.l2BlockSource.getL2Blocks(this.from, 10);

if (!blocks.length) {
await this.interruptableSleep.sleep(this.pollIntervalMS);
continue;
}

// Blocks if there are maxQueueSize results in the queue, until released after the callback.
await this.semaphore.acquire();
this.queue.put(blocks);
this.from += blocks.length;
await this.jobQueue.put(() => this.collectBlocks());
await this.interruptableSleep.sleep(this.pollIntervalMS);
} catch (err) {
log.error(err);
await this.interruptableSleep.sleep(this.pollIntervalMS);
}
}
};

this.jobQueue.start();
this.runningPromise = fn();
}

/**
* Repeatedly queries the block source and adds the received blocks to the block queue.
* Stops when no further blocks are received.
* @returns The total number of blocks added to the block queue.
*/
private async collectBlocks() {
let totalBlocks = 0;
while (true) {
const blocks = await this.l2BlockSource.getL2Blocks(this.from, 10);
if (!blocks.length) {
return totalBlocks;
}
await this.semaphore.acquire();
this.blockQueue.put(blocks);
this.from += blocks.length;
totalBlocks += blocks.length;
}
}

/**
* Stops the downloader.
*/
public async stop() {
this.running = false;
this.interruptableSleep.interrupt();
this.queue.cancel();
await this.jobQueue.cancel();
this.blockQueue.cancel();
await this.runningPromise;
}

/**
* Gets the next batch of blocks from the queue.
* @param timeout - optional timeout value to prevent permanent blocking
* @returns The next batch of blocks from the queue.
*/
public async getL2Blocks() {
const blocks = await this.queue.get();
if (!blocks) {
public async getL2Blocks(timeout?: number) {
try {
const blocks = await this.blockQueue.get(timeout);
if (!blocks) {
return [];
}
this.semaphore.release();
return blocks;
} catch (err) {
// nothing to do
return [];
}
this.semaphore.release();
return blocks;
}

/**
* Forces an immediate request for blocks.
* @returns A promise that fulfills once the poll is complete
*/
public pollImmediate(): Promise<number> {
return this.jobQueue.put(() => this.collectBlocks());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ export class MerkleTreeOperationsFacade implements MerkleTreeOperations {
* @param index - The index of the leaf for which a sibling path is required.
* @returns A promise with the sibling path of the specified leaf index.
*/
getSiblingPath(treeId: MerkleTreeId, index: bigint): Promise<SiblingPath<number>> {
return this.trees.getSiblingPath(treeId, index, this.includeUncommitted);
async getSiblingPath<N extends number>(treeId: MerkleTreeId, index: bigint): Promise<SiblingPath<N>> {
const path = await this.trees.getSiblingPath(treeId, index, this.includeUncommitted);
return path as unknown as SiblingPath<N>;
}

/**
Expand Down
Loading

0 comments on commit 4c89941

Please sign in to comment.