Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: allow release breaking change for stripe node module bump #714

Closed
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:

strategy:
matrix:
node: [16, 18, 19]
node: [18, 19, 20]
fail-fast: false

services:
Expand Down
44 changes: 44 additions & 0 deletions integration/rabbitmq/e2e/rpc-controller-discovery.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,28 @@ describe('Rabbit Controller RPC', () => {
expect(response).toEqual({ echo: '{a:' });
});

it('hash wildcard RPC handler should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange2',
routingKey: 'hash-wildcard-rpc.some-end',
});

expect(response).toEqual({
message: 'success-hash-wildcard-rpc.#',
});
});

it('star wildcard RPC handler should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange2',
routingKey: 'star-wildcard-rpc.middle.end',
});

expect(response).toEqual({
message: 'success-star-wildcard-rpc.*.end',
});
});

it('SUBMODULE: regular RPC handler should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange2',
Expand Down Expand Up @@ -281,4 +303,26 @@ describe('Rabbit Controller RPC', () => {

expect(response).toEqual({ echo: '{a:' });
});

it('SUBMODULE: hash wildcard RPC handler should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange2',
routingKey: 'hash-wildcard-rpc-submodule.some-end',
});

expect(response).toEqual({
message: 'success-hash-wildcard-rpc-submodule.#',
});
});

it('SUBMODULE: star wildcard RPC handler should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange2',
routingKey: 'star-wildcard-rpc-submodule.middle.end',
});

expect(response).toEqual({
message: 'success-star-wildcard-rpc-submodule.*.end',
});
});
});
28 changes: 28 additions & 0 deletions integration/rabbitmq/e2e/rpc.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { INestApplication } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { AppModule } from '../src/app.module';
import { randomUUID } from 'crypto';

const nonJsonRpcRoutingKey = 'non-json-rpc';

Expand Down Expand Up @@ -39,6 +40,33 @@ describe('Rabbit RPC', () => {
expect(response).toEqual({ echo: { request: 'val' } });
});

it('multiple RPC handler should receive a valid RPC response in parallel', async () => {
const correlationId = randomUUID();
const firstResponse = await amqpConnection.request({
exchange: 'exchange1',
routingKey: 'delay-rpc',
correlationId,
headers: { 'X-Request-ID': randomUUID() },
payload: {
request: 'first request',
delay: 1000,
},
});
const secondResponse = await amqpConnection.request({
exchange: 'exchange1',
routingKey: 'delay-rpc',
correlationId,
headers: { 'X-Request-ID': randomUUID() },
payload: {
request: 'second request',
delay: 20,
},
});

expect(firstResponse).toEqual({ echo: { request: 'first request' } });
expect(secondResponse).toEqual({ echo: { request: 'second request' } });
});

it('intercepted RPC handler should receive a transformed RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,30 @@ export class ControllerDiscoveryController {
echo: nonJsonMessage,
};
}

@RabbitRPC({
routingKey: 'hash-wildcard-rpc.#',
exchange: 'exchange2',
queue: 'hash-wildcard-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
hashWildcardRpc() {
return {
message: 'success-hash-wildcard-rpc.#',
};
}

@RabbitRPC({
routingKey: 'star-wildcard-rpc.*.end',
exchange: 'exchange2',
queue: 'star-wildcard-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
wildcardRpc() {
return {
message: 'success-star-wildcard-rpc.*.end',
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,29 @@ export class SubmoduleController {
echo: nonJsonMessage,
};
}
@RabbitRPC({
routingKey: 'hash-wildcard-rpc-submodule.#',
exchange: 'exchange2',
queue: 'hash-wildcard-rpc-submodule',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
hashWildcardRpc() {
return {
message: 'success-hash-wildcard-rpc-submodule.#',
};
}

@RabbitRPC({
routingKey: 'star-wildcard-rpc-submodule.*.end',
exchange: 'exchange2',
queue: 'star-wildcard-rpc-submodule',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
wildcardRpc() {
return {
message: 'success-star-wildcard-rpc-submodule.*.end',
};
}
}
19 changes: 19 additions & 0 deletions integration/rabbitmq/src/rpc/rpc.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import { TransformInterceptor } from '../transform.interceptor';
import { ReplyErrorCallback } from './reply.error.callback';
import { RpcException } from './rpc-exception';

async function delay(milliseconds = 0, returnValue) {
return new Promise((done) =>
setTimeout(() => done(returnValue), milliseconds),
);
}

@Injectable()
export class RpcService {
@RabbitRPC({
Expand All @@ -20,6 +26,19 @@ export class RpcService {
};
}

@RabbitRPC({
routingKey: 'delay-rpc',
exchange: 'exchange1',
queue: 'delay-rpc',
})
async delayRpc(message: any) {
await delay(message?.delay || 0, false);
delete message.delay;
return {
echo: message,
};
}

@UseInterceptors(TransformInterceptor)
@RabbitRPC({
routingKey: 'intercepted-rpc',
Expand Down
10 changes: 4 additions & 6 deletions packages/discovery/src/discovery.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class DiscoveryService {
constructor(
private readonly modulesContainer: ModulesContainer,
private readonly metadataScanner: MetadataScanner
) {}
) { }

/**
* Discovers all providers in a Nest App that match a filter
Expand Down Expand Up @@ -168,11 +168,9 @@ export class DiscoveryService {

const prototype = Object.getPrototypeOf(instance);

return this.metadataScanner
.scanFromPrototype(instance, prototype, (name) =>
this.extractMethodMetaAtKey<T>(metaKey, component, prototype, name)
)
.filter((x) => !isNil(x.meta));
return this.metadataScanner.getAllMethodNames(prototype).map((name) =>
this.extractMethodMetaAtKey<T>(metaKey, component, prototype, name)
).filter((x) => !isNil(x.meta));
}

/**
Expand Down
21 changes: 21 additions & 0 deletions packages/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
- [Requesting Data from an RPC](#requesting-data-from-an-rpc)
- [Type Inference](#type-inference)
- [Interop with other RPC Servers](#interop-with-other-rpc-servers)
- [In distributed systems](#in-distributed-systems)
- [Advanced Patterns](#advanced-patterns)
- [Competing Consumers](#competing-consumers)
- [Handling errors](#handling-errors)
Expand Down Expand Up @@ -499,6 +500,26 @@ The generic parameter used with the `request` method lets you specify the _expec

The RPC functionality included in `@golevelup/nestjs-rabbitmq` is based on the [Direct Reply-To Queue](https://www.rabbitmq.com/direct-reply-to.html) functionality of RabbitMQ. It is possible that because of this, the client library (`AmqpConnection.request`) could be used to interact with an RPC server implemented using a different language or framework. However, this functionality has not been verified.

#### In distributed systems

In a distributed system, transactions must be correlated by an `X-Correlation-ID`. You can use the `X-Request-ID` in the header to separate sub-requests that are contained in the main request chain.

```typescript
// To create a transaction in the distributed system,
// multiple request correlated by an correlationId
const correlationId = randomUUID();
const response = await amqpConnection.request<ExpectedReturnType>({
exchange: 'exchange1',
routingKey: 'rpc',
correlationId,
// Each request in the transaction has its own requestId
headers: { 'X-Request-ID': randomUUID() },
payload: {
request: 'val',
},
});
```

## Advanced Patterns

### Competing Consumers
Expand Down
20 changes: 17 additions & 3 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from './errorBehaviors';
import { Nack, RpcResponse, SubscribeResponse } from './handlerResponses';
import { isNull } from 'lodash';
import { matchesRoutingKey } from './utils';

const DIRECT_REPLY_QUEUE = 'amq.rabbitmq.reply-to';

Expand All @@ -49,6 +50,7 @@ export type SubscriberHandler<T = unknown> = (

export interface CorrelationMessage {
correlationId: string;
requestId?: string;
message: Record<string, unknown>;
}

Expand Down Expand Up @@ -358,6 +360,7 @@ export class AmqpConnection {

const correlationMessage: CorrelationMessage = {
correlationId: msg.properties.correlationId.toString(),
requestId: msg.properties?.headers?.['X-Request-ID']?.toString(),
message: message,
};

Expand All @@ -371,11 +374,16 @@ export class AmqpConnection {

public async request<T>(requestOptions: RequestOptions): Promise<T> {
const correlationId = requestOptions.correlationId || randomUUID();
const requestId = requestOptions?.headers?.['X-Request-ID'];
const timeout = requestOptions.timeout || this.config.defaultRpcTimeout;
const payload = requestOptions.payload || {};

const response$ = this.messageSubject.pipe(
filter((x) => x.correlationId === correlationId),
filter((x) =>
requestId
? x.correlationId === correlationId && x.requestId === requestId
: x.correlationId === correlationId
),
map((x) => x.message as T),
first()
);
Expand Down Expand Up @@ -558,8 +566,14 @@ export class AmqpConnection {
throw new Error('Received null message');
}

if (rpcOptions.routingKey !== msg.fields.routingKey) {
channel.nack(msg, false, true);
if (
!matchesRoutingKey(msg.fields.routingKey, rpcOptions.routingKey)
) {
channel.nack(msg, false, false);
this.logger.error(
'Received message with invalid routing key: ' +
msg.fields.routingKey
);
return;
}

Expand Down
25 changes: 25 additions & 0 deletions packages/rabbitmq/src/amqp/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
export function matchesRoutingKey(
routingKey: string,
pattern: string[] | string | undefined
): boolean {
if (!pattern) return false;
const patterns = Array.isArray(pattern) ? pattern : [pattern];
for (const pattern of patterns) {
if (routingKey === pattern) return true;
const splitKey = routingKey.split('.');
const splitPattern = pattern.split('.');
let starFailed = false;
for (let i = 0; i < splitPattern.length; i++) {
if (splitPattern[i] === '#') return true;

if (splitPattern[i] !== '*' && splitPattern[i] !== splitKey[i]) {
starFailed = true;
break;
}
}

if (!starFailed && splitKey.length === splitPattern.length) return true;
}

return false;
}
Loading
Loading