Skip to content

Commit

Permalink
feat(cluster): support scaling reads to slaves
Browse files Browse the repository at this point in the history
The new option scaleReads is used to specify where to send the reads.

Add two new events:

    1. "+node": a new node is discovered.
    2. "-node": a node is disconnected.

BREAKING CHANGE:
    1. Cluster#masterNodes and Cluster#nodes is removed. Use Cluster#nodes('masters') and Cluster#nodes('all') instead.
    2. Cluster#to() is removed. Use
Promise.all(Cluster#nodes().map(function (node) {})) instead.

Closes #170.
  • Loading branch information
luin committed Feb 7, 2016
1 parent 0a4186e commit 98bdec2
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 395 deletions.
86 changes: 0 additions & 86 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
<dl>
<dt><a href="#Redis">Redis</a> ⇐ <code>[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)</code></dt>
<dd></dd>
<dt><a href="#Cluster">Cluster</a> ⇐ <code>[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)</code></dt>
<dd></dd>
<dt><a href="#Commander">Commander</a></dt>
<dd></dd>
</dl>
Expand All @@ -12,9 +10,6 @@
<dt><a href="#defaultOptions">defaultOptions</a></dt>
<dd><p>Default options</p>
</dd>
<dt><a href="#defaultOptions">defaultOptions</a></dt>
<dd><p>Default options</p>
</dd>
</dl>
<a name="Redis"></a>
## Redis ⇐ <code>[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)</code>
Expand Down Expand Up @@ -190,81 +185,6 @@ Define a custom command using lua script
Create a Redis instance

**Kind**: static method of <code>[Redis](#Redis)</code>
<a name="Cluster"></a>
## Cluster ⇐ <code>[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)</code>
**Kind**: global class
**Extends:** <code>[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)</code>, <code>[Commander](#Commander)</code>

* [Cluster](#Cluster) ⇐ <code>[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)</code>
* [new Cluster(startupNodes, options)](#new_Cluster_new)
* [.disconnect()](#Cluster+disconnect)
* [.getBuiltinCommands()](#Commander+getBuiltinCommands) ⇒ <code>Array.&lt;string&gt;</code>
* [.createBuiltinCommand(commandName)](#Commander+createBuiltinCommand) ⇒ <code>object</code>
* [.defineCommand(name, definition)](#Commander+defineCommand)
* *[.sendCommand()](#Commander+sendCommand)*

<a name="new_Cluster_new"></a>
### new Cluster(startupNodes, options)
Creates a Redis Cluster instance


| Param | Type | Default | Description |
| --- | --- | --- | --- |
| startupNodes | <code>Array.&lt;Object&gt;</code> | | An array of nodes in the cluster, [{ port: number, host: string }] |
| options | <code>Object</code> | | |
| [options.enableOfflineQueue] | <code>boolean</code> | <code>true</code> | See Redis class |
| [options.lazyConnect] | <code>boolean</code> | <code>false</code> | See Redis class |
| [options.readOnly] | <code>boolean</code> | <code>false</code> | Connect in READONLY mode |
| [options.maxRedirections] | <code>number</code> | <code>16</code> | When a MOVED or ASK error is received, client will redirect the command to another node. This option limits the max redirections allowed to send a command. |
| [options.clusterRetryStrategy] | <code>function</code> | | See "Quick Start" section |
| [options.retryDelayOnFailover] | <code>number</code> | <code>2000</code> | When an error is received when sending a command(e.g. "Connection is closed." when the target Redis node is down), |
| [options.retryDelayOnClusterDown] | <code>number</code> | <code>1000</code> | When a CLUSTERDOWN error is received, client will retry if `retryDelayOnClusterDown` is valid delay time. |

<a name="Cluster+disconnect"></a>
### cluster.disconnect()
Disconnect from every node in the cluster.

**Kind**: instance method of <code>[Cluster](#Cluster)</code>
**Access:** public
<a name="Commander+getBuiltinCommands"></a>
### cluster.getBuiltinCommands() ⇒ <code>Array.&lt;string&gt;</code>
Return supported builtin commands

**Kind**: instance method of <code>[Cluster](#Cluster)</code>
**Returns**: <code>Array.&lt;string&gt;</code> - command list
**Access:** public
<a name="Commander+createBuiltinCommand"></a>
### cluster.createBuiltinCommand(commandName) ⇒ <code>object</code>
Create a builtin command

**Kind**: instance method of <code>[Cluster](#Cluster)</code>
**Returns**: <code>object</code> - functions
**Access:** public

| Param | Type | Description |
| --- | --- | --- |
| commandName | <code>string</code> | command name |

<a name="Commander+defineCommand"></a>
### cluster.defineCommand(name, definition)
Define a custom command using lua script

**Kind**: instance method of <code>[Cluster](#Cluster)</code>

| Param | Type | Default | Description |
| --- | --- | --- | --- |
| name | <code>string</code> | | the command name |
| definition | <code>object</code> | | |
| definition.lua | <code>string</code> | | the lua code |
| [definition.numberOfKeys] | <code>number</code> | <code></code> | the number of keys. If omit, you have to pass the number of keys as the first argument every time you invoke the command |

<a name="Commander+sendCommand"></a>
### *cluster.sendCommand()*
Send a command

**Kind**: instance abstract method of <code>[Cluster](#Cluster)</code>
**Overrides:** <code>[sendCommand](#Commander+sendCommand)</code>
**Access:** public
<a name="Commander"></a>
## Commander
**Kind**: global class
Expand Down Expand Up @@ -331,9 +251,3 @@ Default options

**Kind**: global variable
**Access:** protected
<a name="defaultOptions"></a>
## defaultOptions
Default options

**Kind**: global variable
**Access:** protected
89 changes: 89 additions & 0 deletions lib/cluster/connection_pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
'use strict';

var util = require('util');
var EventEmitter = require('events').EventEmitter;
var _ = require('lodash');
var Redis = require('../redis');

function ConnectionPool(redisOptions) {
EventEmitter.call(this);
this.redisOptions = redisOptions;

// this.masters + this.slaves = this.nodes
this.nodes = {};
this.masters = {};
this.slaves = {};

this.specifiedOptions = {};
}

util.inherits(ConnectionPool, EventEmitter);

ConnectionPool.prototype.findOrCreate = function (node, readOnly) {
node.port = node.port || 6379;
node.host = node.host || '127.0.0.1';
node.key = node.key || node.host + ':' + node.port;

if (this.specifiedOptions[node.key]) {
_.assign(node, this.specifiedOptions[node.key]);
} else {
this.specifiedOptions[node.key] = node;
}

if (this.nodes[node.key] && this.nodes[node.key].options.readOnly !== readOnly) {
this.remove(node.key);
}

if (!this.nodes[node.key]) {
var redis = this.nodes[node.key] = new Redis(_.defaults({
retryStrategy: null,
enableOfflineQueue: true,
readOnly: readOnly
}, node, this.redisOptions, { lazyConnect: true }));
this[readOnly ? 'slaves' : 'masters'][node.key] = redis;

var _this = this;
redis.once('end', function () {
delete _this.nodes[node.key];
delete _this.masters[node.key];
delete _this.slaves[node.key];
_this.emit('-node', redis);
if (!Object.keys(_this.nodes).length) {
_this.emit('drain');
}
});

this.emit('+node', redis);
}

return this.nodes[node.key];
};

ConnectionPool.prototype.remove = function (key) {
if (this.nodes[key]) {
this.nodes[key].disconnect();
delete this.nodes[key];
delete this.masters[key];
delete this.slaves[key];
}
};

ConnectionPool.prototype.reset = function (nodes) {
var newNodes = {};
for (var i = 0; i < nodes.length; i++) {
var node = nodes[i];
node.key = node.host + ':' + node.port;
newNodes[node.key] = node;
}
var _this = this;
Object.keys(this.nodes).forEach(function (key) {
if (!newNodes[key]) {
_this.remove(key);
}
});
Object.keys(newNodes).forEach(function (key) {
_this.findOrCreate(newNodes[key], newNodes[key].readOnly);
});
};

module.exports = ConnectionPool;
Loading

0 comments on commit 98bdec2

Please sign in to comment.