Skip to content

Commit

Permalink
Merge branch 'refresh-slots' into v4
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Jun 23, 2018
2 parents 861eaf3 + b9d5a3e commit 229f264
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ but a few so that if one is unreachable the client will try the next one, and th
will resend the commands rejected with `TRYAGAIN` error after the specified time (in ms).
* `redisOptions`: Default options passed to the constructor of `Redis` when connecting to a node.
* `slotsRefreshTimeout`: Milliseconds before a timeout occurs while refreshing slots from the cluster (default `1000`)
* `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`)

### Read-write splitting

Expand Down
24 changes: 22 additions & 2 deletions lib/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var DelayQueue = require('./delay_queue');
* if `retryDelayOnTryAgain` is valid delay time.
* @param {number} [options.slotsRefreshTimeout=1000] - The milliseconds before a timeout occurs while refreshing
* slots from the cluster.
* @param {number} [options.slotsRefreshInterval=5000] - The milliseconds between every automatic slots refresh.
* @param {Object} [options.redisOptions] - Passed to the constructor of `Redis`.
* @extends [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
* @extends Commander
Expand Down Expand Up @@ -106,7 +107,8 @@ Cluster.defaultOptions = {
retryDelayOnFailover: 100,
retryDelayOnClusterDown: 100,
retryDelayOnTryAgain: 100,
slotsRefreshTimeout: 1000
slotsRefreshTimeout: 1000,
slotsRefreshInterval: 5000
};

util.inherits(Cluster, EventEmitter);
Expand All @@ -116,6 +118,15 @@ Cluster.prototype.resetOfflineQueue = function () {
this.offlineQueue = new Deque();
};

Cluster.prototype.resetNodesRefreshInterval = function () {
if (this.slotsTimer) {
return;
}
this.slotsTimer = setInterval(function() {
this.refreshSlotsCache();
}.bind(this), this.options.slotsRefreshInterval);
};

/**
* Connect to a cluster
*
Expand All @@ -127,6 +138,7 @@ Cluster.prototype.connect = function () {
this.setStatus('ready');
this.retryAttempts = 0;
this.executeOfflineCommands();
this.resetNodesRefreshInterval();
}

return new Promise(function (resolve, reject) {
Expand Down Expand Up @@ -225,6 +237,10 @@ Cluster.prototype.disconnect = function (reconnect) {
this.reconnectTimeout = null;
debug('Canceled reconnecting attempts');
}
if (this.slotsTimer) {
clearInterval(this.slotsTimer);
this.slotsTimer = null;
}

if (status === 'wait') {
this.setStatus('close');
Expand All @@ -251,6 +267,10 @@ Cluster.prototype.quit = function (callback) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
if (this.slotsTimer) {
clearInterval(this.slotsTimer);
this.slotsTimer = null;
}
if (status === 'wait') {
var ret = Promise.resolve('OK').nodeify(callback);

Expand Down Expand Up @@ -352,7 +372,7 @@ Cluster.prototype.setStatus = function (status) {
/**
* Refresh the slot cache
*
* @param {function} callback
* @param {function} [callback]
* @private
*/
Cluster.prototype.refreshSlotsCache = function (callback) {
Expand Down
35 changes: 35 additions & 0 deletions test/functional/cluster/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,39 @@ describe('cluster:connect', function () {
{ host: '127.0.0.1', port: '30002', password: null }
], { redisOptions: { lazyConnect: false, password: 'default password' } });
});

it('should discover other nodes automatically every slotsRefreshInterval', function (done) {
var times = 0;
var argvHandler = function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
times++;
if (times === 1) {
return [
[0, 5460, ['127.0.0.1', 30001]],
[5461, 10922, ['127.0.0.1', 30001]],
[10923, 16383, ['127.0.0.1', 30001]]
];
}

return [
[0, 5460, ['127.0.0.1', 30001]],
[5461, 10922, ['127.0.0.1', 30001]],
[10923, 16383, ['127.0.0.1', 30002]]
];
}
};
var node1 = new MockServer(30001, argvHandler);
var node2 = new MockServer(30002, argvHandler);

node1.once('connect', function() {
node2.once('connect', function () {
cluster.disconnect();
disconnect([node1, node2], done);
});
});

var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
], { slotsRefreshInterval: 100, redisOptions: { lazyConnect: false } });
});
});

0 comments on commit 229f264

Please sign in to comment.