diff --git a/index.js b/index.js index aec3a1e..b8ccd47 100644 --- a/index.js +++ b/index.js @@ -2,6 +2,7 @@ var genericPool = require('generic-pool') var util = require('util') var EventEmitter = require('events').EventEmitter var objectAssign = require('object-assign') +var SafeClient = require('./safe-client') // there is a bug in the generic pool where it will not recreate // destroyed workers (even if there is waiting work to do) unless @@ -119,8 +120,10 @@ Pool.prototype.connect = function (cb) { return } + // Wrap the client instance with a control layer. + var safeClient = new SafeClient(client, this.options) this.log('acquire client') - this.emit('acquire', client) + this.emit('acquire', safeClient) client.release = function (err) { delete client.release @@ -134,9 +137,9 @@ Pool.prototype.connect = function (cb) { }.bind(this) if (cb) { - cb(null, client, client.release) + cb(null, safeClient, safeClient.release.bind(safeClient)) } else { - resolve(client) + resolve(safeClient) } }.bind(this)) }.bind(this)) diff --git a/safe-client.js b/safe-client.js new file mode 100644 index 0000000..5980f12 --- /dev/null +++ b/safe-client.js @@ -0,0 +1,39 @@ +var objectAssign = require('object-assign') + +module.exports = SafeClient + +function SafeClient (client, options) { + if (!(this instanceof SafeClient)) { + return new SafeClient(client) + } + + this.options = objectAssign({}, options) + this.Promise = this.options.Promise || global.Promise + this.log = this.options.log || function () { } + + this._client = client + this._released = false + this._releasedError = 'This connection has already been released!' +} + +SafeClient.prototype.query = function (text, values, cb) { + if (this._released) { + var err = new Error(this._releasedError) + if (cb) { + return cb(err) + } else { + return this.Promise.reject(err) + } + } else { + return this._client.query(text, values, cb) + } +} + +SafeClient.prototype.release = function (err) { + if (this._released) { + throw new Error(this._releasedError) + } else { + this._released = true + return this._client.release(err) + } +} diff --git a/test/events.js b/test/events.js index bcb523f..53b3b64 100644 --- a/test/events.js +++ b/test/events.js @@ -17,7 +17,8 @@ describe('events', function () { if (err) return done(err) release() pool.end() - expect(client).to.be(emittedClient) + // "Real" client is wrapped by SafeClient + expect(client._client).to.be(emittedClient) done() }) }) @@ -50,7 +51,6 @@ describe('events', function () { for (var i = 0; i < 10; i++) { pool.connect(function (err, client, release) { err ? done(err) : release() - release() if (err) return done(err) }) pool.query('SELECT now()') diff --git a/test/index.js b/test/index.js index 3f2b99d..14f2040 100644 --- a/test/index.js +++ b/test/index.js @@ -37,7 +37,7 @@ describe('pool', function () { pool.connect(function (err, client, release) { release() if (err) return done(err) - expect(client.binary).to.eql(true) + expect(client._client.binary).to.eql(true) pool.end(done) }) }) @@ -75,18 +75,23 @@ describe('pool', function () { it('removes client if it errors in background', function (done) { var pool = new Pool() + var safeClient pool.connect(function (err, client, release) { - release() - if (err) return done(err) - client.testString = 'foo' + if (err) { + release() + return done(err) + } + safeClient = client + client._client.testString = 'foo' setTimeout(function () { - client.emit('error', new Error('on purpose')) + client._client.emit('error', new Error('on purpose')) }, 10) }) pool.on('error', function (err) { expect(err.message).to.be('on purpose') expect(err.client).to.not.be(undefined) expect(err.client.testString).to.be('foo') + expect(safeClient).to.not.be(undefined) err.client.connection.stream.on('end', function () { pool.end(done) }) diff --git a/test/safe-client.js b/test/safe-client.js new file mode 100644 index 0000000..a8fe530 --- /dev/null +++ b/test/safe-client.js @@ -0,0 +1,63 @@ +var expect = require('expect.js') + +var describe = require('mocha').describe +var it = require('mocha').it +var Promise = require('bluebird') + +var Pool = require('../') + +if (typeof global.Promise === 'undefined') { + global.Promise = Promise +} + +describe('safe-client', function () { + var query = 'SELECT $1::text as name' + describe('with callbacks', function () { + it('does not allow queries after release', function (done) { + var pool = new Pool() + pool.connect(function (err, client, release) { + if (err) { + pool.end(function () { + done(err) + }) + } + client.query(query, ['brianc'], function (err, res) { + if (err) { + return done(err) + } + expect(res.rows[0]).to.eql({ name: 'brianc' }) + release() + client.query(query, ['brianc'], function (err, res) { + if (!err) { + return done(new Error('Did not receive error!')) + } + expect(err.message).to.not.be(undefined) + expect(err.message).to.contain(client._releasedError) + pool.end(done) + }) + }) + }) + }) + }) + + describe('with promises', function () { + it('does not allow queries after release', function (done) { + var pool = new Pool() + var clientHandle + pool.connect().then(function (client) { + clientHandle = client // Handle for examination and brevity + return clientHandle.query(query, ['brianc']).then(function () { + return clientHandle.release() + }).then(function () { + return clientHandle.query(query, ['brianc']) + }) + }).then(function () { + return done(new Error('Should have thrown an error!')) + }).catch(function (err) { + expect(err.message).to.not.be(undefined) + expect(err.message).to.contain(clientHandle._releasedError) + return pool.end(done) + }) + }) + }) +})