Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve(relayer): Support dynamic handover in production #1781

Merged
merged 13 commits into from
Sep 11, 2024
4 changes: 4 additions & 0 deletions src/clients/SpokePoolClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient {
}

protected async _update(eventsToQuery: string[]): Promise<clients.SpokePoolUpdate> {
if (this.pendingBlockNumber === this.deploymentBlock) {
return { success: false, reason: clients.UpdateFailureReason.AlreadyUpdated }; // @todo: Update reason
pxrl marked this conversation as resolved.
Show resolved Hide resolved
}

// If any events have been removed upstream, remove them first.
this.pendingEventsRemoved = this.pendingEventsRemoved.filter((event) => !this.removeEvent(event));

Expand Down
5 changes: 4 additions & 1 deletion src/relayer/Relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ export class Relayer {

/**
* @description Perform per-loop updates.
* @return True if all SpokePoolClients updated successfully, otherwise false.
*/
async update(): Promise<void> {
async update(): Promise<boolean> {
const {
acrossApiClient,
configStoreClient,
Expand Down Expand Up @@ -133,6 +134,8 @@ export class Relayer {
inventoryClient.update(this.inventoryChainIds),
tokenClient.update(),
]);

return Object.values(spokePoolClients).every((spokePoolClient) => spokePoolClient.isUpdated);
}

/**
Expand Down
72 changes: 57 additions & 15 deletions src/relayer/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
import { utils as sdkUtils } from "@across-protocol/sdk";
import { config, delay, disconnectRedisClients, getCurrentTime, getNetworkName, Signer, winston } from "../utils";
import {
config,
delay,
disconnectRedisClients,
getCurrentTime,
getNetworkName,
getRedisCache,
Signer,
winston,
} from "../utils";
import { Relayer } from "./Relayer";
import { RelayerConfig } from "./RelayerConfig";
import { constructRelayerClients } from "./RelayerClientHelper";
config();
let logger: winston.Logger;

const ACTIVE_RELAYER_EXPIRY = 120; // 2 minutes.
const { RUN_IDENTIFIER: runIdentifier, BOT_IDENTIFIER: botIdentifier } = process.env;
const randomNumber = () => Math.floor(Math.random() * 1_000_000);

export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): Promise<void> {
Expand All @@ -14,8 +25,9 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P

logger = _logger;
const config = new RelayerConfig(process.env);
const { externalIndexer, pollingDelay, sendingRelaysEnabled, sendingSlowRelaysEnabled } = config;

const loop = config.pollingDelay > 0;
const loop = pollingDelay > 0;
let stop = !loop;
process.on("SIGHUP", () => {
logger.debug({
Expand All @@ -25,46 +37,76 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P
stop = true;
});

const redis = await getRedisCache(logger);
let setActiveRelayer = false;

// Explicitly don't log ignoredAddresses because it can be huge and can overwhelm log transports.
const { ignoredAddresses: _ignoredConfig, ...loggedConfig } = config;
logger.debug({ at: "Relayer#run", message: "Relayer started 🏃‍♂️", loggedConfig, relayerRun });
const relayerClients = await constructRelayerClients(logger, config, baseSigner);
const relayer = new Relayer(await baseSigner.getAddress(), logger, relayerClients, config);
const simulate = !config.sendingRelaysEnabled;
const enableSlowFills = config.sendingSlowRelaysEnabled;
const simulate = !sendingRelaysEnabled;

const { spokePoolClients } = relayerClients;
let txnReceipts: { [chainId: number]: Promise<string[]> };
let run = 1;

try {
do {
for (let run = 1; !stop; ++run) {
if (loop) {
logger.debug({ at: "relayer#run", message: `Starting relayer execution loop ${run}.` });
}

const tLoopStart = performance.now();
const ready = await relayer.update();
const activeRelayer = await redis.get(botIdentifier);

await relayer.update();
txnReceipts = await relayer.checkForUnfilledDepositsAndFill(enableSlowFills, simulate);
await relayer.runMaintenance();
// If there is another active relayer, allow up to 10 update cycles for this instance to be ready,
// then proceed unconditionally to protect against any RPC outages blocking the relayer.
if (!ready && activeRelayer && run < 10) {
pxrl marked this conversation as resolved.
Show resolved Hide resolved
const runTime = Math.round((performance.now() - tLoopStart) / 1000);
const delta = pollingDelay - runTime;
logger.debug({ at: "Relayer#run", message: `Not ready to relay, waiting ${delta} seconds.` });
await delay(delta);
continue;
}

// Signal to any existing relayer that a handover is underway, or alternatively check for a handover initiated by
// another (newer) relayer instance. The active relayer can also be expired, in which case this relayer should
// reconfirm its status as the active relayer.
if (loop && botIdentifier && runIdentifier) {
if (activeRelayer !== runIdentifier) {
if (!setActiveRelayer && activeRelayer) {
pxrl marked this conversation as resolved.
Show resolved Hide resolved
await redis.set(botIdentifier, runIdentifier, ACTIVE_RELAYER_EXPIRY);
setActiveRelayer = true;
} else {
logger.debug({ at: "Relayer#run", message: `Handing over to ${botIdentifier} instance ${activeRelayer}.` });
stop = true;
}
}
}

if (!stop) {
txnReceipts = await relayer.checkForUnfilledDepositsAndFill(sendingSlowRelaysEnabled, simulate);
await relayer.runMaintenance();
}

if (loop) {
const runTime = Math.round((performance.now() - tLoopStart) / 1000);
logger.debug({
at: "Relayer#run",
message: `Completed relayer execution loop ${run++} in ${runTime} seconds.`,
message: `Completed relayer execution loop ${run} in ${runTime} seconds.`,
});

if (!stop && runTime < config.pollingDelay) {
const delta = config.pollingDelay - runTime;
if (!stop && runTime < pollingDelay) {
const delta = pollingDelay - runTime;
logger.debug({
at: "relayer#run",
message: `Waiting ${delta} s before next loop.`,
});
await delay(delta);
}
}
} while (!stop);
}

// Before exiting, wait for transaction submission to complete.
for (const [chainId, submission] of Object.entries(txnReceipts)) {
Expand All @@ -80,8 +122,8 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P
} finally {
await disconnectRedisClients(logger);

if (config.externalIndexer) {
Object.values(relayerClients.spokePoolClients).map((spokePoolClient) => spokePoolClient.stopWorker());
if (externalIndexer) {
Object.values(spokePoolClients).map((spokePoolClient) => spokePoolClient.stopWorker());
}
}

Expand Down