Skip to content

Commit

Permalink
Styling and a new unit test on pool size
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgebay committed Nov 14, 2013
1 parent eae2eb6 commit a779029
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 21 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ client.execute('UPDATE user_profiles SET birth=? WHERE key=?', [new Date(1950, 5
);

// Streaming query rows
client.streamRows('SELECT event_time, temperature FROM temperature WHERE weatherstation_id=', ['abc'],
client.streamRows('SELECT event_time, temperature FROM temperature WHERE station_id=', ['abc'],
function(err, row) {
//the callback will be invoked per each row as soon as they are received
if (err) console.log("Oh dear...");
Expand All @@ -52,7 +52,7 @@ client.streamRows('SELECT event_time, temperature FROM temperature WHERE weather
);

// Streaming field
client.streamField('SELECT key, photo FROM user_profiles WHERE key=', ['jbay],
client.streamField('SELECT key, photo FROM user_profiles WHERE key=', ['jbay'],
function(err, row, photoStream) {
//the callback will be invoked per each row as soon as they are received.
if (err) console.log("Shame...");
Expand Down
38 changes: 20 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,39 @@ var optionsDefault = {
poolSize: 1
};
//Represents a pool of connection to multiple hosts
function Client (options) {
Client.super_.call(this);
function Client(options) {
events.EventEmitter.call(this);
//Unlimited amount of listeners for internal event queues by default
this.setMaxListeners(0);
//create a connection for each each host
this.connections = [];
this.options = utils.extend({}, optionsDefault, options);
//current connection index
this.connectionIndex = 0;
//current connection index for prepared queries
this.prepareConnectionIndex = 0;
this.preparedQueries = {};

var self = this;
var connCount = 0;
var poolSize = self.options.poolSize;
while (connCount++ < poolSize) {
options.hosts.forEach(function (hostPort, index){
var host = hostPort.split(':');
var connOptions = utils.extend({}, self.options, {host: host[0], port: isNaN(host[1]) ? 9042 : host[1]});
this._createPool();
}

util.inherits(Client, events.EventEmitter);

/**
* Creates the pool of connections suitable for round robin
*/
Client.prototype._createPool = function () {
this.connections = [];
for (var poolIndex = 0; poolIndex < this.options.poolSize; poolIndex++) {
for (var i = 0; i < this.options.hosts.length; i++) {
var host = this.options.hosts[i].split(':');
var connOptions = utils.extend({}, this.options, {host: host[0], port: host[1] || 9042});
var c = new Connection(connOptions);
c.indexInPool = ( (connCount-1) * poolSize) + index;
self.connections.push(c);
});
c.indexInPool = (this.options.poolSize * poolIndex) + i;
this.connections.push(c);
}
}

this.emit('log', 'info', this.connections.length + ' connections created across ' + options.hosts.length + ' hosts.');
}

util.inherits(Client, events.EventEmitter);
this.emit('log', 'info', this.connections.length + ' connections created across ' + this.options.hosts.length + ' hosts.');
};

/**
* Connects to each host
Expand Down
2 changes: 1 addition & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var optionsDefault = {
maxRequestsRetry: 100
};
function Connection(options) {
Connection.super_.call(this);
events.EventEmitter.call(this);

this.streamHandlers = {};
this.options = utils.extend({}, optionsDefault, options);
Expand Down
14 changes: 14 additions & 0 deletions test/clientTests.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ module.exports = {
});
}
},
'pool size test': function (test) {
var poolSize = 3;
var localClient = getANewClient({hosts: ['host1', 'host2', 'host3'], poolSize: poolSize});
var connections = localClient.connections;
//The pool should created like [conHost1, conHost2, conHost3, conHost1, conHost2, conHost3, conHost1, ...]
test.ok(connections.length, 9, 'There must be 9 connections (amount hosts * pool size)');
for (var i = 0; i < poolSize; i++) {
test.ok(
connections[0 + (i * poolSize)].options.host === 'host1' &&
connections[1 + (i * poolSize)].options.host === 'host2' &&
connections[2 + (i * poolSize)].options.host === 'host3', 'The connections inside the pool are not correctly ordered');
}
test.done();
},
'execute params': function (test) {
async.series([
function (callback) {
Expand Down

0 comments on commit a779029

Please sign in to comment.