From a0eb36d81938e488b3bc5369faee74fe22f36949 Mon Sep 17 00:00:00 2001 From: Brian C Date: Thu, 13 Jul 2017 22:37:08 -0500 Subject: [PATCH] 2.0 (#67) * Initial work * Make progress on custom pool * Make all original tests pass * Fix test race * Fix test when DNS is missing * Test more error conditions * Add test for byop * Add BYOP tests for errors * Add test for idle client error expunging * Fix typo * Replace var with const/let * Remove var usage * Fix linting * Work on connection timeout * Work on error condition tests * Remove logging * Add connection timeout * Add idle timeout * Test for returning to client to pool after error fixes #48 * Add idleTimeout support to native client * Add pg as peer dependency fixes #45 * Rename properties * Fix lint * use strict * Add draining to pool.end * Ensure ending pools drain properly * Remove yarn.lock * Remove object-assign * Remove node 8 * Remove closure for waiter construction * Ensure client.connect is never sync * Fix lint * Change to es6 class * Code cleanup & lint fixes --- .travis.yml | 3 - index.js | 364 ++++++++++++++++++++------------- package.json | 9 +- test/bring-your-own-promise.js | 36 ++++ test/connection-strings.js | 12 +- test/connection-timeout.js | 62 ++++++ test/ending.js | 34 +++ test/error-handling.js | 135 ++++++++++++ test/events.js | 48 ++--- test/idle-timeout.js | 54 +++++ test/index.js | 173 ++++++++-------- test/logging.js | 14 +- test/mocha.opts | 2 + test/setup.js | 10 + test/sizing.js | 44 ++++ test/timeout.js | 0 test/verify.js | 25 +++ 17 files changed, 748 insertions(+), 277 deletions(-) create mode 100644 test/bring-your-own-promise.js create mode 100644 test/connection-timeout.js create mode 100644 test/ending.js create mode 100644 test/error-handling.js create mode 100644 test/idle-timeout.js create mode 100644 test/setup.js create mode 100644 test/sizing.js create mode 100644 test/timeout.js create mode 100644 test/verify.js diff --git a/.travis.yml b/.travis.yml index 47358a12a..e1a0ce70c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,6 +8,3 @@ matrix: - node_js: "6" addons: postgresql: "9.4" - - node_js: "8" - addons: - postgresql: "9.4" diff --git a/index.js b/index.js index 7e0f0ce59..c317e5311 100644 --- a/index.js +++ b/index.js @@ -1,174 +1,248 @@ -var genericPool = require('generic-pool') -var util = require('util') -var EventEmitter = require('events').EventEmitter -var objectAssign = require('object-assign') - -// there is a bug in the generic pool where it will not recreate -// destroyed workers (even if there is waiting work to do) unless -// there is a min specified. Make sure we keep some connections -// SEE: https://github.com/coopernurse/node-pool/pull/186 -// SEE: https://github.com/brianc/node-pg-pool/issues/48 -// SEE: https://github.com/strongloop/loopback-connector-postgresql/issues/231 -function _ensureMinimum () { - var i, diff, waiting - if (this._draining) return - waiting = this._waitingClients.size() - if (this._factory.min > 0) { // we have positive specified minimum - diff = this._factory.min - this._count - } else if (waiting > 0) { // we have no minimum, but we do have work to do - diff = Math.min(waiting, this._factory.max - this._count) - } - for (i = 0; i < diff; i++) { - this._createResource() - } -}; - -var Pool = module.exports = function (options, Client) { - if (!(this instanceof Pool)) { - return new Pool(options, Client) - } - EventEmitter.call(this) - this.options = objectAssign({}, options) - this.log = this.options.log || function () { } - this.Client = this.options.Client || Client || require('pg').Client - this.Promise = this.options.Promise || global.Promise - - this.options.max = this.options.max || this.options.poolSize || 10 - this.options.create = this.options.create || this._create.bind(this) - this.options.destroy = this.options.destroy || this._destroy.bind(this) - this.pool = new genericPool.Pool(this.options) - // Monkey patch to ensure we always finish our work - // - There is a bug where callbacks go uncalled if min is not set - // - We might still not want a connection to *always* exist - // - but we do want to create up to max connections if we have work - // - still waiting - // This should be safe till the version of pg-pool is upgraded - // SEE: https://github.com/coopernurse/node-pool/pull/186 - this.pool._ensureMinimum = _ensureMinimum - this.onCreate = this.options.onCreate -} +'use strict' +const EventEmitter = require('events').EventEmitter -util.inherits(Pool, EventEmitter) +const NOOP = function () { } -Pool.prototype._promise = function (cb, executor) { - if (!cb) { - return new this.Promise(executor) +class IdleItem { + constructor (client, timeoutId) { + this.client = client + this.timeoutId = timeoutId } +} - function resolved (value) { - process.nextTick(function () { - cb(null, value) - }) +function throwOnRelease () { + throw new Error('Release called on client which has already been released to the pool.') +} + +function release (client, err) { + client.release = throwOnRelease + if (err) { + this._remove(client) + this._pulseQueue() + return } - function rejected (error) { - process.nextTick(function () { - cb(error) - }) + // idle timeout + let tid + if (this.options.idleTimeoutMillis) { + tid = setTimeout(() => { + this.log('remove idle client') + this._remove(client) + }, this.idleTimeoutMillis) } - executor(resolved, rejected) + if (this.ending) { + this._remove(client) + } else { + this._idle.push(new IdleItem(client, tid)) + } + this._pulseQueue() } -Pool.prototype._promiseNoCallback = function (callback, executor) { - return callback - ? executor() - : new this.Promise(executor) +function promisify (Promise, callback) { + if (callback) { + return { callback: callback, result: undefined } + } + let rej + let res + const cb = function (err, client) { + err ? rej(err) : res(client) + } + const result = new Promise(function (resolve, reject) { + res = resolve + rej = reject + }) + return { callback: cb, result: result } } -Pool.prototype._destroy = function (client) { - if (client._destroying) return - client._destroying = true - client.end() -} +class Pool extends EventEmitter { + constructor (options, Client) { + super() + this.options = Object.assign({}, options) + this.options.max = this.options.max || this.options.poolSize || 10 + this.log = this.options.log || function () { } + this.Client = this.options.Client || Client || require('pg').Client + this.Promise = this.options.Promise || global.Promise + + this._clients = [] + this._idle = [] + this._pendingQueue = [] + this._endCallback = undefined + this.ending = false + } -Pool.prototype._create = function (cb) { - this.log('connecting new client') - var client = new this.Client(this.options) - - client.on('error', function (e) { - this.log('connected client error:', e) - this.pool.destroy(client) - e.client = client - this.emit('error', e, client) - }.bind(this)) - - client.connect(function (err) { - if (err) { - this.log('client connection error:', err) - cb(err, null) - } else { - this.log('client connected') - this.emit('connect', client) - cb(null, client) - } - }.bind(this)) -} + _isFull () { + return this._clients.length >= this.options.max + } -Pool.prototype.connect = function (cb) { - return this._promiseNoCallback(cb, function (resolve, reject) { - this.log('acquire client begin') - this.pool.acquire(function (err, client) { - if (err) { - this.log('acquire client. error:', err) - if (cb) { - cb(err, null, function () {}) - } else { - reject(err) - } - return + _pulseQueue () { + this.log('pulse queue') + if (this.ending) { + this.log('pulse queue on ending') + if (this._idle.length) { + this._idle.map(item => { + this._remove(item.client) + }) } - - this.log('acquire client') + if (!this._clients.length) { + this._endCallback() + } + return + } + // if we don't have any waiting, do nothing + if (!this._pendingQueue.length) { + this.log('no queued requests') + return + } + // if we don't have any idle clients and we have no more room do nothing + if (!this._idle.length && this._isFull()) { + return + } + const waiter = this._pendingQueue.shift() + if (this._idle.length) { + const idleItem = this._idle.pop() + clearTimeout(idleItem.timeoutId) + const client = idleItem.client + client.release = release.bind(this, client) this.emit('acquire', client) + return waiter(undefined, client, client.release) + } + if (!this._isFull()) { + return this.connect(waiter) + } + throw new Error('unexpected condition') + } - client.release = function (err) { - delete client.release - if (err) { - this.log('destroy client. error:', err) - this.pool.destroy(client) - } else { - this.log('release client') - this.pool.release(client) - } - }.bind(this) + _remove (client) { + this._idle = this._idle.filter(item => item.client !== client) + this._clients = this._clients.filter(c => c !== client) + client.end() + this.emit('remove', client) + } - if (cb) { - cb(null, client, client.release) - } else { - resolve(client) + connect (cb) { + if (this.ending) { + const err = new Error('Cannot use a pool after calling end on the pool') + return cb ? cb(err) : this.Promise.reject(err) + } + if (this._clients.length >= this.options.max || this._idle.length) { + const response = promisify(this.Promise, cb) + const result = response.result + this._pendingQueue.push(response.callback) + // if we have idle clients schedule a pulse immediately + if (this._idle.length) { + process.nextTick(() => this._pulseQueue()) } - }.bind(this)) - }.bind(this)) -} + return result + } + + const client = new this.Client(this.options) + this._clients.push(client) + const idleListener = (err) => { + err.client = client + client.removeListener('error', idleListener) + client.on('error', () => { + this.log('additional client error after disconnection due to error', err) + }) + this._remove(client) + // TODO - document that once the pool emits an error + // the client has already been closed & purged and is unusable + this.emit('error', err, client) + } + + this.log('connecting new client') + + // connection timeout logic + let tid + let timeoutHit = false + if (this.options.connectionTimeoutMillis) { + tid = setTimeout(() => { + this.log('ending client due to timeout') + timeoutHit = true + // force kill the node driver, and let libpq do its teardown + client.connection ? client.connection.stream.destroy() : client.end() + }, this.options.connectionTimeoutMillis) + } -Pool.prototype.take = Pool.prototype.connect + const response = promisify(this.Promise, cb) + cb = response.callback -Pool.prototype.query = function (text, values, cb) { - if (typeof values === 'function') { - cb = values - values = undefined + this.log('connecting new client') + client.connect((err) => { + this.log('new client connected') + if (tid) { + clearTimeout(tid) + } + client.on('error', idleListener) + if (err) { + // remove the dead client from our list of clients + this._clients = this._clients.filter(c => c !== client) + if (timeoutHit) { + err.message = 'Connection terminiated due to connection timeout' + } + cb(err, undefined, NOOP) + } else { + client.release = release.bind(this, client) + this.emit('connect', client) + this.emit('acquire', client) + if (this.options.verify) { + this.options.verify(client, cb) + } else { + cb(undefined, client, client.release) + } + } + }) + return response.result } - return this._promise(cb, function (resolve, reject) { - this.connect(function (err, client, done) { + query (text, values, cb) { + if (typeof values === 'function') { + cb = values + values = undefined + } + const response = promisify(this.Promise, cb) + cb = response.callback + this.connect((err, client) => { if (err) { - return reject(err) + return cb(err) } - client.query(text, values, function (err, res) { - done(err) - err ? reject(err) : resolve(res) + this.log('dispatching query') + client.query(text, values, (err, res) => { + this.log('query dispatched') + client.release(err) + if (err) { + return cb(err) + } else { + return cb(undefined, res) + } }) }) - }.bind(this)) -} + return response.result + } -Pool.prototype.end = function (cb) { - this.log('draining pool') - return this._promise(cb, function (resolve, reject) { - this.pool.drain(function () { - this.log('pool drained, calling destroy all now') - this.pool.destroyAllNow(resolve) - }.bind(this)) - }.bind(this)) + end (cb) { + this.log('ending') + if (this.ending) { + const err = new Error('Called end on pool more than once') + return cb ? cb(err) : this.Promise.reject(err) + } + this.ending = true + const promised = promisify(this.Promise, cb) + this._endCallback = promised.callback + this._pulseQueue() + return promised.result + } + + get waitingCount () { + return this._pendingQueue.length + } + + get idleCount () { + return this._idle.length + } + + get totalCount () { + return this._clients.length + } } +module.exports = Pool diff --git a/package.json b/package.json index b3424f1e4..392375bbd 100644 --- a/package.json +++ b/package.json @@ -27,15 +27,16 @@ "homepage": "https://github.com/brianc/node-pg-pool#readme", "devDependencies": { "bluebird": "3.4.1", + "co": "4.6.0", "expect.js": "0.3.1", "lodash": "4.13.1", "mocha": "^2.3.3", - "pg": "5.1.0", + "pg": "*", "standard": "7.1.2", "standard-format": "2.2.1" }, - "dependencies": { - "generic-pool": "2.4.3", - "object-assign": "4.1.0" + "dependencies": {}, + "peerDependencies": { + "pg": ">5.0" } } diff --git a/test/bring-your-own-promise.js b/test/bring-your-own-promise.js new file mode 100644 index 000000000..f7fe3bde9 --- /dev/null +++ b/test/bring-your-own-promise.js @@ -0,0 +1,36 @@ +'use strict' +const co = require('co') +const expect = require('expect.js') + +const describe = require('mocha').describe +const it = require('mocha').it +const BluebirdPromise = require('bluebird') + +const Pool = require('../') + +const checkType = promise => { + expect(promise).to.be.a(BluebirdPromise) + return promise.catch(e => undefined) +} + +describe('Bring your own promise', function () { + it('uses supplied promise for operations', co.wrap(function * () { + const pool = new Pool({ Promise: BluebirdPromise }) + const client1 = yield checkType(pool.connect()) + client1.release() + yield checkType(pool.query('SELECT NOW()')) + const client2 = yield checkType(pool.connect()) + // TODO - make sure pg supports BYOP as well + client2.release() + yield checkType(pool.end()) + })) + + it('uses promises in errors', co.wrap(function * () { + const pool = new Pool({ Promise: BluebirdPromise, port: 48484 }) + yield checkType(pool.connect()) + yield checkType(pool.end()) + yield checkType(pool.connect()) + yield checkType(pool.query()) + yield checkType(pool.end()) + })) +}) diff --git a/test/connection-strings.js b/test/connection-strings.js index 8ee3d074b..7013f28da 100644 --- a/test/connection-strings.js +++ b/test/connection-strings.js @@ -1,13 +1,13 @@ -var expect = require('expect.js') -var describe = require('mocha').describe -var it = require('mocha').it -var Pool = require('../') +const expect = require('expect.js') +const describe = require('mocha').describe +const it = require('mocha').it +const Pool = require('../') describe('Connection strings', function () { it('pool delegates connectionString property to client', function (done) { - var connectionString = 'postgres://foo:bar@baz:1234/xur' + const connectionString = 'postgres://foo:bar@baz:1234/xur' - var pool = new Pool({ + const pool = new Pool({ // use a fake client so we can check we're passed the connectionString Client: function (args) { expect(args.connectionString).to.equal(connectionString) diff --git a/test/connection-timeout.js b/test/connection-timeout.js new file mode 100644 index 000000000..0671b1120 --- /dev/null +++ b/test/connection-timeout.js @@ -0,0 +1,62 @@ +'use strict' +const net = require('net') +const co = require('co') +const expect = require('expect.js') + +const describe = require('mocha').describe +const it = require('mocha').it +const before = require('mocha').before +const after = require('mocha').after + +const Pool = require('../') + +describe('connection timeout', () => { + before((done) => { + this.server = net.createServer((socket) => { + }) + + this.server.listen(() => { + this.port = this.server.address().port + done() + }) + }) + + after((done) => { + this.server.close(done) + }) + + it('should callback with an error if timeout is passed', (done) => { + const pool = new Pool({ connectionTimeoutMillis: 10, port: this.port }) + pool.connect((err, client, release) => { + expect(err).to.be.an(Error) + expect(err.message).to.contain('timeout') + expect(client).to.equal(undefined) + expect(pool.idleCount).to.equal(0) + done() + }) + }) + + it('should reject promise with an error if timeout is passed', (done) => { + const pool = new Pool({ connectionTimeoutMillis: 10, port: this.port }) + pool.connect().catch(err => { + expect(err).to.be.an(Error) + expect(err.message).to.contain('timeout') + expect(pool.idleCount).to.equal(0) + done() + }) + }) + + it('should handle multiple timeouts', co.wrap(function * () { + const errors = [] + const pool = new Pool({ connectionTimeoutMillis: 1, port: this.port }) + for (var i = 0; i < 15; i++) { + try { + yield pool.connect() + } catch (e) { + errors.push(e) + } + } + expect(errors).to.have.length(15) + }.bind(this))) +}) + diff --git a/test/ending.js b/test/ending.js new file mode 100644 index 000000000..1956b13f6 --- /dev/null +++ b/test/ending.js @@ -0,0 +1,34 @@ +'use strict' +const co = require('co') +const expect = require('expect.js') + +const describe = require('mocha').describe +const it = require('mocha').it + +const Pool = require('../') + +describe('pool ending', () => { + it('ends without being used', (done) => { + const pool = new Pool() + pool.end(done) + }) + + it('ends with a promise', () => { + return new Pool().end() + }) + + it('ends with clients', co.wrap(function * () { + const pool = new Pool() + const res = yield pool.query('SELECT $1::text as name', ['brianc']) + expect(res.rows[0].name).to.equal('brianc') + return pool.end() + })) + + it('allows client to finish', co.wrap(function * () { + const pool = new Pool() + const query = pool.query('SELECT $1::text as name', ['brianc']) + yield pool.end() + const res = yield query + expect(res.rows[0].name).to.equal('brianc') + })) +}) diff --git a/test/error-handling.js b/test/error-handling.js new file mode 100644 index 000000000..de68fad32 --- /dev/null +++ b/test/error-handling.js @@ -0,0 +1,135 @@ +'use strict' +const co = require('co') +const expect = require('expect.js') + +const describe = require('mocha').describe +const it = require('mocha').it + +const Pool = require('../') + +describe('pool error handling', function () { + it('Should complete these queries without dying', function (done) { + const pool = new Pool() + let errors = 0 + let shouldGet = 0 + function runErrorQuery () { + shouldGet++ + return new Promise(function (resolve, reject) { + pool.query("SELECT 'asd'+1 ").then(function (res) { + reject(res) // this should always error + }).catch(function (err) { + errors++ + resolve(err) + }) + }) + } + const ps = [] + for (let i = 0; i < 5; i++) { + ps.push(runErrorQuery()) + } + Promise.all(ps).then(function () { + expect(shouldGet).to.eql(errors) + pool.end(done) + }) + }) + + describe('calling release more than once', () => { + it('should throw each time', co.wrap(function * () { + const pool = new Pool() + const client = yield pool.connect() + client.release() + expect(() => client.release()).to.throwError() + expect(() => client.release()).to.throwError() + return yield pool.end() + })) + }) + + describe('calling connect after end', () => { + it('should return an error', function * () { + const pool = new Pool() + const res = yield pool.query('SELECT $1::text as name', ['hi']) + expect(res.rows[0].name).to.equal('hi') + const wait = pool.end() + pool.query('select now()') + yield wait + expect(() => pool.query('select now()')).to.reject() + }) + }) + + describe('using an ended pool', () => { + it('rejects all additional promises', (done) => { + const pool = new Pool() + const promises = [] + pool.end() + .then(() => { + const squash = promise => promise.catch(e => 'okay!') + promises.push(squash(pool.connect())) + promises.push(squash(pool.query('SELECT NOW()'))) + promises.push(squash(pool.end())) + Promise.all(promises).then(res => { + expect(res).to.eql(['okay!', 'okay!', 'okay!']) + done() + }) + }) + }) + + it('returns an error on all additional callbacks', (done) => { + const pool = new Pool() + pool.end(() => { + pool.query('SELECT *', (err) => { + expect(err).to.be.an(Error) + pool.connect((err) => { + expect(err).to.be.an(Error) + pool.end((err) => { + expect(err).to.be.an(Error) + done() + }) + }) + }) + }) + }) + }) + + describe('error from idle client', () => { + it('removes client from pool', co.wrap(function * () { + const pool = new Pool() + const client = yield pool.connect() + expect(pool.totalCount).to.equal(1) + expect(pool.waitingCount).to.equal(0) + expect(pool.idleCount).to.equal(0) + client.release() + yield new Promise((resolve, reject) => { + process.nextTick(() => { + pool.once('error', (err) => { + expect(err.message).to.equal('expected') + expect(pool.idleCount).to.equal(0) + expect(pool.totalCount).to.equal(0) + pool.end().then(resolve, reject) + }) + client.emit('error', new Error('expected')) + }) + }) + })) + }) + + describe('pool with lots of errors', () => { + it('continues to work and provide new clients', co.wrap(function * () { + const pool = new Pool({ max: 1 }) + const errors = [] + for (var i = 0; i < 20; i++) { + try { + yield pool.query('invalid sql') + } catch (err) { + errors.push(err) + } + } + expect(errors).to.have.length(20) + expect(pool.idleCount).to.equal(0) + expect(pool.query).to.be.a(Function) + const res = yield pool.query('SELECT $1::text as name', ['brianc']) + expect(res.rows).to.have.length(1) + expect(res.rows[0].name).to.equal('brianc') + return pool.end() + })) + }) +}) diff --git a/test/events.js b/test/events.js index 759dff02d..3d4405e28 100644 --- a/test/events.js +++ b/test/events.js @@ -1,14 +1,15 @@ -var expect = require('expect.js') -var EventEmitter = require('events').EventEmitter -var describe = require('mocha').describe -var it = require('mocha').it -var objectAssign = require('object-assign') -var Pool = require('../') +'use strict' + +const expect = require('expect.js') +const EventEmitter = require('events').EventEmitter +const describe = require('mocha').describe +const it = require('mocha').it +const Pool = require('../') describe('events', function () { it('emits connect before callback', function (done) { - var pool = new Pool() - var emittedClient = false + const pool = new Pool() + let emittedClient = false pool.on('connect', function (client) { emittedClient = client }) @@ -23,48 +24,47 @@ describe('events', function () { }) it('emits "connect" only with a successful connection', function (done) { - var pool = new Pool({ + const pool = new Pool({ // This client will always fail to connect Client: mockClient({ connect: function (cb) { - process.nextTick(function () { cb(new Error('bad news')) }) + process.nextTick(() => { + cb(new Error('bad news')) + setImmediate(done) + }) } }) }) pool.on('connect', function () { throw new Error('should never get here') }) - pool._create(function (err) { - if (err) done() - else done(new Error('expected failure')) - }) + return pool.connect().catch(e => expect(e.message).to.equal('bad news')) }) it('emits acquire every time a client is acquired', function (done) { - var pool = new Pool() - var acquireCount = 0 + const pool = new Pool() + let acquireCount = 0 pool.on('acquire', function (client) { expect(client).to.be.ok() acquireCount++ }) - for (var i = 0; i < 10; i++) { + for (let i = 0; i < 10; i++) { pool.connect(function (err, client, release) { - err ? done(err) : release() - release() if (err) return done(err) + release() }) pool.query('SELECT now()') } setTimeout(function () { expect(acquireCount).to.be(20) pool.end(done) - }, 40) + }, 100) }) it('emits error and client if an idle client in the pool hits an error', function (done) { - var pool = new Pool() + const pool = new Pool() pool.connect(function (err, client) { - expect(err).to.equal(null) + expect(err).to.equal(undefined) client.release() setImmediate(function () { client.emit('error', new Error('problem')) @@ -80,8 +80,8 @@ describe('events', function () { function mockClient (methods) { return function () { - var client = new EventEmitter() - objectAssign(client, methods) + const client = new EventEmitter() + Object.assign(client, methods) return client } } diff --git a/test/idle-timeout.js b/test/idle-timeout.js new file mode 100644 index 000000000..68a2b78a0 --- /dev/null +++ b/test/idle-timeout.js @@ -0,0 +1,54 @@ +'use strict' +const co = require('co') +const expect = require('expect.js') + +const describe = require('mocha').describe +const it = require('mocha').it + +const Pool = require('../') + +const wait = time => new Promise((resolve) => setTimeout(resolve, time)) + +describe('idle timeout', () => { + it('should timeout and remove the client', (done) => { + const pool = new Pool({ idleTimeoutMillis: 10 }) + pool.query('SELECT NOW()') + pool.on('remove', () => { + expect(pool.idleCount).to.equal(0) + expect(pool.totalCount).to.equal(0) + done() + }) + }) + + it('can remove idle clients and recreate them', co.wrap(function * () { + const pool = new Pool({ idleTimeoutMillis: 1 }) + const results = [] + for (var i = 0; i < 20; i++) { + let query = pool.query('SELECT NOW()') + expect(pool.idleCount).to.equal(0) + expect(pool.totalCount).to.equal(1) + results.push(yield query) + yield wait(2) + expect(pool.idleCount).to.equal(0) + expect(pool.totalCount).to.equal(0) + } + expect(results).to.have.length(20) + })) + + it('does not time out clients which are used', co.wrap(function * () { + const pool = new Pool({ idleTimeoutMillis: 1 }) + const results = [] + for (var i = 0; i < 20; i++) { + let client = yield pool.connect() + expect(pool.totalCount).to.equal(1) + expect(pool.idleCount).to.equal(0) + yield wait(10) + results.push(yield client.query('SELECT NOW()')) + client.release() + expect(pool.idleCount).to.equal(1) + expect(pool.totalCount).to.equal(1) + } + expect(results).to.have.length(20) + return pool.end() + })) +}) diff --git a/test/index.js b/test/index.js index 3f2b99d95..010d99c56 100644 --- a/test/index.js +++ b/test/index.js @@ -1,26 +1,16 @@ -var expect = require('expect.js') -var _ = require('lodash') +'use strict' +const expect = require('expect.js') +const _ = require('lodash') -var describe = require('mocha').describe -var it = require('mocha').it -var Promise = require('bluebird') +const describe = require('mocha').describe +const it = require('mocha').it -var Pool = require('../') - -if (typeof global.Promise === 'undefined') { - global.Promise = Promise -} +const Pool = require('../') describe('pool', function () { - it('can be used as a factory function', function () { - var pool = Pool() - expect(pool instanceof Pool).to.be.ok() - expect(typeof pool.connect).to.be('function') - }) - describe('with callbacks', function () { it('works totally unconfigured', function (done) { - var pool = new Pool() + const pool = new Pool() pool.connect(function (err, client, release) { if (err) return done(err) client.query('SELECT NOW()', function (err, res) { @@ -33,7 +23,7 @@ describe('pool', function () { }) it('passes props to clients', function (done) { - var pool = new Pool({ binary: true }) + const pool = new Pool({ binary: true }) pool.connect(function (err, client, release) { release() if (err) return done(err) @@ -43,7 +33,7 @@ describe('pool', function () { }) it('can run a query with a callback without parameters', function (done) { - var pool = new Pool() + const pool = new Pool() pool.query('SELECT 1 as num', function (err, res) { expect(res.rows[0]).to.eql({ num: 1 }) pool.end(function () { @@ -53,7 +43,7 @@ describe('pool', function () { }) it('can run a query with a callback', function (done) { - var pool = new Pool() + const pool = new Pool() pool.query('SELECT $1::text as name', ['brianc'], function (err, res) { expect(res.rows[0]).to.eql({ name: 'brianc' }) pool.end(function () { @@ -63,18 +53,30 @@ describe('pool', function () { }) it('passes connection errors to callback', function (done) { - var pool = new Pool({host: 'no-postgres-server-here.com'}) + const pool = new Pool({ port: 53922 }) pool.query('SELECT $1::text as name', ['brianc'], function (err, res) { expect(res).to.be(undefined) expect(err).to.be.an(Error) + // a connection error should not polute the pool with a dead client + expect(pool.totalCount).to.equal(0) pool.end(function (err) { done(err) }) }) }) + it('does not pass client to error callback', function (done) { + const pool = new Pool({ port: 58242 }) + pool.connect(function (err, client, release) { + expect(err).to.be.an(Error) + expect(client).to.be(undefined) + expect(release).to.be.a(Function) + pool.end(done) + }) + }) + it('removes client if it errors in background', function (done) { - var pool = new Pool() + const pool = new Pool() pool.connect(function (err, client, release) { release() if (err) return done(err) @@ -94,8 +96,8 @@ describe('pool', function () { }) it('should not change given options', function (done) { - var options = { max: 10 } - var pool = new Pool(options) + const options = { max: 10 } + const pool = new Pool(options) pool.connect(function (err, client, release) { release() if (err) return done(err) @@ -105,8 +107,8 @@ describe('pool', function () { }) it('does not create promises when connecting', function (done) { - var pool = new Pool() - var returnValue = pool.connect(function (err, client, release) { + const pool = new Pool() + const returnValue = pool.connect(function (err, client, release) { release() if (err) return done(err) pool.end(done) @@ -115,8 +117,8 @@ describe('pool', function () { }) it('does not create promises when querying', function (done) { - var pool = new Pool() - var returnValue = pool.query('SELECT 1 as num', function (err) { + const pool = new Pool() + const returnValue = pool.query('SELECT 1 as num', function (err) { pool.end(function () { done(err) }) @@ -125,67 +127,99 @@ describe('pool', function () { }) it('does not create promises when ending', function (done) { - var pool = new Pool() - var returnValue = pool.end(done) + const pool = new Pool() + const returnValue = pool.end(done) expect(returnValue).to.be(undefined) }) + + it('never calls callback syncronously', function (done) { + const pool = new Pool() + pool.connect((err, client) => { + if (err) throw err + client.release() + setImmediate(() => { + let called = false + pool.connect((err, client) => { + if (err) throw err + called = true + client.release() + setImmediate(() => { + pool.end(done) + }) + }) + expect(called).to.equal(false) + }) + }) + }) }) describe('with promises', function () { - it('connects and disconnects', function () { - var pool = new Pool() + it('connects, queries, and disconnects', function () { + const pool = new Pool() return pool.connect().then(function (client) { - expect(pool.pool.availableObjectsCount()).to.be(0) return client.query('select $1::text as name', ['hi']).then(function (res) { expect(res.rows).to.eql([{ name: 'hi' }]) client.release() - expect(pool.pool.getPoolSize()).to.be(1) - expect(pool.pool.availableObjectsCount()).to.be(1) return pool.end() }) }) }) + it('executes a query directly', () => { + const pool = new Pool() + return pool + .query('SELECT $1::text as name', ['hi']) + .then(res => { + expect(res.rows).to.have.length(1) + expect(res.rows[0].name).to.equal('hi') + return pool.end() + }) + }) + it('properly pools clients', function () { - var pool = new Pool({ poolSize: 9 }) - return Promise.map(_.times(30), function () { + const pool = new Pool({ poolSize: 9 }) + const promises = _.times(30, function () { return pool.connect().then(function (client) { return client.query('select $1::text as name', ['hi']).then(function (res) { client.release() return res }) }) - }).then(function (res) { + }) + return Promise.all(promises).then(function (res) { expect(res).to.have.length(30) - expect(pool.pool.getPoolSize()).to.be(9) + expect(pool.totalCount).to.be(9) return pool.end() }) }) it('supports just running queries', function () { - var pool = new Pool({ poolSize: 9 }) - return Promise.map(_.times(30), function () { - return pool.query('SELECT $1::text as name', ['hi']) - }).then(function (queries) { + const pool = new Pool({ poolSize: 9 }) + const text = 'select $1::text as name' + const values = ['hi'] + const query = { text: text, values: values } + const promises = _.times(30, () => pool.query(query)) + return Promise.all(promises).then(function (queries) { expect(queries).to.have.length(30) - expect(pool.pool.getPoolSize()).to.be(9) - expect(pool.pool.availableObjectsCount()).to.be(9) return pool.end() }) }) - it('recovers from all errors', function () { - var pool = new Pool() + it('recovers from query errors', function () { + const pool = new Pool() - var errors = [] - return Promise.mapSeries(_.times(30), function () { + const errors = [] + const promises = _.times(30, () => { return pool.query('SELECT asldkfjasldkf') .catch(function (e) { errors.push(e) }) - }).then(function () { + }) + return Promise.all(promises).then(() => { + expect(errors).to.have.length(30) + expect(pool.totalCount).to.equal(0) + expect(pool.idleCount).to.equal(0) return pool.query('SELECT $1::text as name', ['hi']).then(function (res) { - expect(errors).to.have.length(30) expect(res.rows).to.eql([{ name: 'hi' }]) return pool.end() }) @@ -193,40 +227,3 @@ describe('pool', function () { }) }) }) - -describe('pool error handling', function () { - it('Should complete these queries without dying', function (done) { - var pgPool = new Pool() - var pool = pgPool.pool - pool._factory.max = 1 - pool._factory.min = null - var errors = 0 - var shouldGet = 0 - function runErrorQuery () { - shouldGet++ - return new Promise(function (resolve, reject) { - pgPool.query("SELECT 'asd'+1 ").then(function (res) { - reject(res) // this should always error - }).catch(function (err) { - errors++ - resolve(err) - }) - }) - } - var ps = [] - for (var i = 0; i < 5; i++) { - ps.push(runErrorQuery()) - } - Promise.all(ps).then(function () { - expect(shouldGet).to.eql(errors) - done() - }) - }) -}) - -process.on('unhandledRejection', function (e) { - console.error(e.message, e.stack) - setImmediate(function () { - throw e - }) -}) diff --git a/test/logging.js b/test/logging.js index 47389a5aa..839603b78 100644 --- a/test/logging.js +++ b/test/logging.js @@ -1,17 +1,17 @@ -var expect = require('expect.js') +const expect = require('expect.js') -var describe = require('mocha').describe -var it = require('mocha').it +const describe = require('mocha').describe +const it = require('mocha').it -var Pool = require('../') +const Pool = require('../') describe('logging', function () { it('logs to supplied log function if given', function () { - var messages = [] - var log = function (msg) { + const messages = [] + const log = function (msg) { messages.push(msg) } - var pool = new Pool({ log: log }) + const pool = new Pool({ log: log }) return pool.query('SELECT NOW()').then(function () { expect(messages.length).to.be.greaterThan(0) return pool.end() diff --git a/test/mocha.opts b/test/mocha.opts index 46e8e69d9..590fb8628 100644 --- a/test/mocha.opts +++ b/test/mocha.opts @@ -1,2 +1,4 @@ +--require test/setup.js --no-exit --bail +--timeout 10000 diff --git a/test/setup.js b/test/setup.js new file mode 100644 index 000000000..cf75b7a67 --- /dev/null +++ b/test/setup.js @@ -0,0 +1,10 @@ +const crash = reason => { + process.on(reason, err => { + console.error(reason, err.stack) + process.exit(-1) + }) +} + +crash('unhandledRejection') +crash('uncaughtError') +crash('warning') diff --git a/test/sizing.js b/test/sizing.js new file mode 100644 index 000000000..9e926877d --- /dev/null +++ b/test/sizing.js @@ -0,0 +1,44 @@ +const expect = require('expect.js') +const co = require('co') +const _ = require('lodash') + +const describe = require('mocha').describe +const it = require('mocha').it + +const Pool = require('../') + +describe('pool size of 1', () => { + it('can create a single client and use it once', co.wrap(function * () { + const pool = new Pool({ max: 1 }) + expect(pool.waitingCount).to.equal(0) + const client = yield pool.connect() + const res = yield client.query('SELECT $1::text as name', ['hi']) + expect(res.rows[0].name).to.equal('hi') + client.release() + pool.end() + })) + + it('can create a single client and use it multiple times', co.wrap(function * () { + const pool = new Pool({ max: 1 }) + expect(pool.waitingCount).to.equal(0) + const client = yield pool.connect() + const wait = pool.connect() + expect(pool.waitingCount).to.equal(1) + client.release() + const client2 = yield wait + expect(client).to.equal(client2) + client2.release() + return yield pool.end() + })) + + it('can only send 1 query at a time', co.wrap(function * () { + const pool = new Pool({ max: 1 }) + const queries = _.times(20, (i) => { + return pool.query('SELECT COUNT(*) as counts FROM pg_stat_activity') + }) + const results = yield Promise.all(queries) + const counts = results.map(res => parseInt(res.rows[0].counts), 10) + expect(counts).to.eql(_.times(20, i => 1)) + return yield pool.end() + })) +}) diff --git a/test/timeout.js b/test/timeout.js new file mode 100644 index 000000000..e69de29bb diff --git a/test/verify.js b/test/verify.js new file mode 100644 index 000000000..667dea9ff --- /dev/null +++ b/test/verify.js @@ -0,0 +1,25 @@ +'use strict' +const expect = require('expect.js') + +const describe = require('mocha').describe +const it = require('mocha').it + +const Pool = require('../') + +describe('verify', () => { + it('verifies a client with a callback', false, (done) => { + const pool = new Pool({ + verify: (client, cb) => { + client.release() + cb(new Error('nope')) + } + }) + + pool.connect((err, client) => { + expect(err).to.be.an(Error) + expect(err.message).to.be('nope') + pool.end() + done() + }) + }) +})