Skip to content

Commit

Permalink
feat(NODE-6325): implement document sequence support
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Aug 19, 2024
1 parent b70c885 commit bd6f1d9
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 5 deletions.
68 changes: 65 additions & 3 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,17 @@ export interface OpMsgOptions {
readPreference: ReadPreference;
}

/** @internal */
export class DocumentSequence {
field: string;
documents: Document[];

constructor(field: string, documents: Document[]) {
this.field = field;
this.documents = documents;
}
}

/** @internal */
export class OpMsgRequest {
requestId: number;
Expand Down Expand Up @@ -480,7 +491,7 @@ export class OpMsgRequest {

let totalLength = header.length;
const command = this.command;
totalLength += this.makeDocumentSegment(buffers, command);
totalLength += this.makeSections(buffers, command);

header.writeInt32LE(totalLength, 0); // messageLength
header.writeInt32LE(this.requestId, 4); // requestID
Expand All @@ -490,15 +501,66 @@ export class OpMsgRequest {
return buffers;
}

makeDocumentSegment(buffers: Uint8Array[], document: Document): number {
/**
* Add the sections to the OP_MSG request's buffers and returns the length.
*/
makeSections(buffers: Uint8Array[], document: Document): number {
const sequencesBuffer = this.extractDocumentSequences(document);
const payloadTypeBuffer = Buffer.alloc(1);
payloadTypeBuffer[0] = 0;

const documentBuffer = this.serializeBson(document);
// First section, type 0
buffers.push(payloadTypeBuffer);
buffers.push(documentBuffer);
// Subsequent sections, type 1
buffers.push(sequencesBuffer);

return payloadTypeBuffer.length + documentBuffer.length;
return payloadTypeBuffer.length + documentBuffer.length + sequencesBuffer.length;
}

/**
* Extracts the document sequences from the command document and returns
* a buffer to be added as multiple sections after the initial type 0
* section in the message.
*/
extractDocumentSequences(document: Document): Uint8Array {
// Pull out any field in the command document that's value is a document sequence.
const chunks = [];
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const payloadTypeBuffer = Buffer.alloc(1);
payloadTypeBuffer[0] = 1;
chunks.push(payloadTypeBuffer);
// Second part of the sequence is the length;
const lengthBuffer = Buffer.alloc(4);
chunks.push(lengthBuffer);
// Third part is the field name.
const fieldBuffer = Buffer.from(key);
chunks.push(fieldBuffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
for (const doc of value.documents) {
const docBson = this.serializeBson(doc);
docsLength += docBson.length;
chunks.push(docBson);
}
lengthBuffer.writeInt32LE(fieldBuffer.length + docsLength);
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
// our BSON serializer can do about this. Since DocumentSequence is not exposed
// in the public API and only used internally, we are never mutating an original
// command provided by the user, just our own, and it's cheaper to delete from
// our own command than copying it.
delete document[key];
}
}
if (chunks.length > 0) {
return Buffer.concat(chunks);
}
return Buffer.alloc(0);
}

serializeBson(document: Document): Uint8Array {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,92 @@
const { expect } = require('chai');
const { OpReply } = require('../../mongodb');
import { expect } from 'chai';

import { DocumentSequence, OpMsgRequest, OpReply } from '../../mongodb';

describe('commands', function () {
describe('OpMsgRequest', function () {
describe('#toBin', function () {
/**
* Note that #toBin returns an array of buffers, in this case we are interested in
* the buffer at index 3 of the array, which is a single buffer of all the
* document sequence sections.
*/
context('when the command has document sequences', function () {
context('when there is one document sequence', function () {
const command = {
test: 1,
field: new DocumentSequence('test', [{ test: 1 }])
};
const msg = new OpMsgRequest('admin', command, {});
const buffers = msg.toBin();

it('removes the document sequence fields from the command', function () {
expect(command).to.not.haveOwnProperty('field');
});

it('sets the document sequence section type to 1', function () {
// First byte is a one byte type.
expect(buffers[3][0]).to.equal(1);
});

it('sets the length of the document sequence', function () {
// Bytes starting at index 1 is a 4 byte length.
expect(buffers[3].readInt32LE(1)).to.equal(20);
});

it('sets the name of the first field to be replaced', function () {
// Bytes starting at index 5 is the field name.
expect(buffers[3].toString('utf8', 5, 10)).to.equal('field');
});
});

context('when there are multiple document sequences', function () {
const command = {
test: 1,
fieldOne: new DocumentSequence('test', [{ test: 1 }]),
fieldTwo: new DocumentSequence('test', [{ test: 1 }])
};
const msg = new OpMsgRequest('admin', command, {});
const buffers = msg.toBin();

it('removes the document sequence fields from the command', function () {
expect(command).to.not.haveOwnProperty('fieldOne');
expect(command).to.not.haveOwnProperty('fieldTwo');
});

it('sets the document sequence sections first type to 1', function () {
// First byte is a one byte type.
expect(buffers[3][0]).to.equal(1);
});

it('sets the length of the first document sequence', function () {
// Bytes starting at index 1 is a 4 byte length.
expect(buffers[3].readInt32LE(1)).to.equal(23);
});

it('sets the name of the first field to be replaced', function () {
// Bytes starting at index 5 is the field name.
expect(buffers[3].toString('utf8', 5, 13)).to.equal('fieldOne');
});

it('sets the document sequence sections second type to 1', function () {
// First byte is a one byte type.
expect(buffers[3][28]).to.equal(1);
});

it('sets the length of the second document sequence', function () {
// Bytes starting at index 1 is a 4 byte length.
expect(buffers[3].readInt32LE(29)).to.equal(23);
});

it('sets the name of the second field to be replaced', function () {
// Bytes starting at index 33 is the field name.
expect(buffers[3].toString('utf8', 33, 41)).to.equal('fieldTwo');
});
});
});
});
});

describe('Response', function () {
describe('#parse', function () {
context('when the message body is invalid', function () {
Expand Down

0 comments on commit bd6f1d9

Please sign in to comment.