Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fix/rabbitmq-promises
Browse files Browse the repository at this point in the history
  • Loading branch information
ttshivers committed Jan 16, 2024
2 parents e0f9974 + 24757f3 commit c8c1a7d
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 33 deletions.
62 changes: 62 additions & 0 deletions integration/rabbitmq/e2e/configuration.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,68 @@ describe('Module Configuration', () => {
).toBeDefined();
await app.close();
});

it('should not assert queue with empty name', async () => {
const originalConnect = amqplib.connect;
let assertQueueSpy;

const connectSpy = jest
.spyOn(amqplib, 'connect')
.mockImplementation((...args) => {
const result = originalConnect(...args);
result.then((conn) => {
const originalCreateConfirmChannel = conn.createConfirmChannel;
jest
.spyOn(conn, 'createConfirmChannel')
.mockImplementation(function (...args) {
const result = originalCreateConfirmChannel.apply(this, args);
result.then((channel) => {
assertQueueSpy = jest.spyOn(channel, 'assertQueue');
});
return result;
});
});
return result;
});

app = await Test.createTestingModule({
imports: [
RabbitMQModule.forRootAsync(RabbitMQModule, {
useFactory: async () => {
return {
queues: [
{
name: nonExistingQueue,
createQueueIfNotExists: true,
options: {
durable: true,
},
},
],
uri,
connectionInitOptions: {
wait: true,
reject: true,
timeout: 3000,
},
};
},
}),
],
}).compile();

expect(app).toBeDefined();

expect(connectSpy).toHaveBeenCalledTimes(1);
expect(connectSpy).toHaveBeenCalledWith(amqplibUri, undefined);
expect(assertQueueSpy).not.toHaveBeenCalledWith('', undefined);
expect(assertQueueSpy).toHaveBeenCalledWith(nonExistingQueue, {
durable: true,
});

await app.init();
await app.close();
});
});
});

Expand Down
47 changes: 14 additions & 33 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,39 +301,20 @@ export class AmqpConnection {
) {
await Promise.all(
queues.map(async (configuredQueue) => {
const { createQueueIfNotExists = true } = configuredQueue;
const { name, options, bindQueueArguments, ...rest } = configuredQueue;
const queueOptions = {
...options,
...(bindQueueArguments !== undefined && { bindQueueArguments }),
};

if (createQueueIfNotExists) {
await this.setupQueue(configuredQueue, channel);
}
await channel.assertQueue(
configuredQueue.name,
configuredQueue.options
await this.setupQueue(
{
...rest,
...(name !== undefined && { queue: name }),
queueOptions,
},
channel
);

let routingKeys: string[] = [];
if (Array.isArray(configuredQueue.routingKey)) {
routingKeys = configuredQueue.routingKey;
} else {
if (configuredQueue.routingKey != null) {
routingKeys = [configuredQueue.routingKey];
}
}

if (routingKeys.length > 0) {
await Promise.all(
routingKeys.map((routingKey) => {
if (configuredQueue.exchange != undefined) {
return channel.bindQueue(
configuredQueue.name,
configuredQueue.exchange,
routingKey,
configuredQueue.bindQueueArguments
);
}
})
);
}
})
);
}
Expand Down Expand Up @@ -702,8 +683,8 @@ export class AmqpConnection {
}

let bindQueueArguments: any;
if (subscriptionOptions.queueOptions) {
bindQueueArguments = subscriptionOptions.queueOptions.bindQueueArguments;
if (queueOptions) {
bindQueueArguments = queueOptions.bindQueueArguments;
}

const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey];
Expand Down

0 comments on commit c8c1a7d

Please sign in to comment.