From 5a2221fa931a8827217969f25f447e04718dfcd2 Mon Sep 17 00:00:00 2001 From: ngot Date: Mon, 7 Aug 2023 11:29:56 +1000 Subject: [PATCH] feature-add-shard-support --- lib/index.ts | 39 ++++++++++++++++++++++++++++++++++----- lib/util.ts | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 lib/util.ts diff --git a/lib/index.ts b/lib/index.ts index 8bd29e1..ac4b83c 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -8,6 +8,7 @@ import type { EventsMap, TypedEventBroadcaster, } from "./typed-events"; +import { PUBLISH } from "./util"; const debug = debugModule("socket.io-emitter"); @@ -41,6 +42,11 @@ export interface EmitterOptions { * Defaults to notepack.io, a MessagePack implementation. */ parser?: Parser; + + /** + * Enable the sharded PubSub or not. Default to `false`. + */ + sharded?: false; } interface BroadcastOptions { @@ -48,6 +54,7 @@ interface BroadcastOptions { broadcastChannel: string; requestChannel: string; parser: Parser; + sharded: false; } interface BroadcastFlags { @@ -68,6 +75,7 @@ export class Emitter { { key: "socket.io", parser: msgpack, + sharded: false, }, opts ); @@ -76,6 +84,7 @@ export class Emitter { broadcastChannel: this.opts.key + "#" + nsp + "#", requestChannel: this.opts.key + "-request#" + nsp + "#", parser: this.opts.parser, + sharded: this.opts.sharded, }; } @@ -233,7 +242,12 @@ export class Emitter { data: args, }); - this.redisClient.publish(this.broadcastOptions.requestChannel, request); + PUBLISH( + this.redisClient, + this.broadcastOptions.requestChannel, + request, + this.opts.sharded + ); } } @@ -386,7 +400,7 @@ export class BroadcastOperator debug("publishing message to channel %s", channel); - this.redisClient.publish(channel, msg); + PUBLISH(this.redisClient, channel, msg, this.broadcastOptions.sharded); return true; } @@ -407,7 +421,12 @@ export class BroadcastOperator rooms: Array.isArray(rooms) ? rooms : [rooms], }); - this.redisClient.publish(this.broadcastOptions.requestChannel, request); + PUBLISH( + this.redisClient, + this.broadcastOptions.requestChannel, + request, + this.broadcastOptions.sharded + ); } /** @@ -426,7 +445,12 @@ export class BroadcastOperator rooms: Array.isArray(rooms) ? rooms : [rooms], }); - this.redisClient.publish(this.broadcastOptions.requestChannel, request); + PUBLISH( + this.redisClient, + this.broadcastOptions.requestChannel, + request, + this.broadcastOptions.sharded + ); } /** @@ -445,6 +469,11 @@ export class BroadcastOperator close, }); - this.redisClient.publish(this.broadcastOptions.requestChannel, request); + PUBLISH( + this.redisClient, + this.broadcastOptions.requestChannel, + request, + this.broadcastOptions.sharded + ); } } diff --git a/lib/util.ts b/lib/util.ts new file mode 100644 index 0000000..3726137 --- /dev/null +++ b/lib/util.ts @@ -0,0 +1,48 @@ +/** + * Whether the client comes from the `redis` package + * + * @param redisClient + * + * @see https://github.com/redis/node-redis + */ +function isRedisV4Client(redisClient: any) { + return typeof redisClient.sSubscribe === "function"; +} + +/** + * Whether sharded publish using `redis` or `iosredis` package + * @param redisClient + * @param channel + * @param payload + */ +export function SPUBLISH( + redisClient: any, + channel: string, + payload: string | Uint8Array +) { + if (isRedisV4Client(redisClient)) { + redisClient.sPublish(channel, payload); + } else { + redisClient.spublish(channel, payload); + } +} + +/** + * Whether publish in sharded mode. + * @param redisClient + * @param channel + * @param payload + * @param sharded + */ +export function PUBLISH( + redisClient: any, + channel: string, + payload: string | Uint8Array, + sharded: boolean +) { + if (sharded) { + SPUBLISH(redisClient, channel, payload); + } else { + redisClient.publish(channel, payload); + } +}