Skip to content

Commit

Permalink
feat(prover): perform prover coordination via p2p layer (#9325)
Browse files Browse the repository at this point in the history
fixes: #9264
  • Loading branch information
Maddiaa0 authored Oct 29, 2024
1 parent 468c100 commit 2132bc2
Show file tree
Hide file tree
Showing 25 changed files with 479 additions and 168 deletions.
2 changes: 1 addition & 1 deletion scripts/run_native_testnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Options:
'

# Default values
TEST_SCRIPT=./test-transfer.sh
TEST_SCRIPT="./test-transfer.sh"
PROVER_SCRIPT="\"./prover-node.sh 8078 false\""
NUM_VALIDATORS=3
INTERLEAVED=false
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export class AztecNodeService implements AztecNode {
}

addEpochProofQuote(quote: EpochProofQuote): Promise<void> {
return Promise.resolve(this.p2pClient.broadcastEpochProofQuote(quote));
return Promise.resolve(this.p2pClient.addEpochProofQuote(quote));
}

getEpochProofQuotes(epoch: bigint): Promise<EpochProofQuote[]> {
Expand Down Expand Up @@ -397,7 +397,7 @@ export class AztecNodeService implements AztecNode {
* @returns - The tx if it exists.
*/
public getTxByHash(txHash: TxHash) {
return Promise.resolve(this.p2pClient!.getTxByHash(txHash));
return Promise.resolve(this.p2pClient!.getTxByHashFromPool(txHash));
}

/**
Expand Down
5 changes: 4 additions & 1 deletion yarn-project/bb-prover/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
"version": "0.1.0",
"type": "module",
"exports": {
".": "./dest/index.js"
".": "./dest/index.js",
"./prover": "./dest/prover/index.js",
"./verifier": "./dest/verifier/index.js",
"./test": "./dest/test/index.js"
},
"bin": {
"bb-cli": "./dest/bb/index.js"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ export interface ProverCoordination {
* @param quote - The quote to store
*/
addEpochProofQuote(quote: EpochProofQuote): Promise<void>;

/**
* Stops the coordination service.
*/
stop(): Promise<void>;
}
2 changes: 1 addition & 1 deletion yarn-project/end-to-end/src/e2e_p2p/p2p_network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import { AZTEC_SLOT_DURATION, ETHEREUM_SLOT_DURATION, EthAddress } from '@aztec/
import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { RollupAbi } from '@aztec/l1-artifacts';
import { type BootstrapNode } from '@aztec/p2p';
import { createBootstrapNodeFromPrivateKey } from '@aztec/p2p/mocks';

import getPort from 'get-port';
import { getContract } from 'viem';
import { privateKeyToAccount } from 'viem/accounts';

import {
createBootstrapNodeFromPrivateKey,
createValidatorConfig,
generateNodePrivateKeys,
generatePeerIdPrivateKeys,
Expand Down
29 changes: 0 additions & 29 deletions yarn-project/end-to-end/src/fixtures/setup_p2p_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import { type AztecNodeConfig, AztecNodeService } from '@aztec/aztec-node';
import { type SentTx, createDebugLogger } from '@aztec/aztec.js';
import { type AztecAddress } from '@aztec/circuits.js';
import { type BootnodeConfig, BootstrapNode, createLibP2PPeerId } from '@aztec/p2p';
import { type PXEService } from '@aztec/pxe';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

Expand Down Expand Up @@ -121,31 +120,3 @@ export async function createValidatorConfig(

return nodeConfig;
}

export function createBootstrapNodeConfig(privateKey: string, port: number): BootnodeConfig {
return {
udpListenAddress: `0.0.0.0:${port}`,
udpAnnounceAddress: `127.0.0.1:${port}`,
peerIdPrivateKey: privateKey,
minPeerCount: 10,
maxPeerCount: 100,
};
}

export function createBootstrapNodeFromPrivateKey(privateKey: string, port: number): Promise<BootstrapNode> {
const config = createBootstrapNodeConfig(privateKey, port);
return startBootstrapNode(config);
}

export async function createBootstrapNode(port: number): Promise<BootstrapNode> {
const peerId = await createLibP2PPeerId();
const config = createBootstrapNodeConfig(Buffer.from(peerId.privateKey!).toString('hex'), port);

return startBootstrapNode(config);
}

async function startBootstrapNode(config: BootnodeConfig) {
const bootstrapNode = new BootstrapNode();
await bootstrapNode.start(config);
return bootstrapNode;
}
10 changes: 9 additions & 1 deletion yarn-project/end-to-end/src/fixtures/snapshot_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ async function createAndSyncProverNode(
aztecNodeConfig: AztecNodeConfig,
aztecNode: AztecNode,
) {
// Disable stopping the aztec node as the prover coordination test will kill it otherwise
// This is only required when stopping the prover node for testing
const aztecNodeWithoutStop = {
addEpochProofQuote: aztecNode.addEpochProofQuote.bind(aztecNode),
getTxByHash: aztecNode.getTxByHash.bind(aztecNode),
stop: () => Promise.resolve(),
};

// Creating temp store and archiver for simulated prover node
const archiverConfig = { ...aztecNodeConfig, dataDirectory: undefined };
const archiver = await createArchiver(archiverConfig, new NoopTelemetryClient(), { blockUntilSync: true });
Expand All @@ -277,7 +285,7 @@ async function createAndSyncProverNode(
proverTargetEscrowAmount: 2000n,
};
const proverNode = await createProverNode(proverConfig, {
aztecNodeTxProvider: aztecNode,
aztecNodeTxProvider: aztecNodeWithoutStop,
archiver: archiver as Archiver,
});
await proverNode.start();
Expand Down
7 changes: 6 additions & 1 deletion yarn-project/p2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
"name": "@aztec/p2p",
"version": "0.0.0",
"type": "module",
"exports": "./dest/index.js",
"exports": {
".": "./dest/index.js",
"./mocks": "./dest/mocks/index.js",
"./bootstrap": "./dest/bootstrap/bootstrap.js"
},
"typedocOptions": {
"entryPoints": [
"./src/index.ts"
Expand Down Expand Up @@ -93,6 +97,7 @@
"@jest/globals": "^29.5.0",
"@types/jest": "^29.5.0",
"@types/node": "^18.14.6",
"get-port": "^7.1.0",
"it-drain": "^3.0.5",
"it-length": "^3.0.6",
"jest": "^29.5.0",
Expand Down
49 changes: 1 addition & 48 deletions yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { AztecKVTxPool, type TxPool } from '../mem_pools/tx_pool/index.js';
import { DiscV5Service } from '../service/discV5_service.js';
import { DummyP2PService } from '../service/dummy_service.js';
import { LibP2PService, createLibP2PPeerId } from '../service/index.js';
import { getPublicIp, resolveAddressIfNecessary, splitAddressPort } from '../util.js';
import { configureP2PClientAddresses } from '../util.js';

export * from './p2p_client.js';

Expand Down Expand Up @@ -67,50 +67,3 @@ export const createP2PClient = async (
}
return new P2PClient(store, l2BlockSource, mempools, p2pService, config.keepProvenTxsInPoolFor, telemetry);
};

async function configureP2PClientAddresses(_config: P2PConfig & DataStoreConfig): Promise<P2PConfig & DataStoreConfig> {
const config = { ..._config };
const {
tcpAnnounceAddress: configTcpAnnounceAddress,
udpAnnounceAddress: configUdpAnnounceAddress,
queryForIp,
} = config;

config.tcpAnnounceAddress = configTcpAnnounceAddress
? await resolveAddressIfNecessary(configTcpAnnounceAddress)
: undefined;
config.udpAnnounceAddress = configUdpAnnounceAddress
? await resolveAddressIfNecessary(configUdpAnnounceAddress)
: undefined;

// create variable for re-use if needed
let publicIp;

// check if no announce IP was provided
const splitTcpAnnounceAddress = splitAddressPort(configTcpAnnounceAddress || '', true);
if (splitTcpAnnounceAddress.length == 2 && splitTcpAnnounceAddress[0] === '') {
if (queryForIp) {
publicIp = await getPublicIp();
const tcpAnnounceAddress = `${publicIp}:${splitTcpAnnounceAddress[1]}`;
config.tcpAnnounceAddress = tcpAnnounceAddress;
} else {
throw new Error(
`Invalid announceTcpAddress provided: ${configTcpAnnounceAddress}. Expected format: <addr>:<port>`,
);
}
}

const splitUdpAnnounceAddress = splitAddressPort(configUdpAnnounceAddress || '', true);
if (splitUdpAnnounceAddress.length == 2 && splitUdpAnnounceAddress[0] === '') {
// If announceUdpAddress is not provided, use announceTcpAddress
if (!queryForIp && config.tcpAnnounceAddress) {
config.udpAnnounceAddress = config.tcpAnnounceAddress;
} else if (queryForIp) {
const udpPublicIp = publicIp || (await getPublicIp());
const udpAnnounceAddress = `${udpPublicIp}:${splitUdpAnnounceAddress[1]}`;
config.udpAnnounceAddress = udpAnnounceAddress;
}
}

return config;
}
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ describe('In-Memory P2P Client', () => {
];

for (const quote of proofQuotes) {
client.broadcastEpochProofQuote(quote);
await client.addEpochProofQuote(quote);
}
expect(epochProofQuotePool.addQuote).toBeCalledTimes(proofQuotes.length);

Expand Down
46 changes: 39 additions & 7 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/

import { type ENR } from '@chainsafe/enr';

import { getP2PConfigEnvVars } from '../config.js';
import { getP2PConfigFromEnv } from '../config.js';
import { type AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js';
import { type EpochProofQuotePool } from '../mem_pools/epoch_proof_quote_pool/epoch_proof_quote_pool.js';
import { type MemPools } from '../mem_pools/interface.js';
Expand Down Expand Up @@ -77,11 +77,11 @@ export interface P2P {
getEpochProofQuotes(epoch: bigint): Promise<EpochProofQuote[]>;

/**
* Broadcasts an EpochProofQuote to other peers.
* Adds an EpochProofQuote to the pool and broadcasts an EpochProofQuote to other peers.
*
* @param quote - the quote to broadcast
*/
broadcastEpochProofQuote(quote: EpochProofQuote): void;
addEpochProofQuote(quote: EpochProofQuote): Promise<void>;

/**
* Registers a callback from the validator client that determines how to behave when
Expand Down Expand Up @@ -130,7 +130,14 @@ export interface P2P {
* @param txHash - Hash of tx to return.
* @returns A single tx or undefined.
*/
getTxByHash(txHash: TxHash): Tx | undefined;
getTxByHashFromPool(txHash: TxHash): Tx | undefined;

/**
* Returns a transaction in the transaction pool by its hash, requesting it from the network if it is not found.
* @param txHash - Hash of tx to return.
* @returns A single tx or undefined.
*/
getTxByHash(txHash: TxHash): Promise<Tx | undefined>;

/**
* Returns whether the given tx hash is flagged as pending or mined.
Expand Down Expand Up @@ -217,7 +224,7 @@ export class P2PClient extends WithTracer implements P2P {
) {
super(telemetryClient, 'P2PClient');

const { blockCheckIntervalMS: checkInterval, l2QueueSize: p2pL2QueueSize } = getP2PConfigEnvVars();
const { blockCheckIntervalMS: checkInterval, l2QueueSize: p2pL2QueueSize } = getP2PConfigFromEnv();
const l2DownloaderOpts = { maxQueueSize: p2pL2QueueSize, pollIntervalMS: checkInterval };
// TODO(palla/prover-node): This effectively downloads blocks twice from the archiver, which is an issue
// if the archiver is remote. We should refactor this so the downloader keeps a single queue and handles
Expand All @@ -234,18 +241,29 @@ export class P2PClient extends WithTracer implements P2P {
}

#assertIsReady() {
// this.log.info('Checking if p2p client is ready, current state: ', this.currentState);
if (!this.isReady()) {
throw new Error('P2P client not ready');
}
}

/**
* Adds an EpochProofQuote to the pool and broadcasts an EpochProofQuote to other peers.
* @param quote - the quote to broadcast
*/
addEpochProofQuote(quote: EpochProofQuote): Promise<void> {
this.epochProofQuotePool.addQuote(quote);
this.broadcastEpochProofQuote(quote);
return Promise.resolve();
}

getEpochProofQuotes(epoch: bigint): Promise<EpochProofQuote[]> {
return Promise.resolve(this.epochProofQuotePool.getQuotes(epoch));
}

broadcastEpochProofQuote(quote: EpochProofQuote): void {
this.#assertIsReady();
this.epochProofQuotePool.addQuote(quote);
this.log.info('Broadcasting epoch proof quote', quote.toViemArgs());
return this.p2pService.propagate(quote);
}

Expand Down Expand Up @@ -406,10 +424,24 @@ export class P2PClient extends WithTracer implements P2P {
* @param txHash - Hash of the transaction to look for in the pool.
* @returns A single tx or undefined.
*/
getTxByHash(txHash: TxHash): Tx | undefined {
getTxByHashFromPool(txHash: TxHash): Tx | undefined {
return this.txPool.getTxByHash(txHash);
}

/**
* Returns a transaction in the transaction pool by its hash.
* If the transaction is not in the pool, it will be requested from the network.
* @param txHash - Hash of the transaction to look for in the pool.
* @returns A single tx or undefined.
*/
getTxByHash(txHash: TxHash): Promise<Tx | undefined> {
const tx = this.txPool.getTxByHash(txHash);
if (tx) {
return Promise.resolve(tx);
}
return this.requestTxByHash(txHash);
}

/**
* Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers.
* @param tx - The tx to verify.
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ export const p2pConfigMappings: ConfigMappingsType<P2PConfig> = {
* Gets the config values for p2p client from environment variables.
* @returns The config values for p2p client.
*/
export function getP2PConfigEnvVars(): P2PConfig {
export function getP2PConfigFromEnv(): P2PConfig {
return getConfigFromMappings<P2PConfig>(p2pConfigMappings);
}

Expand Down
Loading

0 comments on commit 2132bc2

Please sign in to comment.