diff --git a/API.md b/API.md index d4211785..477f6c46 100644 --- a/API.md +++ b/API.md @@ -2,8 +2,6 @@
[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
Default options
Default options
-[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
@@ -190,81 +185,6 @@ Define a custom command using lua script
Create a Redis instance
**Kind**: static method of [Redis](#Redis)
-
-## Cluster ⇐ [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
-**Kind**: global class
-**Extends:** [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
, [Commander](#Commander)
-
-* [Cluster](#Cluster) ⇐ [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
- * [new Cluster(startupNodes, options)](#new_Cluster_new)
- * [.disconnect()](#Cluster+disconnect)
- * [.getBuiltinCommands()](#Commander+getBuiltinCommands) ⇒ Array.<string>
- * [.createBuiltinCommand(commandName)](#Commander+createBuiltinCommand) ⇒ object
- * [.defineCommand(name, definition)](#Commander+defineCommand)
- * *[.sendCommand()](#Commander+sendCommand)*
-
-
-### new Cluster(startupNodes, options)
-Creates a Redis Cluster instance
-
-
-| Param | Type | Default | Description |
-| --- | --- | --- | --- |
-| startupNodes | Array.<Object>
| | An array of nodes in the cluster, [{ port: number, host: string }] |
-| options | Object
| | |
-| [options.enableOfflineQueue] | boolean
| true
| See Redis class |
-| [options.lazyConnect] | boolean
| false
| See Redis class |
-| [options.readOnly] | boolean
| false
| Connect in READONLY mode |
-| [options.maxRedirections] | number
| 16
| When a MOVED or ASK error is received, client will redirect the command to another node. This option limits the max redirections allowed to send a command. |
-| [options.clusterRetryStrategy] | function
| | See "Quick Start" section |
-| [options.retryDelayOnFailover] | number
| 2000
| When an error is received when sending a command(e.g. "Connection is closed." when the target Redis node is down), |
-| [options.retryDelayOnClusterDown] | number
| 1000
| When a CLUSTERDOWN error is received, client will retry if `retryDelayOnClusterDown` is valid delay time. |
-
-
-### cluster.disconnect()
-Disconnect from every node in the cluster.
-
-**Kind**: instance method of [Cluster](#Cluster)
-**Access:** public
-
-### cluster.getBuiltinCommands() ⇒ Array.<string>
-Return supported builtin commands
-
-**Kind**: instance method of [Cluster](#Cluster)
-**Returns**: Array.<string>
- command list
-**Access:** public
-
-### cluster.createBuiltinCommand(commandName) ⇒ object
-Create a builtin command
-
-**Kind**: instance method of [Cluster](#Cluster)
-**Returns**: object
- functions
-**Access:** public
-
-| Param | Type | Description |
-| --- | --- | --- |
-| commandName | string
| command name |
-
-
-### cluster.defineCommand(name, definition)
-Define a custom command using lua script
-
-**Kind**: instance method of [Cluster](#Cluster)
-
-| Param | Type | Default | Description |
-| --- | --- | --- | --- |
-| name | string
| | the command name |
-| definition | object
| | |
-| definition.lua | string
| | the lua code |
-| [definition.numberOfKeys] | number
|
| the number of keys. If omit, you have to pass the number of keys as the first argument every time you invoke the command |
-
-
-### *cluster.sendCommand()*
-Send a command
-
-**Kind**: instance abstract method of [Cluster](#Cluster)
-**Overrides:** [sendCommand](#Commander+sendCommand)
-**Access:** public
## Commander
**Kind**: global class
@@ -331,9 +251,3 @@ Default options
**Kind**: global variable
**Access:** protected
-
-## defaultOptions
-Default options
-
-**Kind**: global variable
-**Access:** protected
diff --git a/lib/cluster/connection_pool.js b/lib/cluster/connection_pool.js
new file mode 100644
index 00000000..26de8287
--- /dev/null
+++ b/lib/cluster/connection_pool.js
@@ -0,0 +1,89 @@
+'use strict';
+
+var util = require('util');
+var EventEmitter = require('events').EventEmitter;
+var _ = require('lodash');
+var Redis = require('../redis');
+
+function ConnectionPool(redisOptions) {
+ EventEmitter.call(this);
+ this.redisOptions = redisOptions;
+
+ // this.masters + this.slaves = this.nodes
+ this.nodes = {};
+ this.masters = {};
+ this.slaves = {};
+
+ this.specifiedOptions = {};
+}
+
+util.inherits(ConnectionPool, EventEmitter);
+
+ConnectionPool.prototype.findOrCreate = function (node, readOnly) {
+ node.port = node.port || 6379;
+ node.host = node.host || '127.0.0.1';
+ node.key = node.key || node.host + ':' + node.port;
+
+ if (this.specifiedOptions[node.key]) {
+ _.assign(node, this.specifiedOptions[node.key]);
+ } else {
+ this.specifiedOptions[node.key] = node;
+ }
+
+ if (this.nodes[node.key] && this.nodes[node.key].options.readOnly !== readOnly) {
+ this.remove(node.key);
+ }
+
+ if (!this.nodes[node.key]) {
+ var redis = this.nodes[node.key] = new Redis(_.defaults({
+ retryStrategy: null,
+ enableOfflineQueue: true,
+ readOnly: readOnly
+ }, node, this.redisOptions, { lazyConnect: true }));
+ this[readOnly ? 'slaves' : 'masters'][node.key] = redis;
+
+ var _this = this;
+ redis.once('end', function () {
+ delete _this.nodes[node.key];
+ delete _this.masters[node.key];
+ delete _this.slaves[node.key];
+ _this.emit('-node', redis);
+ if (!Object.keys(_this.nodes).length) {
+ _this.emit('drain');
+ }
+ });
+
+ this.emit('+node', redis);
+ }
+
+ return this.nodes[node.key];
+};
+
+ConnectionPool.prototype.remove = function (key) {
+ if (this.nodes[key]) {
+ this.nodes[key].disconnect();
+ delete this.nodes[key];
+ delete this.masters[key];
+ delete this.slaves[key];
+ }
+};
+
+ConnectionPool.prototype.reset = function (nodes) {
+ var newNodes = {};
+ for (var i = 0; i < nodes.length; i++) {
+ var node = nodes[i];
+ node.key = node.host + ':' + node.port;
+ newNodes[node.key] = node;
+ }
+ var _this = this;
+ Object.keys(this.nodes).forEach(function (key) {
+ if (!newNodes[key]) {
+ _this.remove(key);
+ }
+ });
+ Object.keys(newNodes).forEach(function (key) {
+ _this.findOrCreate(newNodes[key], newNodes[key].readOnly);
+ });
+};
+
+module.exports = ConnectionPool;
diff --git a/lib/cluster.js b/lib/cluster/index.js
similarity index 67%
rename from lib/cluster.js
rename to lib/cluster/index.js
index bbc9b974..4759919d 100644
--- a/lib/cluster.js
+++ b/lib/cluster/index.js
@@ -2,15 +2,17 @@
var Promise = require('bluebird');
var Deque = require('double-ended-queue');
-var Redis = require('./redis');
-var utils = require('./utils');
+var Redis = require('../redis');
+var utils = require('../utils');
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var debug = require('debug')('ioredis:cluster');
var _ = require('lodash');
-var ScanStream = require('./scan_stream');
-var Commander = require('./commander');
-var Command = require('./command');
+var ScanStream = require('../scan_stream');
+var Commander = require('../commander');
+var Command = require('../command');
+var commands = require('../../commands');
+var ConnectionPool = require('./connection_pool');
/**
* Creates a Redis Cluster instance
@@ -20,7 +22,8 @@ var Command = require('./command');
* @param {Object} options
* @param {boolean} [options.enableOfflineQueue=true] - See Redis class
* @param {boolean} [options.lazyConnect=false] - See Redis class
- * @param {boolean} [options.readOnly=false] - Connect in READONLY mode
+ * @param {string} [options.scaleReads="masters"] - Scale reads to the node with the specified role.
+ * Available values are "masters", "slaves" and "all".
* @param {number} [options.maxRedirections=16] - When a MOVED or ASK error is received, client will redirect the
* command to another node. This option limits the max redirections allowed to send a command.
* @param {function} [options.clusterRetryStrategy] - See "Quick Start" section
@@ -35,9 +38,19 @@ function Cluster(startupNodes, options) {
EventEmitter.call(this);
Commander.call(this);
+ this.options = _.defaults(this.options, options, Cluster.defaultOptions);
+
+ // validate options
+ if (['all', 'masters', 'slaves'].indexOf(this.options.scaleReads) === -1) {
+ throw new Error('Invalid option scaleReads "' + this.options.scaleReads +
+ '". Expected "all", "masters" or "slaves"');
+ }
+
if (!Array.isArray(startupNodes) || startupNodes.length === 0) {
throw new Error('`startupNodes` should contain at least one node.');
}
+
+ this.connectionPool = new ConnectionPool(_.defaults({}, this.options.redisOptions, Redis.defaultOptions));
this.startupNodes = startupNodes.map(function (node) {
var options = {};
if (typeof node === 'object') {
@@ -56,12 +69,23 @@ function Cluster(startupNodes, options) {
return options;
});
- this.nodes = {};
- this.masterNodes = {};
+ var _this = this;
+ this.connectionPool.on('-node', function (redis) {
+ _this.emit('-node');
+ if (_this.subscriber === redis) {
+ _this.selectSubscriber();
+ }
+ });
+ this.connectionPool.on('+node', function (redis) {
+ _this.emit('+node', redis);
+ });
+ this.connectionPool.on('drain', function () {
+ _this.setStatus('close');
+ });
+
this.slots = [];
this.retryAttempts = 0;
- this.options = _.defaults({}, options || {}, this.options || {}, Cluster.defaultOptions);
this.resetOfflineQueue();
this.resetFailoverQueue();
@@ -78,15 +102,16 @@ function Cluster(startupNodes, options) {
* @var defaultOptions
* @protected
*/
-Cluster.defaultOptions = _.assign({}, Redis.defaultOptions, {
+Cluster.defaultOptions = {
maxRedirections: 16,
retryDelayOnFailover: 2000,
retryDelayOnClusterDown: 1000,
- readOnly: false,
+ scaleReads: 'masters',
+ enableOfflineQueue: true,
clusterRetryStrategy: function (times) {
return Math.min(100 + times * 2, 2000);
}
-});
+};
util.inherits(Cluster, EventEmitter);
_.assign(Cluster.prototype, Commander.prototype);
@@ -111,6 +136,8 @@ Cluster.prototype.connect = function () {
}
this.setStatus('connecting');
+ this.connectionPool.reset(this.startupNodes);
+
var closeListener;
var refreshListener = function () {
this.removeListener('close', closeListener);
@@ -148,15 +175,12 @@ Cluster.prototype.connect = function () {
}
});
- this.startupNodes.forEach(function (options) {
- this.createNode(options.port, options.host);
- }, this);
this.refreshSlotsCache(function (err) {
if (err && err.message === 'Failed to refresh slots cache.') {
Redis.prototype.silentEmit.call(this, 'error', err);
- var keys = Object.keys(this.nodes);
+ var keys = Object.keys(this.connectionPool.nodes);
for (var i = 0; i < keys.length; ++i) {
- this.nodes[keys[i]].disconnect();
+ this.connectionPool.nodes[keys[i]].disconnect();
}
}
}.bind(this));
@@ -177,81 +201,24 @@ Cluster.prototype.disconnect = function (reconnect) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
- var keys = Object.keys(this.nodes);
- for (var i = 0; i < keys.length; ++i) {
- this.nodes[keys[i]].disconnect();
- }
+ this.nodes().forEach(function (node) {
+ node.disconnect();
+ });
};
-/**
- * Create a connection and add it to the connection list
- *
- * @param {number} port
- * @param {string} host
- * @return {Redis} A redis instance
- * @private
- */
-Cluster.prototype.createNode = function (port, host) {
- var nodeOpt = _.defaults({
- port: port,
- host: host || '127.0.0.1',
- retryStrategy: null
- }, Redis.defaultOptions);
-
- var key = nodeOpt.host + ':' + nodeOpt.port;
-
- if (!this.nodes[key]) {
- // Fetch password from startupNodes option
- delete nodeOpt.password;
- for (var i = 0; i < this.startupNodes.length; i++) {
- var node = this.startupNodes[i];
- if (node.port === nodeOpt.port && node.host === nodeOpt.host) {
- nodeOpt.password = node.password;
- break;
- }
- }
-
- this.nodes[key] = new Redis(_.assign({}, this.options, nodeOpt));
-
- var _this = this;
- if (this.options.readOnly) {
- this.nodes[key].once('ready', function () {
- debug('sending readonly to %s', key);
- _this.nodes[key].readonly();
- });
- }
- this.nodes[key].once('end', function () {
- var deadNode = _this.nodes[key];
- delete _this.nodes[key];
- delete _this.masterNodes[key];
- if (_this.subscriber === deadNode) {
- _this.selectSubscriber();
- }
- if (Object.keys(_this.nodes).length === 0) {
- _this.setStatus('close');
- }
- });
+Cluster.prototype.nodes = function (type) {
+ if (!type) {
+ type = 'all';
}
-
- return this.nodes[key];
-};
-
-Cluster.prototype.selectRandomMasterNode = function () {
- return this.nodes[_.sample(Object.keys(this.masterNodes))];
-};
-
-Cluster.prototype.selectRandomNode = function () {
- var keys = Object.keys(this.nodes);
- return (keys.length > 0) ? this.nodes[_.sample(keys)] : null;
-};
-
-Cluster.prototype.selectRandomNodeForSlot = function (targetSlot) {
- return _.sample(this.slots[targetSlot].allNodes);
+ if (type !== 'all' && type !== 'masters' && type !== 'slaves') {
+ throw new Error('Invalid type "' + type + '". Expected "all", "masters" or "slaves"');
+ }
+ return _.values(this.connectionPool[type === 'all' ? 'nodes' : type]);
};
Cluster.prototype.selectSubscriber = function () {
- this.subscriber = this.selectRandomNode();
- if (this.subscriber === null) {
+ this.subscriber = _.sample(this.connectionPool.nodes);
+ if (!this.subscriber) {
return;
}
// Re-subscribe previous channels
@@ -319,7 +286,7 @@ Cluster.prototype.refreshSlotsCache = function (callback) {
}
};
- var keys = _.shuffle(Object.keys(this.nodes));
+ var keys = _.shuffle(Object.keys(this.connectionPool.nodes));
var lastNodeError = null;
@@ -330,7 +297,7 @@ Cluster.prototype.refreshSlotsCache = function (callback) {
return wrapper(error);
}
debug('getting slot cache from %s', keys[index]);
- _this.getInfoFromNode(_this.nodes[keys[index]], function (err) {
+ _this.getInfoFromNode(_this.connectionPool.nodes[keys[index]], function (err) {
if (_this.status === 'end') {
return wrapper(new Error('Cluster is disconnected.'));
}
@@ -398,87 +365,47 @@ Cluster.prototype.executeClusterDownCommands = function () {
}
};
-Cluster.prototype.to = function (name) {
- var fnName = '_select' + name[0].toUpperCase() + name.slice(1);
- if (typeof this[fnName] !== 'function') {
- // programmatic error, can't happen in prod, so throw
- throw new Error('to ' + name + ' is not a valid group of nodes');
- }
-
- // could be 0 nodes just as well
- var nodes = this[fnName]();
- return {
- nodes: nodes,
- call: this._generateCallNodes(nodes, 'call'),
- callBuffer: this._generateCallNodes(nodes, 'callBuffer')
- };
-};
-
-Cluster.prototype._generateCallNodes = function (nodes, op, _opts) {
- var opts = _opts || {};
-
- return function callNode() {
- var argLength = arguments.length;
- var hasCb = typeof arguments[argLength - 1] === 'function';
- var args = new Array(argLength);
- for (var i = 0; i < argLength; ++i) {
- args[i] = arguments[i];
- }
-
- var callback = hasCb ? args.pop() : null;
- var promise = Promise.map(nodes, function (node) {
- return node[op].apply(node, args);
- }, opts);
-
- if (callback) {
- return promise.nodeify(callback);
- }
-
- return promise;
- };
-};
-
-Cluster.prototype._selectAll = function () {
- return _.values(this.nodes);
-};
-
-Cluster.prototype._selectMasters = function () {
- return _.values(this.masterNodes);
-};
-
-Cluster.prototype._selectSlaves = function () {
- return _.difference(this._selectAll(), this._selectMasters());
-};
-
Cluster.prototype.sendCommand = function (command, stream, node) {
if (this.status === 'end') {
command.reject(new Error('Connection is closed.'));
return command.promise;
}
+ var to = this.options.scaleReads;
+ if (to !== 'masters') {
+ var flags = commands[command.name] && commands[command.name].flags;
+ var isCommandReadOnly = false;
+ for (var i = 0; i < flags.length; i++) {
+ if (flags[i] === 'readonly') {
+ isCommandReadOnly = true;
+ break;
+ }
+ }
+ if (!isCommandReadOnly) {
+ to = 'masters';
+ }
+ }
var targetSlot = node ? node.slot : command.getSlot();
var ttl = {};
- var reject = command.reject;
var _this = this;
if (!node) {
+ var reject = command.reject;
+ var partialTry = _.partial(tryConnection, true);
command.reject = function (err) {
- var partialTry = _.partial(tryConnection, true);
-
_this.handleError(err, ttl, {
- moved: function (node, slot, hostPort) {
- debug('command %s is moved to %s:%s', command.name, hostPort[0], hostPort[1]);
- var coveredSlot = _this.slots[slot];
- if (!coveredSlot) {
- _this.slots[slot] = { masterNode: node, allNodes: [node] };
+ moved: function (slot, key) {
+ debug('command %s is moved to %s', command.name, key);
+ if (_this.slots[slot]) {
+ _this.slots[slot][0] = key;
} else {
- coveredSlot.masterNode = node;
+ _this.slots[slot] = [key];
}
tryConnection();
_this.refreshSlotsCache();
},
- ask: function (node, slot, hostPort) {
- debug('command %s is required to ask %s:%s', command.name, hostPort[0], hostPort[1]);
- tryConnection(false, node);
+ ask: function (slot, key) {
+ debug('command %s is required to ask %s:%s', command.name, key);
+ tryConnection(false, key);
},
clusterDown: partialTry,
connectionClosed: partialTry,
@@ -506,19 +433,27 @@ Cluster.prototype.sendCommand = function (command, stream, node) {
_.includes(Command.FLAGS.EXIT_SUBSCRIBER_MODE, command.name)) {
redis = _this.subscriber;
} else {
- if (typeof targetSlot === 'number' && _this.slots[targetSlot]) {
- if (_this.options.readOnly) {
- redis = _this.selectRandomNodeForSlot(targetSlot);
- } else {
- redis = _this.slots[targetSlot].masterNode;
+ if (!random) {
+ if (typeof targetSlot === 'number' && _this.slots[targetSlot]) {
+ var nodeKeys = _this.slots[targetSlot];
+ var key;
+ if (to === 'all') {
+ key = utils.sample(nodeKeys);
+ } else if (to === 'slaves' && nodeKeys.length > 1) {
+ key = utils.sample(nodeKeys, 1);
+ } else {
+ key = nodeKeys[0];
+ }
+ redis = _this.connectionPool.nodes[key];
+ }
+ if (asking) {
+ redis = _this.connectionPool.nodes[asking];
+ redis.asking();
}
}
- if (asking && !random) {
- redis = asking;
- redis.asking();
- }
- if (random || !redis) {
- redis = _this.selectRandomMasterNode();
+ if (!redis) {
+ redis = _.sample(_this.connectionPool[to === 'all' ? 'nodes' : to]) ||
+ _.sample(_this.connectionPool.nodes);
}
}
if (node && !node.redis) {
@@ -553,13 +488,7 @@ Cluster.prototype.handleError = function (error, ttl, handlers) {
}
var errv = error.message.split(' ');
if (errv[0] === 'MOVED' || errv[0] === 'ASK') {
- var hostPort = errv[2].split(':');
- var node = this.createNode(hostPort[1], hostPort[0]);
- if (errv[0] === 'MOVED') {
- handlers.moved(node, errv[1], hostPort);
- } else {
- handlers.ask(node, errv[1], hostPort);
- }
+ handlers[errv[0] === 'MOVED' ? 'moved' : 'ask'](errv[1], errv[2]);
} else if (errv[0] === 'CLUSTERDOWN' && this.options.retryDelayOnClusterDown > 0) {
this.clusterDownQueue.push(handlers.clusterDown);
if (!this.clusterDownTimeout) {
@@ -595,40 +524,27 @@ Cluster.prototype.getInfoFromNode = function (redis, callback) {
redis.disconnect();
return callback(err);
}
- var i;
- var oldNodes = {};
- var keys = Object.keys(_this.nodes);
- for (i = 0; i < keys.length; ++i) {
- oldNodes[keys[i]] = true;
- }
- _this.masterNodes = {};
- for (i = 0; i < result.length; ++i) {
- var allNodes = [];
+ var nodes = [];
+
+ for (var i = 0; i < result.length; ++i) {
var items = result[i];
- var slotRangeStart = items.shift();
- var slotRangeEnd = items.shift();
- var master = items.shift();
- var masterNodeKey = master[0] + ':' + master[1];
- var masterNode = _this.createNode(master[1], master[0]);
- _this.masterNodes[masterNodeKey] = masterNode;
- allNodes.push(masterNode);
- delete oldNodes[masterNodeKey];
- if (_this.options.readOnly) {
- items.forEach(function (item) {
- var host = item[0];
- var port = item[1];
- allNodes.push(_this.createNode(port, host));
- delete oldNodes[host + ':' + port];
- });
+ var slotRangeStart = items[0];
+ var slotRangeEnd = items[1];
+
+ var keys = [];
+ for (var j = 2; j < items.length; j++) {
+ items[j] = { host: items[j][0], port: items[j][1] };
+ items[j].readOnly = j !== 2;
+ nodes.push(items[j]);
+ keys.push(items[j].host + ':' + items[j].port);
}
- for (var slot = slotRangeStart; slot <= slotRangeEnd; ++slot) {
- _this.slots[slot] = { masterNode: masterNode, allNodes: allNodes };
+
+ for (var slot = slotRangeStart; slot <= slotRangeEnd; slot++) {
+ _this.slots[slot] = keys;
}
}
- Object.keys(oldNodes).forEach(function (key) {
- _this.nodes[key].disconnect();
- });
+ _this.connectionPool.reset(nodes);
callback();
}, 1000));
};
@@ -645,7 +561,7 @@ Cluster.prototype.getInfoFromNode = function (redis, callback) {
};
});
-require('./transaction').addTransactionSupport(Cluster.prototype);
+require('../transaction').addTransactionSupport(Cluster.prototype);
function noop() {}
diff --git a/lib/commander.js b/lib/commander.js
index 88b4a9aa..9ef38d0c 100644
--- a/lib/commander.js
+++ b/lib/commander.js
@@ -9,7 +9,8 @@ var Script = require('./script');
*
* This is the base class of Redis, Redis.Cluster and Pipeline
*
- * @param {boolean} [options.showFriendlyErrorStack=false] - Whether to show a friendly error stack. Will decrease the performance significantly.
+ * @param {boolean} [options.showFriendlyErrorStack=false] - Whether to show a friendly error stack.
+ * Will decrease the performance significantly.
* @constructor
*/
function Commander() {
diff --git a/lib/pipeline.js b/lib/pipeline.js
index 963ced1d..37b9946b 100644
--- a/lib/pipeline.js
+++ b/lib/pipeline.js
@@ -104,14 +104,14 @@ Pipeline.prototype.fillResult = function (value, position) {
this.leftRedirections = {};
}
this.redis.handleError(commonError, this.leftRedirections, {
- moved: function (node) {
- _this.preferNode = node;
- _this.redis.slots[errv[1]] = node;
+ moved: function (slot, key) {
+ _this.preferKey = key;
+ _this.redis.slots[errv[1]] = [key];
_this.redis.refreshSlotsCache();
_this.exec();
},
- ask: function (node) {
- _this.preferNode = node;
+ ask: function (slot, key) {
+ _this.preferKey = key;
_this.exec();
},
clusterDown: function () {
@@ -254,7 +254,10 @@ Pipeline.prototype.exec = function (callback) {
var data = '';
var writePending = _this.replyPending = _this._queue.length;
- var node = { slot: pipelineSlot, redis: _this.preferNode };
+ var node;
+ if (_this.isCluster) {
+ node = { slot: pipelineSlot, redis: _this.redis.connectionPool.nodes[_this.preferKey] };
+ }
var bufferMode = false;
var stream = {
write: function (writable) {
diff --git a/lib/utils/index.js b/lib/utils/index.js
index 9022f166..00b4e97a 100644
--- a/lib/utils/index.js
+++ b/lib/utils/index.js
@@ -1,6 +1,5 @@
'use strict';
var urllib = require('url');
-var util = require('util');
var _ = require('lodash');
/**
@@ -290,3 +289,14 @@ exports.parseURL = function (url) {
return result;
};
+
+exports.sample = function (array, from) {
+ var length = array.length;
+ if (typeof from !== 'number') {
+ from = 0;
+ }
+ if (from >= length) {
+ return;
+ }
+ return array[from + Math.floor(Math.random() * (length - from))];
+};
diff --git a/test/functional/cluster.js b/test/functional/cluster.js
index ff40905c..9402eb6a 100644
--- a/test/functional/cluster.js
+++ b/test/functional/cluster.js
@@ -2,6 +2,7 @@
var utils = require('../../lib/utils');
var Promise = require('bluebird');
+var _ = require('lodash');
describe('cluster', function () {
describe('connect', function () {
@@ -291,8 +292,8 @@ describe('cluster', function () {
var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001', password: 'other password' },
- { host: '127.0.0.1', port: '30002' }
- ], { lazyConnect: false, password: 'default password' });
+ { host: '127.0.0.1', port: '30002', password: null }
+ ], { lazyConnect: false, redisOptions: { password: 'default password' } });
});
});
@@ -933,59 +934,101 @@ describe('cluster', function () {
});
});
- describe('readonly', function () {
- it('should connect all nodes and issue a readonly', function (done) {
- var setReadOnlyNode1 = false;
- var setReadOnlyNode2 = false;
- var setReadOnlyNode3 = false;
- var slotTable = [
- [0, 5460, ['127.0.0.1', 30001], ['127.0.0.1', 30003]],
- [5461, 10922, ['127.0.0.1', 30002]]
- ];
- var node1 = new MockServer(30001, function (argv) {
- if (argv[0] === 'cluster' && argv[1] === 'slots') {
- return slotTable;
- }
- if (argv[0] === 'readonly') {
- setReadOnlyNode1 = true;
- return 'OK';
- }
- });
- var node2 = new MockServer(30002, function (argv) {
+ describe('scaleReads', function () {
+ beforeEach(function () {
+ function handler(port, argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
- return slotTable;
- }
- if (argv[0] === 'readonly') {
- setReadOnlyNode2 = true;
- return 'OK';
+ return [
+ [0, 16381, ['127.0.0.1', 30001], ['127.0.0.1', 30003]],
+ [16382, 16383, ['127.0.0.1', 30002]]
+ ];
}
+ return port;
+ }
+ this.node1 = new MockServer(30001, handler.bind(null, 30001));
+ this.node2 = new MockServer(30002, handler.bind(null, 30002));
+ this.node3 = new MockServer(30003, handler.bind(null, 30003));
+ });
+
+ afterEach(function (done) {
+ disconnect([this.node1, this.node2, this.node3], done);
+ });
+
+ context('masters', function () {
+ it('should only send reads to masters', function (done) {
+ var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]);
+ cluster.on('ready', function () {
+ stub(utils, 'sample').throws('sample is called');
+ cluster.get('foo', function (err, res) {
+ utils.sample.restore();
+ expect(res).to.eql(30001);
+ cluster.disconnect();
+ done();
+ });
+ });
});
+ });
- var node3 = new MockServer(30003, function (argv) {
- if (argv[0] === 'cluster' && argv[1] === 'slots') {
- return slotTable;
- }
- if (argv[0] === 'readonly') {
- setReadOnlyNode3 = true;
- return 'OK';
- }
+ context('slaves', function () {
+ it('should only send reads to slaves', function (done) {
+ var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
+ scaleReads: 'slaves'
+ });
+ cluster.on('ready', function () {
+ stub(utils, 'sample', function (array, from) {
+ expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003']);
+ expect(from).to.eql(1);
+ return '127.0.0.1:30003';
+ });
+ cluster.get('foo', function (err, res) {
+ utils.sample.restore();
+ expect(res).to.eql(30003);
+ cluster.disconnect();
+ done();
+ });
+ });
});
- var cluster = new Redis.Cluster(
- [{ host: '127.0.0.1', port: '30001' }],
- { readOnly: true }
- );
- cluster.on('ready', function () {
- expect(setReadOnlyNode1 || setReadOnlyNode2 || setReadOnlyNode3).to.eql(true);
- cluster.disconnect();
- disconnect([node1, node2, node3], done);
+ it('should send writes to masters', function (done) {
+ var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
+ scaleReads: 'slaves'
+ });
+ cluster.on('ready', function () {
+ stub(utils, 'sample').throws('sample is called');
+ cluster.set('foo', 'bar', function (err, res) {
+ utils.sample.restore();
+ expect(res).to.eql(30001);
+ cluster.disconnect();
+ done();
+ });
+ });
});
+ });
+ context('all', function () {
+ it('should send reads to all nodes randomly', function (done) {
+ var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
+ scaleReads: 'all'
+ });
+ cluster.on('ready', function () {
+ stub(utils, 'sample', function (array, from) {
+ expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003']);
+ expect(from).to.eql(undefined);
+ return '127.0.0.1:30003';
+ });
+ cluster.get('foo', function (err, res) {
+ utils.sample.restore();
+ expect(res).to.eql(30003);
+ cluster.disconnect();
+ done();
+ });
+ });
+ });
});
});
- describe('#masterNodes', function () {
- it('should contains master nodes', function (done) {
+ describe('#nodes()', function () {
+ it('should return the corrent nodes', function (done) {
var slotTable = [
[0, 5460, ['127.0.0.1', 30001], ['127.0.0.1', 30003]],
[5461, 10922, ['127.0.0.1', 30002]]
@@ -1009,8 +1052,16 @@ describe('cluster', function () {
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]);
cluster.on('ready', function () {
- cluster.nodes['127.0.0.1:30001'].on('end', function () {
- expect(Object.keys(cluster.masterNodes).length).to.eql(1);
+ expect(cluster.nodes()).to.have.lengthOf(3);
+ expect(cluster.nodes('all')).to.have.lengthOf(3);
+ expect(cluster.nodes('masters')).to.have.lengthOf(2);
+ expect(cluster.nodes('slaves')).to.have.lengthOf(1);
+
+ cluster.on('-node', function () {
+ expect(cluster.nodes()).to.have.lengthOf(2);
+ expect(cluster.nodes('all')).to.have.lengthOf(2);
+ expect(cluster.nodes('masters')).to.have.lengthOf(1);
+ expect(cluster.nodes('slaves')).to.have.lengthOf(1);
cluster.disconnect();
disconnect([node2, node3], done);
});
@@ -1043,73 +1094,27 @@ describe('cluster', function () {
}
});
- var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]);
+ var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
+ redisOptions: { showFriendlyErrorStack: true }
+ });
cluster.on('ready', function () {
- expect(Object.keys(cluster.masterNodes).length).to.eql(2);
+ expect(cluster.nodes('masters')).to.have.lengthOf(2);
slotTable = [
[0, 5460, ['127.0.0.1', 30003]],
[5461, 10922, ['127.0.0.1', 30002]]
];
cluster.refreshSlotsCache(function () {
- expect(Object.keys(cluster.masterNodes).length).to.eql(2);
- expect(cluster.masterNodes).to.have.property('127.0.0.1:30003');
- expect(cluster.masterNodes).to.have.property('127.0.0.1:30002');
+ expect(cluster.nodes('masters')).to.have.lengthOf(2);
+ expect([
+ cluster.nodes('masters')[0].options.port,
+ cluster.nodes('masters')[1].options.port
+ ].sort()).to.eql([30002, 30003]);
cluster.disconnect();
disconnect([node1, node2, node3], done);
});
});
});
});
-
- describe('#to', function () {
- it('should throw when the group does not exist', function () {
- stub(Redis.Cluster.prototype, 'connect', function () {
- return Promise.resolve();
- });
- var cluster = new Redis.Cluster([{}]);
- expect(function () {
- cluster.to('non-exist');
- }).to.throw(/is not a valid group of nodes/);
- Redis.Cluster.prototype.connect.restore();
- });
-
- it('should return the correct nodes', function (done) {
- var slotTable = [
- [0, 5460, ['127.0.0.1', 30001], ['127.0.0.1', 30003]],
- [5461, 16383, ['127.0.0.1', 30002]]
- ];
- var argvHandler = function (argv) {
- if (argv[0] === 'cluster' && argv[1] === 'slots') {
- return slotTable;
- } else if (argv[0] === 'keys') {
- return ['key' + this.port];
- }
- };
- var node1 = new MockServer(30001, argvHandler);
- var node2 = new MockServer(30002, argvHandler);
- var node3 = new MockServer(30003, argvHandler);
- var pending = 3;
- [node1, node2, node3].forEach(function (node) {
- node.on('connect', function () {
- if (!--pending) {
- run();
- }
- });
- });
- var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], { readOnly: true });
- function run() {
- expect(cluster.to('masters').nodes).to.have.lengthOf(2);
- expect(cluster.to('slaves').nodes).to.have.lengthOf(1);
- expect(cluster.to('all').nodes).to.have.lengthOf(3);
- cluster.to('masters').call('keys', function (err, keys) {
- expect(keys).to.have.lengthOf(2);
- expect([].concat.apply([], keys).sort()).to.eql(['key30001', 'key30002']);
- cluster.disconnect();
- disconnect([node1, node2, node3], done);
- });
- }
- });
- });
});
function disconnect(clients, callback) {