Skip to content

Commit

Permalink
fix: improve error handling (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain authored Jan 24, 2024
1 parent 1b70558 commit 594166c
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 144 deletions.
1 change: 1 addition & 0 deletions .eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ module.exports = {
"prefer-const": "error",
"quotes": ["error", "double"],
"@chainsafe/node/file-extension-in-import": ["error", "always", {esm: true}],
"@typescript-eslint/no-floating-promises": "error",
},
"overrides": [
{
Expand Down
187 changes: 108 additions & 79 deletions packages/discv5/src/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {

import { BindAddrs, IPMode, ITransportService, UDPTransportService } from "../transport/index.js";
import { MAX_PACKET_SIZE } from "../packet/index.js";
import { ConnectionDirection, RequestErrorType, SessionService } from "../session/index.js";
import { ConnectionDirection, RequestErrorType, ResponseErrorType, SessionService } from "../session/index.js";
import { IKeypair, createKeypair } from "../keypair/index.js";
import {
EntryStatus,
Expand Down Expand Up @@ -48,15 +48,17 @@ import { toBuffer } from "../util/index.js";
import { IDiscv5Config, defaultConfig } from "../config/index.js";
import { createNodeContact, getNodeAddress, getNodeId, INodeAddress, NodeContact } from "../session/nodeInfo.js";
import {
BufferCallback,
CodeError,
ConnectionStatus,
ConnectionStatusType,
Discv5EventEmitter,
ENRInput,
IActiveRequest,
INodesResponse,
PongResponse,
ResponseType,
SignableENRInput,
toResponseType,
} from "./types.js";
import { RateLimiter, RateLimiterOpts } from "../rateLimit/index.js";
import {
Expand Down Expand Up @@ -388,16 +390,16 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
* Send FINDNODE message to remote and returns response
*/
public async sendFindNode(remote: ENR | Multiaddr, distances: number[]): Promise<ENR[]> {
const contact = createNodeContact(remote, this.ipMode);
const request = createFindNodeMessage(distances);

return await new Promise((resolve, reject) => {
this.sendRpcRequest({
contact: createNodeContact(remote),
request: createFindNodeMessage(distances),
callback: (err: RequestErrorType | null, res: ENR[] | null): void => {
if (err !== null) {
reject(err);
return;
}
resolve(res as ENR[]);
contact,
request,
callbackPromise: {
resolve: resolve as (val: ENR[]) => void,
reject,
},
});
});
Expand All @@ -407,16 +409,16 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
* Send TALKREQ message to dstId and returns response
*/
public async sendTalkReq(remote: ENR | Multiaddr, payload: Buffer, protocol: string | Uint8Array): Promise<Buffer> {
const contact = createNodeContact(remote, this.ipMode);
const request = createTalkRequestMessage(payload, protocol);

return await new Promise((resolve, reject) => {
this.sendRpcRequest({
contact: createNodeContact(remote),
request: createTalkRequestMessage(payload, protocol),
callback: (err: RequestErrorType | null, res: Buffer | null): void => {
if (err !== null) {
reject(err);
return;
}
resolve(res as Buffer);
contact,
request,
callbackPromise: {
resolve: resolve as (val: Buffer) => void,
reject,
},
});
});
Expand All @@ -441,16 +443,16 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
* Sends a PING request to a node and returns response
*/
public async sendPing(nodeAddr: ENR | Multiaddr): Promise<PongResponse> {
const contact = createNodeContact(nodeAddr, this.ipMode);
const request = createPingMessage(this.enr.seq);

return await new Promise((resolve, reject) => {
this.sendRpcRequest({
contact: createNodeContact(nodeAddr),
request: createPingMessage(this.enr.seq),
callback: (err: RequestErrorType | null, res: PongResponse | null): void => {
if (err !== null) {
reject(err);
return;
}
resolve(res as PongResponse);
contact,
request,
callbackPromise: {
resolve: resolve as (val: PongResponse) => void,
reject,
},
});
});
Expand All @@ -462,16 +464,16 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
private pingConnectedPeers(): void {
for (const entry of this.kbuckets.rawValues()) {
if (entry.status === EntryStatus.Connected) {
this.sendPing(entry.value);
this.sendPing(entry.value).catch((e) => log("Error pinging peer %o: %s", entry.value, (e as Error).message));
}
}
}

/**
* Request an external node's ENR
*/
private requestEnr(contact: NodeContact, callback?: (err: RequestErrorType | null, res: ENR[] | null) => void): void {
this.sendRpcRequest({ request: createFindNodeMessage([0]), contact, callback });
private requestEnr(contact: NodeContact): void {
this.sendRpcRequest({ request: createFindNodeMessage([0]), contact });
}

/**
Expand All @@ -486,7 +488,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
}

this.sendRpcRequest({
contact: createNodeContact(enr),
contact: createNodeContact(enr, this.ipMode),
request,
lookupId,
});
Expand All @@ -498,10 +500,13 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
*
* Returns true if the request was sent successfully
*/
private sendRpcRequest(activeRequest: IActiveRequest): void {
this.activeRequests.set(activeRequest.request.id, activeRequest);
private sendRpcRequest<T extends RequestMessage, U extends ResponseType>(activeRequest: IActiveRequest<T, U>): void {
this.activeRequests.set(
activeRequest.request.id,
activeRequest as unknown as IActiveRequest<RequestMessage, ResponseType>
);

const nodeAddr = getNodeAddress(activeRequest.contact, this.ipMode);
const nodeAddr = getNodeAddress(activeRequest.contact);
log("Sending %s to node: %o", MessageType[activeRequest.request.type], nodeAddr);
try {
this.sessionService.sendRequest(activeRequest.contact, activeRequest.request);
Expand Down Expand Up @@ -545,7 +550,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
setInterval(() => {
// If the node is in the routing table, keep pinging
if (this.kbuckets.getValue(nodeId)) {
this.sendPing(newStatus.enr);
this.sendPing(newStatus.enr).catch((e) =>
log("Error pinging peer %o: %s", newStatus.enr, (e as Error).message)
);
} else {
clearInterval(this.connectedPeers.get(nodeId) as NodeJS.Timeout);
this.connectedPeers.delete(nodeId);
Expand All @@ -566,7 +573,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
setInterval(() => {
// If the node is in the routing table, keep pinging
if (this.kbuckets.getValue(nodeId)) {
this.sendPing(newStatus.enr);
this.sendPing(newStatus.enr).catch((e) =>
log("Error pinging peer %o: %s", newStatus.enr, (e as Error).message)
);
} else {
clearInterval(this.connectedPeers.get(nodeId) as NodeJS.Timeout);
this.connectedPeers.delete(nodeId);
Expand Down Expand Up @@ -679,7 +688,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
// process kad updates

private onPendingEviction = (enr: ENR): void => {
this.sendPing(enr);
this.sendPing(enr).catch((e) => log("Error pinging peer %o: %s", enr, (e as Error).message));
};

private onAppliedEviction = (inserted: ENR, evicted?: ENR): void => {
Expand Down Expand Up @@ -723,18 +732,24 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
* Requests respond to the received socket address, rather than the IP of the known ENR.
*/
private handleRpcRequest = (nodeAddr: INodeAddress, request: RequestMessage): void => {
this.metrics?.rcvdMessageCount.inc({ type: MessageType[request.type] });
switch (request.type) {
case MessageType.PING:
return this.handlePing(nodeAddr, request as IPingMessage);
case MessageType.FINDNODE:
return this.handleFindNode(nodeAddr, request as IFindNodeMessage);
case MessageType.TALKREQ:
return this.handleTalkReq(nodeAddr, request as ITalkReqMessage);
default:
log("Received request which is unimplemented");
// TODO Implement all RPC methods
return;
const requestType = MessageType[request.type];
this.metrics?.rcvdMessageCount.inc({ type: requestType });

try {
switch (request.type) {
case MessageType.PING:
return this.handlePing(nodeAddr, request as IPingMessage);
case MessageType.FINDNODE:
return this.handleFindNode(nodeAddr, request as IFindNodeMessage);
case MessageType.TALKREQ:
return this.handleTalkReq(nodeAddr, request as ITalkReqMessage);
default:
log("Received request type which is unimplemented: %s", request.type);
// TODO Implement all RPC methods
return;
}
} catch (e) {
log("Error handling rpc request: node: %o, requestType: %s", nodeAddr, requestType);
}
};

Expand All @@ -743,7 +758,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
const entry = this.kbuckets.getWithPending(nodeAddr.nodeId);
if (entry) {
if (entry.value.seq < message.enrSeq) {
this.requestEnr(createNodeContact(entry.value));
this.requestEnr(createNodeContact(entry.value, this.ipMode));
}
}

Expand Down Expand Up @@ -829,7 +844,8 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
* Processes an RPC response from a peer.
*/
private handleRpcResponse = (nodeAddr: INodeAddress, response: ResponseMessage): void => {
this.metrics?.rcvdMessageCount.inc({ type: MessageType[response.type] });
const responseType = MessageType[response.type];
this.metrics?.rcvdMessageCount.inc({ type: responseType });

// verify we know of the rpc id

Expand All @@ -841,37 +857,46 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
this.activeRequests.delete(response.id);

// Check that the responder matches the expected request
const requestNodeAddr = getNodeAddress(activeRequest.contact, this.ipMode);
const requestNodeAddr = getNodeAddress(activeRequest.contact);
if (requestNodeAddr.nodeId !== nodeAddr.nodeId || !requestNodeAddr.socketAddr.equals(nodeAddr.socketAddr)) {
log(
"Received a response from an unexpected address. Expected %o, received %o, request id: %s",
requestNodeAddr,
nodeAddr,
response.id
);
activeRequest.callbackPromise?.reject(new CodeError(ResponseErrorType.WrongAddress));
return;
}

// Check that the response type matches the request
if (!requestMatchesResponse(activeRequest.request, response)) {
log("Node gave an incorrect response type. Ignoring response from: %o", nodeAddr);
activeRequest.callbackPromise?.reject(new CodeError(ResponseErrorType.WrongResponseType));
return;
}

switch (response.type) {
case MessageType.PONG:
return this.handlePong(nodeAddr, activeRequest, response as IPongMessage);
case MessageType.NODES:
return this.handleNodes(nodeAddr, activeRequest, response as INodesMessage);
case MessageType.TALKRESP:
return this.handleTalkResp(
nodeAddr,
activeRequest as IActiveRequest<ITalkReqMessage, BufferCallback>,
response as ITalkRespMessage
);
default:
// TODO Implement all RPC methods
return;
try {
switch (response.type) {
case MessageType.PONG:
this.handlePong(nodeAddr, activeRequest, response as IPongMessage);
break;
case MessageType.NODES:
if (!this.handleNodes(nodeAddr, activeRequest as IActiveRequest<IFindNodeMessage>, response as INodesMessage))
return;
break;
case MessageType.TALKRESP:
this.handleTalkResp(nodeAddr, activeRequest as IActiveRequest<ITalkReqMessage>, response as ITalkRespMessage);
break;
default:
// TODO Implement all RPC methods
return;
}

activeRequest.callbackPromise?.resolve(toResponseType(response));
} catch (e) {
log("Error handling rpc response: node: %o response: %s", nodeAddr, responseType);
activeRequest.callbackPromise?.reject(new CodeError(ResponseErrorType.InternalError, (e as Error).message));
}
};

Expand Down Expand Up @@ -910,8 +935,15 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
}
}

private handleNodes(nodeAddr: INodeAddress, activeRequest: IActiveRequest, message: INodesMessage): void {
const { request, lookupId } = activeRequest as { request: IFindNodeMessage; lookupId: number };
/**
* Return true if this nodes message is the final in the response
*/
private handleNodes(
nodeAddr: INodeAddress,
activeRequest: IActiveRequest<IFindNodeMessage>,
message: INodesMessage
): boolean {
const { request, lookupId } = activeRequest;
// Currently a maximum of 16 peers can be returned.
// Datagrams have a max size of 1280 and ENRs have a max size of 300 bytes.
// There should be no more than 5 responses to return 16 peers
Expand All @@ -934,9 +966,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
if (currentResponse.count < 5 && currentResponse.count < message.total) {
currentResponse.count += 1;
currentResponse.enrs.push(...message.enrs);
this.activeRequests.set(message.id, activeRequest);
this.activeRequests.set(message.id, activeRequest as IActiveRequest<RequestMessage>);
this.activeNodesResponses.set(message.id, currentResponse);
return;
return false;
}

// Have received all the Nodes responses we are willing to accept
Expand All @@ -952,18 +984,17 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
this.activeNodesResponses.delete(message.id);

this.discovered(nodeAddr.nodeId, message.enrs, lookupId);

return true;
}

private handleTalkResp = (
nodeAddr: INodeAddress,
activeRequest: IActiveRequest<ITalkReqMessage, BufferCallback>,
activeRequest: IActiveRequest<ITalkReqMessage>,
message: ITalkRespMessage
): void => {
log("Received TALKRESP message from Node: %o", nodeAddr);
this.emit("talkRespReceived", nodeAddr, this.findEnr(nodeAddr.nodeId) ?? null, message);
if (activeRequest.callback) {
activeRequest.callback(null, message.response);
}
};

/**
Expand All @@ -975,14 +1006,9 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
if (!req) {
return;
}
const { request, contact, lookupId, callback } = req;
const { request, contact, lookupId, callbackPromise } = req;
this.activeRequests.delete(request.id);

// If this is initiated by the user, return an error on the callback.
if (callback) {
callback(error, null);
}

const nodeId = getNodeId(contact);
// If a failed FindNodes Request, ensure we haven't partially received responses.
// If so, process the partially found nodes
Expand Down Expand Up @@ -1017,5 +1043,8 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {

// report the node as being disconnected
this.connectionUpdated(nodeId, { type: ConnectionStatusType.Disconnected });

// If this is initiated by the user, return the error on the callback.
callbackPromise?.reject(new CodeError(error));
};
}
Loading

0 comments on commit 594166c

Please sign in to comment.