Skip to content

Commit

Permalink
fix(rabbitmq): a fix for routing key check. The check should respect …
Browse files Browse the repository at this point in the history
…wildcards (#713)

* fix(rabbitmq): a fix for routing key check. The check should respect wildcards

fix #712

* test(rabbitmq): added tests for matchesRoutingKey and logic improved

re #712

---------

Co-authored-by: andriusm <andriusm@softpauer.com>
  • Loading branch information
Andrius0124 and andriusm committed Mar 25, 2024
1 parent bc86da9 commit 450d420
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 2 deletions.
44 changes: 44 additions & 0 deletions integration/rabbitmq/e2e/rpc-controller-discovery.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,28 @@ describe('Rabbit Controller RPC', () => {
expect(response).toEqual({ echo: '{a:' });
});

it('hash wildcard RPC handler should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange2',
routingKey: 'hash-wildcard-rpc.some-end',
});

expect(response).toEqual({
message: 'success-hash-wildcard-rpc.#',
});
});

it('star wildcard RPC handler should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange2',
routingKey: 'star-wildcard-rpc.middle.end',
});

expect(response).toEqual({
message: 'success-star-wildcard-rpc.*.end',
});
});

it('SUBMODULE: regular RPC handler should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange2',
Expand Down Expand Up @@ -281,4 +303,26 @@ describe('Rabbit Controller RPC', () => {

expect(response).toEqual({ echo: '{a:' });
});

it('SUBMODULE: hash wildcard RPC handler should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange2',
routingKey: 'hash-wildcard-rpc-submodule.some-end',
});

expect(response).toEqual({
message: 'success-hash-wildcard-rpc-submodule.#',
});
});

it('SUBMODULE: star wildcard RPC handler should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange2',
routingKey: 'star-wildcard-rpc-submodule.middle.end',
});

expect(response).toEqual({
message: 'success-star-wildcard-rpc-submodule.*.end',
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,30 @@ export class ControllerDiscoveryController {
echo: nonJsonMessage,
};
}

@RabbitRPC({
routingKey: 'hash-wildcard-rpc.#',
exchange: 'exchange2',
queue: 'hash-wildcard-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
hashWildcardRpc() {
return {
message: 'success-hash-wildcard-rpc.#',
};
}

@RabbitRPC({
routingKey: 'star-wildcard-rpc.*.end',
exchange: 'exchange2',
queue: 'star-wildcard-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
wildcardRpc() {
return {
message: 'success-star-wildcard-rpc.*.end',
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,29 @@ export class SubmoduleController {
echo: nonJsonMessage,
};
}
@RabbitRPC({
routingKey: 'hash-wildcard-rpc-submodule.#',
exchange: 'exchange2',
queue: 'hash-wildcard-rpc-submodule',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
hashWildcardRpc() {
return {
message: 'success-hash-wildcard-rpc-submodule.#',
};
}

@RabbitRPC({
routingKey: 'star-wildcard-rpc-submodule.*.end',
exchange: 'exchange2',
queue: 'star-wildcard-rpc-submodule',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
wildcardRpc() {
return {
message: 'success-star-wildcard-rpc-submodule.*.end',
};
}
}
11 changes: 9 additions & 2 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from './errorBehaviors';
import { Nack, RpcResponse, SubscribeResponse } from './handlerResponses';
import { isNull } from 'lodash';
import { matchesRoutingKey } from './utils';

const DIRECT_REPLY_QUEUE = 'amq.rabbitmq.reply-to';

Expand Down Expand Up @@ -565,8 +566,14 @@ export class AmqpConnection {
throw new Error('Received null message');
}

if (rpcOptions.routingKey !== msg.fields.routingKey) {
channel.nack(msg, false, true);
if (
!matchesRoutingKey(msg.fields.routingKey, rpcOptions.routingKey)
) {
channel.nack(msg, false, false);
this.logger.error(
'Received message with invalid routing key: ' +
msg.fields.routingKey
);
return;
}

Expand Down
25 changes: 25 additions & 0 deletions packages/rabbitmq/src/amqp/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
export function matchesRoutingKey(
routingKey: string,
pattern: string[] | string | undefined
): boolean {
if (!pattern) return false;
const patterns = Array.isArray(pattern) ? pattern : [pattern];
for (const pattern of patterns) {
if (routingKey === pattern) return true;
const splitKey = routingKey.split('.');
const splitPattern = pattern.split('.');
let starFailed = false;
for (let i = 0; i < splitPattern.length; i++) {
if (splitPattern[i] === '#') return true;

if (splitPattern[i] !== '*' && splitPattern[i] !== splitKey[i]) {
starFailed = true;
break;
}
}

if (!starFailed && splitKey.length === splitPattern.length) return true;
}

return false;
}
57 changes: 57 additions & 0 deletions packages/rabbitmq/src/tests/rabbitmq.utils.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { matchesRoutingKey } from '../amqp/utils';

describe('matchesRoutingKey', () => {
const userCreated = 'user.created';
it('should return true when routing key matches star pattern', () => {
const routingKey = 'user.created.new';
const pattern = 'user.*.new';
const result = matchesRoutingKey(routingKey, pattern);
expect(result).toBe(true);
});
it('should return true when routing key matches hash pattern', () => {
const routingKey = 'user.updated.new';
const pattern = 'user.#';
const result = matchesRoutingKey(routingKey, pattern);
expect(result).toBe(true);
});
it('should return false when routing key does not match pattern', () => {
const routingKey = 'user.updated';
const pattern = userCreated;
const result = matchesRoutingKey(routingKey, pattern);
expect(result).toBe(false);
});

it('should return true when routing key matches any star pattern in the array', () => {
const routingKey = userCreated;
const pattern = ['event.*', 'user.*'];
const result = matchesRoutingKey(routingKey, pattern);
expect(result).toBe(true);
});

it('should return true when routing key matches precise pattern in the array with wildcards', () => {
const routingKey = userCreated;
const pattern = ['user.*.new', userCreated, 'event.#'];
const result = matchesRoutingKey(routingKey, pattern);
expect(result).toBe(true);
});

it('should return true when routing key matches any hash pattern in the array', () => {
const routingKey = 'user.created.new';
const pattern = ['event.#', 'user.#'];
const result = matchesRoutingKey(routingKey, pattern);
expect(result).toBe(true);
});
it('should return false when routing key does not match any pattern in the array', () => {
const routingKey = 'user.updated';
const pattern = [userCreated, 'event.created'];
const result = matchesRoutingKey(routingKey, pattern);
expect(result).toBe(false);
});

it('should return false when pattern is undefined', () => {
const routingKey = userCreated;
const pattern = undefined;
const result = matchesRoutingKey(routingKey, pattern);
expect(result).toBe(false);
});
});

0 comments on commit 450d420

Please sign in to comment.