From dc90d5a4684c019761b6cf7790397beaf1d19f68 Mon Sep 17 00:00:00 2001 From: Conicaw Date: Mon, 15 Jan 2024 18:08:01 -0600 Subject: [PATCH] fix: add test for race condition Add test for race condition --- .../rabbitmq/e2e/configuration.e2e-spec.ts | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/integration/rabbitmq/e2e/configuration.e2e-spec.ts b/integration/rabbitmq/e2e/configuration.e2e-spec.ts index 993d5d16f..6aae70ec0 100644 --- a/integration/rabbitmq/e2e/configuration.e2e-spec.ts +++ b/integration/rabbitmq/e2e/configuration.e2e-spec.ts @@ -18,6 +18,8 @@ const logger = new ConsoleLogger('Custom logger'); const nonExistingExchange = 'non-existing-exchange'; const nonExistingQueue = 'non-existing-queue'; +const testRoutingKey = 'test'; + class RabbitConfig { createModuleConfig(): RabbitMQConfig { return { @@ -429,6 +431,93 @@ describe('Module Configuration', () => { // Clear non-existing exchange await amqpConnection.channel.deleteExchange(nonExistingExchange); }); + + it('should create an exchange successfully if `createExchangeIfNotExists` is true before binding queues', async () => { + const originalConnect = amqplib.connect; + + // Spy on the internals of amqplib to be able to introduce delays in some functions + // to expose race conditions or unawaited promises + const connectSpy = jest + .spyOn(amqplib, 'connect') + .mockImplementation((...args) => { + const result = originalConnect(...args); + result.then((conn) => { + const originalCreateConfirmChannel = conn.createConfirmChannel; + jest + .spyOn(conn, 'createConfirmChannel') + .mockImplementation(function (...args) { + const result = originalCreateConfirmChannel.apply(this, args); + result.then((channel) => { + const bindQueueSpy = jest.spyOn(channel, 'bindQueue'); + + const originalAssertExchange = channel.assertExchange; + jest + .spyOn(channel, 'assertExchange') + .mockImplementation(function (...args) { + // Delay for a long time to ensure queues are bound after exchanges are asserted + return new Promise((r) => setTimeout(r, 500)).then( + () => { + const result = originalAssertExchange.apply( + this, + args, + ); + result.then(() => { + expect(bindQueueSpy).not.toBeCalled(); + }); + return result; + }, + ) as any; + }); + }); + return result; + }); + }); + return result; + }); + + app = await Test.createTestingModule({ + imports: [ + RabbitMQModule.forRootAsync(RabbitMQModule, { + useFactory: async () => { + return { + exchanges: [ + { + name: nonExistingExchange, + type: 'topic', + createExchangeIfNotExists: true, + }, + ], + queues: [ + { + name: nonExistingQueue, + exchange: nonExistingExchange, + routingKey: testRoutingKey, + }, + ], + uri, + connectionInitOptions: { + wait: true, + reject: true, + timeout: 10000, + }, + }; + }, + }), + ], + }).compile(); + + const amqpConnection = app.get(AmqpConnection); + expect(app).toBeDefined(); + + expect(connectSpy).toHaveBeenCalledTimes(1); + expect(connectSpy).toHaveBeenCalledWith(amqplibUri, undefined); + + await app.init(); + expect( + await amqpConnection.channel.checkExchange(nonExistingExchange), + ).toBeDefined(); + await app.close(); + }); }); }); });