diff --git a/src/libexec/RelayerSpokePoolIndexer.ts b/src/libexec/RelayerSpokePoolIndexer.ts index 70f03bf35..2a1cfb630 100644 --- a/src/libexec/RelayerSpokePoolIndexer.ts +++ b/src/libexec/RelayerSpokePoolIndexer.ts @@ -1,6 +1,5 @@ import assert from "assert"; import minimist from "minimist"; -import { setTimeout } from "node:timers/promises"; import { Contract, Event, EventFilter, providers as ethersProviders, utils as ethersUtils } from "ethers"; import { utils as sdkUtils } from "@across-protocol/sdk"; import * as utils from "../../scripts/utils"; @@ -36,7 +35,9 @@ type ScraperOpts = { const { NODE_SUCCESS, NODE_APP_ERR } = utils; -const INDEXER_POLLING_PERIOD = 2000; // ms; time to sleep between checking for exit request via SIGHUP. +const INDEXER_POLLING_PERIOD = 2_000; // ms; time to sleep between checking for exit request via SIGHUP. +const WS_PING_INTERVAL = 20_000; // ms +const WS_PONG_TIMEOUT = WS_PING_INTERVAL / 2; let logger: winston.Logger; let chain: string; @@ -199,7 +200,7 @@ async function listen( }); do { - await setTimeout(INDEXER_POLLING_PERIOD); + await sdkUtils.delay(INDEXER_POLLING_PERIOD); } while (!stop); } @@ -291,31 +292,82 @@ async function run(argv: string[]): Promise { const providers = getWSProviders(chainId, quorum); let nProviders = providers.length; assert(providers.length > 0, `Insufficient providers for ${chain} (required ${quorum} by quorum)`); + providers.forEach((provider) => { - provider._websocket.on("error", (err) => { - const _provider = getOriginFromURL(provider.connection.url); - const at = "RelayerSpokePoolIndexer::run"; - let message = `Caught ${chain} provider error.`; - let log = logger.debug; - if (--nProviders < quorum) { + const { _websocket: ws } = provider; + const _provider = getOriginFromURL(provider.connection.url); + let interval: NodeJS.Timer | undefined; + let timeout: NodeJS.Timeout | undefined; + + const closeProvider = () => { + if (interval) { + clearInterval(interval); + interval = undefined; + } + + if (timeout) { + clearTimeout(timeout); + timeout = undefined; + } + + if (!stop && --nProviders < quorum) { stop = true; - log = logger.warn; - message += " Insufficient providers to continue."; + logger.warn({ + at: "RelayerSpokePoolIndexer::run", + message: `Insufficient ${chain} providers to continue.`, + quorum, + nProviders, + }); } - log({ at, message, provider: _provider, quorum, nProviders, err }); + }; + + // On connection, start an interval timer to periodically ping the remote end. + ws.on("open", () => { + interval = setInterval(() => { + ws.ping(); + timeout = setTimeout(() => { + logger.warn({ + at: "RelayerSpokePoolIndexer::run", + message: `Timed out on ${chain} provider.`, + provider: _provider, + }); + ws.terminate(); + }, WS_PONG_TIMEOUT); + }, WS_PING_INTERVAL); }); - provider._websocket.on("close", () => { + // Pong received; cancel the timeout. + ws.on("pong", () => { + if (timeout) { + clearTimeout(timeout); + timeout = undefined; + } + }); + + // Oops, something went wrong. + ws.on("error", (err) => { + const at = "RelayerSpokePoolIndexer::run"; + const message = `Caught ${chain} provider error.`; + logger.debug({ at, message, provider: _provider, quorum, nProviders, err }); + closeProvider(); + }); + + // Websocket is gone. + ws.on("close", () => { logger.debug({ at: "RelayerSpokePoolIndexer::run", message: `${chain} provider connection closed.`, - provider: getOriginFromURL(provider.connection.url), + provider: _provider, }); + closeProvider(); }); }); logger.debug({ at: "RelayerSpokePoolIndexer::run", message: `Starting ${chain} listener.`, events, opts }); await listen(eventMgr, spokePool, events, providers, opts); + + // Cleanup where possible. + providers.forEach((provider) => provider._websocket.terminate()); } if (require.main === module) {