Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6329): client bulk write happy path #4206

Merged
merged 8 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/cmap/command_monitoring_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import {
LEGACY_HELLO_COMMAND_CAMEL_CASE
} from '../constants';
import { calculateDurationInMs, deepCopy } from '../utils';
import { OpMsgRequest, type OpQueryRequest, type WriteProtocolMessageType } from './commands';
import {
DocumentSequence,
OpMsgRequest,
type OpQueryRequest,
type WriteProtocolMessageType
} from './commands';
import type { Connection } from './connection';

/**
Expand Down Expand Up @@ -249,7 +254,16 @@ const OP_QUERY_KEYS = [
/** Extract the actual command from the query, possibly up-converting if it's a legacy format */
function extractCommand(command: WriteProtocolMessageType): Document {
if (command instanceof OpMsgRequest) {
return deepCopy(command.command);
const cmd = deepCopy(command.command);
// For OP_MSG with payload type 1 we need to pull the documents
// array out of the document sequence for monitoring.
if (cmd.ops instanceof DocumentSequence) {
cmd.ops = cmd.ops.documents;
}
if (cmd.nsInfo instanceof DocumentSequence) {
cmd.nsInfo = cmd.nsInfo.documents;
}
return cmd;
}

if (command.query?.$query) {
Expand Down
8 changes: 4 additions & 4 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,10 @@ export class OpMsgRequest {
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length);
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5.
encodeUTF8Into(buffer, key, 5);
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${key}\0`, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
Expand All @@ -557,7 +557,7 @@ export class OpMsgRequest {
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(key.length + docsLength, 1);
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
Expand Down
26 changes: 26 additions & 0 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,29 @@ export class ExplainedCursorResponse extends CursorResponse {
return this.toObject(options);
}
}

/**
* Client bulk writes have some extra metadata at the top level that needs to be
* included in the result returned to the user.
*/
export class ClientBulkWriteCursorResponse extends CursorResponse {
get insertedCount() {
return this.get('nInserted', BSONType.int, true);
}

get upsertedCount() {
return this.get('nUpserted', BSONType.int, true);
}

get matchedCount() {
return this.get('nMatched', BSONType.int, true);
}

get modifiedCount() {
return this.get('nModified', BSONType.int, true);
}

get deletedCount() {
return this.get('nDeleted', BSONType.int, true);
}
}
73 changes: 73 additions & 0 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import type { Document } from '../bson';
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import { MongoBulkWriteCursorError } from '../error';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
import { executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import { mergeOptions, MongoDBNamespace } from '../utils';
import {
AbstractCursor,
type AbstractCursorOptions,
type InitialCursorResponse
} from './abstract_cursor';

/** @public */
export interface ClientBulkWriteCursorOptions
extends Omit<AbstractCursorOptions, 'maxAwaitTimeMS' | 'tailable' | 'awaitData'>,
ClientBulkWriteOptions {}

/**
* This is the cursor that handles client bulk write operations. Note this is never
* exposed directly to the user and is always immediately exhausted.
* @internal
*/
export class ClientBulkWriteCursor extends AbstractCursor {
public readonly command: Document;
/** @internal */
private cursorResponse?: ClientBulkWriteCursorResponse;
/** @internal */
private clientBulkWriteOptions: ClientBulkWriteOptions;

/** @internal */
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
super(client, new MongoDBNamespace('admin', '$cmd'), options);

this.command = command;
this.clientBulkWriteOptions = options;
}

/**
* We need a way to get the top level cursor response fields for
* generating the bulk write result, so we expose this here.
*/
get response(): ClientBulkWriteCursorResponse {
if (this.cursorResponse) return this.cursorResponse;
throw new MongoBulkWriteCursorError(
'No client bulk write cursor response returned from the server.'
);
}

clone(): ClientBulkWriteCursor {
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
delete clonedOptions.session;
return new ClientBulkWriteCursor(this.client, this.command, {
...clonedOptions
});
}

/** @internal */
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
...this.clientBulkWriteOptions,
...this.cursorOptions,
session
});

const response = await executeOperation(this.client, clientBulkWriteOperation);
this.cursorResponse = response;

return { server: clientBulkWriteOperation.server, session, response };
}
}
27 changes: 27 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,33 @@ export class MongoGCPError extends MongoOIDCError {
}
}

/**
* An error indicating that an error occurred when processing bulk write results.
*
* @public
* @category Error
*/
export class MongoBulkWriteCursorError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message: string) {
super(message);
}

override get name(): string {
return 'MongoBulkWriteCursorError';
}
}

/**
* An error generated when a ChangeStream operation fails to execute.
*
Expand Down
16 changes: 16 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export {
MongoAWSError,
MongoAzureError,
MongoBatchReExecutionError,
MongoBulkWriteCursorError,
MongoChangeStreamError,
MongoCompatibilityError,
MongoCursorExhaustedError,
Expand Down Expand Up @@ -473,6 +474,21 @@ export type {
AggregateOptions,
DB_AGGREGATE_COLLECTION
} from './operations/aggregate';
export type {
AnyClientBulkWriteModel,
ClientBulkWriteOptions,
ClientBulkWriteResult,
ClientDeleteManyModel,
ClientDeleteOneModel,
ClientDeleteResult,
ClientInsertOneModel,
ClientInsertOneResult,
ClientReplaceOneModel,
ClientUpdateManyModel,
ClientUpdateOneModel,
ClientUpdateResult,
ClientWriteModel
} from './operations/client_bulk_write/common';
export type {
CollationOptions,
CommandOperation,
Expand Down
19 changes: 19 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import {
SeverityLevel
} from './mongo_logger';
import { TypedEventEmitter } from './mongo_types';
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor';
import { executeOperation } from './operations/execute_operation';
import { RunAdminCommandOperation } from './operations/run_command';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
Expand Down Expand Up @@ -477,6 +483,19 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
return this.s.bsonOptions;
}

/**
* Executes a client bulk write operation, available on server 8.0+.
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
* @param models - The client bulk write models.
* @param options - The client bulk write options.
* @returns A ClientBulkWriteResult for acknowledged writes and ok: 1 for unacknowledged writes.
*/
async bulkWrite(
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
models: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult | { ok: 1 }> {
return await new ClientBulkWriteExecutor(this, models, options).execute();
}

/**
* Connect to MongoDB using a url
*
Expand Down
45 changes: 45 additions & 0 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { type Document } from 'bson';

import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import type { Server } from '../../sdam/server';
import type { ClientSession } from '../../sessions';
import { MongoDBNamespace } from '../../utils';
import { CommandOperation } from '../command';
import { Aspect, defineAspects } from '../operation';
import { type ClientBulkWriteOptions } from './common';

/**
* Executes a single client bulk write operation within a potential batch.
* @internal
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
*/
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
command: Document;
override options: ClientBulkWriteOptions;

override get commandName() {
return 'bulkWrite' as const;
}

constructor(command: Document, options: ClientBulkWriteOptions) {
super(undefined, options);
this.command = command;
this.options = options;
this.ns = new MongoDBNamespace('admin', '$cmd');
}

/**
* Execute the command. Superclass will handle write concern, etc.
* @param server - The server.
* @param session - The session.
* @returns The response.
*/
override async execute(
server: Server,
session: ClientSession | undefined
): Promise<ClientBulkWriteCursorResponse> {
return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse);
}
}

// Skipping the collation as it goes on the individual ops.
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);
Loading