Skip to content

Commit

Permalink
feat: add table.createInsertStream for native streaming inserts (#997)
Browse files Browse the repository at this point in the history
* feat: add createInsertStream draft and tests

* refactor to remove InsertQueue

* restore default maxDelayMillis value

* restore default maxDelayMillis value

* moved row encoding to add, updated headers

* header

* add tests

* adding tests

* refactor stream and add tests

* update header dates

* add system tests

* remove comment

* update pending type

* add getOptionsDefaults to rowQueue
  • Loading branch information
steffnay authored Jan 17, 2022
1 parent ba86889 commit 0ffe544
Show file tree
Hide file tree
Showing 9 changed files with 971 additions and 0 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"extend": "^3.0.2",
"is": "^3.3.0",
"p-event": "^4.1.0",
"readable-stream": "^3.6.0",
"stream-events": "^1.0.5",
"uuid": "^8.0.0"
},
Expand Down
5 changes: 5 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ export {

export {Routine} from './routine';

export {RowBatch} from './rowBatch';

export {InsertRowsStreamResponse, RowQueue} from './rowQueue';

export {
CopyTableMetadata,
CreateCopyJobMetadata,
Expand All @@ -110,6 +114,7 @@ export {
InsertRowsCallback,
InsertRowsOptions,
InsertRowsResponse,
InsertStreamOptions,
JobLoadMetadata,
PartialInsertFailure,
PermissionsCallback,
Expand Down
97 changes: 97 additions & 0 deletions src/rowBatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*!
* Copyright 2022 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {InsertRowsCallback} from './rowQueue';
import {RowBatchOptions, RowMetadata} from './table';
export interface BatchLimits {
maxBytes: number;
maxRows: number;
}

export const BATCH_LIMITS: BatchLimits = {
maxBytes: 9 * 1024 * 1024,
maxRows: 50000,
};
export interface InsertOptions {
maxBytes?: number;
maxRows?: number;
maxMilliseconds?: number;
}

/**
* Call used to help batch rows.
*
* @private
*
* @param {BatchInsertOptions} options The batching options.
*/
export class RowBatch {
batchOptions: RowBatchOptions;
rows: RowMetadata[];
callbacks: InsertRowsCallback[];
created: number;
bytes: number;
constructor(options: RowBatchOptions) {
this.batchOptions = options;
this.rows = [];
this.callbacks = [];
this.created = Date.now();
this.bytes = 0;
}
/**
* Adds a row to the current batch.
*
* @param {object} row The row to insert.
* @param {InsertRowsCallback} callback The callback function.
*/
add(row: RowMetadata, callback?: InsertRowsCallback): void {
this.rows.push(row);
this.callbacks.push(callback!);
this.bytes += Buffer.byteLength(JSON.stringify(row));
}
/**
* Indicates if a given row can fit in the batch.
*
* @param {object} row The row in question.
* @returns {boolean}
*/
canFit(row: RowMetadata): boolean {
const {maxRows, maxBytes} = this.batchOptions;

return (
this.rows.length < maxRows! &&
this.bytes + Buffer.byteLength(JSON.stringify(row)) <= maxBytes
);
}
/**
* Checks to see if this batch is at the maximum allowed payload size.
*
* @returns {boolean}
*/
isAtMax(): boolean {
const {maxRows, maxBytes} = BATCH_LIMITS;
return this.rows.length >= maxRows! || this.bytes >= maxBytes;
}
/**
* Indicates if the batch is at capacity.
*
* @returns {boolean}
*/
isFull(): boolean {
const {maxRows, maxBytes} = this.batchOptions;
return this.rows.length >= maxRows! || this.bytes >= maxBytes;
}
}
235 changes: 235 additions & 0 deletions src/rowQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/*!
* Copyright 2022 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as common from '@google-cloud/common';
import * as extend from 'extend';
import * as uuid from 'uuid';
import {RequestCallback, Table, InsertStreamOptions} from '.';
import {GoogleErrorBody} from '@google-cloud/common/build/src/util';
import bigquery from './types';
import {BATCH_LIMITS, RowBatch} from './rowBatch';
import {Stream} from 'stream';
import {RowBatchOptions, InsertRowsOptions, RowMetadata} from './table';

export interface MaxInsertOptions {
maxOutstandingRows: number;
maxOutstandingBytes: number;
maxDelayMillis: number;
}

export const defaultOptions: MaxInsertOptions = {
// The maximum number of rows we'll batch up for insert().
maxOutstandingRows: 300,

// The maximum size of the total batched up rows for insert().
maxOutstandingBytes: 9 * 1024 * 1024,

// The maximum time we'll wait to send batched rows, in milliseconds.
maxDelayMillis: 10000,
};

export type InsertRowsStreamResponse = bigquery.ITableDataInsertAllResponse;

export type InsertRowsCallback = RequestCallback<
bigquery.ITableDataInsertAllResponse | bigquery.ITable
>;
export interface InsertRow {
insertId?: string;
json?: bigquery.IJsonObject;
}

export type TableRow = bigquery.ITableRow;
export interface PartialInsertFailure {
message: string;
reason: string;
row: RowMetadata;
}

/**
* Standard row queue used for inserting rows.
*
*
* @param {Table} table The table.
* @param {Duplex} dup Row stream.
* @param {InsertStreamOptions} options Insert and batch options.
*/
export class RowQueue {
table: Table;
stream: Stream;
insertRowsOptions: InsertRowsOptions = {};
batch: RowBatch;
batchOptions?: RowBatchOptions;
inFlight: boolean;
pending?: ReturnType<typeof setTimeout>;
constructor(table: Table, dup: Stream, options?: InsertStreamOptions) {
this.table = table;
this.stream = dup;
this.inFlight = false;

const opts = typeof options === 'object' ? options : {};

if (opts.insertRowsOptions) {
this.insertRowsOptions = opts.insertRowsOptions;
} else {
this.insertRowsOptions = {};
}
if (opts.batchOptions) {
this.setOptions(opts.batchOptions);
} else {
this.setOptions();
}

this.batch = new RowBatch(this.batchOptions!);
}

/**
* Adds a row to the queue.
*
* @param {RowMetadata} row The row to insert.
* @param {InsertRowsCallback} callback The insert callback.
*/
add(row: RowMetadata, callback: InsertRowsCallback): void {
if (!this.insertRowsOptions.raw) {
row = {
json: Table.encodeValue_(row)!,
};

if (this.insertRowsOptions.createInsertId !== false) {
row.insertId = uuid.v4();
}
}

if (!this.batch.canFit(row)) {
this.insert();
}
this.batch.add(row, callback);

if (this.batch.isFull()) {
this.insert();
} else if (!this.pending) {
const {maxMilliseconds} = this.batchOptions!;
this.pending = setTimeout(() => {
this.insert();
}, maxMilliseconds);
}
}
/**
* Cancels any pending inserts and calls _insert immediately.
*/
insert(callback?: InsertRowsCallback): void {
const {rows, callbacks} = this.batch;

this.batch = new RowBatch(this.batchOptions!);

if (this.pending) {
clearTimeout(this.pending);
delete this.pending;
}
if (rows.length > 0) {
this._insert(rows, callbacks, callback);
}
}

/**
* Accepts a batch of rows and inserts them into table.
*
* @param {object[]} rows The rows to insert.
* @param {InsertCallback[]} callbacks The corresponding callback functions.
* @param {function} [callback] Callback to be fired when insert is done.
*/
_insert(
rows: RowMetadata | RowMetadata[],
callbacks: InsertRowsCallback[],
cb?: InsertRowsCallback
): void {
const json = extend(true, {}, this.insertRowsOptions, {rows});

delete json.createInsertId;
delete json.partialRetries;
delete json.raw;

this.table.request(
{
method: 'POST',
uri: '/insertAll',
json,
},
(err, resp) => {
const partialFailures = (resp.insertErrors || []).map(
(insertError: GoogleErrorBody) => {
return {
errors: insertError.errors!.map(error => {
return {
message: error.message,
reason: error.reason,
};
}),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
row: rows[(insertError as any).index],
};
}
);

if (partialFailures.length > 0) {
err = new common.util.PartialFailureError({
errors: partialFailures,
response: resp,
} as GoogleErrorBody);

callbacks.forEach(callback => callback!(err, resp));
this.stream.emit('error', err);
} else {
callbacks.forEach(callback => callback!(err, resp));
this.stream.emit('response', resp);
cb!(err, resp);
}
cb!(err, resp);
}
);
}

/**
* Sets the batching options.
*
*
* @param {RowBatchOptions} [options] The batching options.
*/
setOptions(options = {} as RowBatchOptions): void {
const defaults = this.getOptionDefaults();

const {maxBytes, maxRows, maxMilliseconds} = extend(
true,
defaults,
options
);

this.batchOptions = {
maxBytes: Math.min(maxBytes, BATCH_LIMITS.maxBytes),
maxRows: Math.min(maxRows!, BATCH_LIMITS.maxRows),
maxMilliseconds: maxMilliseconds,
};
}

getOptionDefaults(): RowBatchOptions {
// Return a unique copy to avoid shenanigans.
const defaults: RowBatchOptions = {
maxBytes: defaultOptions.maxOutstandingBytes,
maxRows: defaultOptions.maxOutstandingRows,
maxMilliseconds: defaultOptions.maxDelayMillis,
};
return defaults;
}
}
Loading

0 comments on commit 0ffe544

Please sign in to comment.