Skip to content

Commit

Permalink
feat: Add timeouts for request / response stream connections (#8434)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 authored Sep 11, 2024
1 parent a0c8915 commit 190c27f
Show file tree
Hide file tree
Showing 17 changed files with 248 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ describe('L1Publisher integration', () => {

writeJson(`mixed_block_${block.number}`, block, l1ToL2Content, recipientAddress, deployerAccount.address);

await publisher.processL2Block(block);
await publisher.proposeL2Block(block);

const logs = await publicClient.getLogs({
address: rollupAddress,
Expand Down Expand Up @@ -485,7 +485,7 @@ describe('L1Publisher integration', () => {

writeJson(`empty_block_${block.number}`, block, [], AztecAddress.ZERO, deployerAccount.address);

await publisher.processL2Block(block);
await publisher.proposeL2Block(block);

const logs = await publicClient.getLogs({
address: rollupAddress,
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export type EnvVar =
| 'TX_GOSSIP_VERSION'
| 'P2P_QUERY_FOR_IP'
| 'P2P_TX_POOL_KEEP_PROVEN_FOR'
| 'P2P_REQRESP_OVERALL_REQUEST_TIMEOUT_MS'
| 'P2P_REQRESP_INDIVIDUAL_REQUEST_TIMEOUT_MS'
| 'TELEMETRY'
| 'OTEL_SERVICE_NAME'
| 'OTEL_EXPORTER_OTLP_METRICS_ENDPOINT'
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/foundation/src/timer/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export { TimeoutTask } from './timeout.js';
export { TimeoutTask, executeTimeoutWithCustomError } from './timeout.js';
export { Timer } from './timer.js';
export { elapsed, elapsedSync } from './elapsed.js';
19 changes: 17 additions & 2 deletions yarn-project/foundation/src/timer/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ export class TimeoutTask<T> {
private interrupt = () => {};
private totalTime = 0;

constructor(private fn: () => Promise<T>, private timeout = 0, fnName = '') {
constructor(
private fn: () => Promise<T>,
private timeout = 0,
fnName = '',
error = () => new Error(`Timeout${fnName ? ` running ${fnName}` : ''} after ${timeout}ms.`),
) {
this.interruptPromise = new Promise<T>((_, reject) => {
this.interrupt = () => reject(new Error(`Timeout${fnName ? ` running ${fnName}` : ''} after ${timeout}ms.`));
this.interrupt = () => reject(error());
});
}

Expand Down Expand Up @@ -62,3 +67,13 @@ export const executeTimeout = async <T>(fn: () => Promise<T>, timeout = 0, fnNam
const task = new TimeoutTask(fn, timeout, fnName);
return await task.exec();
};

export const executeTimeoutWithCustomError = async <T>(
fn: () => Promise<T>,
timeout = 0,
error = () => new Error('No custom error provided'),
fnName = '',
) => {
const task = new TimeoutTask(fn, timeout, fnName, error);
return await task.exec();
};
5 changes: 4 additions & 1 deletion yarn-project/p2p/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import {
pickConfigMappings,
} from '@aztec/foundation/config';

import { type P2PReqRespConfig, p2pReqRespConfigMappings } from './service/reqresp/config.js';

/**
* P2P client configuration values.
*/
export interface P2PConfig {
export interface P2PConfig extends P2PReqRespConfig {
/**
* A flag dictating whether the P2P subsystem should be enabled.
*/
Expand Down Expand Up @@ -170,6 +172,7 @@ export const p2pConfigMappings: ConfigMappingsType<P2PConfig> = {
'How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven)',
...numberConfigHelper(0),
},
...p2pReqRespConfigMappings,
};

/**
Expand Down
21 changes: 21 additions & 0 deletions yarn-project/p2p/src/errors/reqresp.error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/** Individual request timeout error
*
* This error will be thrown when a request to a specific peer times out.
* @category Errors
*/
export class IndiviualReqRespTimeoutError extends Error {
constructor() {
super(`Request to peer timed out`);
}
}

/** Collective request timeout error
*
* This error will be thrown when a req resp request times out regardless of the peer.
* @category Errors
*/
export class CollectiveReqRespTimeoutError extends Error {
constructor() {
super(`Request to all peers timed out`);
}
}
7 changes: 6 additions & 1 deletion yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { bootstrap } from '@libp2p/bootstrap';
import { tcp } from '@libp2p/tcp';
import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p';

import { type P2PReqRespConfig } from '../service/reqresp/config.js';
import { pingHandler, statusHandler } from '../service/reqresp/handlers.js';
import {
PING_PROTOCOL,
Expand Down Expand Up @@ -80,7 +81,11 @@ export const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
// Create a req resp node, exposing the underlying p2p node
export const createReqResp = async (): Promise<ReqRespNode> => {
const p2p = await createLibp2pNode();
const req = new ReqResp(p2p);
const config: P2PReqRespConfig = {
overallRequestTimeoutMs: 4000,
individualRequestTimeoutMs: 2000,
};
const req = new ReqResp(config, p2p);
return {
p2p,
req,
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/p2p/src/service/discv5_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { BootstrapNode } from '../bootstrap/bootstrap.js';
import { type P2PConfig } from '../config.js';
import { DiscV5Service } from './discV5_service.js';
import { createLibP2PPeerId } from './libp2p_service.js';
import { DEFAULT_P2P_REQRESP_CONFIG } from './reqresp/config.js';
import { PeerDiscoveryState } from './service.js';

const waitForPeers = (node: DiscV5Service, expectedCount: number): Promise<void> => {
Expand Down Expand Up @@ -135,6 +136,7 @@ describe('Discv5Service', () => {
p2pEnabled: true,
l2QueueSize: 100,
keepProvenTxsInPoolFor: 0,
...DEFAULT_P2P_REQRESP_CONFIG,
};
return new DiscV5Service(peerId, config);
};
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export class LibP2PService implements P2PService {
private logger = createDebugLogger('aztec:libp2p_service'),
) {
this.peerManager = new PeerManager(node, peerDiscoveryService, config, logger);
this.reqresp = new ReqResp(node);
this.reqresp = new ReqResp(config, node);

this.blockReceivedCallback = (block: BlockProposal): Promise<BlockAttestation | undefined> => {
this.logger.verbose(
Expand Down
35 changes: 35 additions & 0 deletions yarn-project/p2p/src/service/reqresp/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { type ConfigMapping, numberConfigHelper } from '@aztec/foundation/config';

export const DEFAULT_INDIVIDUAL_REQUEST_TIMEOUT_MS = 2000;
export const DEFAULT_OVERALL_REQUEST_TIMEOUT_MS = 4000;

// For use in tests.
export const DEFAULT_P2P_REQRESP_CONFIG: P2PReqRespConfig = {
overallRequestTimeoutMs: DEFAULT_OVERALL_REQUEST_TIMEOUT_MS,
individualRequestTimeoutMs: DEFAULT_INDIVIDUAL_REQUEST_TIMEOUT_MS,
};

export interface P2PReqRespConfig {
/**
* The overall timeout for a request response operation.
*/
overallRequestTimeoutMs: number;

/**
* The timeout for an individual request response peer interaction.
*/
individualRequestTimeoutMs: number;
}

export const p2pReqRespConfigMappings: Record<keyof P2PReqRespConfig, ConfigMapping> = {
overallRequestTimeoutMs: {
env: 'P2P_REQRESP_OVERALL_REQUEST_TIMEOUT_MS',
description: 'The overall timeout for a request response operation.',
...numberConfigHelper(DEFAULT_OVERALL_REQUEST_TIMEOUT_MS),
},
individualRequestTimeoutMs: {
env: 'P2P_REQRESP_INDIVIDUAL_REQUEST_TIMEOUT_MS',
description: 'The timeout for an individual request response peer interaction.',
...numberConfigHelper(DEFAULT_INDIVIDUAL_REQUEST_TIMEOUT_MS),
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { type P2PClient } from '../../client/p2p_client.js';
import { type BootnodeConfig, type P2PConfig } from '../../config.js';
import { type TxPool } from '../../tx_pool/index.js';
import { createLibP2PPeerId } from '../index.js';
import { DEFAULT_P2P_REQRESP_CONFIG } from './config.js';

/**
* Mockify helper for testing purposes.
Expand Down Expand Up @@ -92,6 +93,7 @@ describe('Req Resp p2p client integration', () => {
queryForIp: false,
dataDirectory: undefined,
l1Contracts: { rollupAddress: EthAddress.ZERO },
...DEFAULT_P2P_REQRESP_CONFIG,
};

txPool = {
Expand Down
59 changes: 58 additions & 1 deletion yarn-project/p2p/src/service/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { TxHash, mockTx } from '@aztec/circuit-types';
import { sleep } from '@aztec/foundation/sleep';

import { describe, expect, it } from '@jest/globals';
import { describe, expect, it, jest } from '@jest/globals';

import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js';
import { MOCK_SUB_PROTOCOL_HANDLERS, connectToPeers, createNodes, startNodes, stopNodes } from '../../mocks/index.js';
import { PING_PROTOCOL, TX_REQ_PROTOCOL } from './interface.js';

Expand Down Expand Up @@ -120,5 +121,61 @@ describe('ReqResp', () => {

await stopNodes(nodes);
});

it('Should hit individual timeout if nothing is returned over the stream', async () => {
const nodes = await createNodes(2);

await startNodes(nodes);

jest.spyOn((nodes[1].req as any).subProtocolHandlers, TX_REQ_PROTOCOL).mockImplementation(() => {
return new Promise(() => {});
});

// Spy on the logger to make sure the error message is logged
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error');

await sleep(500);
await connectToPeers(nodes);
await sleep(500);

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, Buffer.from('tx'));
expect(res).toBeUndefined();

// Make sure the error message is logged
const errorMessage = `${
new IndiviualReqRespTimeoutError().message
} | peerId: ${nodes[1].p2p.peerId.toString()} | subProtocol: ${TX_REQ_PROTOCOL}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

await stopNodes(nodes);
});

it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => {
const nodes = await createNodes(4);

await startNodes(nodes);

for (let i = 1; i < nodes.length; i++) {
jest.spyOn((nodes[i].req as any).subProtocolHandlers, TX_REQ_PROTOCOL).mockImplementation(() => {
return new Promise(() => {});
});
}

// Spy on the logger to make sure the error message is logged
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error');

await sleep(500);
await connectToPeers(nodes);
await sleep(500);

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, Buffer.from('tx'));
expect(res).toBeUndefined();

// Make sure the error message is logged
const errorMessage = `${new CollectiveReqRespTimeoutError().message} | subProtocol: ${TX_REQ_PROTOCOL}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

await stopNodes(nodes);
});
});
});
Loading

0 comments on commit 190c27f

Please sign in to comment.