Skip to content

Commit

Permalink
Discv5 worker (#4988)
Browse files Browse the repository at this point in the history
* wip

* WIP

* Add nodejs metrics to discv5 worker

* Fix build error

* Add optional prefix to gc metrics

* Add to discv5 dashboard

* Add discv5 worker comments

* Patch some tests

* Update discv5 to 2.0.0

* Fix e2e test

* Bump timeouts

* fix typo
  • Loading branch information
wemeetagain authored Jan 20, 2023
1 parent 460ba59 commit c49ee3c
Show file tree
Hide file tree
Showing 23 changed files with 1,066 additions and 221 deletions.
820 changes: 722 additions & 98 deletions dashboards/lodestar_discv5.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"@chainsafe/as-chacha20poly1305": "^0.1.0",
"@chainsafe/as-sha256": "^0.3.1",
"@chainsafe/bls": "7.1.1",
"@chainsafe/discv5": "^1.4.0",
"@chainsafe/discv5": "^2.0.0",
"@chainsafe/libp2p-gossipsub": "^5.3.0",
"@chainsafe/libp2p-noise": "^10.2.0",
"@chainsafe/persistent-merkle-tree": "^0.4.2",
Expand Down
9 changes: 8 additions & 1 deletion packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,15 @@ export function getLodestarApi({
},

async discv5GetKadValues() {
const discv5 = network.discv5();
if (!discv5) {
return {
data: [],
};
}

return {
data: network.discv5?.kadValues().map((enr) => enr.encodeTxt()) ?? [],
data: (await discv5.kadValues()).map((enr) => enr.encodeTxt()) ?? [],
};
},

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/node/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export function getNodeApi(
): ServerApi<routes.node.Api> {
return {
async getNetworkIdentity() {
const enr = network.getEnr();
const enr = await network.getEnr();
const keypair = createKeypairFromPeerId(network.peerId);
const discoveryAddresses = [
enr?.getLocationMultiaddr("tcp")?.toString() ?? null,
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/metrics/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from "./metrics.js";
export * from "./server/index.js";
export * from "./interface.js";
export * from "./nodeJsMetrics.js";
export {RegistryMetricCreator} from "./utils/registryMetricCreator.js";
18 changes: 2 additions & 16 deletions packages/beacon-node/src/metrics/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {collectDefaultMetrics, Metric, Registry} from "prom-client";
import gcStats from "prometheus-gc-stats";
import {Metric, Registry} from "prom-client";
import {ILogger} from "@lodestar/utils";
import {BeaconStateAllForks, getCurrentSlot} from "@lodestar/state-transition";
import {IChainForkConfig} from "@lodestar/config";
Expand All @@ -8,6 +7,7 @@ import {createLodestarMetrics, ILodestarMetrics} from "./metrics/lodestar.js";
import {MetricsOptions} from "./options.js";
import {RegistryMetricCreator} from "./utils/registryMetricCreator.js";
import {createValidatorMonitor, IValidatorMonitor} from "./validatorMonitor.js";
import {collectNodeJSMetrics} from "./nodeJsMetrics.js";

export type IMetrics = IBeaconMetrics & ILodestarMetrics & IValidatorMonitor & {register: RegistryMetricCreator};

Expand Down Expand Up @@ -49,17 +49,3 @@ export function createMetrics(
register,
};
}

export function collectNodeJSMetrics(register: Registry): void {
collectDefaultMetrics({
register,
// eventLoopMonitoringPrecision with sampling rate in milliseconds
eventLoopMonitoringPrecision: 10,
});

// Collects GC metrics using a native binding module
// - nodejs_gc_runs_total: Counts the number of time GC is invoked
// - nodejs_gc_pause_seconds_total: Time spent in GC in seconds
// - nodejs_gc_reclaimed_bytes_total: The number of bytes GC has freed
gcStats(register)();
}
37 changes: 0 additions & 37 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,43 +163,6 @@ export function createLodestarMetrics(
}),
},

discv5: {
kadTableSize: register.gauge({
name: "lodestar_discv5_kad_table_size",
help: "Total size of the discv5 kad table",
}),
lookupCount: register.gauge({
name: "lodestar_discv5_lookup_count",
help: "Total count of discv5 lookups",
}),
activeSessionCount: register.gauge({
name: "lodestar_discv5_active_session_count",
help: "Count of the discv5 active sessions",
}),
connectedPeerCount: register.gauge({
name: "lodestar_discv5_connected_peer_count",
help: "Count of the discv5 connected peers",
}),
sentMessageCount: register.gauge<"type">({
name: "lodestar_discv5_sent_message_count",
help: "Count of the discv5 messages sent by message type",
labelNames: ["type"],
}),
rcvdMessageCount: register.gauge<"type">({
name: "lodestar_discv5_rcvd_message_count",
help: "Count of the discv5 messages received by message type",
labelNames: ["type"],
}),
rateLimitHitIP: register.gauge({
name: "lodestar_discv5_rate_limit_hit_ip",
help: "Total count of rate limit hits by IP",
}),
rateLimitHitTotal: register.gauge({
name: "lodestar_discv5_rate_limit_hit_total",
help: "Total count of rate limit hits by total requests",
}),
},

gossipPeer: {
scoreByThreshold: register.gauge<"threshold">({
name: "lodestar_gossip_peer_score_by_threshold_count",
Expand Down
17 changes: 17 additions & 0 deletions packages/beacon-node/src/metrics/nodeJsMetrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import {collectDefaultMetrics, Registry} from "prom-client";
import gcStats from "prometheus-gc-stats";

export function collectNodeJSMetrics(register: Registry, prefix?: string): void {
collectDefaultMetrics({
register,
prefix,
// eventLoopMonitoringPrecision with sampling rate in milliseconds
eventLoopMonitoringPrecision: 10,
});

// Collects GC metrics using a native binding module
// - nodejs_gc_runs_total: Counts the number of time GC is invoked
// - nodejs_gc_pause_seconds_total: Time spent in GC in seconds
// - nodejs_gc_reclaimed_bytes_total: The number of bytes GC has freed
gcStats(register, {prefix})();
}
21 changes: 15 additions & 6 deletions packages/beacon-node/src/metrics/server/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,24 @@ export type HttpMetricsServerOpts = {
export class HttpMetricsServer {
private readonly server: http.Server;
private readonly register: Registry;
private readonly getOtherMetrics: () => Promise<string>;
private readonly logger: ILogger;
private readonly activeSockets: HttpActiveSocketsTracker;

private readonly httpServerRegister: RegistryMetricCreator;
private readonly scrapeTimeMetric: HistogramExtra<"status">;

constructor(private readonly opts: HttpMetricsServerOpts, {register, logger}: {register: Registry; logger: ILogger}) {
constructor(
private readonly opts: HttpMetricsServerOpts,
{
register,
getOtherMetrics = async () => "",
logger,
}: {register: Registry; getOtherMetrics?: () => Promise<string>; logger: ILogger}
) {
this.logger = logger;
this.register = register;
this.getOtherMetrics = getOtherMetrics;
this.server = http.createServer(this.onRequest.bind(this));

// New registry to metric the metrics. Using the same registry would deadlock the .metrics promise
Expand Down Expand Up @@ -81,16 +90,16 @@ export class HttpMetricsServer {
private async onRequest(req: http.IncomingMessage, res: http.ServerResponse): Promise<void> {
if (req.method === "GET" && req.url && req.url.includes("/metrics")) {
const timer = this.scrapeTimeMetric.startTimer();
const metricsRes = await wrapError(this.register.metrics());
timer({status: metricsRes.err ? "error" : "success"});
const metricsRes = await Promise.all([wrapError(this.register.metrics()), this.getOtherMetrics()]);
timer({status: metricsRes[0].err ? "error" : "success"});

// Ensure we only writeHead once
if (metricsRes.err) {
res.writeHead(500, {"content-type": "text/plain"}).end(metricsRes.err.stack);
if (metricsRes[0].err) {
res.writeHead(500, {"content-type": "text/plain"}).end(metricsRes[0].err.stack);
} else {
// Get scrape time metrics
const httpServerMetrics = await this.httpServerRegister.metrics();
const metricsStr = `${metricsRes.result}\n\n${httpServerMetrics}`;
const metricsStr = `${metricsRes[0].result}\n\n${metricsRes[1]}\n\n${httpServerMetrics}`;
res.writeHead(200, {"content-type": this.register.contentType}).end(metricsStr);
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/metrics/utils/gauge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class GaugeExtra<T extends string> extends Gauge<T> implements IGauge {
/**
* @override Metric.collect
*/
private collect(): void {
collect(): void {
for (const collectFn of this.collectFns) {
collectFn(this);
}
Expand Down
121 changes: 121 additions & 0 deletions packages/beacon-node/src/network/discv5/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import EventEmitter from "events";
import {PeerId} from "@libp2p/interface-peer-id";
import StrictEventEmitter from "strict-event-emitter-types";
import {exportToProtobuf} from "@libp2p/peer-id-factory";
import {ENR, IDiscv5DiscoveryInputOptions} from "@chainsafe/discv5";
import {spawn, Thread, Worker} from "@chainsafe/threads";
import {ILogger} from "@lodestar/utils";
import {Discv5WorkerApi, Discv5WorkerData} from "./types.js";

export type Discv5Opts = {
peerId: PeerId;
discv5: Omit<IDiscv5DiscoveryInputOptions, "metrics" | "searchInterval" | "enabled">;
logger: ILogger;
metrics: boolean;
};

export interface IDiscv5Events {
discovered: (enr: ENR) => void;
}

type Discv5WorkerStatus =
| {status: "stopped"}
| {status: "started"; workerApi: Discv5WorkerApi; subscription: {unsubscribe(): void}};

/**
* Wrapper class abstracting the details of discv5 worker instantiation and message-passing
*/
export class Discv5Worker extends (EventEmitter as {new (): StrictEventEmitter<EventEmitter, IDiscv5Events>}) {
private logger: ILogger;
private status: Discv5WorkerStatus;

constructor(private opts: Discv5Opts) {
super();

this.logger = opts.logger;
this.status = {status: "stopped"};
}

async start(): Promise<void> {
if (this.status.status === "started") return;

const workerData: Discv5WorkerData = {
enrStr: (this.opts.discv5.enr as ENR).encodeTxt(),
peerIdProto: exportToProtobuf(this.opts.peerId),
bindAddr: this.opts.discv5.bindAddr,
config: this.opts.discv5,
bootEnrs: this.opts.discv5.bootEnrs as string[],
metrics: this.opts.metrics,
};
const worker = new Worker("./worker.js", {workerData} as ConstructorParameters<typeof Worker>[1]);

const workerApi = await spawn<Discv5WorkerApi>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
});

const subscription = workerApi.discoveredBuf().subscribe((enrStr) => this.onDiscoveredStr(enrStr));

this.status = {status: "started", workerApi, subscription};
}

async stop(): Promise<void> {
if (this.status.status === "stopped") return;

this.status.subscription.unsubscribe();
await this.status.workerApi.close();
await Thread.terminate((this.status.workerApi as unknown) as Thread);

this.status = {status: "stopped"};
}

onDiscoveredStr(enrStr: Uint8Array): void {
try {
this.emit("discovered", ENR.decode(Buffer.from(enrStr)));
} catch (e) {
// TODO there is a bug in enr encoding(?) that causes many enrs to be incorrectly encoded (and thus incorrectly decoded)
// this.logger.error("Unable to decode enr", {enr: Buffer.from(enrStr).toString("hex")}, e as Error);
}
}

async enr(): Promise<ENR> {
if (this.status.status === "started") {
return ENR.decode(Buffer.from(await this.status.workerApi.enrBuf()));
} else {
throw new Error("Cannot get enr before module is started");
}
}

async setEnrValue(key: string, value: Uint8Array): Promise<void> {
if (this.status.status === "started") {
await this.status.workerApi.setEnrValue(key, value);
} else {
throw new Error("Cannot setEnrValue before module is started");
}
}

async kadValues(): Promise<ENR[]> {
if (this.status.status === "started") {
return (await this.status.workerApi.kadValuesBuf()).map((enrBuf) => ENR.decode(Buffer.from(enrBuf)));
} else {
return [];
}
}

async findRandomNode(): Promise<ENR[]> {
if (this.status.status === "started") {
return (await this.status.workerApi.findRandomNodeBuf()).map((enrBuf) => ENR.decode(Buffer.from(enrBuf)));
} else {
return [];
}
}

async metrics(): Promise<string> {
if (this.status.status === "started") {
return await this.status.workerApi.metrics();
} else {
return "";
}
}
}
39 changes: 39 additions & 0 deletions packages/beacon-node/src/network/discv5/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import {Discv5} from "@chainsafe/discv5";
import {Observable} from "@chainsafe/threads/observable";

// TODO export IDiscv5Config so we don't need this convoluted type
type Discv5Config = Parameters<typeof Discv5["create"]>[0]["config"];

/** discv5 worker constructor data */
export interface Discv5WorkerData {
enrStr: string;
peerIdProto: Uint8Array;
bindAddr: string;
config: Discv5Config;
bootEnrs: string[];
metrics: boolean;
}

/**
* API exposed by the discv5 worker
*
* Note: ENRs are represented as bytes to facilitate message-passing
*/
export type Discv5WorkerApi = {
/** The current host ENR */
enrBuf(): Promise<Uint8Array>;
/** Set a key-value of the current host ENR */
setEnrValue(key: string, value: Uint8Array): Promise<void>;

/** Return the ENRs currently in the kad table */
kadValuesBuf(): Promise<Uint8Array[]>;
/** Begin a random search through the DHT, return discovered ENRs */
findRandomNodeBuf(): Promise<Uint8Array[]>;
/** Stream of discovered ENRs */
discoveredBuf(): Observable<Uint8Array>;

/** Prometheus metrics string */
metrics(): Promise<string>;
/** tear down discv5 resources */
close(): Promise<void>;
};
Loading

0 comments on commit c49ee3c

Please sign in to comment.