Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle MQ connection error events #5123

Merged
merged 1 commit into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/agents/router/src/errors/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,21 @@ export class SequencerPostFailed extends ExecuteError {
super("Sequencer POST request failed", context, SequencerPostFailed.name, undefined, true);
}
}

export class MQConnectionClosed extends ExecuteError {
constructor(context: any = {}) {
super("MQ connection was closed", context, MQConnectionClosed.name, undefined, true);
}
}

export class MQConnectionUnreachable extends ExecuteError {
constructor(context: any = {}) {
super("MQ is unreachable ", context, MQConnectionUnreachable.name, undefined, true);
}
}

export class MQConnectionFailed extends ExecuteError {
constructor(context: any = {}) {
super("MQ connection had a failure", context, MQConnectionFailed.name, undefined, true);
}
}
16 changes: 16 additions & 0 deletions packages/agents/router/src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { SubgraphReader } from "@connext/nxtp-adapters-subgraph";
import { ChainData, createMethodContext, Logger, RequestContext } from "@connext/nxtp-utils";
import rabbit from "foo-foo-mq";

import { MQConnectionClosed, MQConnectionFailed, MQConnectionUnreachable } from "./errors";

export const XCALL_QUEUE = "xcalls";
export const MQ_EXCHANGE = "router";
export const XCALL_MESSAGE_TYPE = "xcall";
Expand Down Expand Up @@ -44,9 +46,23 @@ export const setupMq = async (
exchanges: [{ name: MQ_EXCHANGE, type: "direct" }],
bindings: [{ exchange: MQ_EXCHANGE, target: XCALL_QUEUE, keys: [XCALL_QUEUE] }],
});

await rabbit.on("closed", function () {
throw new MQConnectionClosed();
});

await rabbit.on("failed", function () {
throw new MQConnectionFailed();
});

await rabbit.on("unreachable", function () {
throw new MQConnectionUnreachable();
});

logger.info("Message queue setup is done!", requestContext, methodContext, {
uri,
});

return rabbit;
};

Expand Down