Skip to content

Commit

Permalink
feat(rabbitmq): bind handlerless queues to exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
fermentfan committed Nov 7, 2023
1 parent 086bf69 commit e711bdc
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 9 deletions.
49 changes: 40 additions & 9 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
} from 'rxjs';
import { catchError, filter, first, map, take, timeout } from 'rxjs/operators';
import { randomUUID } from 'crypto';
import { defaultAssertQueueErrorHandler } from '..';
import { defaultAssertQueueErrorHandler, RabbitMQQueueConfig } from '..';
import {
ConnectionInitOptions,
MessageHandlerOptions,
Expand Down Expand Up @@ -281,14 +281,7 @@ export class AmqpConnection {
return channel.checkExchange(x.name);
});

this.config.queues.forEach((queue) => {
const { createQueueIfNotExists = true } = queue;

if (createQueueIfNotExists) {
return channel.assertQueue(queue.name, queue.options);
}
return channel.checkQueue(queue.name);
});
this.setupQueuesWithBindings(channel, this.config.queues);

if (this.config.enableDirectReplyTo) {
await this.initDirectReplyQueue(channel);
Expand All @@ -298,6 +291,44 @@ export class AmqpConnection {
}
}

private async setupQueuesWithBindings(
channel: ConfirmChannel,
queues: RabbitMQQueueConfig[]
) {
for (const configuredQueue of queues) {
const { createQueueIfNotExists = true } = configuredQueue;

if (createQueueIfNotExists) {
this.setupQueue(configuredQueue, channel);
}
await channel.assertQueue(configuredQueue.name, configuredQueue.options);

let routingKeys: string[] = [];
if (Array.isArray(configuredQueue.routingKey)) {
routingKeys = configuredQueue.routingKey;
} else {
if (configuredQueue.routingKey != null) {
routingKeys = [configuredQueue.routingKey];
}
}

if (routingKeys.length > 0) {
await Promise.all(
routingKeys.map((routingKey) => {
if (configuredQueue.exchange != undefined) {
channel.bindQueue(
configuredQueue.name,
configuredQueue.exchange,
routingKey,
configuredQueue.bindQueueArguments
);
}
})
);
}
}
}

private async initDirectReplyQueue(channel: ConfirmChannel) {
// Set up a consumer on the Direct Reply-To queue to facilitate RPC functionality
await channel.consume(
Expand Down
3 changes: 3 additions & 0 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ export interface RabbitMQQueueConfig {
name: string;
createQueueIfNotExists?: boolean;
options?: Options.AssertQueue;
exchange?: string;
routingKey?: string | string[];
bindQueueArguments?: any;
}

export type ConsumeOptions = Options.Consume;
Expand Down

0 comments on commit e711bdc

Please sign in to comment.