Skip to content

Commit

Permalink
feat(rabbitmq): cleanup for error handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
WonderPanda committed Apr 16, 2020
1 parent 85b1b67 commit ddd4707
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 122 deletions.
18 changes: 0 additions & 18 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,4 @@ describe('Rabbit Subscribe', () => {
done();
}, 50);
});

// it('should receive undefined argument when subscriber allows non-json messages and message is empty', async (done) => {

// setTimeout(() => {
// expect(testHandler).toHaveBeenCalledTimes(1);
// expect(testHandler).toHaveBeenCalledWith(undefined);
// done();
// }, 50);
// });

// it('should receive undefined argument when subscriber allows non-json messages and message is unparseable by JSON', async (done) => {

// setTimeout(() => {
// expect(testHandler).toHaveBeenCalledTimes(1);
// expect(testHandler).toHaveBeenCalledWith(undefined);
// done();
// }, 50);
// });
});
1 change: 1 addition & 0 deletions integration/rabbitmq/src/rpc/reply.error.callback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ export function ReplyErrorCallback(
error = Buffer.from(JSON.stringify({ status: 'error', message: error }));

channel.publish('', replyTo, error, { correlationId });
channel.ack(msg);
}
}
6 changes: 3 additions & 3 deletions integration/rabbitmq/src/rpc/rpc.service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {
RabbitRPC,
MessageHandlerErrorBehavior,
RabbitRPC,
} from '@golevelup/nestjs-rabbitmq';
import { Injectable, UseInterceptors } from '@nestjs/common';
import { TransformInterceptor } from '../transform.interceptor';
import { RpcException } from '@nestjs/microservices';
import { TransformInterceptor } from '../transform.interceptor';
import { ReplyErrorCallback } from './reply.error.callback';

@Injectable()
Expand Down Expand Up @@ -37,7 +37,7 @@ export class RpcService {
exchange: 'exchange1',
queue: 'error-reply-rpc',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorCallbacks: [ReplyErrorCallback],
errorHandler: ReplyErrorCallback,
})
errorReplyRpc(message: object) {
throw new RpcException(message);
Expand Down
85 changes: 40 additions & 45 deletions packages/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,28 @@
<img alt="license" src="https://img.shields.io/npm/l/@golevelup/nestjs-rabbitmq.svg">
</p>

Table of Contents
=================

* [Description](#description)
* [Motivation](#motivation)
* [Connection Management](#connection-management)
* [Usage](#usage)
* [Install](#install)
* [Module Initialization](#module-initialization)
* [Receiving Messages](#receiving-messages)
* [Exposing RPC Handlers](#exposing-rpc-handlers)
* [Exposing Pub/Sub Handlers](#exposing-pubsub-handlers)
* [Message Handling](#message-handling)
* [Conditional Handler Registration](#conditional-handler-registration)
* [Sending Messages](#sending-messages)
* [Inject the AmqpConnection](#inject-the-amqpconnection)
* [Publising Messages (Fire and Forget)](#publising-messages-fire-and-forget)
* [Requesting Data from an RPC](#requesting-data-from-an-rpc)
* [Type Inference](#type-inference)
* [Interop with other RPC Servers](#interop-with-other-rpc-servers)
* [Advanced Patterns](#advanced-patterns)
* [Competing Consumers](#competing-consumers)
* [TODO](#todo)
# Table of Contents

- [Description](#description)
- [Motivation](#motivation)
- [Connection Management](#connection-management)
- [Usage](#usage)
- [Install](#install)
- [Module Initialization](#module-initialization)
- [Receiving Messages](#receiving-messages)
- [Exposing RPC Handlers](#exposing-rpc-handlers)
- [Exposing Pub/Sub Handlers](#exposing-pubsub-handlers)
- [Message Handling](#message-handling)
- [Conditional Handler Registration](#conditional-handler-registration)
- [Sending Messages](#sending-messages)
- [Inject the AmqpConnection](#inject-the-amqpconnection)
- [Publising Messages (Fire and Forget)](#publising-messages-fire-and-forget)
- [Requesting Data from an RPC](#requesting-data-from-an-rpc)
- [Type Inference](#type-inference)
- [Interop with other RPC Servers](#interop-with-other-rpc-servers)
- [Advanced Patterns](#advanced-patterns)
- [Competing Consumers](#competing-consumers)
- [TODO](#todo)

## Description

Expand Down Expand Up @@ -61,13 +60,13 @@ import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
exchanges: [
{
name: 'exchange1',
type: 'topic'
}
type: 'topic',
},
],
uri: 'amqp://rabbitmq:rabbitmq@localhost:5672',
connectionInitOptions: { wait: false }
})
]
connectionInitOptions: { wait: false },
}),
],
})
export class RabbitExampleModule {}
```
Expand Down Expand Up @@ -104,15 +103,15 @@ import { MessagingService } from './messaging/messaging.service';
exchanges: [
{
name: 'exchange1',
type: 'topic'
}
type: 'topic',
},
],
uri: 'amqp://rabbitmq:rabbitmq@localhost:5672'
uri: 'amqp://rabbitmq:rabbitmq@localhost:5672',
}),
RabbitExampleModule
RabbitExampleModule,
],
providers: [MessagingService],
controllers: [MessagingController]
controllers: [MessagingController],
})
export class RabbitExampleModule {}
```
Expand All @@ -132,11 +131,11 @@ export class MessagingService {
@RabbitRPC({
exchange: 'exchange1',
routingKey: 'rpc-route',
queue: 'rpc-queue'
queue: 'rpc-queue',
})
public async rpcHandler(msg: {}) {
return {
response: 42
response: 42,
};
}
}
Expand All @@ -155,7 +154,7 @@ export class MessagingService {
@RabbitSubscribe({
exchange: 'exchange1',
routingKey: 'subscribe-route',
queue: 'subscribe-queue'
queue: 'subscribe-queue',
})
public async pubSubHandler(msg: {}) {
console.log(`Received message: ${JSON.stringify(msg)}`);
Expand Down Expand Up @@ -194,6 +193,7 @@ export class MessagingService {
}
}
```

### Conditional Handler Registration

In some scenarios, it may not be desirable for all running instances of a NestJS application to register RabbitMQ message handlers. For example, if leveraging the same application code base to expose API instances and worker roles separately it may be desirable to have only the worker instances attach handlers to manage queue subscriptions or RPC requests.
Expand Down Expand Up @@ -245,9 +245,9 @@ const response = await amqpConnection.request<ExpectedReturnType>({
exchange: 'exchange1',
routingKey: 'rpc',
payload: {
request: 'val'
request: 'val',
},
timeout = 10000 // optional timeout for how long the request
timeout = 10000, // optional timeout for how long the request
// should wait before failing if no response is received
});
```
Expand Down Expand Up @@ -281,23 +281,18 @@ export class MessagingService {
@RabbitSubscribe({
exchange: 'exchange1',
routingKey: 'subscribe-route1',
queue: 'subscribe-queue'
queue: 'subscribe-queue',
})
public async competingPubSubHandler(msg: {}) {
console.log(`Received message: ${JSON.stringify(msg)}`);
}

@RabbitSubscribe({
exchange: 'exchange1',
routingKey: 'subscribe-route2'
routingKey: 'subscribe-route2',
})
public async messagePerInstanceHandler(msg: {}) {
console.log(`Received message: ${JSON.stringify(msg)}`);
}
}
```

## TODO

- Possible validation pipeline using class-validator and class-transformer to ensure messages are well formatted
- Integrate hooks for things like logging, metrics, or custom error handling
63 changes: 20 additions & 43 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ import {
import * as uuid from 'uuid';
import {
ConnectionInitOptions,
MessageHandlerErrorBehavior,
MessageHandlerOptions,
RabbitMQConfig,
RequestOptions,
} from '../rabbitmq.interfaces';
import {
getHandlerForLegacyBehavior,
MessageHandlerErrorBehavior,
} from './errorBehaviors';
import { Nack, RpcResponse, SubscribeResponse } from './handlerResponses';

const DIRECT_REPLY_QUEUE = 'amq.rabbitmq.reply-to';
Expand Down Expand Up @@ -274,11 +277,14 @@ export class AmqpConnection {
if (msg == null) {
return;
} else {
const errorBehavior =
msgOptions.errorBehavior ||
this.config.defaultSubscribeErrorBehavior;

await this.handleError(channel, msgOptions, errorBehavior, msg, e);
const errorHandler =
msgOptions.errorHandler ||
getHandlerForLegacyBehavior(
msgOptions.errorBehavior ||
this.config.defaultSubscribeErrorBehavior
);

await errorHandler(channel, msg, e);
}
}
});
Expand Down Expand Up @@ -343,10 +349,14 @@ export class AmqpConnection {
if (msg == null) {
return;
} else {
const errorBehavior =
rpcOptions.errorBehavior || this.config.defaultRpcErrorBehavior;

await this.handleError(channel, rpcOptions, errorBehavior, msg, e);
const errorHandler =
rpcOptions.errorHandler ||
getHandlerForLegacyBehavior(
rpcOptions.errorBehavior ||
this.config.defaultSubscribeErrorBehavior
);

await errorHandler(channel, msg, e);
}
}
});
Expand Down Expand Up @@ -401,37 +411,4 @@ export class AmqpConnection {

return handler(message, msg);
}

private async handleError(
channel: amqplib.Channel,
msgOptions: MessageHandlerOptions,
errorBehavior: MessageHandlerErrorBehavior,
msg: amqplib.Message,
error: any
) {
if (msg == null) {
return;
} else {
try {
if (msgOptions.errorCallbacks) {
for (const callback of msgOptions.errorCallbacks) {
await callback(channel, msg, error);
}
}
} finally {
switch (errorBehavior) {
case MessageHandlerErrorBehavior.ACK: {
channel.ack(msg);
break;
}
case MessageHandlerErrorBehavior.REQUEUE: {
channel.nack(msg, false, true);
break;
}
default:
channel.nack(msg, false, false);
}
}
}
}
}
55 changes: 55 additions & 0 deletions packages/rabbitmq/src/amqp/errorBehaviors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import * as amqplib from 'amqplib';

export enum MessageHandlerErrorBehavior {
ACK,
NACK,
REQUEUE,
}

export type MessageErrorHandler = (
channel: amqplib.Channel,
msg: amqplib.ConsumeMessage,
error: any
) => Promise<void> | void;

/**
* An error handler that will ack the message which caused an error during processing
*/
export const ackErrorHandler: MessageErrorHandler = (channel, msg, error) => {
channel.ack(msg);
};

/**
* An error handler that will nack and requeue a message which created an error during processing
*/
export const requeueErrorHandler: MessageErrorHandler = (
channel,
msg,
error
) => {
channel.nack(msg, false, true);
};

/**
* An error handler that will nack a message which created an error during processing
*/
export const defaultNackErrorHandler: MessageErrorHandler = (
channel,
msg,
error
) => {
channel.nack(msg, false, false);
};

export const getHandlerForLegacyBehavior = (
behavior: MessageHandlerErrorBehavior
) => {
switch (behavior) {
case MessageHandlerErrorBehavior.ACK:
return ackErrorHandler;
case MessageHandlerErrorBehavior.REQUEUE:
return requeueErrorHandler;
default:
return defaultNackErrorHandler;
}
};
1 change: 1 addition & 0 deletions packages/rabbitmq/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './amqp/connection';
export * from './amqp/errorBehaviors';
export * from './amqp/handlerResponses';
export * from './rabbitmq.constants';
export * from './rabbitmq.decorators';
Expand Down
Loading

0 comments on commit ddd4707

Please sign in to comment.