Skip to content

Commit

Permalink
Merge branch 'master' into fix/createQueueIfNotExists-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
underfisk committed Jan 16, 2024
2 parents b80ab65 + 4b54afb commit 2e5084a
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 44 deletions.
89 changes: 89 additions & 0 deletions integration/rabbitmq/e2e/configuration.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const logger = new ConsoleLogger('Custom logger');
const nonExistingExchange = 'non-existing-exchange';
const nonExistingQueue = 'non-existing-queue';

const testRoutingKey = 'test';

class RabbitConfig {
createModuleConfig(): RabbitMQConfig {
return {
Expand Down Expand Up @@ -492,6 +494,93 @@ describe('Module Configuration', () => {
// Clear non-existing exchange
await amqpConnection.channel.deleteExchange(nonExistingExchange);
});

it('should create an exchange successfully if `createExchangeIfNotExists` is true before binding queues', async () => {
const originalConnect = amqplib.connect;

// Spy on the internals of amqplib to be able to introduce delays in some functions
// to expose race conditions or unawaited promises
const connectSpy = jest
.spyOn(amqplib, 'connect')
.mockImplementation((...args) => {
const result = originalConnect(...args);
result.then((conn) => {
const originalCreateConfirmChannel = conn.createConfirmChannel;
jest
.spyOn(conn, 'createConfirmChannel')
.mockImplementation(function (...args) {
const result = originalCreateConfirmChannel.apply(this, args);
result.then((channel) => {
const bindQueueSpy = jest.spyOn(channel, 'bindQueue');

const originalAssertExchange = channel.assertExchange;
jest
.spyOn(channel, 'assertExchange')
.mockImplementation(function (...args) {
// Delay for a long time to ensure queues are bound after exchanges are asserted
return new Promise((r) => setTimeout(r, 500)).then(
() => {
const result = originalAssertExchange.apply(
this,
args,
);
result.then(() => {
expect(bindQueueSpy).not.toBeCalled();
});
return result;
},
) as any;
});
});
return result;
});
});
return result;
});

app = await Test.createTestingModule({
imports: [
RabbitMQModule.forRootAsync(RabbitMQModule, {
useFactory: async () => {
return {
exchanges: [
{
name: nonExistingExchange,
type: 'topic',
createExchangeIfNotExists: true,
},
],
queues: [
{
name: nonExistingQueue,
exchange: nonExistingExchange,
routingKey: testRoutingKey,
},
],
uri,
connectionInitOptions: {
wait: true,
reject: true,
timeout: 10000,
},
};
},
}),
],
}).compile();

const amqpConnection = app.get<AmqpConnection>(AmqpConnection);
expect(app).toBeDefined();

expect(connectSpy).toHaveBeenCalledTimes(1);
expect(connectSpy).toHaveBeenCalledWith(amqplibUri, undefined);

await app.init();
expect(
await amqpConnection.channel.checkExchange(nonExistingExchange),
).toBeDefined();
await app.close();
});
});
});
});
94 changes: 50 additions & 44 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,21 +236,23 @@ export class AmqpConnection {
};

await Promise.all([
Object.keys(this.config.channels).map(async (channelName) => {
const config = this.config.channels[channelName];
// Only takes the first channel specified as default so other ones get created.
if (defaultChannel.name === AmqpConnection.name && config.default) {
defaultChannel.name = channelName;
defaultChannel.config.prefetchCount =
config.prefetchCount || this.config.prefetchCount;
return;
}
Promise.all(
Object.keys(this.config.channels).map(async (channelName) => {
const config = this.config.channels[channelName];
// Only takes the first channel specified as default so other ones get created.
if (defaultChannel.name === AmqpConnection.name && config.default) {
defaultChannel.name = channelName;
defaultChannel.config.prefetchCount =
config.prefetchCount || this.config.prefetchCount;
return;
}

return this.setupManagedChannel(channelName, {
...config,
default: false,
});
}),
return this.setupManagedChannel(channelName, {
...config,
default: false,
});
})
),
this.setupManagedChannel(defaultChannel.name, defaultChannel.config),
]);
}
Expand All @@ -268,20 +270,22 @@ export class AmqpConnection {
this._channel = channel;

// Always assert exchanges & rpc queue in default channel.
this.config.exchanges.forEach((x) => {
const { createExchangeIfNotExists = true } = x;

if (createExchangeIfNotExists) {
return channel.assertExchange(
x.name,
x.type || this.config.defaultExchangeType,
x.options
);
}
return channel.checkExchange(x.name);
});
await Promise.all(
this.config.exchanges.map((x) => {
const { createExchangeIfNotExists = true } = x;

if (createExchangeIfNotExists) {
return channel.assertExchange(
x.name,
x.type || this.config.defaultExchangeType,
x.options
);
}
return channel.checkExchange(x.name);
})
);

this.setupQueuesWithBindings(channel, this.config.queues);
await this.setupQueuesWithBindings(channel, this.config.queues);

if (this.config.enableDirectReplyTo) {
await this.initDirectReplyQueue(channel);
Expand All @@ -295,22 +299,24 @@ export class AmqpConnection {
channel: ConfirmChannel,
queues: RabbitMQQueueConfig[]
) {
for (const configuredQueue of queues) {
const { name, options, bindQueueArguments, ...rest } = configuredQueue;
const queueOptions = {
...options,
...(bindQueueArguments !== undefined && { bindQueueArguments }),
};

await this.setupQueue(
{
...rest,
...(name !== undefined && { queue: name }),
queueOptions,
},
channel
);
}
await Promise.all(
queues.map(async (configuredQueue) => {
const { name, options, bindQueueArguments, ...rest } = configuredQueue;
const queueOptions = {
...options,
...(bindQueueArguments !== undefined && { bindQueueArguments }),
};

await this.setupQueue(
{
...rest,
...(name !== undefined && { queue: name }),
queueOptions,
},
channel
);
})
);
}

private async initDirectReplyQueue(channel: ConfirmChannel) {
Expand Down Expand Up @@ -687,7 +693,7 @@ export class AmqpConnection {
await Promise.all(
routingKeys.map((routingKey) => {
if (routingKey != null) {
channel.bindQueue(
return channel.bindQueue(
actualQueue as string,
exchange,
routingKey,
Expand Down

0 comments on commit 2e5084a

Please sign in to comment.