From f92b2b6668e6895ebe68c39487ab951faafe1a6c Mon Sep 17 00:00:00 2001 From: shaharmor Date: Mon, 18 Jun 2018 18:10:47 +0300 Subject: [PATCH] refresh slots automatically every slotsRefreshInterval (default 5s) --- README.md | 1 + lib/cluster/index.js | 24 ++++++++++++++++++++++-- test/functional/cluster.js | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 831f9c19..9e68f5d9 100644 --- a/README.md +++ b/README.md @@ -744,6 +744,7 @@ but a few so that if one is unreachable the client will try the next one, and th will resend the commands rejected with `TRYAGAIN` error after the specified time (in ms). * `redisOptions`: Default options passed to the constructor of `Redis` when connecting to a node. * `slotsRefreshTimeout`: Milliseconds before a timeout occurs while refreshing slots from the cluster (default `1000`) + * `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`) ### Read-write splitting diff --git a/lib/cluster/index.js b/lib/cluster/index.js index c268c888..483d978d 100644 --- a/lib/cluster/index.js +++ b/lib/cluster/index.js @@ -37,6 +37,7 @@ var DelayQueue = require('./delay_queue'); * if `retryDelayOnTryAgain` is valid delay time. * @param {number} [options.slotsRefreshTimeout=1000] - The milliseconds before a timeout occurs while refreshing * slots from the cluster. + * @param {number} [options.slotsRefreshInterval=5000] - The milliseconds between every automatic slots refresh. * @param {Object} [options.redisOptions] - Passed to the constructor of `Redis`. * @extends [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter) * @extends Commander @@ -106,7 +107,8 @@ Cluster.defaultOptions = { retryDelayOnFailover: 100, retryDelayOnClusterDown: 100, retryDelayOnTryAgain: 100, - slotsRefreshTimeout: 1000 + slotsRefreshTimeout: 1000, + slotsRefreshInterval: 5000 }; util.inherits(Cluster, EventEmitter); @@ -116,6 +118,15 @@ Cluster.prototype.resetOfflineQueue = function () { this.offlineQueue = new Deque(); }; +Cluster.prototype.resetNodesRefreshInterval = function () { + if (this.slotsTimer) { + return; + } + this.slotsTimer = setInterval(function() { + this.refreshSlotsCache(); + }.bind(this), this.options.slotsRefreshInterval); +}; + /** * Connect to a cluster * @@ -127,6 +138,7 @@ Cluster.prototype.connect = function () { this.setStatus('ready'); this.retryAttempts = 0; this.executeOfflineCommands(); + this.resetNodesRefreshInterval(); } return new Promise(function (resolve, reject) { @@ -219,6 +231,10 @@ Cluster.prototype.disconnect = function (reconnect) { clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } + if (this.slotsTimer) { + clearInterval(this.slotsTimer); + this.slotsTimer = null; + } if (status === 'wait') { this.setStatus('close'); @@ -245,6 +261,10 @@ Cluster.prototype.quit = function (callback) { clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } + if (this.slotsTimer) { + clearInterval(this.slotsTimer); + this.slotsTimer = null; + } if (status === 'wait') { var ret = Promise.resolve('OK').nodeify(callback); @@ -346,7 +366,7 @@ Cluster.prototype.setStatus = function (status) { /** * Refresh the slot cache * - * @param {function} callback + * @param {function} [callback] * @private */ Cluster.prototype.refreshSlotsCache = function (callback) { diff --git a/test/functional/cluster.js b/test/functional/cluster.js index e2ef6fed..c85f0717 100644 --- a/test/functional/cluster.js +++ b/test/functional/cluster.js @@ -199,6 +199,41 @@ describe('cluster', function () { } }); + it('should discover other nodes automatically every slotsRefreshInterval', function (done) { + var times = 0; + var argvHandler = function (argv) { + if (argv[0] === 'cluster' && argv[1] === 'slots') { + times++; + if (times === 1) { + return [ + [0, 5460, ['127.0.0.1', 30001]], + [5461, 10922, ['127.0.0.1', 30001]], + [10923, 16383, ['127.0.0.1', 30001]] + ]; + } + + return [ + [0, 5460, ['127.0.0.1', 30001]], + [5461, 10922, ['127.0.0.1', 30001]], + [10923, 16383, ['127.0.0.1', 30002]] + ]; + } + }; + var node1 = new MockServer(30001, argvHandler); + var node2 = new MockServer(30002, argvHandler); + + node1.once('connect', function() { + node2.once('connect', function () { + cluster.disconnect(); + disconnect([node1, node2], done); + }); + }); + + var cluster = new Redis.Cluster([ + { host: '127.0.0.1', port: '30001' } + ], { slotsRefreshInterval: 100, redisOptions: { lazyConnect: false } }); + }); + it('should send command to the correct node', function (done) { var node1 = new MockServer(30001, function (argv) { if (argv[0] === 'cluster' && argv[1] === 'slots') {