diff --git a/packages/brokers/README.md b/packages/brokers/README.md index e97a0f718f02..061e642243e2 100644 --- a/packages/brokers/README.md +++ b/packages/brokers/README.md @@ -40,7 +40,7 @@ pnpm add @discordjs/brokers import { PubSubRedisBroker } from '@discordjs/brokers'; import Redis from 'ioredis'; -const broker = new PubSubRedisBroker({ redisClient: new Redis() }); +const broker = new PubSubRedisBroker(new Redis()); await broker.publish('test', 'Hello World!'); await broker.destroy(); @@ -49,7 +49,7 @@ await broker.destroy(); import { PubSubRedisBroker } from '@discordjs/brokers'; import Redis from 'ioredis'; -const broker = new PubSubRedisBroker({ redisClient: new Redis() }); +const broker = new PubSubRedisBroker(new Redis()); broker.on('test', ({ data, ack }) => { console.log(data); void ack(); @@ -65,7 +65,7 @@ await broker.subscribe('subscribers', ['test']); import { RPCRedisBroker } from '@discordjs/brokers'; import Redis from 'ioredis'; -const broker = new RPCRedisBroker({ redisClient: new Redis() }); +const broker = new RPCRedisBroker(new Redis()); console.log(await broker.call('testcall', 'Hello World!')); await broker.destroy(); @@ -74,7 +74,7 @@ await broker.destroy(); import { RPCRedisBroker } from '@discordjs/brokers'; import Redis from 'ioredis'; -const broker = new RPCRedisBroker({ redisClient: new Redis() }); +const broker = new RPCRedisBroker(new Redis()); broker.on('testcall', ({ data, ack, reply }) => { console.log('responder', data); void ack(); diff --git a/packages/brokers/__tests__/index.test.ts b/packages/brokers/__tests__/index.test.ts index 40e83a288f72..f7af86d9d06f 100644 --- a/packages/brokers/__tests__/index.test.ts +++ b/packages/brokers/__tests__/index.test.ts @@ -17,7 +17,7 @@ const mockRedisClient = { test('pubsub with custom encoding', async () => { const encode = vi.fn((data) => data); - const broker = new PubSubRedisBroker({ redisClient: mockRedisClient, encode }); + const broker = new PubSubRedisBroker(mockRedisClient, { encode }); await broker.publish('test', 'test'); expect(encode).toHaveBeenCalledWith('test'); }); diff --git a/packages/brokers/src/brokers/Broker.ts b/packages/brokers/src/brokers/Broker.ts index 4e298725ea5c..49b0b223751f 100644 --- a/packages/brokers/src/brokers/Broker.ts +++ b/packages/brokers/src/brokers/Broker.ts @@ -1,5 +1,4 @@ import { Buffer } from 'node:buffer'; -import { randomBytes } from 'node:crypto'; import { encode, decode } from '@msgpack/msgpack'; import type { AsyncEventEmitter } from '@vladfrangu/async_event_emitter'; @@ -7,10 +6,6 @@ import type { AsyncEventEmitter } from '@vladfrangu/async_event_emitter'; * Base options for a broker implementation */ export interface BaseBrokerOptions { - /** - * How long to block for messages when polling - */ - blockTimeout?: number; /** * Function to use for decoding messages */ @@ -21,25 +16,12 @@ export interface BaseBrokerOptions { */ // eslint-disable-next-line @typescript-eslint/method-signature-style encode?: (data: unknown) => Buffer; - /** - * Max number of messages to poll at once - */ - maxChunk?: number; - /** - * Unique consumer name. - * - * @see {@link https://redis.io/commands/xreadgroup/} - */ - name?: string; } /** * Default broker options */ export const DefaultBrokerOptions = { - name: randomBytes(20).toString('hex'), - maxChunk: 10, - blockTimeout: 5_000, encode: (data): Buffer => { const encoded = encode(data); return Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength); diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index e723f85506f4..e7c3bf51cef2 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -1,4 +1,5 @@ import type { Buffer } from 'node:buffer'; +import { randomBytes } from 'node:crypto'; import { readFileSync } from 'node:fs'; import { resolve } from 'node:path'; import { AsyncEventEmitter } from '@vladfrangu/async_event_emitter'; @@ -19,11 +20,31 @@ declare module 'ioredis' { */ export interface RedisBrokerOptions extends BaseBrokerOptions { /** - * The Redis client to use + * How long to block for messages when polling */ - redisClient: Redis; + blockTimeout?: number; + /** + * Max number of messages to poll at once + */ + maxChunk?: number; + /** + * Unique consumer name. + * + * @see {@link https://redis.io/commands/xreadgroup/} + */ + name?: string; } +/** + * Default broker options for redis + */ +export const DefaultRedisBrokerOptions = { + ...DefaultBrokerOptions, + name: randomBytes(20).toString('hex'), + maxChunk: 10, + blockTimeout: 5_000, +} as const satisfies Required; + /** * Helper class with shared Redis logic */ @@ -56,14 +77,17 @@ export abstract class BaseRedisBroker> */ protected listening = false; - public constructor(options: RedisBrokerOptions) { + public constructor( + protected readonly redisClient: Redis, + options: RedisBrokerOptions, + ) { super(); - this.options = { ...DefaultBrokerOptions, ...options }; - options.redisClient.defineCommand('xcleangroup', { + this.options = { ...DefaultRedisBrokerOptions, ...options }; + redisClient.defineCommand('xcleangroup', { numberOfKeys: 1, lua: readFileSync(resolve(__dirname, '..', 'scripts', 'xcleangroup.lua'), 'utf8'), }); - this.streamReadClient = options.redisClient.duplicate(); + this.streamReadClient = redisClient.duplicate(); } /** @@ -75,7 +99,7 @@ export abstract class BaseRedisBroker> events.map(async (event) => { this.subscribedEvents.add(event as string); try { - return await this.options.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM'); + return await this.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM'); } catch (error) { if (!(error instanceof ReplyError)) { throw error; @@ -97,7 +121,7 @@ export abstract class BaseRedisBroker> commands[idx + 1] = ['xcleangroup', event as string, group]; } - await this.options.redisClient.pipeline(commands).exec(); + await this.redisClient.pipeline(commands).exec(); for (const event of events) { this.subscribedEvents.delete(event as string); @@ -162,7 +186,7 @@ export abstract class BaseRedisBroker> */ public async destroy() { this.streamReadClient.disconnect(); - this.options.redisClient.disconnect(); + this.redisClient.disconnect(); } /** diff --git a/packages/brokers/src/brokers/redis/PubSubRedis.ts b/packages/brokers/src/brokers/redis/PubSubRedis.ts index ddba1e97df2d..c8371520d825 100644 --- a/packages/brokers/src/brokers/redis/PubSubRedis.ts +++ b/packages/brokers/src/brokers/redis/PubSubRedis.ts @@ -11,7 +11,7 @@ import { BaseRedisBroker } from './BaseRedis.js'; * import { PubSubRedisBroker } from '@discordjs/brokers'; * import Redis from 'ioredis'; * - * const broker = new PubSubRedisBroker({ redisClient: new Redis() }); + * const broker = new PubSubRedisBroker(new Redis()); * * await broker.publish('test', 'Hello World!'); * await broker.destroy(); @@ -20,7 +20,7 @@ import { BaseRedisBroker } from './BaseRedis.js'; * import { PubSubRedisBroker } from '@discordjs/brokers'; * import Redis from 'ioredis'; * - * const broker = new PubSubRedisBroker({ redisClient: new Redis() }); + * const broker = new PubSubRedisBroker(new Redis()); * broker.on('test', ({ data, ack }) => { * console.log(data); * void ack(); @@ -37,19 +37,14 @@ export class PubSubRedisBroker> * {@inheritDoc IPubSubBroker.publish} */ public async publish(event: Event, data: TEvents[Event]): Promise { - await this.options.redisClient.xadd( - event as string, - '*', - BaseRedisBroker.STREAM_DATA_KEY, - this.options.encode(data), - ); + await this.redisClient.xadd(event as string, '*', BaseRedisBroker.STREAM_DATA_KEY, this.options.encode(data)); } protected emitEvent(id: Buffer, group: string, event: string, data: unknown) { const payload: { ack(): Promise; data: unknown } = { data, ack: async () => { - await this.options.redisClient.xack(event, group, id); + await this.redisClient.xack(event, group, id); }, }; diff --git a/packages/brokers/src/brokers/redis/RPCRedis.ts b/packages/brokers/src/brokers/redis/RPCRedis.ts index f34608429331..0e7662ac735c 100644 --- a/packages/brokers/src/brokers/redis/RPCRedis.ts +++ b/packages/brokers/src/brokers/redis/RPCRedis.ts @@ -1,9 +1,9 @@ import type { Buffer } from 'node:buffer'; import { clearTimeout, setTimeout } from 'node:timers'; +import type Redis from 'ioredis/built/Redis.js'; import type { IRPCBroker } from '../Broker.js'; -import { DefaultBrokerOptions } from '../Broker.js'; import type { RedisBrokerOptions } from './BaseRedis.js'; -import { BaseRedisBroker } from './BaseRedis.js'; +import { BaseRedisBroker, DefaultRedisBrokerOptions } from './BaseRedis.js'; interface InternalPromise { reject(error: any): void; @@ -22,9 +22,9 @@ export interface RPCRedisBrokerOptions extends RedisBrokerOptions { * Default values used for the {@link RPCRedisBrokerOptions} */ export const DefaultRPCRedisBrokerOptions = { - ...DefaultBrokerOptions, + ...DefaultRedisBrokerOptions, timeout: 5_000, -} as const satisfies Required>; +} as const satisfies Required; /** * RPC broker powered by Redis @@ -35,7 +35,7 @@ export const DefaultRPCRedisBrokerOptions = { * import { RPCRedisBroker } from '@discordjs/brokers'; * import Redis from 'ioredis'; * - * const broker = new RPCRedisBroker({ redisClient: new Redis() }); + * const broker = new RPCRedisBroker(new Redis()); * * console.log(await broker.call('testcall', 'Hello World!')); * await broker.destroy(); @@ -44,7 +44,7 @@ export const DefaultRPCRedisBrokerOptions = { * import { RPCRedisBroker } from '@discordjs/brokers'; * import Redis from 'ioredis'; * - * const broker = new RPCRedisBroker({ redisClient: new Redis() }); + * const broker = new RPCRedisBroker(new Redis()); * broker.on('testcall', ({ data, ack, reply }) => { * console.log('responder', data); * void ack(); @@ -65,8 +65,8 @@ export class RPCRedisBroker, TResponses exte protected readonly promises = new Map(); - public constructor(options: RPCRedisBrokerOptions) { - super(options); + public constructor(redisClient: Redis, options: RPCRedisBrokerOptions) { + super(redisClient, options); this.options = { ...DefaultRPCRedisBrokerOptions, ...options }; this.streamReadClient.on('messageBuffer', (channel: Buffer, message: Buffer) => { @@ -88,7 +88,7 @@ export class RPCRedisBroker, TResponses exte data: TEvents[Event], timeoutDuration: number = this.options.timeout, ): Promise { - const id = await this.options.redisClient.xadd( + const id = await this.redisClient.xadd( event as string, '*', BaseRedisBroker.STREAM_DATA_KEY, @@ -118,10 +118,10 @@ export class RPCRedisBroker, TResponses exte const payload: { ack(): Promise; data: unknown; reply(data: unknown): Promise } = { data, ack: async () => { - await this.options.redisClient.xack(event, group, id); + await this.redisClient.xack(event, group, id); }, reply: async (data) => { - await this.options.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data)); + await this.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data)); }, };