Skip to content

Commit

Permalink
fix(#348): assertQueueErrorHandler initial impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Christophe BLIN authored and underfisk committed Jan 12, 2022
1 parent 9bc3711 commit 5efedaf
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 7 deletions.
48 changes: 46 additions & 2 deletions packages/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ If the method signature of the consumer accepts `amqplib.ConsumeMessage` as a se
```typescript
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Injectable } from '@nestjs/common';
import { ConsumeMessage } from "amqplib";
import { ConsumeMessage } from 'amqplib';

@Injectable()
export class MessagingService {
Expand Down Expand Up @@ -298,7 +298,7 @@ const response = await amqpConnection.request<ExpectedReturnType>({
payload: {
request: 'val',
},
timeout : 10000, // optional timeout for how long the request
timeout: 10000, // optional timeout for how long the request
// should wait before failing if no response is received
});
```
Expand Down Expand Up @@ -348,6 +348,50 @@ export class MessagingService {
}
```

### Handling errors

By default, the library tries to do its best to give you the control on errors if you want and to do something sensible by default.

This is done with the `errorHandler` property that is availble both in RPC and RabbitSubscribe.

```typescript
@RabbitSubscribe({
exchange: 'exchange1',
routingKey: 'subscribe-route1',
queue: 'subscribe-queue',
errorHandler: myErrorHandler
})
```

> it should be used with `rpcOptions` for RPC
The default is `defaultNackErrorHandler` and it just nack the message without requeue (which is usually ok to avoid the message coming back in the queue again and again)

However, you can do more fancy stuff like inspecting the message properties to decide to requeue or not. Be aware that you should not requeue indefinitely...

Please note that nack will trigger the dead-letter mecanism of RabbitMQ (and so, you can use the deadLetterExchange in the queueOptions in order to send the message somewhere else).

A complete error handling strategy for RabbitMQ is out of the scope of this library.

### Handling errors during queue creation

Similarly to message errors, the library provide an error handler for failures during a queue creation (more exactly, during the assertQueue operation which will create the queue if it does not exist).

```typescript
@RabbitSubscribe({
exchange: 'exchange1',
routingKey: 'subscribe-route1',
queue: 'subscribe-queue',
assertQueueErrorHandler: myErrorHandler
})
```

The default is `defaultAssertQueueErrorHandler` which just rethrows the RabbitMq error (because there is no "one size fits all" for this situation).

You have the option to use `forceDeleteAssertQueueErrorHandler` which will try to delete the queue and recreate it with the provided queueOptions (if any)

Obviously, you can also provide your own function and do whatever is best for you, in this case the function must return the name of the created queue.

## Contribute

Contributions welcome! Read the [contribution guidelines](../../CONTRIBUTING.md) first.
Expand Down
20 changes: 15 additions & 5 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
timeoutWith,
} from 'rxjs/operators';
import * as uuid from 'uuid';
import { defaultAssertQueueErrorHandler } from '..';
import {
ConnectionInitOptions,
MessageHandlerOptions,
Expand Down Expand Up @@ -416,16 +417,25 @@ export class AmqpConnection {
exchange,
routingKey,
createQueueIfNotExists = true,
assertQueueErrorHandler = defaultAssertQueueErrorHandler,
} = subscriptionOptions;

let actualQueue: string;

if (createQueueIfNotExists) {
const { queue } = await channel.assertQueue(
subscriptionOptions.queue || '',
subscriptionOptions.queueOptions || undefined
);
actualQueue = queue;
const queueName = subscriptionOptions.queue || '';
const queueOptions = subscriptionOptions.queueOptions || undefined;
try {
const { queue } = await channel.assertQueue(queueName, queueOptions);
actualQueue = queue;
} catch (error) {
actualQueue = await assertQueueErrorHandler(
channel,
queueName,
queueOptions,
error
);
}
} else {
const { queue } = await channel.checkQueue(
subscriptionOptions.queue || ''
Expand Down
38 changes: 38 additions & 0 deletions packages/rabbitmq/src/amqp/errorBehaviors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as amqplib from 'amqplib';
import { QueueOptions } from '../rabbitmq.interfaces';

export enum MessageHandlerErrorBehavior {
ACK = 'ACK',
Expand Down Expand Up @@ -53,3 +54,40 @@ export const getHandlerForLegacyBehavior = (
return defaultNackErrorHandler;
}
};

export type AssertQueueErrorHandler = (
channel: amqplib.Channel,
queueName: string,
queueOptions: QueueOptions | undefined,
error: any
) => Promise<string> | string;

/**
* Just rethrows the error
*/
export const defaultAssertQueueErrorHandler: AssertQueueErrorHandler = (
channel: amqplib.Channel,
queueName: string,
queueOptions: QueueOptions | undefined,
error: any
) => {
throw error;
};

/**
* Tries to delete the queue and to redeclare it with the provided options
*/
export const forceDeleteAssertQueueErrorHandler: AssertQueueErrorHandler = async (
channel: amqplib.Channel,
queueName: string,
queueOptions: QueueOptions | undefined,
error: any
) => {
if (error.code == 406) {
//406 == preconditions failed
await channel.deleteQueue(queueName);
const { queue } = await channel.assertQueue(queueName, queueOptions);
return queue;
}
throw error;
};
5 changes: 5 additions & 0 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as amqpConnectionManager from 'amqp-connection-manager';
import * as amqplib from 'amqplib';
import {
AssertQueueErrorHandler,
MessageErrorHandler,
MessageHandlerErrorBehavior,
} from './amqp/errorBehaviors';
Expand Down Expand Up @@ -52,6 +53,10 @@ export interface MessageHandlerOptions {
* A function that will be called if an error is thrown during processing of an incoming message
*/
errorHandler?: MessageErrorHandler;
/**
* A function that will be called if an error is thown during queue creation (i.e during channel.assertQueue)
*/
assertQueueErrorHandler: AssertQueueErrorHandler;
allowNonJsonMessages?: boolean;
createQueueIfNotExists?: boolean;
}
Expand Down

0 comments on commit 5efedaf

Please sign in to comment.