From 4b54afbb3ca6bfaf5b060e6f0ec49edf20c66238 Mon Sep 17 00:00:00 2001 From: ttshivers Date: Tue, 16 Jan 2024 11:11:01 -0600 Subject: [PATCH] fix(rabbitmq): fix unawaited promises (#674) * fix(rabbitmq): fix unawaited promises I noticed some floating promises in the code when trying to debug an issue due to ordering. Ensuring all the promises are awaited where expected should help prevent possible issues like ensureing the assertExchange is done before trying to bind a queue to it. * fix: add test for race condition Add test for race condition --- .../rabbitmq/e2e/configuration.e2e-spec.ts | 89 ++++++++++++++++++ packages/rabbitmq/src/amqp/connection.ts | 94 ++++++++++--------- 2 files changed, 139 insertions(+), 44 deletions(-) diff --git a/integration/rabbitmq/e2e/configuration.e2e-spec.ts b/integration/rabbitmq/e2e/configuration.e2e-spec.ts index b78b85ad8..d81ad0f43 100644 --- a/integration/rabbitmq/e2e/configuration.e2e-spec.ts +++ b/integration/rabbitmq/e2e/configuration.e2e-spec.ts @@ -18,6 +18,8 @@ const logger = new ConsoleLogger('Custom logger'); const nonExistingExchange = 'non-existing-exchange'; const nonExistingQueue = 'non-existing-queue'; +const testRoutingKey = 'test'; + class RabbitConfig { createModuleConfig(): RabbitMQConfig { return { @@ -492,6 +494,93 @@ describe('Module Configuration', () => { // Clear non-existing exchange await amqpConnection.channel.deleteExchange(nonExistingExchange); }); + + it('should create an exchange successfully if `createExchangeIfNotExists` is true before binding queues', async () => { + const originalConnect = amqplib.connect; + + // Spy on the internals of amqplib to be able to introduce delays in some functions + // to expose race conditions or unawaited promises + const connectSpy = jest + .spyOn(amqplib, 'connect') + .mockImplementation((...args) => { + const result = originalConnect(...args); + result.then((conn) => { + const originalCreateConfirmChannel = conn.createConfirmChannel; + jest + .spyOn(conn, 'createConfirmChannel') + .mockImplementation(function (...args) { + const result = originalCreateConfirmChannel.apply(this, args); + result.then((channel) => { + const bindQueueSpy = jest.spyOn(channel, 'bindQueue'); + + const originalAssertExchange = channel.assertExchange; + jest + .spyOn(channel, 'assertExchange') + .mockImplementation(function (...args) { + // Delay for a long time to ensure queues are bound after exchanges are asserted + return new Promise((r) => setTimeout(r, 500)).then( + () => { + const result = originalAssertExchange.apply( + this, + args, + ); + result.then(() => { + expect(bindQueueSpy).not.toBeCalled(); + }); + return result; + }, + ) as any; + }); + }); + return result; + }); + }); + return result; + }); + + app = await Test.createTestingModule({ + imports: [ + RabbitMQModule.forRootAsync(RabbitMQModule, { + useFactory: async () => { + return { + exchanges: [ + { + name: nonExistingExchange, + type: 'topic', + createExchangeIfNotExists: true, + }, + ], + queues: [ + { + name: nonExistingQueue, + exchange: nonExistingExchange, + routingKey: testRoutingKey, + }, + ], + uri, + connectionInitOptions: { + wait: true, + reject: true, + timeout: 10000, + }, + }; + }, + }), + ], + }).compile(); + + const amqpConnection = app.get(AmqpConnection); + expect(app).toBeDefined(); + + expect(connectSpy).toHaveBeenCalledTimes(1); + expect(connectSpy).toHaveBeenCalledWith(amqplibUri, undefined); + + await app.init(); + expect( + await amqpConnection.channel.checkExchange(nonExistingExchange), + ).toBeDefined(); + await app.close(); + }); }); }); }); diff --git a/packages/rabbitmq/src/amqp/connection.ts b/packages/rabbitmq/src/amqp/connection.ts index 9247d9e6f..9c31569cc 100644 --- a/packages/rabbitmq/src/amqp/connection.ts +++ b/packages/rabbitmq/src/amqp/connection.ts @@ -236,21 +236,23 @@ export class AmqpConnection { }; await Promise.all([ - Object.keys(this.config.channels).map(async (channelName) => { - const config = this.config.channels[channelName]; - // Only takes the first channel specified as default so other ones get created. - if (defaultChannel.name === AmqpConnection.name && config.default) { - defaultChannel.name = channelName; - defaultChannel.config.prefetchCount = - config.prefetchCount || this.config.prefetchCount; - return; - } + Promise.all( + Object.keys(this.config.channels).map(async (channelName) => { + const config = this.config.channels[channelName]; + // Only takes the first channel specified as default so other ones get created. + if (defaultChannel.name === AmqpConnection.name && config.default) { + defaultChannel.name = channelName; + defaultChannel.config.prefetchCount = + config.prefetchCount || this.config.prefetchCount; + return; + } - return this.setupManagedChannel(channelName, { - ...config, - default: false, - }); - }), + return this.setupManagedChannel(channelName, { + ...config, + default: false, + }); + }) + ), this.setupManagedChannel(defaultChannel.name, defaultChannel.config), ]); } @@ -268,20 +270,22 @@ export class AmqpConnection { this._channel = channel; // Always assert exchanges & rpc queue in default channel. - this.config.exchanges.forEach((x) => { - const { createExchangeIfNotExists = true } = x; - - if (createExchangeIfNotExists) { - return channel.assertExchange( - x.name, - x.type || this.config.defaultExchangeType, - x.options - ); - } - return channel.checkExchange(x.name); - }); + await Promise.all( + this.config.exchanges.map((x) => { + const { createExchangeIfNotExists = true } = x; + + if (createExchangeIfNotExists) { + return channel.assertExchange( + x.name, + x.type || this.config.defaultExchangeType, + x.options + ); + } + return channel.checkExchange(x.name); + }) + ); - this.setupQueuesWithBindings(channel, this.config.queues); + await this.setupQueuesWithBindings(channel, this.config.queues); if (this.config.enableDirectReplyTo) { await this.initDirectReplyQueue(channel); @@ -295,22 +299,24 @@ export class AmqpConnection { channel: ConfirmChannel, queues: RabbitMQQueueConfig[] ) { - for (const configuredQueue of queues) { - const { name, options, bindQueueArguments, ...rest } = configuredQueue; - const queueOptions = { - ...options, - ...(bindQueueArguments !== undefined && { bindQueueArguments }), - }; - - await this.setupQueue( - { - ...rest, - ...(name !== undefined && { queue: name }), - queueOptions, - }, - channel - ); - } + await Promise.all( + queues.map(async (configuredQueue) => { + const { name, options, bindQueueArguments, ...rest } = configuredQueue; + const queueOptions = { + ...options, + ...(bindQueueArguments !== undefined && { bindQueueArguments }), + }; + + await this.setupQueue( + { + ...rest, + ...(name !== undefined && { queue: name }), + queueOptions, + }, + channel + ); + }) + ); } private async initDirectReplyQueue(channel: ConfirmChannel) { @@ -687,7 +693,7 @@ export class AmqpConnection { await Promise.all( routingKeys.map((routingKey) => { if (routingKey != null) { - channel.bindQueue( + return channel.bindQueue( actualQueue as string, exchange, routingKey,