diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index 19dd8e1a65..d39aecbf2b 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -30,6 +30,8 @@ const QUERY_FAILURE = 2; const SHARD_CONFIG_STALE = 4; const AWAIT_CAPABLE = 8; +const encodeUTF8Into = BSON.BSON.onDemand.ByteUtils.encodeUTF8Into; + /** @internal */ export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest; @@ -411,6 +413,15 @@ export interface OpMsgOptions { readPreference: ReadPreference; } +/** @internal */ +export class DocumentSequence { + documents: Document[]; + + constructor(documents: Document[]) { + this.documents = documents; + } +} + /** @internal */ export class OpMsgRequest { requestId: number; @@ -480,7 +491,7 @@ export class OpMsgRequest { let totalLength = header.length; const command = this.command; - totalLength += this.makeDocumentSegment(buffers, command); + totalLength += this.makeSections(buffers, command); header.writeInt32LE(totalLength, 0); // messageLength header.writeInt32LE(this.requestId, 4); // requestID @@ -490,15 +501,65 @@ export class OpMsgRequest { return buffers; } - makeDocumentSegment(buffers: Uint8Array[], document: Document): number { - const payloadTypeBuffer = Buffer.alloc(1); + /** + * Add the sections to the OP_MSG request's buffers and returns the length. + */ + makeSections(buffers: Uint8Array[], document: Document): number { + const sequencesBuffer = this.extractDocumentSequences(document); + const payloadTypeBuffer = Buffer.allocUnsafe(1); payloadTypeBuffer[0] = 0; const documentBuffer = this.serializeBson(document); + // First section, type 0 buffers.push(payloadTypeBuffer); buffers.push(documentBuffer); + // Subsequent sections, type 1 + buffers.push(sequencesBuffer); - return payloadTypeBuffer.length + documentBuffer.length; + return payloadTypeBuffer.length + documentBuffer.length + sequencesBuffer.length; + } + + /** + * Extracts the document sequences from the command document and returns + * a buffer to be added as multiple sections after the initial type 0 + * section in the message. + */ + extractDocumentSequences(document: Document): Uint8Array { + // Pull out any field in the command document that's value is a document sequence. + const chunks = []; + for (const [key, value] of Object.entries(document)) { + if (value instanceof DocumentSequence) { + // Document sequences starts with type 1 at the first byte. + const buffer = Buffer.allocUnsafe(1 + 4 + key.length); + buffer[0] = 1; + // Third part is the field name at offset 5. + encodeUTF8Into(buffer, key, 5); + chunks.push(buffer); + // Fourth part are the documents' bytes. + let docsLength = 0; + for (const doc of value.documents) { + const docBson = this.serializeBson(doc); + docsLength += docBson.length; + chunks.push(docBson); + } + // Second part of the sequence is the length at offset 1; + buffer.writeInt32LE(key.length + docsLength, 1); + // Why are we removing the field from the command? This is because it needs to be + // removed in the OP_MSG request first section, and DocumentSequence is not a + // BSON type and is specific to the MongoDB wire protocol so there's nothing + // our BSON serializer can do about this. Since DocumentSequence is not exposed + // in the public API and only used internally, we are never mutating an original + // command provided by the user, just our own, and it's cheaper to delete from + // our own command than copying it. + delete document[key]; + } + } + if (chunks.length > 0) { + return Buffer.concat(chunks); + } + // If we have no document sequences we return an empty buffer for nothing to add + // to the payload. + return Buffer.alloc(0); } serializeBson(document: Document): Uint8Array { diff --git a/test/unit/cmap/commands.test.js b/test/unit/cmap/commands.test.js deleted file mode 100644 index 6a0ced9fde..0000000000 --- a/test/unit/cmap/commands.test.js +++ /dev/null @@ -1,111 +0,0 @@ -const { expect } = require('chai'); -const { OpReply } = require('../../mongodb'); - -describe('commands', function () { - describe('Response', function () { - describe('#parse', function () { - context('when the message body is invalid', function () { - context('when the buffer is empty', function () { - const message = Buffer.from([]); - const header = { - length: 0, - requestId: 0, - responseTo: 0, - opCode: 0 - }; - const body = Buffer.from([]); - - it('throws an exception', function () { - const response = new OpReply(message, header, body); - expect(() => response.parse()).to.throw(RangeError, /outside buffer bounds/); - }); - }); - - context('when numReturned is invalid', function () { - const message = Buffer.from([]); - const header = { - length: 0, - requestId: 0, - responseTo: 0, - opCode: 0 - }; - const body = Buffer.alloc(5 * 4); - body.writeInt32LE(-1, 16); - - it('throws an exception', function () { - const response = new OpReply(message, header, body); - expect(() => response.parse()).to.throw(RangeError, /Invalid array length/i); - }); - }); - }); - }); - - describe('#constructor', function () { - context('when the message body is invalid', function () { - const message = Buffer.from([]); - const header = { - length: 0, - requestId: 0, - responseTo: 0, - opCode: 0 - }; - const body = Buffer.from([]); - - it('does not throw an exception', function () { - let error; - try { - new OpReply(message, header, body); - } catch (err) { - error = err; - } - expect(error).to.be.undefined; - }); - - it('initializes the sections to an empty array', function () { - const response = new OpReply(message, header, body); - expect(response.sections).to.be.empty; - }); - - it('does not set the responseFlags', function () { - const response = new OpReply(message, header, body); - expect(response.responseFlags).to.be.undefined; - }); - - it('does not set the cursorNotFound flag', function () { - const response = new OpReply(message, header, body); - expect(response.cursorNotFound).to.be.undefined; - }); - - it('does not set the cursorId', function () { - const response = new OpReply(message, header, body); - expect(response.cursorId).to.be.undefined; - }); - - it('does not set startingFrom', function () { - const response = new OpReply(message, header, body); - expect(response.startingFrom).to.be.undefined; - }); - - it('does not set numberReturned', function () { - const response = new OpReply(message, header, body); - expect(response.numberReturned).to.be.undefined; - }); - - it('does not set queryFailure', function () { - const response = new OpReply(message, header, body); - expect(response.queryFailure).to.be.undefined; - }); - - it('does not set shardConfigStale', function () { - const response = new OpReply(message, header, body); - expect(response.shardConfigStale).to.be.undefined; - }); - - it('does not set awaitCapable', function () { - const response = new OpReply(message, header, body); - expect(response.awaitCapable).to.be.undefined; - }); - }); - }); - }); -}); diff --git a/test/unit/cmap/commands.test.ts b/test/unit/cmap/commands.test.ts new file mode 100644 index 0000000000..07e3fccbb6 --- /dev/null +++ b/test/unit/cmap/commands.test.ts @@ -0,0 +1,217 @@ +import * as BSON from 'bson'; +import { expect } from 'chai'; + +import { DocumentSequence, OpMsgRequest, OpReply } from '../../mongodb'; + +describe('commands', function () { + describe('OpMsgRequest', function () { + describe('#toBin', function () { + /** + * Note that #toBin returns an array of buffers, in this case we are interested in + * the buffer at index 3 of the array, which is a single buffer of all the + * document sequence sections. + */ + context('when the command has document sequences', function () { + context('when there is one document sequence', function () { + const command = { + test: 1, + field: new DocumentSequence([{ test: 1 }]) + }; + const msg = new OpMsgRequest('admin', command, {}); + const buffers = msg.toBin(); + + it('keeps the first section as type 0', function () { + // The type byte for the first section is at index 1. + expect(buffers[1][0]).to.equal(0); + }); + + it('does not serialize the document sequences in the first section', function () { + // Buffer at index 2 is the type 0 section - one document. + expect(BSON.deserialize(buffers[2])).to.deep.equal({ test: 1, $db: 'admin' }); + }); + + it('removes the document sequence fields from the command', function () { + expect(command).to.not.haveOwnProperty('field'); + }); + + it('sets the document sequence section type to 1', function () { + // First byte is a one byte type. + expect(buffers[3][0]).to.equal(1); + }); + + it('sets the length of the document sequence', function () { + // Bytes starting at index 1 is a 4 byte length. + expect(buffers[3].readInt32LE(1)).to.equal(20); + }); + + it('sets the name of the first field to be replaced', function () { + // Bytes starting at index 5 is the field name. + expect(buffers[3].toString('utf8', 5, 10)).to.equal('field'); + }); + }); + + context('when there are multiple document sequences', function () { + const command = { + test: 1, + fieldOne: new DocumentSequence([{ test: 1 }]), + fieldTwo: new DocumentSequence([{ test: 1 }]) + }; + const msg = new OpMsgRequest('admin', command, {}); + const buffers = msg.toBin(); + + it('keeps the first section as type 0', function () { + // The type byte for the first section is at index 1. + expect(buffers[1][0]).to.equal(0); + }); + + it('does not serialize the document sequences in the first section', function () { + // Buffer at index 2 is the type 0 section - one document. + expect(BSON.deserialize(buffers[2])).to.deep.equal({ test: 1, $db: 'admin' }); + }); + + it('removes the document sequence fields from the command', function () { + expect(command).to.not.haveOwnProperty('fieldOne'); + expect(command).to.not.haveOwnProperty('fieldTwo'); + }); + + it('sets the document sequence sections first type to 1', function () { + // First byte is a one byte type. + expect(buffers[3][0]).to.equal(1); + }); + + it('sets the length of the first document sequence', function () { + // Bytes starting at index 1 is a 4 byte length. + expect(buffers[3].readInt32LE(1)).to.equal(23); + }); + + it('sets the name of the first field to be replaced', function () { + // Bytes starting at index 5 is the field name. + expect(buffers[3].toString('utf8', 5, 13)).to.equal('fieldOne'); + }); + + it('sets the document sequence sections second type to 1', function () { + // First byte is a one byte type. + expect(buffers[3][28]).to.equal(1); + }); + + it('sets the length of the second document sequence', function () { + // Bytes starting at index 1 is a 4 byte length. + expect(buffers[3].readInt32LE(29)).to.equal(23); + }); + + it('sets the name of the second field to be replaced', function () { + // Bytes starting at index 33 is the field name. + expect(buffers[3].toString('utf8', 33, 41)).to.equal('fieldTwo'); + }); + }); + }); + }); + }); + + describe('Response', function () { + describe('#parse', function () { + context('when the message body is invalid', function () { + context('when the buffer is empty', function () { + const message = Buffer.from([]); + const header = { + length: 0, + requestId: 0, + responseTo: 0, + opCode: 0 + }; + const body = Buffer.from([]); + + it('throws an exception', function () { + const response = new OpReply(message, header, body); + expect(() => response.parse()).to.throw(RangeError, /outside buffer bounds/); + }); + }); + + context('when numReturned is invalid', function () { + const message = Buffer.from([]); + const header = { + length: 0, + requestId: 0, + responseTo: 0, + opCode: 0 + }; + const body = Buffer.alloc(5 * 4); + body.writeInt32LE(-1, 16); + + it('throws an exception', function () { + const response = new OpReply(message, header, body); + expect(() => response.parse()).to.throw(RangeError, /Invalid array length/i); + }); + }); + }); + }); + + describe('#constructor', function () { + context('when the message body is invalid', function () { + const message = Buffer.from([]); + const header = { + length: 0, + requestId: 0, + responseTo: 0, + opCode: 0 + }; + const body = Buffer.from([]); + + it('does not throw an exception', function () { + let error; + try { + new OpReply(message, header, body); + } catch (err) { + error = err; + } + expect(error).to.be.undefined; + }); + + it('initializes the sections to an empty array', function () { + const response = new OpReply(message, header, body); + expect(response.sections).to.be.empty; + }); + + it('does not set the responseFlags', function () { + const response = new OpReply(message, header, body); + expect(response.responseFlags).to.be.undefined; + }); + + it('does not set the cursorNotFound flag', function () { + const response = new OpReply(message, header, body); + expect(response.cursorNotFound).to.be.undefined; + }); + + it('does not set the cursorId', function () { + const response = new OpReply(message, header, body); + expect(response.cursorId).to.be.undefined; + }); + + it('does not set startingFrom', function () { + const response = new OpReply(message, header, body); + expect(response.startingFrom).to.be.undefined; + }); + + it('does not set numberReturned', function () { + const response = new OpReply(message, header, body); + expect(response.numberReturned).to.be.undefined; + }); + + it('does not set queryFailure', function () { + const response = new OpReply(message, header, body); + expect(response.queryFailure).to.be.undefined; + }); + + it('does not set shardConfigStale', function () { + const response = new OpReply(message, header, body); + expect(response.shardConfigStale).to.be.undefined; + }); + + it('does not set awaitCapable', function () { + const response = new OpReply(message, header, body); + expect(response.awaitCapable).to.be.undefined; + }); + }); + }); + }); +});