diff --git a/integration/rabbitmq/e2e/configuration.e2e-spec.ts b/integration/rabbitmq/e2e/configuration.e2e-spec.ts index 4407551c2..22cd29e6d 100644 --- a/integration/rabbitmq/e2e/configuration.e2e-spec.ts +++ b/integration/rabbitmq/e2e/configuration.e2e-spec.ts @@ -18,6 +18,8 @@ const logger = new ConsoleLogger('Custom logger'); const nonExistingExchange = 'non-existing-exchange'; const nonExistingQueue = 'non-existing-queue'; +const testRoutingKey = 'test'; + class RabbitConfig { createModuleConfig(): RabbitMQConfig { return { @@ -492,6 +494,93 @@ describe('Module Configuration', () => { // Clear non-existing exchange await amqpConnection.channel.deleteExchange(nonExistingExchange); }); + + it('should create an exchange successfully if `createExchangeIfNotExists` is true before binding queues', async () => { + const originalConnect = amqplib.connect; + + // Spy on the internals of amqplib to be able to introduce delays in some functions + // to expose race conditions or unawaited promises + const connectSpy = jest + .spyOn(amqplib, 'connect') + .mockImplementation((...args) => { + const result = originalConnect(...args); + result.then((conn) => { + const originalCreateConfirmChannel = conn.createConfirmChannel; + jest + .spyOn(conn, 'createConfirmChannel') + .mockImplementation(function (...args) { + const result = originalCreateConfirmChannel.apply(this, args); + result.then((channel) => { + const bindQueueSpy = jest.spyOn(channel, 'bindQueue'); + + const originalAssertExchange = channel.assertExchange; + jest + .spyOn(channel, 'assertExchange') + .mockImplementation(function (...args) { + // Delay for a long time to ensure queues are bound after exchanges are asserted + return new Promise((r) => setTimeout(r, 500)).then( + () => { + const result = originalAssertExchange.apply( + this, + args, + ); + result.then(() => { + expect(bindQueueSpy).not.toBeCalled(); + }); + return result; + }, + ) as any; + }); + }); + return result; + }); + }); + return result; + }); + + app = await Test.createTestingModule({ + imports: [ + RabbitMQModule.forRootAsync(RabbitMQModule, { + useFactory: async () => { + return { + exchanges: [ + { + name: nonExistingExchange, + type: 'topic', + createExchangeIfNotExists: true, + }, + ], + queues: [ + { + name: nonExistingQueue, + exchange: nonExistingExchange, + routingKey: testRoutingKey, + }, + ], + uri, + connectionInitOptions: { + wait: true, + reject: true, + timeout: 10000, + }, + }; + }, + }), + ], + }).compile(); + + const amqpConnection = app.get(AmqpConnection); + expect(app).toBeDefined(); + + expect(connectSpy).toHaveBeenCalledTimes(1); + expect(connectSpy).toHaveBeenCalledWith(amqplibUri, undefined); + + await app.init(); + expect( + await amqpConnection.channel.checkExchange(nonExistingExchange), + ).toBeDefined(); + await app.close(); + }); }); }); }); diff --git a/packages/rabbitmq/src/amqp/connection.ts b/packages/rabbitmq/src/amqp/connection.ts index 9247d9e6f..9c31569cc 100644 --- a/packages/rabbitmq/src/amqp/connection.ts +++ b/packages/rabbitmq/src/amqp/connection.ts @@ -236,21 +236,23 @@ export class AmqpConnection { }; await Promise.all([ - Object.keys(this.config.channels).map(async (channelName) => { - const config = this.config.channels[channelName]; - // Only takes the first channel specified as default so other ones get created. - if (defaultChannel.name === AmqpConnection.name && config.default) { - defaultChannel.name = channelName; - defaultChannel.config.prefetchCount = - config.prefetchCount || this.config.prefetchCount; - return; - } + Promise.all( + Object.keys(this.config.channels).map(async (channelName) => { + const config = this.config.channels[channelName]; + // Only takes the first channel specified as default so other ones get created. + if (defaultChannel.name === AmqpConnection.name && config.default) { + defaultChannel.name = channelName; + defaultChannel.config.prefetchCount = + config.prefetchCount || this.config.prefetchCount; + return; + } - return this.setupManagedChannel(channelName, { - ...config, - default: false, - }); - }), + return this.setupManagedChannel(channelName, { + ...config, + default: false, + }); + }) + ), this.setupManagedChannel(defaultChannel.name, defaultChannel.config), ]); } @@ -268,20 +270,22 @@ export class AmqpConnection { this._channel = channel; // Always assert exchanges & rpc queue in default channel. - this.config.exchanges.forEach((x) => { - const { createExchangeIfNotExists = true } = x; - - if (createExchangeIfNotExists) { - return channel.assertExchange( - x.name, - x.type || this.config.defaultExchangeType, - x.options - ); - } - return channel.checkExchange(x.name); - }); + await Promise.all( + this.config.exchanges.map((x) => { + const { createExchangeIfNotExists = true } = x; + + if (createExchangeIfNotExists) { + return channel.assertExchange( + x.name, + x.type || this.config.defaultExchangeType, + x.options + ); + } + return channel.checkExchange(x.name); + }) + ); - this.setupQueuesWithBindings(channel, this.config.queues); + await this.setupQueuesWithBindings(channel, this.config.queues); if (this.config.enableDirectReplyTo) { await this.initDirectReplyQueue(channel); @@ -295,22 +299,24 @@ export class AmqpConnection { channel: ConfirmChannel, queues: RabbitMQQueueConfig[] ) { - for (const configuredQueue of queues) { - const { name, options, bindQueueArguments, ...rest } = configuredQueue; - const queueOptions = { - ...options, - ...(bindQueueArguments !== undefined && { bindQueueArguments }), - }; - - await this.setupQueue( - { - ...rest, - ...(name !== undefined && { queue: name }), - queueOptions, - }, - channel - ); - } + await Promise.all( + queues.map(async (configuredQueue) => { + const { name, options, bindQueueArguments, ...rest } = configuredQueue; + const queueOptions = { + ...options, + ...(bindQueueArguments !== undefined && { bindQueueArguments }), + }; + + await this.setupQueue( + { + ...rest, + ...(name !== undefined && { queue: name }), + queueOptions, + }, + channel + ); + }) + ); } private async initDirectReplyQueue(channel: ConfirmChannel) { @@ -687,7 +693,7 @@ export class AmqpConnection { await Promise.all( routingKeys.map((routingKey) => { if (routingKey != null) { - channel.bindQueue( + return channel.bindQueue( actualQueue as string, exchange, routingKey,