Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(NODE-6325): implement document sequence support #4201

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,17 @@ export interface OpMsgOptions {
readPreference: ReadPreference;
}

/** @internal */
export class DocumentSequence {
field: string;
documents: Document[];

constructor(field: string, documents: Document[]) {
this.field = field;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
this.documents = documents;
}
}

/** @internal */
export class OpMsgRequest {
requestId: number;
Expand Down Expand Up @@ -480,7 +491,7 @@ export class OpMsgRequest {

let totalLength = header.length;
const command = this.command;
totalLength += this.makeDocumentSegment(buffers, command);
totalLength += this.makeSections(buffers, command);

header.writeInt32LE(totalLength, 0); // messageLength
header.writeInt32LE(this.requestId, 4); // requestID
Expand All @@ -490,15 +501,66 @@ export class OpMsgRequest {
return buffers;
}

makeDocumentSegment(buffers: Uint8Array[], document: Document): number {
/**
* Add the sections to the OP_MSG request's buffers and returns the length.
*/
makeSections(buffers: Uint8Array[], document: Document): number {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
const sequencesBuffer = this.extractDocumentSequences(document);
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
const payloadTypeBuffer = Buffer.alloc(1);
payloadTypeBuffer[0] = 0;

const documentBuffer = this.serializeBson(document);
// First section, type 0
buffers.push(payloadTypeBuffer);
buffers.push(documentBuffer);
// Subsequent sections, type 1
buffers.push(sequencesBuffer);

return payloadTypeBuffer.length + documentBuffer.length;
return payloadTypeBuffer.length + documentBuffer.length + sequencesBuffer.length;
}

/**
* Extracts the document sequences from the command document and returns
* a buffer to be added as multiple sections after the initial type 0
* section in the message.
*/
extractDocumentSequences(document: Document): Uint8Array {
// Pull out any field in the command document that's value is a document sequence.
const chunks = [];
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const payloadTypeBuffer = Buffer.alloc(1);
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
payloadTypeBuffer[0] = 1;
chunks.push(payloadTypeBuffer);
// Second part of the sequence is the length;
const lengthBuffer = Buffer.alloc(4);
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
chunks.push(lengthBuffer);
// Third part is the field name.
const fieldBuffer = Buffer.from(key);
chunks.push(fieldBuffer);
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
// Fourth part are the documents' bytes.
let docsLength = 0;
for (const doc of value.documents) {
const docBson = this.serializeBson(doc);
docsLength += docBson.length;
chunks.push(docBson);
}
lengthBuffer.writeInt32LE(fieldBuffer.length + docsLength);
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
// our BSON serializer can do about this. Since DocumentSequence is not exposed
// in the public API and only used internally, we are never mutating an original
// command provided by the user, just our own, and it's cheaper to delete from
// our own command than copying it.
delete document[key];
}
}
if (chunks.length > 0) {
return Buffer.concat(chunks);
}
return Buffer.alloc(0);
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
}

serializeBson(document: Document): Uint8Array {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,92 @@
const { expect } = require('chai');
const { OpReply } = require('../../mongodb');
import { expect } from 'chai';

import { DocumentSequence, OpMsgRequest, OpReply } from '../../mongodb';

describe('commands', function () {
describe('OpMsgRequest', function () {
describe('#toBin', function () {
/**
* Note that #toBin returns an array of buffers, in this case we are interested in
* the buffer at index 3 of the array, which is a single buffer of all the
* document sequence sections.
*/
context('when the command has document sequences', function () {
context('when there is one document sequence', function () {
const command = {
test: 1,
field: new DocumentSequence('test', [{ test: 1 }])
};
const msg = new OpMsgRequest('admin', command, {});
const buffers = msg.toBin();

it('removes the document sequence fields from the command', function () {
expect(command).to.not.haveOwnProperty('field');
});

it('sets the document sequence section type to 1', function () {
// First byte is a one byte type.
expect(buffers[3][0]).to.equal(1);
});

it('sets the length of the document sequence', function () {
// Bytes starting at index 1 is a 4 byte length.
expect(buffers[3].readInt32LE(1)).to.equal(20);
});

it('sets the name of the first field to be replaced', function () {
// Bytes starting at index 5 is the field name.
expect(buffers[3].toString('utf8', 5, 10)).to.equal('field');
});
});

context('when there are multiple document sequences', function () {
const command = {
test: 1,
fieldOne: new DocumentSequence('test', [{ test: 1 }]),
fieldTwo: new DocumentSequence('test', [{ test: 1 }])
};
const msg = new OpMsgRequest('admin', command, {});
const buffers = msg.toBin();

it('removes the document sequence fields from the command', function () {
expect(command).to.not.haveOwnProperty('fieldOne');
expect(command).to.not.haveOwnProperty('fieldTwo');
});

it('sets the document sequence sections first type to 1', function () {
// First byte is a one byte type.
expect(buffers[3][0]).to.equal(1);
});

it('sets the length of the first document sequence', function () {
// Bytes starting at index 1 is a 4 byte length.
expect(buffers[3].readInt32LE(1)).to.equal(23);
});

it('sets the name of the first field to be replaced', function () {
// Bytes starting at index 5 is the field name.
expect(buffers[3].toString('utf8', 5, 13)).to.equal('fieldOne');
});

it('sets the document sequence sections second type to 1', function () {
// First byte is a one byte type.
expect(buffers[3][28]).to.equal(1);
});

it('sets the length of the second document sequence', function () {
// Bytes starting at index 1 is a 4 byte length.
expect(buffers[3].readInt32LE(29)).to.equal(23);
});

it('sets the name of the second field to be replaced', function () {
// Bytes starting at index 33 is the field name.
expect(buffers[3].toString('utf8', 33, 41)).to.equal('fieldTwo');
});
});
});
});
});

describe('Response', function () {
describe('#parse', function () {
context('when the message body is invalid', function () {
Expand Down