Skip to content

Commit

Permalink
feat(NODE-6329): client bulk write happy path
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Sep 18, 2024
1 parent 643a875 commit 343c27e
Show file tree
Hide file tree
Showing 26 changed files with 2,617 additions and 25 deletions.
8 changes: 4 additions & 4 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,10 @@ export class OpMsgRequest {
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length);
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5.
encodeUTF8Into(buffer, key, 5);
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${key}\0`, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
Expand All @@ -557,7 +557,7 @@ export class OpMsgRequest {
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(key.length + docsLength, 1);
buffer.writeInt32LE(key.length + 1 + docsLength, 1);
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
Expand Down
26 changes: 26 additions & 0 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,29 @@ export class ExplainedCursorResponse extends CursorResponse {
return this.toObject(options);
}
}

/**
* Client bulk writes have some extra metadata at the top level that needs to be
* included in the result returned to the user.
*/
export class ClientBulkWriteCursorResponse extends CursorResponse {
get insertedCount() {
return this.get('nInserted', BSONType.int, true);
}

get upsertedCount() {
return this.get('nUpserted', BSONType.int, true);
}

get matchedCount() {
return this.get('nMatched', BSONType.int, true);
}

get modifiedCount() {
return this.get('nModified', BSONType.int, true);
}

get deletedCount() {
return this.get('nDeleted', BSONType.int, true);
}
}
64 changes: 64 additions & 0 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import type { Document } from '../bson';
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
import { executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import { mergeOptions, MongoDBNamespace } from '../utils';
import {
AbstractCursor,
type AbstractCursorOptions,
type InitialCursorResponse
} from './abstract_cursor';

/** @public */
export interface ClientBulkWriteCursorOptions
extends AbstractCursorOptions,
ClientBulkWriteOptions {}

/**
* @public
*/
export class ClientBulkWriteCursor extends AbstractCursor {
public readonly command: Document;
/** @internal */
private cursorResponse?: ClientBulkWriteCursorResponse;
/** @internal */
private clientBulkWriteOptions: ClientBulkWriteOptions;

/** @internal */
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
super(client, new MongoDBNamespace('admin'), options);

this.command = command;
this.clientBulkWriteOptions = options;
}

get response(): ClientBulkWriteCursorResponse {
if (this.cursorResponse) return this.response;
throw new Error('no cursor response');
}

clone(): ClientBulkWriteCursor {
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
delete clonedOptions.session;
return new ClientBulkWriteCursor(this.client, this.command, {
...clonedOptions
});
}

/** @internal */
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
...this.clientBulkWriteOptions,
...this.cursorOptions,
session
});

const response = await executeOperation(this.client, clientBulkWriteOperation);
this.cursorResponse = response;

return { server: clientBulkWriteOperation.server, session, response };
}
}
15 changes: 15 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,21 @@ export type {
AggregateOptions,
DB_AGGREGATE_COLLECTION
} from './operations/aggregate';
export type {
AnyClientBulkWriteModel,
ClientBulkWriteOptions,
ClientBulkWriteResult,
ClientDeleteManyModel,
ClientDeleteOneModel,
ClientDeleteResult,
ClientInsertOneModel,
ClientInsertOneResult,
ClientReplaceOneModel,
ClientUpdateManyModel,
ClientUpdateOneModel,
ClientUpdateResult,
ClientWriteModel
} from './operations/client_bulk_write/common';
export type {
CollationOptions,
CommandOperation,
Expand Down
18 changes: 18 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import {
SeverityLevel
} from './mongo_logger';
import { TypedEventEmitter } from './mongo_types';
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor';
import { executeOperation } from './operations/execute_operation';
import { RunAdminCommandOperation } from './operations/run_command';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
Expand Down Expand Up @@ -477,6 +483,18 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
return this.s.bsonOptions;
}

/**
* Executes a client bulk write operation, available on server 8.0+.
* @param models - The client bulk write models.
* @param options - The client bulk write options.
*/
async bulkWrite(
models: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult> {
return await new ClientBulkWriteExecutor(this, models, options).execute();
}

/**
* Connect to MongoDB using a url
*
Expand Down
41 changes: 41 additions & 0 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { type Document } from 'bson';

import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import type { Server } from '../../sdam/server';
import type { ClientSession } from '../../sessions';
import { AbstractOperation, Aspect, defineAspects } from '../operation';
import { type ClientBulkWriteOptions } from './common';

export class ClientBulkWriteOperation extends AbstractOperation<ClientBulkWriteCursorResponse> {
command: Document;
override options: ClientBulkWriteOptions;

override get commandName() {
return 'bulkWrite' as const;
}

constructor(command: Document, options: ClientBulkWriteOptions) {
super(options);
this.command = command;
this.options = options;
}

override async execute(
server: Server,
session: ClientSession | undefined
): Promise<ClientBulkWriteCursorResponse> {
return await server.command(
this.ns,
this.command,
{
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session
},
ClientBulkWriteCursorResponse
);
}
}

defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION]);
3 changes: 2 additions & 1 deletion src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type Document } from '../../bson';
import { type Document, ObjectId } from '../../bson';
import { DocumentSequence } from '../../cmap/commands';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
import { type CollationOptions } from '../command';
Expand Down Expand Up @@ -112,6 +112,7 @@ export const buildInsertOneOperation = (
insert: index,
document: model.document
};
document.document._id = model.document._id ?? new ObjectId();
return document;
};

Expand Down
74 changes: 74 additions & 0 deletions src/operations/client_bulk_write/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,77 @@ export type AnyClientBulkWriteModel =
| ClientUpdateManyModel
| ClientDeleteOneModel
| ClientDeleteManyModel;

/** @public */
export interface ClientBulkWriteResult {
/**
* The total number of documents inserted across all insert operations.
*/
insertedCount: number;
/**
* The total number of documents upserted across all update operations.
*/
upsertedCount: number;
/**
* The total number of documents matched across all update operations.
*/
matchedCount: number;
/**
* The total number of documents modified across all update operations.
*/
modifiedCount: number;
/**
* The total number of documents deleted across all delete operations.
*/
deletedCount: number;
/**
* The results of each individual insert operation that was successfully performed.
*/
insertResults?: Map<number, ClientInsertOneResult>;
/**
* The results of each individual update operation that was successfully performed.
*/
updateResults?: Map<number, ClientUpdateResult>;
/**
* The results of each individual delete operation that was successfully performed.
*/
deleteResults?: Map<number, ClientDeleteResult>;
}

/** @public */
export interface ClientInsertOneResult {
/**
* The _id of the inserted document.
*/
insertedId: any;
}

/** @public */
export interface ClientUpdateResult {
/**
* The number of documents that matched the filter.
*/
matchedCount: number;

/**
* The number of documents that were modified.
*/
modifiedCount: number;

/**
* The _id field of the upserted document if an upsert occurred.
*
* It MUST be possible to discern between a BSON Null upserted ID value and this field being
* unset. If necessary, drivers MAY add a didUpsert boolean field to differentiate between
* these two cases.
*/
upsertedId?: any;
}

/** @public */
export interface ClientDeleteResult {
/**
* The number of documents that were deleted.
*/
deletedCount: number;
}
56 changes: 56 additions & 0 deletions src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
import { type MongoClient } from '../../mongo_client';
import { ClientBulkWriteCommandBuilder } from './command_builder';
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
} from './common';
import { ClientBulkWriteResultsMerger } from './results_merger';

/**
* Responsible for executing a client bulk write.
* @internal
*/
export class ClientBulkWriteExecutor {
client: MongoClient;
options: ClientBulkWriteOptions;
operations: AnyClientBulkWriteModel[];

/**
* Instantiate the executor.
* @param client - The mongo client.
* @param operations - The user supplied bulk write models.
* @param options - The bulk write options.
*/
constructor(
client: MongoClient,
operations: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
) {
this.client = client;
this.options = options || {};
this.operations = operations;
}

/**
* Execute the client bulk write. Will split commands into batches and exhaust the cursors
* for each, then merge the results into one.
* @returns The result.
*/
async execute(): Promise<ClientBulkWriteResult> {
// The command builder will take the user provided models and potential split the batch
// into multiple commands due to size.
const commmandBuilder = new ClientBulkWriteCommandBuilder(this.operations, this.options);
const commands = commmandBuilder.buildCommands();
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
// For each command will will create and exhaust a cursor for the results.
for (const command of commands) {
const cursor = new ClientBulkWriteCursor(this.client, command, this.options);
for (const docs of await cursor.toArray()) {
resultsMerger.merge(command.ops.documents, cursor.response, docs);
}
}
return resultsMerger.result;
}
}
Loading

0 comments on commit 343c27e

Please sign in to comment.