Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: better reqresp logging + handle empty responses in snappy #10657

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion yarn-project/p2p/src/errors/reqresp.error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* This error will be thrown when a request to a specific peer times out.
* @category Errors
*/
export class IndiviualReqRespTimeoutError extends Error {
export class IndividualReqRespTimeoutError extends Error {
constructor() {
super(`Request to peer timed out`);
}
Expand All @@ -19,3 +19,17 @@ export class CollectiveReqRespTimeoutError extends Error {
super(`Request to all peers timed out`);
}
}

/** Invalid response error
*
* This error will be thrown when a response is received that is not valid.
*
* This error does not need to be punished as message validators will handle punishing invalid
* requests
* @category Errors
*/
export class InvalidResponseError extends Error {
constructor() {
super(`Invalid response received`);
}
}
24 changes: 21 additions & 3 deletions yarn-project/p2p/src/service/encoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,31 @@ export function getMsgIdFn(message: Message) {
return sha256(Buffer.concat(vec)).subarray(0, 20);
}

/**
* Snappy transform for libp2p gossipsub
*/
export class SnappyTransform implements DataTransform {
// Topic string included to satisfy DataTransform interface
inboundTransform(_topicStr: string, data: Uint8Array): Uint8Array {
const uncompressed = Buffer.from(uncompressSync(Buffer.from(data), { asBuffer: true }));
return new Uint8Array(uncompressed);
return this.inboundTransformNoTopic(Buffer.from(data));
}

public inboundTransformNoTopic(data: Buffer): Buffer {
if (data.length === 0) {
return data;
}
return Buffer.from(uncompressSync(data, { asBuffer: true }));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it neiter the asBuffer: true nor the Uint8Array wrapper are needed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same for the Uint8Array in output)


// Topic string included to satisfy DataTransform interface
outboundTransform(_topicStr: string, data: Uint8Array): Uint8Array {
return new Uint8Array(compressSync(Buffer.from(data)));
return this.outboundTransformNoTopic(Buffer.from(data));
}

public outboundTransformNoTopic(data: Buffer): Buffer {
if (data.length === 0) {
return data;
}
return Buffer.from(compressSync(data));
}
}
59 changes: 52 additions & 7 deletions yarn-project/p2p/src/service/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { sleep } from '@aztec/foundation/sleep';
import { describe, expect, it, jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';

import { CollectiveReqRespTimeoutError } from '../../errors/reqresp.error.js';
import { CollectiveReqRespTimeoutError, IndividualReqRespTimeoutError } from '../../errors/reqresp.error.js';
import {
MOCK_SUB_PROTOCOL_HANDLERS,
MOCK_SUB_PROTOCOL_VALIDATORS,
Expand Down Expand Up @@ -86,9 +86,27 @@ describe('ReqResp', () => {
void nodes[1].req.stop();
void nodes[2].req.stop();

const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug');

// send from the first node
const res = await nodes[0].req.sendRequest(PING_PROTOCOL, PING_REQUEST);

// We expect the logger to have been called twice with the peer ids citing the inability to connect
expect(loggerSpy).toHaveBeenCalledWith(
expect.stringContaining(`Connection reset: ${nodes[1].p2p.peerId.toString()}`),
{
peerId: nodes[1].p2p.peerId.toString(),
subProtocol: PING_PROTOCOL,
},
);
expect(loggerSpy).toHaveBeenCalledWith(
expect.stringContaining(`Connection reset: ${nodes[2].p2p.peerId.toString()}`),
{
peerId: nodes[2].p2p.peerId.toString(),
subProtocol: PING_PROTOCOL,
},
);

expect(res?.toBuffer().toString('utf-8')).toEqual('pong');
});

Expand All @@ -111,7 +129,7 @@ describe('ReqResp', () => {

// Make sure the error message is logged
const errorMessage = `Rate limit exceeded for ${PING_PROTOCOL} from ${nodes[0].p2p.peerId.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);
expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining(errorMessage));
});

describe('Tx req protocol', () => {
Expand Down Expand Up @@ -139,6 +157,29 @@ describe('ReqResp', () => {
expect(res).toEqual(tx);
});

it('Handle returning empty buffers', async () => {
const tx = mockTx();
const txHash = tx.getTxHash();

const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise<Buffer> => {
return Promise.resolve(Buffer.alloc(0));
};

nodes = await createNodes(peerManager, 2);

const spySendRequestToPeer = jest.spyOn(nodes[0].req, 'sendRequestToPeer');

await startNodes(nodes, protocolHandlers);
await sleep(500);
await connectToPeers(nodes);
await sleep(500);

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash);
expect(spySendRequestToPeer).toHaveBeenCalledTimes(1);
expect(res).toEqual(undefined);
});

it('Does not crash if tx hash returns undefined', async () => {
const tx = mockTx();
const txHash = tx.getTxHash();
Expand Down Expand Up @@ -170,7 +211,7 @@ describe('ReqResp', () => {
});

// Spy on the logger to make sure the error message is logged
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error');
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug');

await sleep(500);
await connectToPeers(nodes);
Expand All @@ -183,9 +224,13 @@ describe('ReqResp', () => {
// Make sure the error message is logged
const peerId = nodes[1].p2p.peerId.toString();
expect(loggerSpy).toHaveBeenCalledWith(
expect.stringMatching(/Error sending request to peer/i),
expect.any(Error),
{ peerId, subProtocol: '/aztec/req/tx/0.1.0' },
`Timeout error: ${
new IndividualReqRespTimeoutError().message
} | peerId: ${peerId} | subProtocol: ${TX_REQ_PROTOCOL}`,
expect.objectContaining({
peerId: peerId,
subProtocol: TX_REQ_PROTOCOL,
}),
);

// Expect the peer to be penalized for timing out
Expand All @@ -209,7 +254,7 @@ describe('ReqResp', () => {
}

// Spy on the logger to make sure the error message is logged
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error');
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug');

await sleep(500);
await connectToPeers(nodes);
Expand Down
98 changes: 82 additions & 16 deletions yarn-project/p2p/src/service/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import { executeTimeoutWithCustomError } from '@aztec/foundation/timer';
import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface';
import { pipe } from 'it-pipe';
import { type Libp2p } from 'libp2p';
import { compressSync, uncompressSync } from 'snappy';
import { type Uint8ArrayList } from 'uint8arraylist';

import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js';
import {
CollectiveReqRespTimeoutError,
IndividualReqRespTimeoutError,
InvalidResponseError,
} from '../../errors/reqresp.error.js';
import { SnappyTransform } from '../encoding.js';
import { type PeerManager } from '../peer_manager.js';
import { PeerErrorSeverity } from '../peer_scoring.js';
import { type P2PReqRespConfig } from './config.js';
Expand Down Expand Up @@ -49,13 +53,16 @@ export class ReqResp {

private rateLimiter: RequestResponseRateLimiter;

private snappyTransform: SnappyTransform;

constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p, private peerManager: PeerManager) {
this.logger = createLogger('p2p:reqresp');

this.overallRequestTimeoutMs = config.overallRequestTimeoutMs;
this.individualRequestTimeoutMs = config.individualRequestTimeoutMs;

this.rateLimiter = new RequestResponseRateLimiter(peerManager);
this.snappyTransform = new SnappyTransform();
}

/**
Expand Down Expand Up @@ -143,8 +150,7 @@ export class ReqResp {
// The response validator handles peer punishment within
const isValid = await responseValidator(request, object, peer);
if (!isValid) {
this.logger.error(`Invalid response for ${subProtocol} from ${peer.toString()}`);
return undefined;
throw new InvalidResponseError();
}
return object;
}
Expand All @@ -159,7 +165,7 @@ export class ReqResp {
() => new CollectiveReqRespTimeoutError(),
);
} catch (e: any) {
this.logger.error(`${e.message} | subProtocol: ${subProtocol}`);
this.logger.debug(`${e.message} | subProtocol: ${subProtocol}`);
return undefined;
}
}
Expand Down Expand Up @@ -200,18 +206,14 @@ export class ReqResp {

// Open the stream with a timeout
const result = await executeTimeoutWithCustomError<Buffer>(
(): Promise<Buffer> => pipe([payload], stream!, this.readMessage),
(): Promise<Buffer> => pipe([payload], stream!, this.readMessage.bind(this)),
this.individualRequestTimeoutMs,
() => new IndiviualReqRespTimeoutError(),
() => new IndividualReqRespTimeoutError(),
);

await stream.close();
this.logger.trace(`Stream closed with ${peerId.toString()} for ${subProtocol}`);

return result;
} catch (e: any) {
this.logger.error(`Error sending request to peer`, e, { peerId: peerId.toString(), subProtocol });
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
this.handleResponseError(e, peerId, subProtocol);
} finally {
if (stream) {
try {
Expand All @@ -224,7 +226,70 @@ export class ReqResp {
}
}
}
return undefined;
}

/**
* Handle a response error
*
* ReqResp errors are punished differently depending on the severity of the offense
*
* @param e - The error
* @param peerId - The peer id
* @param subProtocol - The sub protocol
* @returns If the error is non pubishable, then undefined is returned, otherwise the peer is penalized
*/
private handleResponseError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): void {
const severity = this.categorizeError(e, peerId, subProtocol);
if (severity) {
this.peerManager.penalizePeer(peerId, severity);
}
}

/**
* Categorize the error and log it.
*/
private categorizeError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): PeerErrorSeverity | undefined {
// Non pubishable errors
// We do not punish a collective timeout, as the node triggers this interupt, independent of the peer's behaviour
const logTags = {
peerId: peerId.toString(),
subProtocol,
};
if (e instanceof CollectiveReqRespTimeoutError || e instanceof InvalidResponseError) {
this.logger.debug(
`Non-punishable error: ${e.message} | peerId: ${peerId.toString()} | subProtocol: ${subProtocol}`,
logTags,
);
return undefined;
}

// Pubishable errors
// Connection reset errors in the networking stack are punished with high severity
// it just signals an unreliable peer
// We assume that the requesting node has a functioning networking stack.
if (e?.code === 'ECONNRESET' || e?.code === 'EPIPE') {
this.logger.debug(`Connection reset: ${peerId.toString()}`, logTags);
return PeerErrorSeverity.HighToleranceError;
}

if (e?.code === 'ECONNREFUSED') {
this.logger.debug(`Connection refused: ${peerId.toString()}`, logTags);
return PeerErrorSeverity.HighToleranceError;
}

// Timeout errors are punished with high tolerance, they can be due to a geogrpahically far away peer or an
// overloaded peer
if (e instanceof IndividualReqRespTimeoutError) {
this.logger.debug(
`Timeout error: ${e.message} | peerId: ${peerId.toString()} | subProtocol: ${subProtocol}`,
logTags,
);
return PeerErrorSeverity.HighToleranceError;
}

// Catch all error
this.logger.error(`Unexpected error sending request to peer`, e, logTags);
return PeerErrorSeverity.HighToleranceError;
}

/**
Expand All @@ -235,8 +300,8 @@ export class ReqResp {
for await (const chunk of source) {
chunks.push(chunk.subarray());
}
const messageData = chunks.concat();
return uncompressSync(Buffer.concat(messageData), { asBuffer: true }) as Buffer;
const messageData = Buffer.concat(chunks);
return this.snappyTransform.inboundTransformNoTopic(messageData);
}

/**
Expand Down Expand Up @@ -266,6 +331,7 @@ export class ReqResp {
}

const handler = this.subProtocolHandlers[protocol];
const transform = this.snappyTransform;

try {
await pipe(
Expand All @@ -274,7 +340,7 @@ export class ReqResp {
for await (const chunkList of source) {
const msg = Buffer.from(chunkList.subarray());
const response = await handler(msg);
yield new Uint8Array(compressSync(response));
yield new Uint8Array(transform.outboundTransformNoTopic(response));
}
},
stream,
Expand Down
Loading