Skip to content

Commit

Permalink
feat!: configurable transport service and remote enr (#169)
Browse files Browse the repository at this point in the history
* Transport interface, debug log enabler, enr in talkreq/talkresp

* Transport interface, debug log enabler, enr in talkreq/talkresp

* feat: update TALK* methods

BREAKING CHANGE: sendTalkReq and sendTalkResp now require an ENR rather
than a NodeId. The findENR method is now exposed to retrieve any locally
cached ENRs for this purpose.

Co-authored-by: Cayman <caymannava@gmail.com>
  • Loading branch information
acolytec3 and wemeetagain committed Apr 26, 2022
1 parent 88c1f7c commit 72aaa0b
Showing 1 changed file with 55 additions and 52 deletions.
107 changes: 55 additions & 52 deletions src/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { randomBytes } from "libp2p-crypto";
import { Multiaddr } from "multiaddr";
import PeerId from "peer-id";

import { UDPTransportService } from "../transport";
import { ITransportService, UDPTransportService } from "../transport";
import { MAX_PACKET_SIZE } from "../packet";
import { ConnectionDirection, RequestErrorType, SessionService } from "../session";
import { ENR, NodeId, MAX_RECORD_SIZE, createNodeId } from "../enr";
Expand Down Expand Up @@ -59,6 +59,7 @@ export interface IDiscv5CreateOptions {
multiaddr: Multiaddr;
config?: Partial<IDiscv5Config>;
metrics?: IDiscv5Metrics;
transport?: ITransportService;
}

/**
Expand Down Expand Up @@ -168,11 +169,15 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
* @param peerId the PeerId with the keypair that identifies the enr
* @param multiaddr The multiaddr which contains the the network interface and port to which the UDP server binds
*/
public static create({ enr, peerId, multiaddr, config = {}, metrics }: IDiscv5CreateOptions): Discv5 {
public static create({ enr, peerId, multiaddr, config = {}, metrics, transport }: IDiscv5CreateOptions): Discv5 {
const fullConfig = { ...defaultConfig, ...config };
const decodedEnr = typeof enr === "string" ? ENR.decodeTxt(enr) : enr;
const udpTransport = new UDPTransportService(multiaddr, decodedEnr.nodeId);
const sessionService = new SessionService(fullConfig, decodedEnr, createKeypairFromPeerId(peerId), udpTransport);
const sessionService = new SessionService(
fullConfig,
decodedEnr,
createKeypairFromPeerId(peerId),
transport ?? new UDPTransportService(multiaddr, decodedEnr.nodeId)
);
return new Discv5(fullConfig, sessionService, metrics);
}

Expand Down Expand Up @@ -313,6 +318,27 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
});
}

/**
* Returns an ENR if one is known for the given NodeId
*
* This includes ENRs from any ongoing lookups not yet in the kad table
*/
public findEnr(nodeId: NodeId): ENR | undefined {
// check if we know this node id in our routing table
const enr = this.kbuckets.getValue(nodeId);
if (enr) {
return enr;
}
// Check the untrusted addresses for ongoing lookups
for (const lookup of this.activeLookups.values()) {
const enr = lookup.untrustedEnrs[nodeId];
if (enr) {
return enr;
}
}
return undefined;
}

/**
* Broadcast TALKREQ message to all nodes in routing table and returns response
*/
Expand All @@ -338,20 +364,13 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
}
});
}

/**
* Send TALKREQ message to dstId and returns response
*/
public async sendTalkReq(dstId: string, payload: Buffer, protocol: string | Uint8Array): Promise<Buffer> {
public async sendTalkReq(remote: ENR | Multiaddr, payload: Buffer, protocol: string | Uint8Array): Promise<Buffer> {
return await new Promise((resolve, reject) => {
const enr = this.findEnr(dstId);
if (!enr) {
log("Talkreq requested an unknown ENR, node: %s", dstId);
return;
}

this.sendRpcRequest({
contact: createNodeContact(enr),
contact: createNodeContact(remote),
request: createTalkRequestMessage(payload, protocol),
callback: (err: RequestErrorType | null, res: Buffer | null): void => {
if (err !== null) {
Expand All @@ -367,25 +386,17 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
/**
* Send TALKRESP message to requesting node
*/
public async sendTalkResp(srcId: NodeId, requestId: RequestId, payload: Uint8Array): Promise<void> {
public async sendTalkResp(remote: ENR | Multiaddr, requestId: RequestId, payload: Uint8Array): Promise<void> {
const msg = createTalkResponseMessage(requestId, payload);
const enr = this.findEnr(srcId);
const addr = enr?.getLocationMultiaddr("udp");
if (enr && addr) {
log(`Sending TALKRESP message to node ${enr.id}`);
try {
this.sessionService.sendResponse({ nodeId: srcId, socketAddr: addr }, msg);
this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.TALKRESP] });
} catch (e) {
log("Failed to send a TALKRESP response. Error: %s", (e as Error).message);
}
} else {
if (!addr && enr) {
log(`No ip + udp port found for node ${srcId}`);
} else {
log(`Node ${srcId} not found`);
}
}
const nodeAddr = getNodeAddress(createNodeContact(remote));
this.sendRpcResponse(nodeAddr, msg);
}

/**
* Hack to get debug logs to work in browser
*/
public enableLogs(): void {
debug.enable("discv5*");
}

/**
Expand Down Expand Up @@ -451,6 +462,19 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
}
}

/**
* Sends generic RPC responses.
*/
private sendRpcResponse(nodeAddr: INodeAddress, response: ResponseMessage): void {
log("Sending %s to node: %o", MessageType[response.type], nodeAddr);
try {
this.sessionService.sendResponse(nodeAddr, response);
this.metrics?.sentMessageCount.inc({ type: MessageType[response.type] });
} catch (e) {
log("Error sending RPC to node: %o, :Error: %s", nodeAddr, (e as Error).message);
}
}

/**
* Update the conection status of a node in the routing table.
* This tracks whether or not we should be pinging peers.
Expand Down Expand Up @@ -545,27 +569,6 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
}
}

/**
* Returns an ENR if one is known for the given NodeId
*
* This includes ENRs from any ongoing lookups not yet in the kad table
*/
private findEnr(nodeId: NodeId): ENR | undefined {
// check if we know this node id in our routing table
const enr = this.kbuckets.getValue(nodeId);
if (enr) {
return enr;
}
// Check the untrusted addresses for ongoing lookups
for (const lookup of this.activeLookups.values()) {
const enr = lookup.untrustedEnrs[nodeId];
if (enr) {
return enr;
}
}
return undefined;
}

/**
* Processes discovered peers from a query
*/
Expand Down

0 comments on commit 72aaa0b

Please sign in to comment.