Skip to content

Commit

Permalink
fix: hard exit on mq disruption (#4600)
Browse files Browse the repository at this point in the history
* fix: hard exit on mq disruption

* fix: other places to exit on channel error
  • Loading branch information
Rahul Sethuram authored Jun 28, 2023
1 parent 3253a0a commit 1193bfd
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
16 changes: 15 additions & 1 deletion packages/agents/lighthouse/src/tasks/prover/prover.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ChainData, createLoggingContext, Logger, RelayerType, sendHeartbeat } from "@connext/nxtp-utils";
import { ChainData, createLoggingContext, jsonifyError, Logger, RelayerType, sendHeartbeat } from "@connext/nxtp-utils";
import { getContractInterfaces, ChainReader } from "@connext/nxtp-txservice";
import { closeDatabase, getDatabase, getDatabaseAndPool } from "@connext/nxtp-adapters-database";
import { setupConnextRelayer, setupGelatoRelayer } from "@connext/nxtp-adapters-relayer";
Expand Down Expand Up @@ -74,6 +74,20 @@ export const makeProver = async (config: NxtpLighthouseConfig, chainData: Map<st
: context.config.database.url;
context.adapters.databaseWriter = await getDatabaseAndPool(databaseWriter, context.logger);
context.adapters.mqClient = await Broker.connect(config.messageQueue.connection.uri);
// hard exit on errors or close, this will force a restart from AWS
context.adapters.mqClient.on("error", (err: unknown) => {
context.logger.error("MQ connection error", requestContext, methodContext, undefined, {
error: jsonifyError(err as Error),
});
process.exit(1);
});

context.adapters.mqClient.on("close", (err: unknown) => {
context.logger.error("MQ connection closed", requestContext, methodContext, undefined, {
error: jsonifyError(err as Error),
});
process.exit(1);
});
context.adapters.cache = StoreManager.getInstance({
redis: { host: context.config.redis.host, port: context.config.redis.port, instance: undefined },
mock: !context.config.redis.host || !context.config.redis.port,
Expand Down
61 changes: 61 additions & 0 deletions packages/agents/sequencer/src/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ export const makePublisher = async (_configOverride?: SequencerConfig) => {
throw new Error("Sequencer publisher not configured");
}

// hard error on channel issues, will be restarted from higher level orchestrator
channel.on("error", (err: unknown) => {
context.logger.error("MQ channel error", requestContext, methodContext, undefined, {
error: jsonifyError(err as Error),
});
process.exit(1);
});

channel.on("close", (err: unknown) => {
context.logger.error("MQ channel closed", requestContext, methodContext, undefined, {
error: jsonifyError(err as Error),
});
process.exit(1);
});

context.logger.info("Sequencer boot complete!", requestContext, methodContext, {
port: {
pub: context.config.server.pub.port,
Expand Down Expand Up @@ -140,6 +155,21 @@ export const makeHTTPSubscriber = async () => {
throw new Error("Sequencer publisher not configured");
}

// hard error on channel issues, will be restarted from higher level orchestrator
channel.on("error", (err: unknown) => {
context.logger.error("MQ channel error", requestContext, methodContext, undefined, {
error: jsonifyError(err as Error),
});
process.exit(1);
});

channel.on("close", (err: unknown) => {
context.logger.error("MQ channel closed", requestContext, methodContext, undefined, {
error: jsonifyError(err as Error),
});
process.exit(1);
});

// Create health server, set up routes, and start listening.
await bindHealthServer(context.config.server.pub.host, context.config.server.pub.port);
} catch (error: any) {
Expand Down Expand Up @@ -212,6 +242,21 @@ export const makeSubscriber = async () => {
);
}

// hard error on channel issues, will be restarted from higher level orchestrator
channel.on("error", (err: unknown) => {
context.logger.error("MQ channel error", requestContext, methodContext, undefined, {
error: jsonifyError(err as Error),
});
process.exit(1);
});

channel.on("close", (err: unknown) => {
context.logger.error("MQ channel closed", requestContext, methodContext, undefined, {
error: jsonifyError(err as Error),
});
process.exit(1);
});

// Create health server, set up routes, and start listening.
await bindHealthServer(context.config.server.sub.host, context.config.server.sub.port);
} catch (error: any) {
Expand Down Expand Up @@ -466,6 +511,22 @@ export const setupMQ = async (requestContext: RequestContext): Promise<Broker.Co

logger.info("MQ setup in progress...", requestContext, methodContext, {});
const connection = await Broker.connect(config.messageQueue.connection.uri);

// hard exit on errors or close, this will force a restart from AWS
connection.on("error", (err: unknown) => {
logger.error("MQ connection error", requestContext, methodContext, undefined, {
error: jsonifyError(err as Error),
});
process.exit(1);
});

connection.on("close", (err: unknown) => {
logger.error("MQ connection closed", requestContext, methodContext, undefined, {
error: jsonifyError(err as Error),
});
process.exit(1);
});

logger.info("MQ setup is done!", requestContext, methodContext, {});
return connection;
};

0 comments on commit 1193bfd

Please sign in to comment.