Skip to content

Commit

Permalink
feat: wait for ready state before resolving cluster.connect()
Browse files Browse the repository at this point in the history
BREAKING CHANGE:
`Cluster#connect()` will be resolved when the connection
status become `ready` instead of `connect`.
  • Loading branch information
shaharmor authored and luin committed Jun 29, 2018
1 parent 1babc13 commit 7517a73
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 24 deletions.
52 changes: 28 additions & 24 deletions lib/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,6 @@ Cluster.prototype.resetNodesRefreshInterval = function () {
* @public
*/
Cluster.prototype.connect = function () {
function readyHandler() {
this.setStatus('ready');
this.retryAttempts = 0;
this.executeOfflineCommands();
this.resetNodesRefreshInterval();
}

var Promise = PromiseContainer.get();
return new Promise(function (resolve, reject) {
if (this.status === 'connecting' || this.status === 'connect' || this.status === 'ready') {
Expand All @@ -156,6 +149,14 @@ Cluster.prototype.connect = function () {

this.connectionPool.reset(this.startupNodes);

function readyHandler() {
this.setStatus('ready');
this.retryAttempts = 0;
this.executeOfflineCommands();
this.resetNodesRefreshInterval();
resolve();
}

var closeListener;
var refreshListener = function () {
this.removeListener('close', closeListener);
Expand All @@ -164,7 +165,7 @@ Cluster.prototype.connect = function () {
if (this.options.enableReadyCheck) {
this._readyCheck(function (err, fail) {
if (err || fail) {
debug('Ready check failed (%s). Reconnecting...', err || fail)
debug('Ready check failed (%s). Reconnecting...', err || fail);
if (this.status === 'connect') {
this.disconnect(true);
}
Expand All @@ -175,7 +176,6 @@ Cluster.prototype.connect = function () {
} else {
readyHandler.call(this);
}
resolve();
};

closeListener = function () {
Expand Down Expand Up @@ -276,7 +276,7 @@ Cluster.prototype.quit = function (callback) {

var Promise = PromiseContainer.get();
if (status === 'wait') {
var ret = asCallback(Promise.resolve('OK'), callback)
var ret = asCallback(Promise.resolve('OK'), callback);

// use setImmediate to make sure "close" event
// being emitted after quit() is returned
Expand Down Expand Up @@ -530,9 +530,9 @@ Cluster.prototype.sendCommand = function (command, stream, node) {
if (typeof to === 'function') {
var nodes =
nodeKeys
.map(function (key) {
return _this.connectionPool.nodes.all[key];
});
.map(function (key) {
return _this.connectionPool.nodes.all[key];
});
redis = to(nodes, command);
if (Array.isArray(redis)) {
redis = utils.sample(redis);
Expand Down Expand Up @@ -603,7 +603,11 @@ Cluster.prototype.handleError = function (error, ttl, handlers) {
timeout: this.options.retryDelayOnClusterDown,
callback: this.refreshSlotsCache.bind(this)
});
} else if (error.message === utils.CONNECTION_CLOSED_ERROR_MSG && this.options.retryDelayOnFailover > 0 && this.status === 'ready') {
} else if (
error.message === utils.CONNECTION_CLOSED_ERROR_MSG &&
this.options.retryDelayOnFailover > 0 &&
this.status === 'ready'
) {
this.delayQueue.push('failover', handlers.connectionClosed, {
timeout: this.options.retryDelayOnFailover,
callback: this.refreshSlotsCache.bind(this)
Expand Down Expand Up @@ -683,16 +687,16 @@ Cluster.prototype._readyCheck = function (callback) {
};

['sscan', 'hscan', 'zscan', 'sscanBuffer', 'hscanBuffer', 'zscanBuffer']
.forEach(function (command) {
Cluster.prototype[command + 'Stream'] = function (key, options) {
return new ScanStream(_.defaults({
objectMode: true,
key: key,
redis: this,
command: command
}, options));
};
});
.forEach(function (command) {
Cluster.prototype[command + 'Stream'] = function (key, options) {
return new ScanStream(_.defaults({
objectMode: true,
key: key,
redis: this,
command: command
}, options));
};
});

require('../transaction').addTransactionSupport(Cluster.prototype);

Expand Down
29 changes: 29 additions & 0 deletions test/functional/cluster/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,34 @@ describe('cluster:connect', function () {
});
});

it('should wait for ready state before resolving', function (done) {
var slotTable = [
[0, 16383, ['127.0.0.1', 30001]]
];
var argvHandler = function (argv) {
if (argv[0] === 'info') {
// return 'role:master'
}
if (argv[0] === 'cluster' && argv[1] === 'slots') {
return slotTable;
}
if (argv[0] === 'cluster' && argv[1] === 'info') {
return 'cluster_state:ok';
}
};
var node = new MockServer(30001, argvHandler);

var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
], { lazyConnect: true });

cluster.connect().then(function () {
expect(cluster.status).to.eql('ready');
cluster.disconnect();
disconnect([node], done);
});
});

it('should support url schema', function (done) {
var node = new MockServer(30001);

Expand Down Expand Up @@ -249,6 +277,7 @@ describe('cluster:connect', function () {
expect(err.message).to.eql(errorMessage);
checkDone();
});

function checkDone() {
if (!--pending) {
cluster.disconnect();
Expand Down

0 comments on commit 7517a73

Please sign in to comment.