diff --git a/integration/rabbitmq/e2e/subscribe.e2e-spec.ts b/integration/rabbitmq/e2e/subscribe.e2e-spec.ts
index 90a525bbf..155529f83 100644
--- a/integration/rabbitmq/e2e/subscribe.e2e-spec.ts
+++ b/integration/rabbitmq/e2e/subscribe.e2e-spec.ts
@@ -93,22 +93,4 @@ describe('Rabbit Subscribe', () => {
done();
}, 50);
});
-
- // it('should receive undefined argument when subscriber allows non-json messages and message is empty', async (done) => {
-
- // setTimeout(() => {
- // expect(testHandler).toHaveBeenCalledTimes(1);
- // expect(testHandler).toHaveBeenCalledWith(undefined);
- // done();
- // }, 50);
- // });
-
- // it('should receive undefined argument when subscriber allows non-json messages and message is unparseable by JSON', async (done) => {
-
- // setTimeout(() => {
- // expect(testHandler).toHaveBeenCalledTimes(1);
- // expect(testHandler).toHaveBeenCalledWith(undefined);
- // done();
- // }, 50);
- // });
});
diff --git a/integration/rabbitmq/src/rpc/reply.error.callback.ts b/integration/rabbitmq/src/rpc/reply.error.callback.ts
index 409b0b5d3..baae6aaf9 100644
--- a/integration/rabbitmq/src/rpc/reply.error.callback.ts
+++ b/integration/rabbitmq/src/rpc/reply.error.callback.ts
@@ -16,5 +16,6 @@ export function ReplyErrorCallback(
error = Buffer.from(JSON.stringify({ status: 'error', message: error }));
channel.publish('', replyTo, error, { correlationId });
+ channel.ack(msg);
}
}
diff --git a/integration/rabbitmq/src/rpc/rpc.service.ts b/integration/rabbitmq/src/rpc/rpc.service.ts
index 46c4ec5ab..09afb80b1 100644
--- a/integration/rabbitmq/src/rpc/rpc.service.ts
+++ b/integration/rabbitmq/src/rpc/rpc.service.ts
@@ -1,10 +1,10 @@
import {
- RabbitRPC,
MessageHandlerErrorBehavior,
+ RabbitRPC,
} from '@golevelup/nestjs-rabbitmq';
import { Injectable, UseInterceptors } from '@nestjs/common';
-import { TransformInterceptor } from '../transform.interceptor';
import { RpcException } from '@nestjs/microservices';
+import { TransformInterceptor } from '../transform.interceptor';
import { ReplyErrorCallback } from './reply.error.callback';
@Injectable()
@@ -37,7 +37,7 @@ export class RpcService {
exchange: 'exchange1',
queue: 'error-reply-rpc',
errorBehavior: MessageHandlerErrorBehavior.ACK,
- errorCallbacks: [ReplyErrorCallback],
+ errorHandler: ReplyErrorCallback,
})
errorReplyRpc(message: object) {
throw new RpcException(message);
diff --git a/packages/rabbitmq/README.md b/packages/rabbitmq/README.md
index 6ac5d1aab..d1a3350b6 100644
--- a/packages/rabbitmq/README.md
+++ b/packages/rabbitmq/README.md
@@ -6,29 +6,28 @@
-Table of Contents
-=================
-
- * [Description](#description)
- * [Motivation](#motivation)
- * [Connection Management](#connection-management)
- * [Usage](#usage)
- * [Install](#install)
- * [Module Initialization](#module-initialization)
- * [Receiving Messages](#receiving-messages)
- * [Exposing RPC Handlers](#exposing-rpc-handlers)
- * [Exposing Pub/Sub Handlers](#exposing-pubsub-handlers)
- * [Message Handling](#message-handling)
- * [Conditional Handler Registration](#conditional-handler-registration)
- * [Sending Messages](#sending-messages)
- * [Inject the AmqpConnection](#inject-the-amqpconnection)
- * [Publising Messages (Fire and Forget)](#publising-messages-fire-and-forget)
- * [Requesting Data from an RPC](#requesting-data-from-an-rpc)
- * [Type Inference](#type-inference)
- * [Interop with other RPC Servers](#interop-with-other-rpc-servers)
- * [Advanced Patterns](#advanced-patterns)
- * [Competing Consumers](#competing-consumers)
- * [TODO](#todo)
+# Table of Contents
+
+- [Description](#description)
+- [Motivation](#motivation)
+- [Connection Management](#connection-management)
+- [Usage](#usage)
+ - [Install](#install)
+ - [Module Initialization](#module-initialization)
+- [Receiving Messages](#receiving-messages)
+ - [Exposing RPC Handlers](#exposing-rpc-handlers)
+ - [Exposing Pub/Sub Handlers](#exposing-pubsub-handlers)
+ - [Message Handling](#message-handling)
+ - [Conditional Handler Registration](#conditional-handler-registration)
+- [Sending Messages](#sending-messages)
+ - [Inject the AmqpConnection](#inject-the-amqpconnection)
+ - [Publising Messages (Fire and Forget)](#publising-messages-fire-and-forget)
+ - [Requesting Data from an RPC](#requesting-data-from-an-rpc)
+ - [Type Inference](#type-inference)
+ - [Interop with other RPC Servers](#interop-with-other-rpc-servers)
+- [Advanced Patterns](#advanced-patterns)
+ - [Competing Consumers](#competing-consumers)
+- [TODO](#todo)
## Description
@@ -61,13 +60,13 @@ import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
exchanges: [
{
name: 'exchange1',
- type: 'topic'
- }
+ type: 'topic',
+ },
],
uri: 'amqp://rabbitmq:rabbitmq@localhost:5672',
- connectionInitOptions: { wait: false }
- })
- ]
+ connectionInitOptions: { wait: false },
+ }),
+ ],
})
export class RabbitExampleModule {}
```
@@ -104,15 +103,15 @@ import { MessagingService } from './messaging/messaging.service';
exchanges: [
{
name: 'exchange1',
- type: 'topic'
- }
+ type: 'topic',
+ },
],
- uri: 'amqp://rabbitmq:rabbitmq@localhost:5672'
+ uri: 'amqp://rabbitmq:rabbitmq@localhost:5672',
}),
- RabbitExampleModule
+ RabbitExampleModule,
],
providers: [MessagingService],
- controllers: [MessagingController]
+ controllers: [MessagingController],
})
export class RabbitExampleModule {}
```
@@ -132,11 +131,11 @@ export class MessagingService {
@RabbitRPC({
exchange: 'exchange1',
routingKey: 'rpc-route',
- queue: 'rpc-queue'
+ queue: 'rpc-queue',
})
public async rpcHandler(msg: {}) {
return {
- response: 42
+ response: 42,
};
}
}
@@ -155,7 +154,7 @@ export class MessagingService {
@RabbitSubscribe({
exchange: 'exchange1',
routingKey: 'subscribe-route',
- queue: 'subscribe-queue'
+ queue: 'subscribe-queue',
})
public async pubSubHandler(msg: {}) {
console.log(`Received message: ${JSON.stringify(msg)}`);
@@ -194,6 +193,7 @@ export class MessagingService {
}
}
```
+
### Conditional Handler Registration
In some scenarios, it may not be desirable for all running instances of a NestJS application to register RabbitMQ message handlers. For example, if leveraging the same application code base to expose API instances and worker roles separately it may be desirable to have only the worker instances attach handlers to manage queue subscriptions or RPC requests.
@@ -245,9 +245,9 @@ const response = await amqpConnection.request({
exchange: 'exchange1',
routingKey: 'rpc',
payload: {
- request: 'val'
+ request: 'val',
},
- timeout = 10000 // optional timeout for how long the request
+ timeout = 10000, // optional timeout for how long the request
// should wait before failing if no response is received
});
```
@@ -281,7 +281,7 @@ export class MessagingService {
@RabbitSubscribe({
exchange: 'exchange1',
routingKey: 'subscribe-route1',
- queue: 'subscribe-queue'
+ queue: 'subscribe-queue',
})
public async competingPubSubHandler(msg: {}) {
console.log(`Received message: ${JSON.stringify(msg)}`);
@@ -289,15 +289,10 @@ export class MessagingService {
@RabbitSubscribe({
exchange: 'exchange1',
- routingKey: 'subscribe-route2'
+ routingKey: 'subscribe-route2',
})
public async messagePerInstanceHandler(msg: {}) {
console.log(`Received message: ${JSON.stringify(msg)}`);
}
}
```
-
-## TODO
-
-- Possible validation pipeline using class-validator and class-transformer to ensure messages are well formatted
-- Integrate hooks for things like logging, metrics, or custom error handling
diff --git a/packages/rabbitmq/src/amqp/connection.ts b/packages/rabbitmq/src/amqp/connection.ts
index b0aaf43fc..d972aea08 100644
--- a/packages/rabbitmq/src/amqp/connection.ts
+++ b/packages/rabbitmq/src/amqp/connection.ts
@@ -13,11 +13,14 @@ import {
import * as uuid from 'uuid';
import {
ConnectionInitOptions,
- MessageHandlerErrorBehavior,
MessageHandlerOptions,
RabbitMQConfig,
RequestOptions,
} from '../rabbitmq.interfaces';
+import {
+ getHandlerForLegacyBehavior,
+ MessageHandlerErrorBehavior,
+} from './errorBehaviors';
import { Nack, RpcResponse, SubscribeResponse } from './handlerResponses';
const DIRECT_REPLY_QUEUE = 'amq.rabbitmq.reply-to';
@@ -274,11 +277,14 @@ export class AmqpConnection {
if (msg == null) {
return;
} else {
- const errorBehavior =
- msgOptions.errorBehavior ||
- this.config.defaultSubscribeErrorBehavior;
-
- await this.handleError(channel, msgOptions, errorBehavior, msg, e);
+ const errorHandler =
+ msgOptions.errorHandler ||
+ getHandlerForLegacyBehavior(
+ msgOptions.errorBehavior ||
+ this.config.defaultSubscribeErrorBehavior
+ );
+
+ await errorHandler(channel, msg, e);
}
}
});
@@ -343,10 +349,14 @@ export class AmqpConnection {
if (msg == null) {
return;
} else {
- const errorBehavior =
- rpcOptions.errorBehavior || this.config.defaultRpcErrorBehavior;
-
- await this.handleError(channel, rpcOptions, errorBehavior, msg, e);
+ const errorHandler =
+ rpcOptions.errorHandler ||
+ getHandlerForLegacyBehavior(
+ rpcOptions.errorBehavior ||
+ this.config.defaultSubscribeErrorBehavior
+ );
+
+ await errorHandler(channel, msg, e);
}
}
});
@@ -401,37 +411,4 @@ export class AmqpConnection {
return handler(message, msg);
}
-
- private async handleError(
- channel: amqplib.Channel,
- msgOptions: MessageHandlerOptions,
- errorBehavior: MessageHandlerErrorBehavior,
- msg: amqplib.Message,
- error: any
- ) {
- if (msg == null) {
- return;
- } else {
- try {
- if (msgOptions.errorCallbacks) {
- for (const callback of msgOptions.errorCallbacks) {
- await callback(channel, msg, error);
- }
- }
- } finally {
- switch (errorBehavior) {
- case MessageHandlerErrorBehavior.ACK: {
- channel.ack(msg);
- break;
- }
- case MessageHandlerErrorBehavior.REQUEUE: {
- channel.nack(msg, false, true);
- break;
- }
- default:
- channel.nack(msg, false, false);
- }
- }
- }
- }
}
diff --git a/packages/rabbitmq/src/amqp/errorBehaviors.ts b/packages/rabbitmq/src/amqp/errorBehaviors.ts
new file mode 100644
index 000000000..11eec59d4
--- /dev/null
+++ b/packages/rabbitmq/src/amqp/errorBehaviors.ts
@@ -0,0 +1,55 @@
+import * as amqplib from 'amqplib';
+
+export enum MessageHandlerErrorBehavior {
+ ACK,
+ NACK,
+ REQUEUE,
+}
+
+export type MessageErrorHandler = (
+ channel: amqplib.Channel,
+ msg: amqplib.ConsumeMessage,
+ error: any
+) => Promise | void;
+
+/**
+ * An error handler that will ack the message which caused an error during processing
+ */
+export const ackErrorHandler: MessageErrorHandler = (channel, msg, error) => {
+ channel.ack(msg);
+};
+
+/**
+ * An error handler that will nack and requeue a message which created an error during processing
+ */
+export const requeueErrorHandler: MessageErrorHandler = (
+ channel,
+ msg,
+ error
+) => {
+ channel.nack(msg, false, true);
+};
+
+/**
+ * An error handler that will nack a message which created an error during processing
+ */
+export const defaultNackErrorHandler: MessageErrorHandler = (
+ channel,
+ msg,
+ error
+) => {
+ channel.nack(msg, false, false);
+};
+
+export const getHandlerForLegacyBehavior = (
+ behavior: MessageHandlerErrorBehavior
+) => {
+ switch (behavior) {
+ case MessageHandlerErrorBehavior.ACK:
+ return ackErrorHandler;
+ case MessageHandlerErrorBehavior.REQUEUE:
+ return requeueErrorHandler;
+ default:
+ return defaultNackErrorHandler;
+ }
+};
diff --git a/packages/rabbitmq/src/index.ts b/packages/rabbitmq/src/index.ts
index 6d8a779ac..1efa69e0b 100644
--- a/packages/rabbitmq/src/index.ts
+++ b/packages/rabbitmq/src/index.ts
@@ -1,4 +1,5 @@
export * from './amqp/connection';
+export * from './amqp/errorBehaviors';
export * from './amqp/handlerResponses';
export * from './rabbitmq.constants';
export * from './rabbitmq.decorators';
diff --git a/packages/rabbitmq/src/rabbitmq.interfaces.ts b/packages/rabbitmq/src/rabbitmq.interfaces.ts
index bcc89565b..b38f6143f 100644
--- a/packages/rabbitmq/src/rabbitmq.interfaces.ts
+++ b/packages/rabbitmq/src/rabbitmq.interfaces.ts
@@ -1,5 +1,9 @@
import * as amqpConnectionManager from 'amqp-connection-manager';
import * as amqplib from 'amqplib';
+import {
+ MessageErrorHandler,
+ MessageHandlerErrorBehavior,
+} from './amqp/errorBehaviors';
export interface RabbitMQExchangeConfig {
name: string;
@@ -33,28 +37,23 @@ export interface QueueOptions {
maxPriority?: number;
}
-export enum MessageHandlerErrorBehavior {
- ACK,
- NACK,
- REQUEUE,
-}
-
export interface MessageHandlerOptions {
exchange: string;
routingKey: string | string[];
queue?: string;
queueOptions?: QueueOptions;
+ /**
+ * @deprecated()
+ * Legacy error handling behaviors. This will be overridden if the errorHandler property is set
+ */
errorBehavior?: MessageHandlerErrorBehavior;
- errorCallbacks?: IMessageErrorCallback[];
+ /**
+ * A function that will be called if an error is thrown during processing of an incoming message
+ */
+ errorHandler?: MessageErrorHandler;
allowNonJsonMessages?: boolean;
}
-export type IMessageErrorCallback = (
- channel: amqplib.Channel,
- msg: amqplib.ConsumeMessage,
- error: any
-) => Promise | any;
-
export interface ConnectionInitOptions {
wait?: boolean;
timeout?: number;