Skip to content

Commit

Permalink
improve(relayer): Support keepalives over websockets (#1771)
Browse files Browse the repository at this point in the history
As newer chains are added, we sometimes see that websockets are
occasionally dropped; potentially as a result of low activity on the
chain meaning that the websocket sits idle. This commit adds a simple
periodic keepalive that will be initiated by each listener process
towards each of its configured providers.
  • Loading branch information
pxrl committed Sep 5, 2024
1 parent b55f874 commit 955d2da
Showing 1 changed file with 66 additions and 14 deletions.
80 changes: 66 additions & 14 deletions src/libexec/RelayerSpokePoolIndexer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import assert from "assert";
import minimist from "minimist";
import { setTimeout } from "node:timers/promises";
import { Contract, Event, EventFilter, providers as ethersProviders, utils as ethersUtils } from "ethers";
import { utils as sdkUtils } from "@across-protocol/sdk";
import * as utils from "../../scripts/utils";
Expand Down Expand Up @@ -36,7 +35,9 @@ type ScraperOpts = {

const { NODE_SUCCESS, NODE_APP_ERR } = utils;

const INDEXER_POLLING_PERIOD = 2000; // ms; time to sleep between checking for exit request via SIGHUP.
const INDEXER_POLLING_PERIOD = 2_000; // ms; time to sleep between checking for exit request via SIGHUP.
const WS_PING_INTERVAL = 20_000; // ms
const WS_PONG_TIMEOUT = WS_PING_INTERVAL / 2;

let logger: winston.Logger;
let chain: string;
Expand Down Expand Up @@ -199,7 +200,7 @@ async function listen(
});

do {
await setTimeout(INDEXER_POLLING_PERIOD);
await sdkUtils.delay(INDEXER_POLLING_PERIOD);
} while (!stop);
}

Expand Down Expand Up @@ -291,31 +292,82 @@ async function run(argv: string[]): Promise<void> {
const providers = getWSProviders(chainId, quorum);
let nProviders = providers.length;
assert(providers.length > 0, `Insufficient providers for ${chain} (required ${quorum} by quorum)`);

providers.forEach((provider) => {
provider._websocket.on("error", (err) => {
const _provider = getOriginFromURL(provider.connection.url);
const at = "RelayerSpokePoolIndexer::run";
let message = `Caught ${chain} provider error.`;
let log = logger.debug;
if (--nProviders < quorum) {
const { _websocket: ws } = provider;
const _provider = getOriginFromURL(provider.connection.url);
let interval: NodeJS.Timer | undefined;
let timeout: NodeJS.Timeout | undefined;

const closeProvider = () => {
if (interval) {
clearInterval(interval);
interval = undefined;
}

if (timeout) {
clearTimeout(timeout);
timeout = undefined;
}

if (!stop && --nProviders < quorum) {
stop = true;
log = logger.warn;
message += " Insufficient providers to continue.";
logger.warn({
at: "RelayerSpokePoolIndexer::run",
message: `Insufficient ${chain} providers to continue.`,
quorum,
nProviders,
});
}
log({ at, message, provider: _provider, quorum, nProviders, err });
};

// On connection, start an interval timer to periodically ping the remote end.
ws.on("open", () => {
interval = setInterval(() => {
ws.ping();
timeout = setTimeout(() => {
logger.warn({
at: "RelayerSpokePoolIndexer::run",
message: `Timed out on ${chain} provider.`,
provider: _provider,
});
ws.terminate();
}, WS_PONG_TIMEOUT);
}, WS_PING_INTERVAL);
});

provider._websocket.on("close", () => {
// Pong received; cancel the timeout.
ws.on("pong", () => {
if (timeout) {
clearTimeout(timeout);
timeout = undefined;
}
});

// Oops, something went wrong.
ws.on("error", (err) => {
const at = "RelayerSpokePoolIndexer::run";
const message = `Caught ${chain} provider error.`;
logger.debug({ at, message, provider: _provider, quorum, nProviders, err });
closeProvider();
});

// Websocket is gone.
ws.on("close", () => {
logger.debug({
at: "RelayerSpokePoolIndexer::run",
message: `${chain} provider connection closed.`,
provider: getOriginFromURL(provider.connection.url),
provider: _provider,
});
closeProvider();
});
});

logger.debug({ at: "RelayerSpokePoolIndexer::run", message: `Starting ${chain} listener.`, events, opts });
await listen(eventMgr, spokePool, events, providers, opts);

// Cleanup where possible.
providers.forEach((provider) => provider._websocket.terminate());
}

if (require.main === module) {
Expand Down

0 comments on commit 955d2da

Please sign in to comment.