Skip to content

Commit

Permalink
feat: amqlib types refactoring
Browse files Browse the repository at this point in the history
improves the import of 3rd party types

365
  • Loading branch information
underfisk committed Jan 20, 2022
1 parent ae8d124 commit 319ce18
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 51 deletions.
65 changes: 36 additions & 29 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import { Logger } from '@nestjs/common';
import * as amqpcon from 'amqp-connection-manager';
import * as amqplib from 'amqplib';
import {
ChannelWrapper,
AmqpConnectionManager,
connect,
} from 'amqp-connection-manager';
import {
ConsumeMessage,
Channel,
Connection,
ConfirmChannel,
Options,
} from 'amqplib';
import { EMPTY, interval, race, Subject, throwError } from 'rxjs';
import {
catchError,
Expand Down Expand Up @@ -55,39 +65,39 @@ export class AmqpConnection {
private readonly config: Required<RabbitMQConfig>;
private readonly logger: Logger;
private readonly initialized = new Subject();
private _managedConnection!: amqpcon.AmqpConnectionManager;
private _managedConnection!: AmqpConnectionManager;
/**
* Will now specify the default managed channel.
*/
private _managedChannel!: amqpcon.ChannelWrapper;
private _managedChannels: Record<string, amqpcon.ChannelWrapper> = {};
private _managedChannel!: ChannelWrapper;
private _managedChannels: Record<string, ChannelWrapper> = {};
/**
* Will now specify the default channel.
*/
private _channel!: amqplib.Channel;
private _channels: Record<string, amqplib.Channel> = {};
private _connection?: amqplib.Connection;
private _channel!: Channel;
private _channels: Record<string, Channel> = {};
private _connection?: Connection;

constructor(config: RabbitMQConfig) {
this.config = { ...defaultConfig, ...config };
this.logger = new Logger(AmqpConnection.name);
}

get channel(): amqplib.Channel {
get channel(): Channel {
if (!this._channel) throw new Error('channel is not available');
return this._channel;
}

get connection(): amqplib.Connection {
get connection(): Connection {
if (!this._connection) throw new Error('connection is not available');
return this._connection;
}

get managedChannel(): amqpcon.ChannelWrapper {
get managedChannel(): ChannelWrapper {
return this._managedChannel;
}

get managedConnection(): amqpcon.AmqpConnectionManager {
get managedConnection(): AmqpConnectionManager {
return this._managedConnection;
}

Expand Down Expand Up @@ -133,7 +143,7 @@ export class AmqpConnection {
private async initCore(): Promise<void> {
this.logger.log('Trying to connect to a RabbitMQ broker');

this._managedConnection = amqpcon.connect(
this._managedConnection = connect(
Array.isArray(this.config.uri) ? this.config.uri : [this.config.uri],
this.config.connectionManagerOptions
);
Expand Down Expand Up @@ -177,7 +187,7 @@ export class AmqpConnection {
}

private async setupInitChannel(
channel: amqplib.ConfirmChannel,
channel: ConfirmChannel,
name: string,
config: RabbitMQChannelConfig
): Promise<void> {
Expand Down Expand Up @@ -205,7 +215,7 @@ export class AmqpConnection {
}
}

private async initDirectReplyQueue(channel: amqplib.ConfirmChannel) {
private async initDirectReplyQueue(channel: ConfirmChannel) {
// Set up a consumer on the Direct Reply-To queue to facilitate RPC functionality
await channel.consume(
DIRECT_REPLY_QUEUE,
Expand Down Expand Up @@ -271,7 +281,7 @@ export class AmqpConnection {
public async createSubscriber<T>(
handler: (
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
rawMessage?: ConsumeMessage
) => Promise<SubscribeResponse>,
msgOptions: MessageHandlerOptions
) {
Expand All @@ -285,10 +295,10 @@ export class AmqpConnection {
private async setupSubscriberChannel<T>(
handler: (
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
rawMessage?: ConsumeMessage
) => Promise<SubscribeResponse>,
msgOptions: MessageHandlerOptions,
channel: amqplib.ConfirmChannel
channel: ConfirmChannel
): Promise<void> {
const queue = await this.setupQueue(msgOptions, channel);

Expand Down Expand Up @@ -336,7 +346,7 @@ export class AmqpConnection {
public async createRpc<T, U>(
handler: (
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
rawMessage?: ConsumeMessage
) => Promise<RpcResponse<U>>,
rpcOptions: MessageHandlerOptions
) {
Expand All @@ -350,10 +360,10 @@ export class AmqpConnection {
public async setupRpcChannel<T, U>(
handler: (
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
rawMessage?: ConsumeMessage
) => Promise<RpcResponse<U>>,
rpcOptions: MessageHandlerOptions,
channel: amqplib.ConfirmChannel
channel: ConfirmChannel
) {
const queue = await this.setupQueue(rpcOptions, channel);

Expand Down Expand Up @@ -400,7 +410,7 @@ export class AmqpConnection {
exchange: string,
routingKey: string,
message: any,
options?: amqplib.Options.Publish
options?: Options.Publish
) {
// source amqplib channel is used directly to keep the behavior of throwing connection related errors
if (!this.managedConnection.isConnected() || !this._channel) {
Expand All @@ -422,11 +432,8 @@ export class AmqpConnection {
}

private handleMessage<T, U>(
handler: (
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
) => Promise<U>,
msg: amqplib.ConsumeMessage,
handler: (msg: T | undefined, rawMessage?: ConsumeMessage) => Promise<U>,
msg: ConsumeMessage,
allowNonJsonMessages?: boolean
) {
let message: T | undefined = undefined;
Expand All @@ -448,7 +455,7 @@ export class AmqpConnection {

private async setupQueue(
subscriptionOptions: MessageHandlerOptions,
channel: amqplib.ConfirmChannel
channel: ConfirmChannel
): Promise<string> {
const {
exchange,
Expand Down Expand Up @@ -538,7 +545,7 @@ export class AmqpConnection {
* @param name name of the channel
* @returns channel wrapper
*/
private selectManagedChannel(name?: string): amqpcon.ChannelWrapper {
private selectManagedChannel(name?: string): ChannelWrapper {
if (!name) return this._managedChannel;
const channel = this._managedChannels[name];
if (!channel) {
Expand Down
26 changes: 9 additions & 17 deletions packages/rabbitmq/src/amqp/errorBehaviors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as amqplib from 'amqplib';
import { Channel, ConsumeMessage } from 'amqplib';
import { QueueOptions } from '../rabbitmq.interfaces';

export enum MessageHandlerErrorBehavior {
Expand All @@ -8,37 +8,29 @@ export enum MessageHandlerErrorBehavior {
}

export type MessageErrorHandler = (
channel: amqplib.Channel,
msg: amqplib.ConsumeMessage,
channel: Channel,
msg: ConsumeMessage,
error: any
) => Promise<void> | void;

/**
* An error handler that will ack the message which caused an error during processing
*/
export const ackErrorHandler: MessageErrorHandler = (channel, msg, error) => {
export const ackErrorHandler: MessageErrorHandler = (channel, msg) => {
channel.ack(msg);
};

/**
* An error handler that will nack and requeue a message which created an error during processing
*/
export const requeueErrorHandler: MessageErrorHandler = (
channel,
msg,
error
) => {
export const requeueErrorHandler: MessageErrorHandler = (channel, msg) => {
channel.nack(msg, false, true);
};

/**
* An error handler that will nack a message which created an error during processing
*/
export const defaultNackErrorHandler: MessageErrorHandler = (
channel,
msg,
error
) => {
export const defaultNackErrorHandler: MessageErrorHandler = (channel, msg) => {
channel.nack(msg, false, false);
};

Expand All @@ -56,7 +48,7 @@ export const getHandlerForLegacyBehavior = (
};

export type AssertQueueErrorHandler = (
channel: amqplib.Channel,
channel: Channel,
queueName: string,
queueOptions: QueueOptions | undefined,
error: any
Expand All @@ -66,7 +58,7 @@ export type AssertQueueErrorHandler = (
* Just rethrows the error
*/
export const defaultAssertQueueErrorHandler: AssertQueueErrorHandler = (
channel: amqplib.Channel,
channel: Channel,
queueName: string,
queueOptions: QueueOptions | undefined,
error: any
Expand All @@ -78,7 +70,7 @@ export const defaultAssertQueueErrorHandler: AssertQueueErrorHandler = (
* Tries to delete the queue and to redeclare it with the provided options
*/
export const forceDeleteAssertQueueErrorHandler: AssertQueueErrorHandler = async (
channel: amqplib.Channel,
channel: Channel,
queueName: string,
queueOptions: QueueOptions | undefined,
error: any
Expand Down
12 changes: 7 additions & 5 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as amqpConnectionManager from 'amqp-connection-manager';
import * as amqplib from 'amqplib';
import { AmqpConnectionManagerOptions } from 'amqp-connection-manager';
import { Options } from 'amqplib';
import {
AssertQueueErrorHandler,
MessageErrorHandler,
Expand All @@ -9,7 +9,7 @@ import {
export interface RabbitMQExchangeConfig {
name: string;
type?: string;
options?: amqplib.Options.AssertExchange;
options?: Options.AssertExchange;
}

export interface MessageOptions {
Expand Down Expand Up @@ -75,6 +75,8 @@ export interface ConnectionInitOptions {
reject?: boolean;
}

export type RabbitMQChannels = Record<string, RabbitMQChannelConfig>;

export interface RabbitMQConfig {
uri: string | string[];
/**
Expand All @@ -87,15 +89,15 @@ export interface RabbitMQConfig {
defaultRpcErrorBehavior?: MessageHandlerErrorBehavior;
defaultSubscribeErrorBehavior?: MessageHandlerErrorBehavior;
connectionInitOptions?: ConnectionInitOptions;
connectionManagerOptions?: amqpConnectionManager.AmqpConnectionManagerOptions;
connectionManagerOptions?: AmqpConnectionManagerOptions;
registerHandlers?: boolean;
enableDirectReplyTo?: boolean;
/**
* You can optionally create channels which you consume messages from.
*
* By setting `prefetchCount` for a channel, you can manage message speeds of your various handlers on the same connection.
*/
channels?: Record<string, RabbitMQChannelConfig>;
channels?: RabbitMQChannels;
}

export type RabbitHandlerType = 'rpc' | 'subscribe';
Expand Down

0 comments on commit 319ce18

Please sign in to comment.