Skip to content

Commit

Permalink
refactor(NODE-6325): implement document sequence support (#4201)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran authored Aug 20, 2024
1 parent 8622545 commit 55bdeaa
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 115 deletions.
69 changes: 65 additions & 4 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const QUERY_FAILURE = 2;
const SHARD_CONFIG_STALE = 4;
const AWAIT_CAPABLE = 8;

const encodeUTF8Into = BSON.BSON.onDemand.ByteUtils.encodeUTF8Into;

/** @internal */
export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest;

Expand Down Expand Up @@ -411,6 +413,15 @@ export interface OpMsgOptions {
readPreference: ReadPreference;
}

/** @internal */
export class DocumentSequence {
documents: Document[];

constructor(documents: Document[]) {
this.documents = documents;
}
}

/** @internal */
export class OpMsgRequest {
requestId: number;
Expand Down Expand Up @@ -480,7 +491,7 @@ export class OpMsgRequest {

let totalLength = header.length;
const command = this.command;
totalLength += this.makeDocumentSegment(buffers, command);
totalLength += this.makeSections(buffers, command);

header.writeInt32LE(totalLength, 0); // messageLength
header.writeInt32LE(this.requestId, 4); // requestID
Expand All @@ -490,15 +501,65 @@ export class OpMsgRequest {
return buffers;
}

makeDocumentSegment(buffers: Uint8Array[], document: Document): number {
const payloadTypeBuffer = Buffer.alloc(1);
/**
* Add the sections to the OP_MSG request's buffers and returns the length.
*/
makeSections(buffers: Uint8Array[], document: Document): number {
const sequencesBuffer = this.extractDocumentSequences(document);
const payloadTypeBuffer = Buffer.allocUnsafe(1);
payloadTypeBuffer[0] = 0;

const documentBuffer = this.serializeBson(document);
// First section, type 0
buffers.push(payloadTypeBuffer);
buffers.push(documentBuffer);
// Subsequent sections, type 1
buffers.push(sequencesBuffer);

return payloadTypeBuffer.length + documentBuffer.length;
return payloadTypeBuffer.length + documentBuffer.length + sequencesBuffer.length;
}

/**
* Extracts the document sequences from the command document and returns
* a buffer to be added as multiple sections after the initial type 0
* section in the message.
*/
extractDocumentSequences(document: Document): Uint8Array {
// Pull out any field in the command document that's value is a document sequence.
const chunks = [];
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length);
buffer[0] = 1;
// Third part is the field name at offset 5.
encodeUTF8Into(buffer, key, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
for (const doc of value.documents) {
const docBson = this.serializeBson(doc);
docsLength += docBson.length;
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(key.length + docsLength, 1);
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
// our BSON serializer can do about this. Since DocumentSequence is not exposed
// in the public API and only used internally, we are never mutating an original
// command provided by the user, just our own, and it's cheaper to delete from
// our own command than copying it.
delete document[key];
}
}
if (chunks.length > 0) {
return Buffer.concat(chunks);
}
// If we have no document sequences we return an empty buffer for nothing to add
// to the payload.
return Buffer.alloc(0);
}

serializeBson(document: Document): Uint8Array {
Expand Down
111 changes: 0 additions & 111 deletions test/unit/cmap/commands.test.js

This file was deleted.

Loading

0 comments on commit 55bdeaa

Please sign in to comment.