Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): more comprehensive peer management, dial retries, persistence fix #6953

Merged
merged 13 commits into from
Jun 20, 2024
10 changes: 4 additions & 6 deletions yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { type AztecKVStore } from '@aztec/kv-store';
import { P2PClient } from '../client/p2p_client.js';
import { type P2PConfig } from '../config.js';
import { DiscV5Service } from '../service/discV5_service.js';
import { DummyP2PService, DummyPeerDiscoveryService } from '../service/dummy_service.js';
import { DummyP2PService } from '../service/dummy_service.js';
import { LibP2PService, createLibP2PPeerId } from '../service/index.js';
import { type TxPool } from '../tx_pool/index.js';
import { getPublicIp, splitAddressPort } from '../util.js';
Expand All @@ -17,7 +17,6 @@ export const createP2PClient = async (
txPool: TxPool,
l2BlockSource: L2BlockSource,
) => {
let discv5Service;
let p2pService;

if (config.p2pEnabled) {
Expand All @@ -40,7 +39,7 @@ export const createP2PClient = async (
config.tcpAnnounceAddress = tcpAnnounceAddress;
} else {
throw new Error(
`Invalid announceTcpAddress provided: ${splitTcpAnnounceAddress}. Expected format: <addr>:<port>`,
`Invalid announceTcpAddress provided: ${configTcpAnnounceAddress}. Expected format: <addr>:<port>`,
);
}
}
Expand All @@ -59,11 +58,10 @@ export const createP2PClient = async (

// Create peer discovery service
const peerId = await createLibP2PPeerId(config.peerIdPrivateKey);
discv5Service = new DiscV5Service(peerId, config);
p2pService = await LibP2PService.new(config, discv5Service, peerId, txPool);
const discoveryService = new DiscV5Service(peerId, config);
p2pService = await LibP2PService.new(config, discoveryService, peerId, txPool, store);
} else {
p2pService = new DummyP2PService();
discv5Service = new DummyPeerDiscoveryService();
}
return new P2PClient(store, l2BlockSource, txPool, p2pService);
};
1 change: 0 additions & 1 deletion yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ describe('In-Memory P2P Client', () => {
start: jest.fn(),
stop: jest.fn(),
propagateTx: jest.fn(),
settledTxs: jest.fn(),
};

blockSource = new MockBlockSource();
Expand Down
1 change: 0 additions & 1 deletion yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ export class P2PClient implements P2P {
for (const block of blocks) {
const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash);
await this.txPool.deleteTxs(txHashes);
this.p2pService.settledTxs(txHashes);
}
}

Expand Down
40 changes: 29 additions & 11 deletions yarn-project/p2p/src/service/discV5_service.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { createDebugLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { sleep } from '@aztec/foundation/sleep';

import { Discv5, type Discv5EventEmitter } from '@chainsafe/discv5';
import { type ENR, SignableENR } from '@chainsafe/enr';
import { ENR, SignableENR } from '@chainsafe/enr';
import type { PeerId } from '@libp2p/interface';
import { multiaddr } from '@multiformats/multiaddr';
import EventEmitter from 'events';
Expand All @@ -14,6 +13,8 @@ import { type PeerDiscoveryService, PeerDiscoveryState } from './service.js';

export const AZTEC_ENR_KEY = 'aztec_network';

const delayBeforeStart = 2000; // 2sec

export enum AztecENR {
devnet = 0x01,
testnet = 0x02,
Expand All @@ -33,11 +34,12 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService
/** This instance's ENR */
private enr: SignableENR;

private runningPromise: RunningPromise;

private currentState = PeerDiscoveryState.STOPPED;

private bootstrapNodes: string[];
private bootstrapNodePeerIds: PeerId[] = [];

private startTime = 0;

constructor(private peerId: PeerId, config: P2PConfig, private logger = createDebugLogger('aztec:discv5_service')) {
super();
Expand Down Expand Up @@ -83,18 +85,17 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService
const multiAddrUdp = await enr.getFullMultiaddr('udp');
this.logger.debug(`ENR multiaddr: ${multiAddrTcp?.toString()}, ${multiAddrUdp?.toString()}`);
});

this.runningPromise = new RunningPromise(async () => {
await this.discv5.findRandomNode();
}, config.p2pPeerCheckIntervalMS);
}

public async start(): Promise<void> {
// Do this conversion once since it involves an async function call
this.bootstrapNodePeerIds = await Promise.all(this.bootstrapNodes.map(enr => ENR.decodeTxt(enr).peerId()));
if (this.currentState === PeerDiscoveryState.RUNNING) {
throw new Error('DiscV5Service already started');
}
this.logger.info('Starting DiscV5');
await this.discv5.start();
this.startTime = Date.now();

this.logger.info('DiscV5 started');
this.currentState = PeerDiscoveryState.RUNNING;
Expand All @@ -110,12 +111,25 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService
this.logger.error(`Error adding bootnode ENRs: ${e}`);
}
}
}

public async runRandomNodesQuery(): Promise<void> {
if (this.currentState !== PeerDiscoveryState.RUNNING) {
throw new Error('DiscV5Service not running');
}

// First, wait some time before starting the peer discovery
// reference: https://github.com/ChainSafe/lodestar/issues/3423
await sleep(2000);
const msSinceStart = Date.now() - this.startTime;
if (Date.now() - this.startTime <= delayBeforeStart) {
await sleep(delayBeforeStart - msSinceStart);
}

this.runningPromise.start();
try {
await this.discv5.findRandomNode();
} catch (err) {
this.logger.error(`Error running discV5 random node query: ${err}`);
}
}

public getAllPeers(): ENR[] {
Expand All @@ -134,8 +148,12 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService
return this.currentState;
}

public isBootstrapPeer(peerId: PeerId): boolean {
return this.bootstrapNodePeerIds.some(node => node.equals(peerId));
}

public async stop(): Promise<void> {
await this.runningPromise.stop();
// await this.runningPromise.stop();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be deleted shouldn't it?

await this.discv5.stop();
this.currentState = PeerDiscoveryState.STOPPED;
}
Expand Down
16 changes: 14 additions & 2 deletions yarn-project/p2p/src/service/discv5_service.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { sleep } from '@aztec/foundation/sleep';

import { jest } from '@jest/globals';
import type { PeerId } from '@libp2p/interface';
import { SemVer } from 'semver';
Expand All @@ -8,7 +10,7 @@ import { createLibP2PPeerId } from './libp2p_service.js';
import { PeerDiscoveryState } from './service.js';

const waitForPeers = (node: DiscV5Service, expectedCount: number): Promise<void> => {
const timeout = 5_000;
const timeout = 7_000;
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error(`Timeout: Failed to connect to ${expectedCount} peers within ${timeout} ms`));
Expand Down Expand Up @@ -67,7 +69,17 @@ describe('Discv5Service', () => {
const node2 = await createNode(basePort);
await node1.start();
await node2.start();
await waitForPeers(node2, 2);
await Promise.all([
waitForPeers(node2, 2),
(async () => {
await sleep(2000); // wait for peer discovery to be able to start
for (let i = 0; i < 5; i++) {
await node1.runRandomNodesQuery();
await node2.runRandomNodesQuery();
await sleep(100);
}
})(),
]);

const node1Peers = await Promise.all(node1.getAllPeers().map(async peer => (await peer.peerId()).toString()));
const node2Peers = await Promise.all(node2.getAllPeers().map(async peer => (await peer.peerId()).toString()));
Expand Down
11 changes: 10 additions & 1 deletion yarn-project/p2p/src/service/dummy_service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { type Tx, type TxHash } from '@aztec/circuit-types';
import type { Tx, TxHash } from '@aztec/circuit-types';

import type { PeerId } from '@libp2p/interface';
import EventEmitter from 'events';

import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from './service.js';
Expand Down Expand Up @@ -66,6 +67,14 @@ export class DummyPeerDiscoveryService extends EventEmitter implements PeerDisco
return [];
}

public runRandomNodesQuery(): Promise<void> {
return Promise.resolve();
}

public isBootstrapPeer(_: PeerId): boolean {
return false;
}

public getStatus(): PeerDiscoveryState {
return this.currentState;
}
Expand Down
42 changes: 0 additions & 42 deletions yarn-project/p2p/src/service/known_txs.test.ts

This file was deleted.

56 changes: 0 additions & 56 deletions yarn-project/p2p/src/service/known_txs.ts

This file was deleted.

Loading
Loading