Skip to content

Commit

Permalink
Add option to time out pg.connect() call
Browse files Browse the repository at this point in the history
Currently if you call pg.connect(), the call will block indefinitely until a
connection becomes available. In many cases, if a connection is not available
after some period of time, it's preferable to return an error (and call
control) to the client, instead of tying up resources forever.

Blocking on resource checkout also makes it easier for clients to deadlock -
recently at Shyp, we had a situation where a row got locked and the thread
that could unlock it was blocked waiting for a connection to become available,
leading to deadlock. In that situation, it would be better to abort the
checkout, which would have errored, but also broken the deadlock.

Add a new setting to defaults: `acquireTimeout`, which will wait for
`acquireTimeout` milliseconds before giving up and returning an error. If the
value is undefined (the default), `node-postgres` will continue to wait
indefinitely for a connection to become available.

This builds on a pull request against `generic-pool`, support options.timeout:
coopernurse/node-pool#127. Review has been slow going,
so I published a new package with that change as `generic-pool-timeout`, and
updated the reference in this codebase.

Adds semicolons in many places that omitted them and fixes several typos. I'm
happy to pull those out into a different commit.

Sets the TZ=GMT environment variable before running the tests; without this
value set, and with a Postgres server set to the America/Los_Angeles timezone,
a timezone test failed.

Fixes #782 and #805. Will help alleviate #902. May help with #397.
  • Loading branch information
Kevin Burke committed May 27, 2016
1 parent 55abbaa commit e795d17
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 28 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ connectionString=postgres://

params := $(connectionString)

node-command := xargs -n 1 -I file node file $(params)
node-command := TZ=GMT xargs -n 1 -I file node file $(params)

.PHONY : test test-connection test-integration bench test-native \
jshint publish test-missing-native update-npm
jshint publish test-missing-native update-npm clean

all:
npm install
Expand Down Expand Up @@ -67,3 +67,6 @@ prepare-test-db:
jshint:
@echo "***Starting jshint***"
@./node_modules/.bin/jshint lib

clean:
@rm -rf node_modules
11 changes: 11 additions & 0 deletions lib/defaults.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ var defaults = module.exports = {
//from the pool and destroyed
poolIdleTimeout: 30000,

// acquireTimeout is the amount of time in ms to wait to check out a database
// connection. By default, node-postgres will wait indefinitely for a
// connection to become available. If the acquireTimeout is negative,
// or NaN, pg.connect will return an error immediately. If the acquireTimeout
// is zero, node-postgres will return an error in the next tick if the pool
// is full.
//
// The error message on timeout will be "Cannot acquire resource because the
// pool is full".
acquireTimeout: undefined,

//frequency to check for idle clients within the client pool
reapIntervalMillis: 1000,

Expand Down
17 changes: 14 additions & 3 deletions lib/pool.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
var EventEmitter = require('events').EventEmitter;

var defaults = require('./defaults');
var genericPool = require('generic-pool');
var genericPool = require('generic-pool-timeout');


module.exports = function(Client) {
Expand Down Expand Up @@ -72,6 +72,17 @@ module.exports = function(Client) {
pool[key] = EventEmitter.prototype[key];
}
}
var block = true;
if (defaults.block === false || clientConfig.block === false) {
block = false;
}
var timeout;
if (typeof defaults.acquireTimeout === "number" && !isNaN(defaults.acquireTimeout)) {
timeout = defaults.acquireTimeout;
} else if (typeof clientConfig.acquireTimeout === "number" && !isNaN(clientConfig.acquireTimeout)) {
timeout = clientConfig.acquireTimeout;
}

//monkey-patch with connect method
pool.connect = function(cb) {
var domain = process.domain;
Expand All @@ -88,10 +99,10 @@ module.exports = function(Client) {
pool.release(client);
}
});
});
}, 0, {timeout: timeout});
};
return pool;
}
},
};

return pools;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"main": "./lib",
"dependencies": {
"buffer-writer": "1.0.1",
"generic-pool": "2.1.1",
"generic-pool-timeout": "2.7.1",
"packet-reader": "0.2.0",
"pg-connection-string": "0.1.3",
"pg-types": "1.*",
Expand Down
2 changes: 1 addition & 1 deletion script/test-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pg.connect(helper.config, function(err, client, done) {
console.error(err);
process.exit(255);
}
console.log("Checking for existance of required test table 'person'")
console.log("Checking for existence of required test table 'person'")
client.query("SELECT COUNT(name) FROM person", function(err, callback) {
if(err != null) {
console.error("Recieved error when executing query 'SELECT COUNT(name) FROM person'")
Expand Down
4 changes: 2 additions & 2 deletions test/integration/connection-pool/error-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pg.connect(helper.config, assert.success(function(client, done) {
var params = ['idle'];
if(!isGreater) {
killIdleQuery = 'SELECT procpid, (SELECT pg_terminate_backend(procpid)) AS killed FROM pg_stat_activity WHERE current_query LIKE $1';
params = ['%IDLE%']
params = ['%IDLE%'];
}

//subscribe to the pg error event
Expand All @@ -37,5 +37,5 @@ pg.connect(helper.config, assert.success(function(client, done) {
}));
}));

})
});
}));
8 changes: 3 additions & 5 deletions test/test-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ assert = require('assert');

var EventEmitter = require('events').EventEmitter;
var sys = require('util');
var BufferList = require(__dirname+'/buffer-list')
var BufferList = require(__dirname+'/buffer-list');

var Connection = require(__dirname + '/../lib/connection');

Expand Down Expand Up @@ -160,7 +160,7 @@ var expect = function(callback, timeout) {
callback.apply(this, arguments)
}
} else {
throw new Error("Unsupported arrity " + callback.length);
throw new Error("Unsupported arity " + callback.length);
}

}
Expand Down Expand Up @@ -193,7 +193,7 @@ process.on('exit', function() {
})

process.on('uncaughtException', function(err) {
console.error("\n %s", err.stack || err.toString())
console.error("\n %s", err.stack || err.toString());
//causes xargs to abort right away
process.exit(255);
});
Expand Down Expand Up @@ -248,5 +248,3 @@ module.exports = {
setTimezoneOffset: setTimezoneOffset,
resetTimezoneOffset: resetTimezoneOffset
};


2 changes: 1 addition & 1 deletion test/unit/client/query-queue-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ test('drain', function() {
test("emits drain", function() {
process.nextTick(function() {
assert.ok(raisedDrain);
})
});
});
});
});
68 changes: 58 additions & 10 deletions test/unit/pool/basic-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,30 @@ var libDir = __dirname + '/../../../lib';
var poolsFactory = require(libDir + '/pool')
var defaults = require(libDir + '/defaults');
var poolId = 0;
var pg = require(libDir);

require(__dirname + '/../../test-helper');

var FakeClient = function() {
EventEmitter.call(this);
}
};

util.inherits(FakeClient, EventEmitter);

FakeClient.prototype.connect = function(cb) {
process.nextTick(cb);
}
};

FakeClient.prototype.end = function() {
this.endCalled = true;
}
};
var pools = poolsFactory(FakeClient);

//Hangs the event loop until 'end' is called on client
var HangingClient = function(config) {
EventEmitter.call(this);
this.config = config;
}
};

util.inherits(HangingClient, EventEmitter);

Expand All @@ -36,11 +37,11 @@ HangingClient.prototype.connect = function(cb) {
console.log('hung client...');
}, 1000);
process.nextTick(cb);
}
};

HangingClient.prototype.end = function() {
clearInterval(this.intervalId);
}
};

test('no pools exist', function() {
assert.empty(Object.keys(pools.all));
Expand Down Expand Up @@ -153,7 +154,7 @@ test('pool with connection error on connection', function() {
});
});

test('returnning an error to done()', function() {
test('returning an error to done()', function() {
var p = pools.getOrCreate(poolId++);
p.connect(function(err, client, done) {
assert.equal(err, null);
Expand All @@ -178,7 +179,6 @@ test('fetching pool by object', function() {
assert.equal(p, p2);
});


test('pool#connect client.poolCount', function() {
var p = pools.getOrCreate(poolId++);
var tid;
Expand All @@ -187,7 +187,7 @@ test('pool#connect client.poolCount', function() {
tid = setTimeout(function() {
throw new Error("Connection callback was never called");
}, 100);
}
};

setConnectTimeout();
p.connect(function(err, client, done) {
Expand All @@ -209,6 +209,54 @@ test('pool#connect client.poolCount', function() {
assert.equal(client.poolCount, undefined,
'after pool is destroyed, count should be undefined');
});
})
});
});
});

pg.defaults.poolSize = 1;

test('pool#connect acquire errors if the pool is full', function() {
var p = pools.getOrCreate(poolId++);
p.connect(function(err, client, done1) {
p.connect(function(err, client, done2) {
assert.equal(err.message, "Cannot acquire resource because the pool is full");
});
});
});

pg.defaults.poolSize = 1;

pg.defaults.poolSize = 1;
pg.defaults.acquireTimeout = 10;
pg.defaults.poolIdleTimeout = 10;

test('pool#connect acquire errors if acquisition times out', function() {
var p = pools.getOrCreate(poolId++);
p.connect(function(err, client, done1) {
setTimeout(function() {
done1();
}, 40);

var start = Date.now();
p.connect(function(err, client, done2) {
assert.equal(err.message, "Cannot acquire resource because the pool is full");
assert.ok((Date.now() - start) < 15);
});
});
});

pg.defaults.acquireTimeout = 20;

test('pool#connect acquire returns a resource if one becomes available before the timeout', function() {
var p = pools.getOrCreate(poolId++);
p.connect(function(err, client, done1) {
setTimeout(function() {
done1();
}, 10);

p.connect(function(err, client, done2) {
assert.equal(err, null);
done2();
});
});
});
6 changes: 3 additions & 3 deletions test/unit/pool/timeout-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ require(__dirname + '/../../test-helper');

var FakeClient = function() {
EventEmitter.call(this);
}
};

util.inherits(FakeClient, EventEmitter);

FakeClient.prototype.connect = function(cb) {
process.nextTick(cb);
}
};

FakeClient.prototype.end = function() {
this.endCalled = true;
}
};

defaults.poolIdleTimeout = 10;
defaults.reapIntervalMillis = 10;
Expand Down

0 comments on commit e795d17

Please sign in to comment.