diff --git a/API.md b/API.md index d8b59175..cabb39c4 100644 --- a/API.md +++ b/API.md @@ -215,11 +215,13 @@ Creates a Redis Cluster instance | startupNodes | Array.<Object> | | An array of nodes in the cluster, [{ port: number, host: string }] | | options | Object | | | | [options.enableOfflineQueue] | boolean | true | See Redis class | +| [options.enableReadyCheck] | boolean | true | When enabled, ioredis only emits "ready" event when `CLUSTER INFO` command reporting the cluster is ready for handling commands. | | [options.scaleReads] | string | "master" | Scale reads to the node with the specified role. Available values are "master", "slave" and "all". | | [options.maxRedirections] | number | 16 | When a MOVED or ASK error is received, client will redirect the command to another node. This option limits the max redirections allowed to send a command. | | [options.clusterRetryStrategy] | function | | See "Quick Start" section | | [options.retryDelayOnFailover] | number | 100 | When an error is received when sending a command(e.g. "Connection is closed." when the target Redis node is down), | | [options.retryDelayOnClusterDown] | number | 100 | When a CLUSTERDOWN error is received, client will retry if `retryDelayOnClusterDown` is valid delay time. | +| [options.redisOptions] | Object | | Passed to the constructor of `Redis`. | ### cluster.connect() ⇒ Promise diff --git a/README.md b/README.md index 5146449c..b0a89b56 100644 --- a/README.md +++ b/README.md @@ -751,7 +751,7 @@ sub.subscribe('news', function () { Event | Description :------------- | :------------- connect | emits when a connection is established to the Redis server. -ready | emits immediately after `connect` event. +ready | emits when `CLUSTER INFO` reporting the cluster is able to receive commands (if `enableReadyCheck` is `true`) or immediately after `connect` event (if `enableReadyCheck` is false). error | emits when an error occurs while connecting with a property of `lastNodeError` representing the last node error received. This event is emitted silently (only emitting if there's at least one listener). close | emits when an established Redis server connection has closed. reconnecting | emits after `close` when a reconnection will be made. The argument of the event is the time (in ms) before reconnecting. diff --git a/lib/cluster/index.js b/lib/cluster/index.js index 7181a7f0..063ed601 100644 --- a/lib/cluster/index.js +++ b/lib/cluster/index.js @@ -21,6 +21,8 @@ var ConnectionPool = require('./connection_pool'); * @param {Object[]} startupNodes - An array of nodes in the cluster, [{ port: number, host: string }] * @param {Object} options * @param {boolean} [options.enableOfflineQueue=true] - See Redis class + * @param {boolean} [options.enableReadyCheck=true] - When enabled, ioredis only emits "ready" event when `CLUSTER INFO` + * command reporting the cluster is ready for handling commands. * @param {string} [options.scaleReads=master] - Scale reads to the node with the specified role. * Available values are "master", "slave" and "all". * @param {number} [options.maxRedirections=16] - When a MOVED or ASK error is received, client will redirect the @@ -30,6 +32,7 @@ var ConnectionPool = require('./connection_pool'); * "Connection is closed." when the target Redis node is down), * @param {number} [options.retryDelayOnClusterDown=100] - When a CLUSTERDOWN error is received, client will retry * if `retryDelayOnClusterDown` is valid delay time. + * @param {Object} [options.redisOptions] - Passed to the constructor of `Redis`. * @extends [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter) * @extends Commander */ @@ -103,10 +106,11 @@ function Cluster(startupNodes, options) { */ Cluster.defaultOptions = { maxRedirections: 16, + enableOfflineQueue: true, + enableReadyCheck: true, retryDelayOnFailover: 100, retryDelayOnClusterDown: 100, scaleReads: 'master', - enableOfflineQueue: true, clusterRetryStrategy: function (times) { return Math.min(100 + times * 2, 2000); } @@ -134,6 +138,12 @@ Cluster.prototype.resetClusterDownQueue = function () { * @public */ Cluster.prototype.connect = function () { + function readyHandler() { + this.setStatus('ready'); + this.retryAttempts = 0; + this.executeOfflineCommands(); + } + return new Promise(function (resolve, reject) { if (this.status === 'connecting' || this.status === 'connect' || this.status === 'ready') { reject(new Error('Redis is already connecting/connected')); @@ -146,11 +156,19 @@ Cluster.prototype.connect = function () { var closeListener; var refreshListener = function () { this.removeListener('close', closeListener); - this.retryAttempts = 0; this.manuallyClosing = false; this.setStatus('connect'); - this.setStatus('ready'); - this.executeOfflineCommands(); + if (this.options.enableReadyCheck) { + this._readyCheck(function (err, fail) { + if (err || fail) { + this.disconnect(true); + } else { + readyHandler.call(this); + } + }.bind(this)); + } else { + readyHandler.call(this); + } resolve(); }; @@ -444,7 +462,7 @@ Cluster.prototype.sendCommand = function (command, stream, node) { return; } var redis; - if (_this.status === 'ready') { + if (_this.status === 'ready' || (command.name === 'cluster')) { if (node && node.redis) { redis = node.redis; } else if (_.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, command.name) || @@ -457,7 +475,7 @@ Cluster.prototype.sendCommand = function (command, stream, node) { if (typeof to === 'function') { var nodes = nodeKeys - .map(function(key) { + .map(function (key) { return _this.connectionPool.nodes.all[key]; }); redis = to(nodes, command); @@ -582,6 +600,40 @@ Cluster.prototype.getInfoFromNode = function (redis, callback) { }, 1000)); }; +/** + * Check whether Cluster is able to process commands + * + * @param {Function} callback + * @private + */ +Cluster.prototype._readyCheck = function (callback) { + this.cluster('info', function (err, res) { + if (err) { + return callback(err); + } + if (typeof res !== 'string') { + return callback(); + } + + var state; + var lines = res.split('\r\n'); + for (var i = 0; i < lines.length; ++i) { + var parts = lines[i].split(':'); + if (parts[0] === 'cluster_state') { + state = parts[1]; + break; + } + } + + if (state === 'fail') { + debug('cluster state not ok (%s)', state); + callback(null, state); + } else { + callback(); + } + }); +}; + ['sscan', 'hscan', 'zscan', 'sscanBuffer', 'hscanBuffer', 'zscanBuffer'] .forEach(function (command) { Cluster.prototype[command + 'Stream'] = function (key, options) { diff --git a/test/functional/cluster.js b/test/functional/cluster.js index 18160a1b..dab34407 100644 --- a/test/functional/cluster.js +++ b/test/functional/cluster.js @@ -935,6 +935,37 @@ describe('cluster', function () { }); }); + describe('enableReadyCheck', function () { + it('should reconnect when cluster state is not ok', function (done) { + var state = 'fail'; + var server = new MockServer(30001, function (argv) { + if (argv[0] === 'cluster' && argv[1] === 'slots') { + return [ + [0, 16383, ['127.0.0.1', 30001]] + ]; + } else if (argv[0] === 'cluster' && argv[1] === 'info') { + return 'cluster_state:' + state; + } + }); + var count = 0; + var client = new Redis.Cluster([{ + host: '127.0.0.1', port: '30001' + }], { + clusterRetryStrategy: function (times) { + expect(++count).to.eql(times); + if (count === 3) { + state = 'ok'; + } + return 0; + } + }); + client.on('ready', function () { + client.disconnect(); + disconnect([server], done); + }); + }); + }); + describe('scaleReads', function () { beforeEach(function () { function handler(port, argv) { @@ -1010,12 +1041,11 @@ describe('cluster', function () { context('custom', function () { it('should send to selected slave', function (done) { var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], { - scaleReads: function(node, command) { + scaleReads: function (node, command) { if (command.name === 'get') { return node[1]; - } else { - return node[2]; } + return node[2]; } }); cluster.on('ready', function () { @@ -1035,12 +1065,11 @@ describe('cluster', function () { it('should send writes to masters', function (done) { var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], { - scaleReads: function(node, command) { + scaleReads: function (node, command) { if (command.name === 'get') { return node[1]; - } else { - return node[2]; } + return node[2]; } }); cluster.on('ready', function () {