diff --git a/.travis.yml b/.travis.yml index 9629156e2..7987f761b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,20 +7,18 @@ before_script: | env: - CC=clang CXX=clang++ npm_config_clang=1 PGUSER=postgres PGDATABASE=postgres - # test w/ new faster parsing code - - CC=clang CXX=clang++ npm_config_clang=1 PGUSER=postgres PGDATABASE=postgres PG_FAST_CONNECTION=true node_js: - lts/dubnium - lts/erbium # node 13.7 seems to have changed behavior of async iterators exiting early on streams # if 13.8 still has this problem when it comes down I'll talk to the node team about the change - # in the mean time...peg to 13.6 + # in the mean time...peg to 13.6 - 13.6 - 14 addons: - postgresql: "10" + postgresql: '10' matrix: include: @@ -42,25 +40,25 @@ matrix: - node_js: lts/carbon addons: - postgresql: "9.5" + postgresql: '9.5' dist: precise # different PostgreSQL versions on Node LTS - node_js: lts/erbium addons: - postgresql: "9.3" + postgresql: '9.3' - node_js: lts/erbium addons: - postgresql: "9.4" + postgresql: '9.4' - node_js: lts/erbium addons: - postgresql: "9.5" + postgresql: '9.5' - node_js: lts/erbium addons: - postgresql: "9.6" + postgresql: '9.6' # PostgreSQL 9.2 only works on precise - node_js: lts/carbon addons: - postgresql: "9.2" + postgresql: '9.2' dist: precise diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 76906712b..2c12f2cce 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -18,9 +18,6 @@ var ConnectionParameters = require('./connection-parameters') var Query = require('./query') var defaults = require('./defaults') var Connection = require('./connection') -if (process.env.PG_FAST_CONNECTION) { - Connection = require('./connection-fast') -} var Client = function (config) { EventEmitter.call(this) diff --git a/packages/pg/lib/connection-fast.js b/packages/pg/lib/connection-fast.js deleted file mode 100644 index 7cc2ed8cf..000000000 --- a/packages/pg/lib/connection-fast.js +++ /dev/null @@ -1,214 +0,0 @@ -'use strict' -/** - * Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com) - * All rights reserved. - * - * This source code is licensed under the MIT license found in the - * README.md file in the root directory of this source tree. - */ - -var net = require('net') -var EventEmitter = require('events').EventEmitter -var util = require('util') - -const { parse, serialize } = require('pg-protocol') - -// TODO(bmc) support binary mode at some point -console.log('***using faster connection***') -var Connection = function (config) { - EventEmitter.call(this) - config = config || {} - this.stream = config.stream || new net.Socket() - this.stream.setNoDelay(true) - this._keepAlive = config.keepAlive - this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis - this.lastBuffer = false - this.parsedStatements = {} - this.ssl = config.ssl || false - this._ending = false - this._emitMessage = false - var self = this - this.on('newListener', function (eventName) { - if (eventName === 'message') { - self._emitMessage = true - } - }) -} - -util.inherits(Connection, EventEmitter) - -Connection.prototype.connect = function (port, host) { - var self = this - - this._connecting = true - this.stream.connect(port, host) - - this.stream.once('connect', function () { - if (self._keepAlive) { - self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) - } - self.emit('connect') - }) - - const reportStreamError = function (error) { - // errors about disconnections should be ignored during disconnect - if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) { - return - } - self.emit('error', error) - } - this.stream.on('error', reportStreamError) - - this.stream.on('close', function () { - self.emit('end') - }) - - if (!this.ssl) { - return this.attachListeners(this.stream) - } - - this.stream.once('data', function (buffer) { - var responseCode = buffer.toString('utf8') - switch (responseCode) { - case 'S': // Server supports SSL connections, continue with a secure connection - break - case 'N': // Server does not support SSL connections - self.stream.end() - return self.emit('error', new Error('The server does not support SSL connections')) - default: - // Any other response byte, including 'E' (ErrorResponse) indicating a server error - self.stream.end() - return self.emit('error', new Error('There was an error establishing an SSL connection')) - } - var tls = require('tls') - const options = Object.assign( - { - socket: self.stream, - }, - self.ssl - ) - if (net.isIP(host) === 0) { - options.servername = host - } - self.stream = tls.connect(options) - self.attachListeners(self.stream) - self.stream.on('error', reportStreamError) - - self.emit('sslconnect') - }) -} - -Connection.prototype.attachListeners = function (stream) { - stream.on('end', () => { - this.emit('end') - }) - parse(stream, (msg) => { - var eventName = msg.name === 'error' ? 'errorMessage' : msg.name - if (this._emitMessage) { - this.emit('message', msg) - } - this.emit(eventName, msg) - }) -} - -Connection.prototype.requestSsl = function () { - this.stream.write(serialize.requestSsl()) -} - -Connection.prototype.startup = function (config) { - this.stream.write(serialize.startup(config)) -} - -Connection.prototype.cancel = function (processID, secretKey) { - this._send(serialize.cancel(processID, secretKey)) -} - -Connection.prototype.password = function (password) { - this._send(serialize.password(password)) -} - -Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) { - this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) -} - -Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) { - this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) -} - -Connection.prototype._send = function (buffer) { - if (!this.stream.writable) { - return false - } - return this.stream.write(buffer) -} - -Connection.prototype.query = function (text) { - this._send(serialize.query(text)) -} - -// send parse message -Connection.prototype.parse = function (query) { - this._send(serialize.parse(query)) -} - -// send bind message -// "more" === true to buffer the message until flush() is called -Connection.prototype.bind = function (config) { - this._send(serialize.bind(config)) -} - -// send execute message -// "more" === true to buffer the message until flush() is called -Connection.prototype.execute = function (config) { - this._send(serialize.execute(config)) -} - -const flushBuffer = serialize.flush() -Connection.prototype.flush = function () { - if (this.stream.writable) { - this.stream.write(flushBuffer) - } -} - -const syncBuffer = serialize.sync() -Connection.prototype.sync = function () { - this._ending = true - this._send(syncBuffer) - this._send(flushBuffer) -} - -const endBuffer = serialize.end() - -Connection.prototype.end = function () { - // 0x58 = 'X' - this._ending = true - if (!this._connecting || !this.stream.writable) { - this.stream.end() - return - } - return this.stream.write(endBuffer, () => { - this.stream.end() - }) -} - -Connection.prototype.close = function (msg) { - this._send(serialize.close(msg)) -} - -Connection.prototype.describe = function (msg) { - this._send(serialize.describe(msg)) -} - -Connection.prototype.sendCopyFromChunk = function (chunk) { - this._send(serialize.copyData(chunk)) -} - -Connection.prototype.endCopyFrom = function () { - this._send(serialize.copyDone()) -} - -Connection.prototype.sendCopyFail = function (msg) { - this._send(serialize.copyFail(msg)) -} - -module.exports = Connection diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index c3f30aa0f..bce183484 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -11,11 +11,9 @@ var net = require('net') var EventEmitter = require('events').EventEmitter var util = require('util') -var Writer = require('buffer-writer') -var Reader = require('packet-reader') +const { parse, serialize } = require('pg-protocol') -var TEXT_MODE = 0 -var BINARY_MODE = 1 +// TODO(bmc) support binary mode at some point var Connection = function (config) { EventEmitter.call(this) config = config || {} @@ -23,20 +21,10 @@ var Connection = function (config) { this._keepAlive = config.keepAlive this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis this.lastBuffer = false - this.lastOffset = 0 - this.buffer = null - this.offset = null - this.encoding = config.encoding || 'utf8' this.parsedStatements = {} - this.writer = new Writer() this.ssl = config.ssl || false this._ending = false - this._mode = TEXT_MODE this._emitMessage = false - this._reader = new Reader({ - headerSize: 1, - lengthPadding: -4, - }) var self = this this.on('newListener', function (eventName) { if (eventName === 'message') { @@ -101,576 +89,124 @@ Connection.prototype.connect = function (port, host) { options.servername = host } self.stream = tls.connect(options) - self.stream.on('error', reportStreamError) self.attachListeners(self.stream) + self.stream.on('error', reportStreamError) + self.emit('sslconnect') }) } Connection.prototype.attachListeners = function (stream) { - var self = this - stream.on('data', function (buff) { - self._reader.addChunk(buff) - var packet = self._reader.read() - while (packet) { - var msg = self.parseMessage(packet) - var eventName = msg.name === 'error' ? 'errorMessage' : msg.name - if (self._emitMessage) { - self.emit('message', msg) - } - self.emit(eventName, msg) - packet = self._reader.read() - } + stream.on('end', () => { + this.emit('end') }) - stream.on('end', function () { - self.emit('end') + parse(stream, (msg) => { + var eventName = msg.name === 'error' ? 'errorMessage' : msg.name + if (this._emitMessage) { + this.emit('message', msg) + } + this.emit(eventName, msg) }) } Connection.prototype.requestSsl = function () { - var bodyBuffer = this.writer.addInt16(0x04d2).addInt16(0x162f).flush() - - var length = bodyBuffer.length + 4 - - var buffer = new Writer().addInt32(length).add(bodyBuffer).join() - this.stream.write(buffer) + this.stream.write(serialize.requestSsl()) } Connection.prototype.startup = function (config) { - var writer = this.writer.addInt16(3).addInt16(0) - - Object.keys(config).forEach(function (key) { - var val = config[key] - writer.addCString(key).addCString(val) - }) - - writer.addCString('client_encoding').addCString("'utf-8'") - - var bodyBuffer = writer.addCString('').flush() - // this message is sent without a code - - var length = bodyBuffer.length + 4 - - var buffer = new Writer().addInt32(length).add(bodyBuffer).join() - this.stream.write(buffer) + this.stream.write(serialize.startup(config)) } Connection.prototype.cancel = function (processID, secretKey) { - var bodyBuffer = this.writer.addInt16(1234).addInt16(5678).addInt32(processID).addInt32(secretKey).flush() - - var length = bodyBuffer.length + 4 - - var buffer = new Writer().addInt32(length).add(bodyBuffer).join() - this.stream.write(buffer) + this._send(serialize.cancel(processID, secretKey)) } Connection.prototype.password = function (password) { - // 0x70 = 'p' - this._send(0x70, this.writer.addCString(password)) + this._send(serialize.password(password)) } Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) { - // 0x70 = 'p' - this.writer.addCString(mechanism).addInt32(Buffer.byteLength(initialResponse)).addString(initialResponse) - - this._send(0x70) + this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) } Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) { - // 0x70 = 'p' - this.writer.addString(additionalData) - - this._send(0x70) + this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) } -Connection.prototype._send = function (code, more) { +Connection.prototype._send = function (buffer) { if (!this.stream.writable) { return false } - if (more === true) { - this.writer.addHeader(code) - } else { - return this.stream.write(this.writer.flush(code)) - } + return this.stream.write(buffer) } Connection.prototype.query = function (text) { - // 0x51 = Q - this.stream.write(this.writer.addCString(text).flush(0x51)) + this._send(serialize.query(text)) } // send parse message -// "more" === true to buffer the message until flush() is called -Connection.prototype.parse = function (query, more) { - // expect something like this: - // { name: 'queryName', - // text: 'select * from blah', - // types: ['int8', 'bool'] } - - // normalize missing query names to allow for null - query.name = query.name || '' - if (query.name.length > 63) { - /* eslint-disable no-console */ - console.error('Warning! Postgres only supports 63 characters for query names.') - console.error('You supplied %s (%s)', query.name, query.name.length) - console.error('This can cause conflicts and silent errors executing queries') - /* eslint-enable no-console */ - } - // normalize null type array - query.types = query.types || [] - var len = query.types.length - var buffer = this.writer - .addCString(query.name) // name of query - .addCString(query.text) // actual query text - .addInt16(len) - for (var i = 0; i < len; i++) { - buffer.addInt32(query.types[i]) - } - - var code = 0x50 - this._send(code, more) +Connection.prototype.parse = function (query) { + this._send(serialize.parse(query)) } // send bind message // "more" === true to buffer the message until flush() is called -Connection.prototype.bind = function (config, more) { - // normalize config - config = config || {} - config.portal = config.portal || '' - config.statement = config.statement || '' - config.binary = config.binary || false - var values = config.values || [] - var len = values.length - var useBinary = false - for (var j = 0; j < len; j++) { - useBinary |= values[j] instanceof Buffer - } - var buffer = this.writer.addCString(config.portal).addCString(config.statement) - if (!useBinary) { - buffer.addInt16(0) - } else { - buffer.addInt16(len) - for (j = 0; j < len; j++) { - buffer.addInt16(values[j] instanceof Buffer) - } - } - buffer.addInt16(len) - for (var i = 0; i < len; i++) { - var val = values[i] - if (val === null || typeof val === 'undefined') { - buffer.addInt32(-1) - } else if (val instanceof Buffer) { - buffer.addInt32(val.length) - buffer.add(val) - } else { - buffer.addInt32(Buffer.byteLength(val)) - buffer.addString(val) - } - } - - if (config.binary) { - buffer.addInt16(1) // format codes to use binary - buffer.addInt16(1) - } else { - buffer.addInt16(0) // format codes to use text - } - // 0x42 = 'B' - this._send(0x42, more) +Connection.prototype.bind = function (config) { + this._send(serialize.bind(config)) } // send execute message // "more" === true to buffer the message until flush() is called -Connection.prototype.execute = function (config, more) { - config = config || {} - config.portal = config.portal || '' - config.rows = config.rows || '' - this.writer.addCString(config.portal).addInt32(config.rows) - - // 0x45 = 'E' - this._send(0x45, more) +Connection.prototype.execute = function (config) { + this._send(serialize.execute(config)) } -var emptyBuffer = Buffer.alloc(0) - +const flushBuffer = serialize.flush() Connection.prototype.flush = function () { - // 0x48 = 'H' - this.writer.add(emptyBuffer) - this._send(0x48) + if (this.stream.writable) { + this.stream.write(flushBuffer) + } } +const syncBuffer = serialize.sync() Connection.prototype.sync = function () { - // clear out any pending data in the writer - this.writer.flush(0) - - this.writer.add(emptyBuffer) this._ending = true - this._send(0x53) + this._send(syncBuffer) + this._send(flushBuffer) } -const END_BUFFER = Buffer.from([0x58, 0x00, 0x00, 0x00, 0x04]) +const endBuffer = serialize.end() Connection.prototype.end = function () { // 0x58 = 'X' - this.writer.add(emptyBuffer) this._ending = true if (!this._connecting || !this.stream.writable) { this.stream.end() return } - return this.stream.write(END_BUFFER, () => { + return this.stream.write(endBuffer, () => { this.stream.end() }) } -Connection.prototype.close = function (msg, more) { - this.writer.addCString(msg.type + (msg.name || '')) - this._send(0x43, more) +Connection.prototype.close = function (msg) { + this._send(serialize.close(msg)) } -Connection.prototype.describe = function (msg, more) { - this.writer.addCString(msg.type + (msg.name || '')) - this._send(0x44, more) +Connection.prototype.describe = function (msg) { + this._send(serialize.describe(msg)) } Connection.prototype.sendCopyFromChunk = function (chunk) { - this.stream.write(this.writer.add(chunk).flush(0x64)) + this._send(serialize.copyData(chunk)) } Connection.prototype.endCopyFrom = function () { - this.stream.write(this.writer.add(emptyBuffer).flush(0x63)) + this._send(serialize.copyDone()) } Connection.prototype.sendCopyFail = function (msg) { - // this.stream.write(this.writer.add(emptyBuffer).flush(0x66)); - this.writer.addCString(msg) - this._send(0x66) + this._send(serialize.copyFail(msg)) } -var Message = function (name, length) { - this.name = name - this.length = length -} - -Connection.prototype.parseMessage = function (buffer) { - this.offset = 0 - var length = buffer.length + 4 - switch (this._reader.header) { - case 0x52: // R - return this.parseR(buffer, length) - - case 0x53: // S - return this.parseS(buffer, length) - - case 0x4b: // K - return this.parseK(buffer, length) - - case 0x43: // C - return this.parseC(buffer, length) - - case 0x5a: // Z - return this.parseZ(buffer, length) - - case 0x54: // T - return this.parseT(buffer, length) - - case 0x44: // D - return this.parseD(buffer, length) - - case 0x45: // E - return this.parseE(buffer, length) - - case 0x4e: // N - return this.parseN(buffer, length) - - case 0x31: // 1 - return new Message('parseComplete', length) - - case 0x32: // 2 - return new Message('bindComplete', length) - - case 0x33: // 3 - return new Message('closeComplete', length) - - case 0x41: // A - return this.parseA(buffer, length) - - case 0x6e: // n - return new Message('noData', length) - - case 0x49: // I - return new Message('emptyQuery', length) - - case 0x73: // s - return new Message('portalSuspended', length) - - case 0x47: // G - return this.parseG(buffer, length) - - case 0x48: // H - return this.parseH(buffer, length) - - case 0x57: // W - return new Message('replicationStart', length) - - case 0x63: // c - return new Message('copyDone', length) - - case 0x64: // d - return this.parsed(buffer, length) - } -} - -Connection.prototype.parseR = function (buffer, length) { - var code = this.parseInt32(buffer) - - var msg = new Message('authenticationOk', length) - - switch (code) { - case 0: // AuthenticationOk - return msg - case 3: // AuthenticationCleartextPassword - if (msg.length === 8) { - msg.name = 'authenticationCleartextPassword' - return msg - } - break - case 5: // AuthenticationMD5Password - if (msg.length === 12) { - msg.name = 'authenticationMD5Password' - msg.salt = Buffer.alloc(4) - buffer.copy(msg.salt, 0, this.offset, this.offset + 4) - this.offset += 4 - return msg - } - - break - case 10: // AuthenticationSASL - msg.name = 'authenticationSASL' - msg.mechanisms = [] - do { - var mechanism = this.parseCString(buffer) - - if (mechanism) { - msg.mechanisms.push(mechanism) - } - } while (mechanism) - - return msg - case 11: // AuthenticationSASLContinue - msg.name = 'authenticationSASLContinue' - msg.data = this.readString(buffer, length - 4) - - return msg - case 12: // AuthenticationSASLFinal - msg.name = 'authenticationSASLFinal' - msg.data = this.readString(buffer, length - 4) - - return msg - } - - throw new Error('Unknown authenticationOk message type' + util.inspect(msg)) -} - -Connection.prototype.parseS = function (buffer, length) { - var msg = new Message('parameterStatus', length) - msg.parameterName = this.parseCString(buffer) - msg.parameterValue = this.parseCString(buffer) - return msg -} - -Connection.prototype.parseK = function (buffer, length) { - var msg = new Message('backendKeyData', length) - msg.processID = this.parseInt32(buffer) - msg.secretKey = this.parseInt32(buffer) - return msg -} - -Connection.prototype.parseC = function (buffer, length) { - var msg = new Message('commandComplete', length) - msg.text = this.parseCString(buffer) - return msg -} - -Connection.prototype.parseZ = function (buffer, length) { - var msg = new Message('readyForQuery', length) - msg.name = 'readyForQuery' - msg.status = this.readString(buffer, 1) - return msg -} - -var ROW_DESCRIPTION = 'rowDescription' -Connection.prototype.parseT = function (buffer, length) { - var msg = new Message(ROW_DESCRIPTION, length) - msg.fieldCount = this.parseInt16(buffer) - var fields = [] - for (var i = 0; i < msg.fieldCount; i++) { - fields.push(this.parseField(buffer)) - } - msg.fields = fields - return msg -} - -var Field = function () { - this.name = null - this.tableID = null - this.columnID = null - this.dataTypeID = null - this.dataTypeSize = null - this.dataTypeModifier = null - this.format = null -} - -var FORMAT_TEXT = 'text' -var FORMAT_BINARY = 'binary' -Connection.prototype.parseField = function (buffer) { - var field = new Field() - field.name = this.parseCString(buffer) - field.tableID = this.parseInt32(buffer) - field.columnID = this.parseInt16(buffer) - field.dataTypeID = this.parseInt32(buffer) - field.dataTypeSize = this.parseInt16(buffer) - field.dataTypeModifier = this.parseInt32(buffer) - if (this.parseInt16(buffer) === TEXT_MODE) { - this._mode = TEXT_MODE - field.format = FORMAT_TEXT - } else { - this._mode = BINARY_MODE - field.format = FORMAT_BINARY - } - return field -} - -var DATA_ROW = 'dataRow' -var DataRowMessage = function (length, fieldCount) { - this.name = DATA_ROW - this.length = length - this.fieldCount = fieldCount - this.fields = [] -} - -// extremely hot-path code -Connection.prototype.parseD = function (buffer, length) { - var fieldCount = this.parseInt16(buffer) - var msg = new DataRowMessage(length, fieldCount) - for (var i = 0; i < fieldCount; i++) { - msg.fields.push(this._readValue(buffer)) - } - return msg -} - -// extremely hot-path code -Connection.prototype._readValue = function (buffer) { - var length = this.parseInt32(buffer) - if (length === -1) return null - if (this._mode === TEXT_MODE) { - return this.readString(buffer, length) - } - return this.readBytes(buffer, length) -} - -// parses error -Connection.prototype.parseE = function (buffer, length, isNotice) { - var fields = {} - var fieldType = this.readString(buffer, 1) - while (fieldType !== '\0') { - fields[fieldType] = this.parseCString(buffer) - fieldType = this.readString(buffer, 1) - } - - // the msg is an Error instance - var msg = isNotice ? { message: fields.M } : new Error(fields.M) - - // for compatibility with Message - msg.name = isNotice ? 'notice' : 'error' - msg.length = length - - msg.severity = fields.S - msg.code = fields.C - msg.detail = fields.D - msg.hint = fields.H - msg.position = fields.P - msg.internalPosition = fields.p - msg.internalQuery = fields.q - msg.where = fields.W - msg.schema = fields.s - msg.table = fields.t - msg.column = fields.c - msg.dataType = fields.d - msg.constraint = fields.n - msg.file = fields.F - msg.line = fields.L - msg.routine = fields.R - return msg -} - -// same thing, different name -Connection.prototype.parseN = function (buffer, length) { - var msg = this.parseE(buffer, length, true) - msg.name = 'notice' - return msg -} - -Connection.prototype.parseA = function (buffer, length) { - var msg = new Message('notification', length) - msg.processId = this.parseInt32(buffer) - msg.channel = this.parseCString(buffer) - msg.payload = this.parseCString(buffer) - return msg -} - -Connection.prototype.parseG = function (buffer, length) { - var msg = new Message('copyInResponse', length) - return this.parseGH(buffer, msg) -} - -Connection.prototype.parseH = function (buffer, length) { - var msg = new Message('copyOutResponse', length) - return this.parseGH(buffer, msg) -} - -Connection.prototype.parseGH = function (buffer, msg) { - var isBinary = buffer[this.offset] !== 0 - this.offset++ - msg.binary = isBinary - var columnCount = this.parseInt16(buffer) - msg.columnTypes = [] - for (var i = 0; i < columnCount; i++) { - msg.columnTypes.push(this.parseInt16(buffer)) - } - return msg -} - -Connection.prototype.parsed = function (buffer, length) { - var msg = new Message('copyData', length) - msg.chunk = this.readBytes(buffer, msg.length - 4) - return msg -} - -Connection.prototype.parseInt32 = function (buffer) { - var value = buffer.readInt32BE(this.offset) - this.offset += 4 - return value -} - -Connection.prototype.parseInt16 = function (buffer) { - var value = buffer.readInt16BE(this.offset) - this.offset += 2 - return value -} - -Connection.prototype.readString = function (buffer, length) { - return buffer.toString(this.encoding, this.offset, (this.offset += length)) -} - -Connection.prototype.readBytes = function (buffer, length) { - return buffer.slice(this.offset, (this.offset += length)) -} - -Connection.prototype.parseCString = function (buffer) { - var start = this.offset - var end = buffer.indexOf(0, start) - this.offset = end + 1 - return buffer.toString(this.encoding, start, end) -} -// end parsing methods module.exports = Connection diff --git a/packages/pg/test/unit/connection/outbound-sending-tests.js b/packages/pg/test/unit/connection/outbound-sending-tests.js deleted file mode 100644 index 8b21de4ce..000000000 --- a/packages/pg/test/unit/connection/outbound-sending-tests.js +++ /dev/null @@ -1,205 +0,0 @@ -'use strict' -require(__dirname + '/test-helper') -var Connection = require(__dirname + '/../../../lib/connection') -var stream = new MemoryStream() -var con = new Connection({ - stream: stream, -}) -con._connecting = true - -assert.received = function (stream, buffer) { - assert.lengthIs(stream.packets, 1) - var packet = stream.packets.pop() - assert.equalBuffers(packet, buffer) -} - -test('sends startup message', function () { - con.startup({ - user: 'brian', - database: 'bang', - }) - assert.received( - stream, - new BufferList() - .addInt16(3) - .addInt16(0) - .addCString('user') - .addCString('brian') - .addCString('database') - .addCString('bang') - .addCString('client_encoding') - .addCString("'utf-8'") - .addCString('') - .join(true) - ) -}) - -test('sends password message', function () { - con.password('!') - assert.received(stream, new BufferList().addCString('!').join(true, 'p')) -}) - -test('sends SASLInitialResponseMessage message', function () { - con.sendSASLInitialResponseMessage('mech', 'data') - assert.received(stream, new BufferList().addCString('mech').addInt32(4).addString('data').join(true, 'p')) -}) - -test('sends SCRAMClientFinalMessage message', function () { - con.sendSCRAMClientFinalMessage('data') - assert.received(stream, new BufferList().addString('data').join(true, 'p')) -}) - -test('sends query message', function () { - var txt = 'select * from boom' - con.query(txt) - assert.received(stream, new BufferList().addCString(txt).join(true, 'Q')) -}) - -test('sends parse message', function () { - con.parse({ text: '!' }) - var expected = new BufferList().addCString('').addCString('!').addInt16(0).join(true, 'P') - assert.received(stream, expected) -}) - -test('sends parse message with named query', function () { - con.parse({ - name: 'boom', - text: 'select * from boom', - types: [], - }) - var expected = new BufferList().addCString('boom').addCString('select * from boom').addInt16(0).join(true, 'P') - assert.received(stream, expected) - - test('with multiple parameters', function () { - con.parse({ - name: 'force', - text: 'select * from bang where name = $1', - types: [1, 2, 3, 4], - }) - var expected = new BufferList() - .addCString('force') - .addCString('select * from bang where name = $1') - .addInt16(4) - .addInt32(1) - .addInt32(2) - .addInt32(3) - .addInt32(4) - .join(true, 'P') - assert.received(stream, expected) - }) -}) - -test('bind messages', function () { - test('with no values', function () { - con.bind() - - var expectedBuffer = new BufferList() - .addCString('') - .addCString('') - .addInt16(0) - .addInt16(0) - .addInt16(0) - .join(true, 'B') - assert.received(stream, expectedBuffer) - }) - - test('with named statement, portal, and values', function () { - con.bind({ - portal: 'bang', - statement: 'woo', - values: ['1', 'hi', null, 'zing'], - }) - var expectedBuffer = new BufferList() - .addCString('bang') // portal name - .addCString('woo') // statement name - .addInt16(0) - .addInt16(4) - .addInt32(1) - .add(Buffer.from('1')) - .addInt32(2) - .add(Buffer.from('hi')) - .addInt32(-1) - .addInt32(4) - .add(Buffer.from('zing')) - .addInt16(0) - .join(true, 'B') - assert.received(stream, expectedBuffer) - }) -}) - -test('with named statement, portal, and buffer value', function () { - con.bind({ - portal: 'bang', - statement: 'woo', - values: ['1', 'hi', null, Buffer.from('zing', 'utf8')], - }) - var expectedBuffer = new BufferList() - .addCString('bang') // portal name - .addCString('woo') // statement name - .addInt16(4) // value count - .addInt16(0) // string - .addInt16(0) // string - .addInt16(0) // string - .addInt16(1) // binary - .addInt16(4) - .addInt32(1) - .add(Buffer.from('1')) - .addInt32(2) - .add(Buffer.from('hi')) - .addInt32(-1) - .addInt32(4) - .add(Buffer.from('zing', 'UTF-8')) - .addInt16(0) - .join(true, 'B') - assert.received(stream, expectedBuffer) -}) - -test('sends execute message', function () { - test('for unamed portal with no row limit', function () { - con.execute() - var expectedBuffer = new BufferList().addCString('').addInt32(0).join(true, 'E') - assert.received(stream, expectedBuffer) - }) - - test('for named portal with row limit', function () { - con.execute({ - portal: 'my favorite portal', - rows: 100, - }) - var expectedBuffer = new BufferList().addCString('my favorite portal').addInt32(100).join(true, 'E') - assert.received(stream, expectedBuffer) - }) -}) - -test('sends flush command', function () { - con.flush() - var expected = new BufferList().join(true, 'H') - assert.received(stream, expected) -}) - -test('sends sync command', function () { - con.sync() - var expected = new BufferList().join(true, 'S') - assert.received(stream, expected) -}) - -test('sends end command', function () { - con.end() - var expected = Buffer.from([0x58, 0, 0, 0, 4]) - assert.received(stream, expected) - assert.equal(stream.closed, true) -}) - -test('sends describe command', function () { - test('describe statement', function () { - con.describe({ type: 'S', name: 'bang' }) - var expected = new BufferList().addChar('S').addCString('bang').join(true, 'D') - assert.received(stream, expected) - }) - - test('describe unnamed portal', function () { - con.describe({ type: 'P' }) - var expected = new BufferList().addChar('P').addCString('').join(true, 'D') - assert.received(stream, expected) - }) -})