diff --git a/lib/Command.ts b/lib/Command.ts index 14eecfa3..4bec6346 100644 --- a/lib/Command.ts +++ b/lib/Command.ts @@ -42,15 +42,17 @@ export interface CommandNameFlags { "psubscribe", "unsubscribe", "punsubscribe", + "ssubscribe", + "sunsubscribe", "ping", "quit" ]; // Commands that are valid in monitor mode VALID_IN_MONITOR_MODE: ["monitor", "auth"]; // Commands that will turn current connection into subscriber mode - ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe"]; + ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe", "ssubscribe"]; // Commands that may make current connection quit subscriber mode - EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe"]; + EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe", "sunsubscribe"]; // Commands that will make client disconnect from server TODO shutdown? WILL_DISCONNECT: ["quit"]; } @@ -84,12 +86,14 @@ export default class Command implements Respondable { "psubscribe", "unsubscribe", "punsubscribe", + "ssubscribe", + "sunsubscribe", "ping", "quit", ], VALID_IN_MONITOR_MODE: ["monitor", "auth"], - ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe"], - EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe"], + ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe", "ssubscribe"], + EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe", "sunsubscribe"], WILL_DISCONNECT: ["quit"], }; diff --git a/lib/DataHandler.ts b/lib/DataHandler.ts index 9cc1c1ed..edf1f0ad 100644 --- a/lib/DataHandler.ts +++ b/lib/DataHandler.ts @@ -143,6 +143,18 @@ export default class DataHandler { this.redis.emit("pmessageBuffer", pattern, reply[2], reply[3]); break; } + case "smessage": { + if (this.redis.listeners("smessage").length > 0) { + this.redis.emit( + "smessage", + reply[1].toString(), + reply[2] ? reply[2].toString() : "" + ); + } + this.redis.emit("smessageBuffer", reply[1], reply[2]); + break; + } + case "ssubscribe": case "subscribe": case "psubscribe": { const channel = reply[1].toString(); @@ -156,6 +168,7 @@ export default class DataHandler { } break; } + case "sunsubscribe": case "unsubscribe": case "punsubscribe": { const channel = reply[1] ? reply[1].toString() : null; diff --git a/lib/SubscriptionSet.ts b/lib/SubscriptionSet.ts index 49bdad4d..98a92c62 100644 --- a/lib/SubscriptionSet.ts +++ b/lib/SubscriptionSet.ts @@ -10,6 +10,7 @@ export default class SubscriptionSet { private set: { [key: string]: { [channel: string]: boolean } } = { subscribe: {}, psubscribe: {}, + ssubscribe: {}, }; add(set: AddSet, channel: string) { @@ -27,7 +28,8 @@ export default class SubscriptionSet { isEmpty(): boolean { return ( this.channels("subscribe").length === 0 && - this.channels("psubscribe").length === 0 + this.channels("psubscribe").length === 0 && + this.channels("ssubscribe").length === 0 ); } } @@ -39,5 +41,8 @@ function mapSet(set: AddSet | DelSet): AddSet { if (set === "punsubscribe") { return "psubscribe"; } + if (set === "sunsubscribe") { + return "ssubscribe"; + } return set; } diff --git a/lib/cluster/ClusterSubscriber.ts b/lib/cluster/ClusterSubscriber.ts index 09e8da36..e0ecd2e5 100644 --- a/lib/cluster/ClusterSubscriber.ts +++ b/lib/cluster/ClusterSubscriber.ts @@ -64,7 +64,9 @@ export default class ClusterSubscriber { private onSubscriberEnd = () => { if (!this.started) { - debug("subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting."); + debug( + "subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting." + ); return; } // If the subscriber closes whilst it's still the active connection, @@ -72,7 +74,7 @@ export default class ClusterSubscriber { // minimise the number of missed publishes. debug("subscriber has disconnected, selecting a new one..."); this.selectSubscriber(); - } + }; private selectSubscriber() { const lastActiveSubscriber = this.lastActiveSubscriber; @@ -122,7 +124,7 @@ export default class ClusterSubscriber { // Don't try to reconnect the subscriber connection. If the connection fails // we will get an end event (handled below), at which point we'll pick a new // node from the pool and try to connect to that as the subscriber connection. - retryStrategy: null + retryStrategy: null, }); // Ignore the errors since they're handled in the connection pool. @@ -136,7 +138,7 @@ export default class ClusterSubscriber { this.subscriber.once("end", this.onSubscriberEnd); // Re-subscribe previous channels - const previousChannels = { subscribe: [], psubscribe: [] }; + const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] }; if (lastActiveSubscriber) { const condition = lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition; @@ -144,14 +146,17 @@ export default class ClusterSubscriber { previousChannels.subscribe = condition.subscriber.channels("subscribe"); previousChannels.psubscribe = condition.subscriber.channels("psubscribe"); + previousChannels.ssubscribe = + condition.subscriber.channels("ssubscribe"); } } if ( previousChannels.subscribe.length || - previousChannels.psubscribe.length + previousChannels.psubscribe.length || + previousChannels.ssubscribe.length ) { let pending = 0; - for (const type of ["subscribe", "psubscribe"]) { + for (const type of ["subscribe", "psubscribe", "ssubscribe"]) { const channels = previousChannels[type]; if (channels.length) { pending += 1; @@ -171,7 +176,12 @@ export default class ClusterSubscriber { } else { this.lastActiveSubscriber = this.subscriber; } - for (const event of ["message", "messageBuffer"]) { + for (const event of [ + "message", + "messageBuffer", + "smessage", + "smessageBuffer", + ]) { this.subscriber.on(event, (arg1, arg2) => { this.emitter.emit(event, arg1, arg2); }); diff --git a/lib/redis/event_handler.ts b/lib/redis/event_handler.ts index 82470e80..402c31c7 100644 --- a/lib/redis/event_handler.ts +++ b/lib/redis/event_handler.ts @@ -283,6 +283,11 @@ export function readyHandler(self) { debug("psubscribe %d channels", psubscribeChannels.length); self.psubscribe(psubscribeChannels); } + const ssubscribeChannels = condition.subscriber.channels("ssubscribe"); + if (ssubscribeChannels.length) { + debug("ssubscribe %d channels", ssubscribeChannels.length); + self.ssubscribe(ssubscribeChannels); + } } } diff --git a/test/functional/cluster/spub_ssub.ts b/test/functional/cluster/spub_ssub.ts new file mode 100644 index 00000000..b2b1a480 --- /dev/null +++ b/test/functional/cluster/spub_ssub.ts @@ -0,0 +1,108 @@ +import MockServer, { getConnectionName } from "../../helpers/mock_server"; +import { expect } from "chai"; +import { Cluster } from "../../../lib"; +import * as sinon from "sinon"; +import Redis from "../../../lib/Redis"; +import { noop } from "../../../lib/utils"; + +describe("cluster:spub/ssub", function () { + it("should receive messages", (done) => { + const handler = function (argv) { + if (argv[0] === "cluster" && argv[1] === "SLOTS") { + return [ + [0, 1, ["127.0.0.1", 30001]], + [2, 16383, ["127.0.0.1", 30002]], + ]; + } + }; + const node1 = new MockServer(30001, handler); + new MockServer(30002, handler); + + const options = [{ host: "127.0.0.1", port: "30001" }]; + const ssub = new Cluster(options); + + ssub.ssubscribe("test cluster", function () { + node1.write(node1.findClientByName("ioredis-cluster(subscriber)"), [ + "smessage", + "test shard channel", + "hi", + ]); + }); + ssub.on("smessage", function (channel, message) { + expect(channel).to.eql("test shard channel"); + expect(message).to.eql("hi"); + ssub.disconnect(); + done(); + }); + }); + + it("should works when sending regular commands", (done) => { + const handler = function (argv) { + if (argv[0] === "cluster" && argv[1] === "SLOTS") { + return [[0, 16383, ["127.0.0.1", 30001]]]; + } + }; + new MockServer(30001, handler); + + const ssub = new Cluster([{ port: "30001" }]); + + ssub.ssubscribe("test cluster", function () { + ssub.set("foo", "bar").then((res) => { + expect(res).to.eql("OK"); + ssub.disconnect(); + done(); + }); + }); + }); + + it("supports password", (done) => { + const handler = function (argv, c) { + if (argv[0] === "auth") { + c.password = argv[1]; + return; + } + if (argv[0] === "ssubscribe") { + expect(c.password).to.eql("abc"); + expect(getConnectionName(c)).to.eql("ioredis-cluster(subscriber)"); + } + if (argv[0] === "cluster" && argv[1] === "SLOTS") { + return [[0, 16383, ["127.0.0.1", 30001]]]; + } + }; + new MockServer(30001, handler); + + const ssub = new Redis.Cluster([{ port: "30001", password: "abc" }]); + + ssub.ssubscribe("test cluster", function () { + ssub.disconnect(); + done(); + }); + }); + + it("should re-ssubscribe after reconnection", (done) => { + new MockServer(30001, function (argv) { + if (argv[0] === "cluster" && argv[1] === "SLOTS") { + return [[0, 16383, ["127.0.0.1", 30001]]]; + } else if (argv[0] === "ssubscribe" || argv[0] === "psubscribe") { + return [argv[0], argv[1]]; + } + }); + const client = new Cluster([{ host: "127.0.0.1", port: "30001" }]); + + client.ssubscribe("test cluster", function () { + const stub = sinon + .stub(Redis.prototype, "ssubscribe") + .callsFake((channels) => { + expect(channels).to.eql(["test cluster"]); + stub.restore(); + client.disconnect(); + done(); + return Redis.prototype.ssubscribe.apply(this, arguments); + }); + client.once("end", function () { + client.connect().catch(noop); + }); + client.disconnect(); + }); + }); +}); diff --git a/test/functional/spub_ssub.ts b/test/functional/spub_ssub.ts new file mode 100644 index 00000000..138430e2 --- /dev/null +++ b/test/functional/spub_ssub.ts @@ -0,0 +1,146 @@ +import Redis from "../../lib/Redis"; +import { expect } from "chai"; + +describe("spub/ssub", function () { + it("should invoke the callback when subscribe successfully", (done) => { + const redis = new Redis(); + let pending = 1; + redis.ssubscribe("foo", "bar", function (err, count) { + expect(count).to.eql(2); + pending -= 1; + }); + redis.ssubscribe("foo", "zoo", function (err, count) { + expect(count).to.eql(3); + expect(pending).to.eql(0); + redis.disconnect(); + done(); + }); + }); + + it("should reject when issue a command in the subscriber mode", (done) => { + const redis = new Redis(); + redis.ssubscribe("foo", function () { + redis.set("foo", "bar", function (err) { + expect(err instanceof Error); + expect(err.message).to.match(/subscriber mode/); + redis.disconnect(); + done(); + }); + }); + }); + + it("should report being in 'subscriber' mode when subscribed", (done) => { + const redis = new Redis(); + redis.ssubscribe("foo", function () { + expect(redis.mode).to.equal("subscriber"); + redis.disconnect(); + done(); + }); + }); + + it("should exit subscriber mode using sunsubscribe", (done) => { + const redis = new Redis(); + redis.ssubscribe("foo", "bar", function () { + redis.sunsubscribe("foo", "bar", function (err, count) { + expect(count).to.eql(0); + redis.set("foo", "bar", function (err) { + expect(err).to.eql(null); + + redis.ssubscribe("zoo", "foo", function () { + redis.sunsubscribe(function (err, count) { + expect(count).to.eql(0); + redis.set("foo", "bar", function (err) { + expect(err).to.eql(null); + redis.disconnect(); + done(); + }); + }); + }); + }); + }); + }); + }); + + it("should report being in 'normal' mode after sunsubscribing", (done) => { + const redis = new Redis(); + redis.ssubscribe("foo", "bar", function () { + redis.sunsubscribe("foo", "bar", function (err, count) { + expect(redis.mode).to.equal("normal"); + redis.disconnect(); + done(); + }); + }); + }); + + it("should receive messages when subscribe a shard channel", (done) => { + const redis = new Redis(); + const pub = new Redis(); + let pending = 2; + redis.ssubscribe("foo", function () { + pub.spublish("foo", "bar"); + }); + redis.on("smessage", function (channel, message) { + expect(channel).to.eql("foo"); + expect(message).to.eql("bar"); + if (!--pending) { + redis.disconnect(); + done(); + } + }); + redis.on("smessageBuffer", function (channel, message) { + expect(channel).to.be.instanceof(Buffer); + expect(channel.toString()).to.eql("foo"); + expect(message).to.be.instanceof(Buffer); + expect(message.toString()).to.eql("bar"); + if (!--pending) { + redis.disconnect(); + done(); + } + }); + }); + + it("should be able to send quit command in the subscriber mode", (done) => { + const redis = new Redis(); + let pending = 1; + redis.ssubscribe("foo", function () { + redis.quit(function () { + pending -= 1; + }); + }); + redis.on("end", function () { + expect(pending).to.eql(0); + redis.disconnect(); + done(); + }); + }); + + // TODO ready reconnect in redis stand + it("should restore subscription after reconnecting(ssubscribe)", (done) => { + const redis = new Redis({ port: 6379, host: "127.0.0.1" }); + const pub = new Redis({ port: 6379, host: "127.0.0.1" }); + // redis.ping(function (err, result) { + // // redis.on("message", function (channel, message) { + // console.log(`${err}-${result}`); + // // }); + // }); + redis.ssubscribe("foo", "bar", function () { + redis.on("ready", function () { + // Execute a random command to make sure that `subscribe` + // is sent + redis.ping(function () { + let pending = 2; + redis.on("smessage", function (channel, message) { + if (!--pending) { + redis.disconnect(); + pub.disconnect(); + done(); + } + }); + pub.spublish("foo", "hi1"); + pub.spublish("bar", "hi2"); + }); + }); + redis.disconnect(true); + }); + }); +});