Skip to content

Commit

Permalink
feat(NODE-6350): add typescript support to client bulkWrite API
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken authored and durran committed Oct 14, 2024
1 parent 7fde8dd commit 8b309e2
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 42 deletions.
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ export type {
export type {
AnyClientBulkWriteModel,
ClientBulkWriteError,
ClientBulkWriteModel,
ClientBulkWriteOptions,
ClientBulkWriteResult,
ClientDeleteManyModel,
Expand Down
10 changes: 5 additions & 5 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
} from './mongo_logger';
import { TypedEventEmitter } from './mongo_types';
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
Expand Down Expand Up @@ -331,7 +331,6 @@ export type MongoClientEvents = Pick<TopologyEvents, (typeof MONGO_CLIENT_EVENTS
};

/** @internal */

const kOptions = Symbol('options');

/**
Expand Down Expand Up @@ -489,16 +488,17 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
* @param options - The client bulk write options.
* @returns A ClientBulkWriteResult for acknowledged writes and ok: 1 for unacknowledged writes.
*/
async bulkWrite(
models: AnyClientBulkWriteModel[],
async bulkWrite<SchemaMap extends Record<string, Document> = Record<string, Document>>(
models: ReadonlyArray<ClientBulkWriteModel<SchemaMap>>,
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult | { ok: 1 }> {
if (this.autoEncrypter) {
throw new MongoInvalidArgumentError(
'MongoClient bulkWrite does not currently support automatic encryption.'
);
}
return await new ClientBulkWriteExecutor(this, models, options).execute();
// We do not need schema type information past this point ("as any" is fine)
return await new ClientBulkWriteExecutor(this, models as any, options).execute();
}

/**
Expand Down
28 changes: 17 additions & 11 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const MESSAGE_OVERHEAD_BYTES = 1000;

/** @internal */
export class ClientBulkWriteCommandBuilder {
models: AnyClientBulkWriteModel[];
models: ReadonlyArray<AnyClientBulkWriteModel<Document>>;
options: ClientBulkWriteOptions;
pkFactory: PkFactory;
/** The current index in the models array that is being processed. */
Expand All @@ -53,7 +53,7 @@ export class ClientBulkWriteCommandBuilder {
* @param models - The client write models.
*/
constructor(
models: AnyClientBulkWriteModel[],
models: ReadonlyArray<AnyClientBulkWriteModel<Document>>,
options: ClientBulkWriteOptions,
pkFactory?: PkFactory
) {
Expand Down Expand Up @@ -248,7 +248,7 @@ interface ClientInsertOperation {
* @returns the operation.
*/
export const buildInsertOneOperation = (
model: ClientInsertOneModel,
model: ClientInsertOneModel<Document>,
index: number,
pkFactory: PkFactory
): ClientInsertOperation => {
Expand All @@ -275,7 +275,10 @@ export interface ClientDeleteOperation {
* @param index - The namespace index.
* @returns the operation.
*/
export const buildDeleteOneOperation = (model: ClientDeleteOneModel, index: number): Document => {
export const buildDeleteOneOperation = (
model: ClientDeleteOneModel<Document>,
index: number
): Document => {
return createDeleteOperation(model, index, false);
};

Expand All @@ -285,15 +288,18 @@ export const buildDeleteOneOperation = (model: ClientDeleteOneModel, index: numb
* @param index - The namespace index.
* @returns the operation.
*/
export const buildDeleteManyOperation = (model: ClientDeleteManyModel, index: number): Document => {
export const buildDeleteManyOperation = (
model: ClientDeleteManyModel<Document>,
index: number
): Document => {
return createDeleteOperation(model, index, true);
};

/**
* Creates a delete operation based on the parameters.
*/
function createDeleteOperation(
model: ClientDeleteOneModel | ClientDeleteManyModel,
model: ClientDeleteOneModel<Document> | ClientDeleteManyModel<Document>,
index: number,
multi: boolean
): ClientDeleteOperation {
Expand Down Expand Up @@ -330,7 +336,7 @@ export interface ClientUpdateOperation {
* @returns the operation.
*/
export const buildUpdateOneOperation = (
model: ClientUpdateOneModel,
model: ClientUpdateOneModel<Document>,
index: number
): ClientUpdateOperation => {
return createUpdateOperation(model, index, false);
Expand All @@ -343,7 +349,7 @@ export const buildUpdateOneOperation = (
* @returns the operation.
*/
export const buildUpdateManyOperation = (
model: ClientUpdateManyModel,
model: ClientUpdateManyModel<Document>,
index: number
): ClientUpdateOperation => {
return createUpdateOperation(model, index, true);
Expand All @@ -365,7 +371,7 @@ function validateUpdate(update: Document) {
* Creates a delete operation based on the parameters.
*/
function createUpdateOperation(
model: ClientUpdateOneModel | ClientUpdateManyModel,
model: ClientUpdateOneModel<Document> | ClientUpdateManyModel<Document>,
index: number,
multi: boolean
): ClientUpdateOperation {
Expand Down Expand Up @@ -413,7 +419,7 @@ export interface ClientReplaceOneOperation {
* @returns the operation.
*/
export const buildReplaceOneOperation = (
model: ClientReplaceOneModel,
model: ClientReplaceOneModel<Document>,
index: number
): ClientReplaceOneOperation => {
if (hasAtomicOperators(model.replacement)) {
Expand Down Expand Up @@ -442,7 +448,7 @@ export const buildReplaceOneOperation = (

/** @internal */
export function buildOperation(
model: AnyClientBulkWriteModel,
model: AnyClientBulkWriteModel<Document>,
index: number,
pkFactory: PkFactory
): Document {
Expand Down
73 changes: 51 additions & 22 deletions src/operations/client_bulk_write/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,50 +32,50 @@ export interface ClientWriteModel {
}

/** @public */
export interface ClientInsertOneModel extends ClientWriteModel {
export interface ClientInsertOneModel<TSchema> extends ClientWriteModel {
name: 'insertOne';
/** The document to insert. */
document: OptionalId<Document>;
document: OptionalId<TSchema>;
}

/** @public */
export interface ClientDeleteOneModel extends ClientWriteModel {
export interface ClientDeleteOneModel<TSchema> extends ClientWriteModel {
name: 'deleteOne';
/**
* The filter used to determine if a document should be deleted.
* For a deleteOne operation, the first match is removed.
*/
filter: Filter<Document>;
filter: Filter<TSchema>;
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
hint?: Hint;
}

/** @public */
export interface ClientDeleteManyModel extends ClientWriteModel {
export interface ClientDeleteManyModel<TSchema> extends ClientWriteModel {
name: 'deleteMany';
/**
* The filter used to determine if a document should be deleted.
* For a deleteMany operation, all matches are removed.
*/
filter: Filter<Document>;
filter: Filter<TSchema>;
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
hint?: Hint;
}

/** @public */
export interface ClientReplaceOneModel extends ClientWriteModel {
export interface ClientReplaceOneModel<TSchema> extends ClientWriteModel {
name: 'replaceOne';
/**
* The filter used to determine if a document should be replaced.
* For a replaceOne operation, the first match is replaced.
*/
filter: Filter<Document>;
filter: Filter<TSchema>;
/** The document with which to replace the matched document. */
replacement: WithoutId<Document>;
replacement: WithoutId<TSchema>;
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
Expand All @@ -85,19 +85,19 @@ export interface ClientReplaceOneModel extends ClientWriteModel {
}

/** @public */
export interface ClientUpdateOneModel extends ClientWriteModel {
export interface ClientUpdateOneModel<TSchema> extends ClientWriteModel {
name: 'updateOne';
/**
* The filter used to determine if a document should be updated.
* For an updateOne operation, the first match is updated.
*/
filter: Filter<Document>;
filter: Filter<TSchema>;
/**
* The modifications to apply. The value can be either:
* UpdateFilter<Document> - A document that contains update operator expressions,
* Document[] - an aggregation pipeline.
*/
update: UpdateFilter<Document> | Document[];
update: UpdateFilter<TSchema> | Document[];
/** A set of filters specifying to which array elements an update should apply. */
arrayFilters?: Document[];
/** Specifies a collation. */
Expand All @@ -109,19 +109,19 @@ export interface ClientUpdateOneModel extends ClientWriteModel {
}

/** @public */
export interface ClientUpdateManyModel extends ClientWriteModel {
export interface ClientUpdateManyModel<TSchema> extends ClientWriteModel {
name: 'updateMany';
/**
* The filter used to determine if a document should be updated.
* For an updateMany operation, all matches are updated.
*/
filter: Filter<Document>;
filter: Filter<TSchema>;
/**
* The modifications to apply. The value can be either:
* UpdateFilter<Document> - A document that contains update operator expressions,
* Document[] - an aggregation pipeline.
*/
update: UpdateFilter<Document> | Document[];
update: UpdateFilter<TSchema> | Document[];
/** A set of filters specifying to which array elements an update should apply. */
arrayFilters?: Document[];
/** Specifies a collation. */
Expand All @@ -137,13 +137,42 @@ export interface ClientUpdateManyModel extends ClientWriteModel {
* to MongoClient#bulkWrite.
* @public
*/
export type AnyClientBulkWriteModel =
| ClientInsertOneModel
| ClientReplaceOneModel
| ClientUpdateOneModel
| ClientUpdateManyModel
| ClientDeleteOneModel
| ClientDeleteManyModel;
export type AnyClientBulkWriteModel<TSchema extends Document> =
| ClientInsertOneModel<TSchema>
| ClientReplaceOneModel<TSchema>
| ClientUpdateOneModel<TSchema>
| ClientUpdateManyModel<TSchema>
| ClientDeleteOneModel<TSchema>
| ClientDeleteManyModel<TSchema>;

/**
* Take a Typescript type that maps namespaces to schema types.
* @public
*
* @example
* ```ts
* type MongoDBSchemas = {
* 'db.books': Book;
* 'db.authors': Author;
* }
*
* const model: ClientBulkWriteModel<MongoDBSchemas> = {
* namespace: 'db.books'
* name: 'insertOne',
* document: { title: 'Practical MongoDB Aggregations', authorName: 3 } // error `authorName` cannot be number
* };
* ```
*
* The type of the `namespace` field narrows other parts of the BulkWriteModel to use the correct schema for type assertions.
*
*/
export type ClientBulkWriteModel<
SchemaMap extends Record<string, Document> = Record<string, Document>
> = {
[Namespace in keyof SchemaMap]: AnyClientBulkWriteModel<SchemaMap[Namespace]> & {
namespace: Namespace;
};
}[keyof SchemaMap];

/** @public */
export interface ClientBulkWriteResult {
Expand Down
10 changes: 6 additions & 4 deletions src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { type Document } from 'bson';

import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
import {
MongoClientBulkWriteError,
Expand All @@ -22,9 +24,9 @@ import { ClientBulkWriteResultsMerger } from './results_merger';
* @internal
*/
export class ClientBulkWriteExecutor {
client: MongoClient;
options: ClientBulkWriteOptions;
operations: AnyClientBulkWriteModel[];
private readonly client: MongoClient;
private readonly options: ClientBulkWriteOptions;
private readonly operations: ReadonlyArray<AnyClientBulkWriteModel<Document>>;

/**
* Instantiate the executor.
Expand All @@ -34,7 +36,7 @@ export class ClientBulkWriteExecutor {
*/
constructor(
client: MongoClient,
operations: AnyClientBulkWriteModel[],
operations: ReadonlyArray<AnyClientBulkWriteModel<Document>>,
options?: ClientBulkWriteOptions
) {
if (operations.length === 0) {
Expand Down
Loading

0 comments on commit 8b309e2

Please sign in to comment.