Skip to content

Commit

Permalink
Feat/659 unbound queues (#662)
Browse files Browse the repository at this point in the history
* feat(rabbitmq): enable module definition to include queues to be created without handler

fix #659

* fix(rabbitmq): fix check assert for queues

fix 659

* chore(rabbitmq): add test for queue assertion

* fix: add default value for queues array

* chore: add myself to contributors

* feat(rabbitmq): bind handlerless queues to exchange
  • Loading branch information
fermentfan committed Nov 17, 2023
1 parent c62fa4a commit c3baf7d
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 1 deletion.
9 changes: 9 additions & 0 deletions .all-contributorsrc
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,15 @@
"contributions": [
"code"
]
},
{
"login": "fermentfan",
"name": "Dennis von der Bey",
"avatar_url": "https://avatars.githubusercontent.com/u/19498613?v=4",
"profile": "https://ferment.fan",
"contributions": [
"code"
]
}
],
"contributorsPerLine": 7
Expand Down
67 changes: 67 additions & 0 deletions integration/rabbitmq/e2e/configuration.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const amqplibUri = `${uri}?heartbeat=5`;
const logger = new ConsoleLogger('Custom logger');

const nonExistingExchange = 'non-existing-exchange';
const nonExistingQueue = 'non-existing-queue';

class RabbitConfig {
createModuleConfig(): RabbitMQConfig {
Expand Down Expand Up @@ -160,6 +161,72 @@ describe('Module Configuration', () => {
// Clear non-existing exchange
await amqpConnection.channel.deleteExchange(nonExistingExchange);
});

it("should throw an error if queue doesn't exist and `createQueueIfNotExists` is false", async () => {
try {
app = await Test.createTestingModule({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
queues: [
{
name: nonExistingQueue,
createExchangeIfNotExists: false,
},
],
uri,
connectionInitOptions: {
wait: true,
reject: true,
timeout: 3000,
},
logger,
}),
],
}).compile();

fail(
`Queue "${nonExistingQueue}" should not exist before running this test`,
);
} catch (error) {
expect(error).toBeDefined();
}
});

it('should create a queue successfully if `createQueueIfNotExists` is true', async () => {
const spy = jest.spyOn(amqplib, 'connect');

app = await Test.createTestingModule({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
queues: [
{
name: nonExistingQueue,
createExchangeIfNotExists: true,
},
],
uri,
connectionInitOptions: {
wait: true,
reject: true,
timeout: 3000,
},
logger,
}),
],
}).compile();

const amqpConnection = app.get<AmqpConnection>(AmqpConnection);
expect(app).toBeDefined();

expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith(amqplibUri, undefined);

await app.init();
expect(
await amqpConnection.channel.checkQueue(nonExistingQueue),
).toBeDefined();
await app.close();
});
});
});

Expand Down
43 changes: 42 additions & 1 deletion packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
} from 'rxjs';
import { catchError, filter, first, map, take, timeout } from 'rxjs/operators';
import { randomUUID } from 'crypto';
import { defaultAssertQueueErrorHandler } from '..';
import { defaultAssertQueueErrorHandler, RabbitMQQueueConfig } from '..';
import {
ConnectionInitOptions,
MessageHandlerOptions,
Expand Down Expand Up @@ -91,6 +91,7 @@ const defaultConfig = {
),
defaultSubscribeErrorBehavior: MessageHandlerErrorBehavior.REQUEUE,
exchanges: [],
queues: [],
defaultRpcTimeout: 10000,
connectionInitOptions: {
wait: true,
Expand Down Expand Up @@ -280,6 +281,8 @@ export class AmqpConnection {
return channel.checkExchange(x.name);
});

this.setupQueuesWithBindings(channel, this.config.queues);

if (this.config.enableDirectReplyTo) {
await this.initDirectReplyQueue(channel);
}
Expand All @@ -288,6 +291,44 @@ export class AmqpConnection {
}
}

private async setupQueuesWithBindings(
channel: ConfirmChannel,
queues: RabbitMQQueueConfig[]
) {
for (const configuredQueue of queues) {
const { createQueueIfNotExists = true } = configuredQueue;

if (createQueueIfNotExists) {
this.setupQueue(configuredQueue, channel);
}
await channel.assertQueue(configuredQueue.name, configuredQueue.options);

let routingKeys: string[] = [];
if (Array.isArray(configuredQueue.routingKey)) {
routingKeys = configuredQueue.routingKey;
} else {
if (configuredQueue.routingKey != null) {
routingKeys = [configuredQueue.routingKey];
}
}

if (routingKeys.length > 0) {
await Promise.all(
routingKeys.map((routingKey) => {
if (configuredQueue.exchange != undefined) {
channel.bindQueue(
configuredQueue.name,
configuredQueue.exchange,
routingKey,
configuredQueue.bindQueueArguments
);
}
})
);
}
}
}

private async initDirectReplyQueue(channel: ConfirmChannel) {
// Set up a consumer on the Direct Reply-To queue to facilitate RPC functionality
await channel.consume(
Expand Down
10 changes: 10 additions & 0 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ export interface RabbitMQExchangeConfig {
options?: Options.AssertExchange;
}

export interface RabbitMQQueueConfig {
name: string;
createQueueIfNotExists?: boolean;
options?: Options.AssertQueue;
exchange?: string;
routingKey?: string | string[];
bindQueueArguments?: any;
}

export type ConsumeOptions = Options.Consume;

export interface MessageOptions {
Expand Down Expand Up @@ -98,6 +107,7 @@ export interface RabbitMQConfig {
*/
prefetchCount?: number;
exchanges?: RabbitMQExchangeConfig[];
queues?: RabbitMQQueueConfig[];
defaultRpcTimeout?: number;
defaultExchangeType?: string;
defaultRpcErrorHandler?: MessageErrorHandler;
Expand Down

0 comments on commit c3baf7d

Please sign in to comment.