diff --git a/src/request.ts b/src/request.ts index b3483c1f..dc9e0b1d 100644 --- a/src/request.ts +++ b/src/request.ts @@ -27,11 +27,17 @@ import {Duplex, PassThrough, Transform} from 'stream'; // eslint-disable-next-line @typescript-eslint/no-var-requires const streamEvents = require('stream-events'); +export const transactionExpiredError = 'This transaction has already expired.'; export interface AbortableDuplex extends Duplex { abort(): void; } +interface TransactionRequestOptions { + readOnly?: {}; + readWrite?: {previousTransaction?: string | Uint8Array | null}; +} + // Import the clients for each version supported by this package. const gapic = Object.freeze({ v1: require('./v1'), @@ -55,9 +61,10 @@ import { RunQueryResponse, RunQueryCallback, } from './query'; -import {Datastore} from '.'; +import {Datastore, Transaction} from '.'; import ITimestamp = google.protobuf.ITimestamp; import {AggregateQuery} from './aggregate'; +import {RunOptions} from './transaction'; import * as protos from '../protos/protos'; import {serializer} from 'google-gax'; import * as gax from 'google-gax'; @@ -153,6 +160,16 @@ function getInfoFromStats( return {}; } +const readTimeAndConsistencyError = + 'Read time and read consistency cannot both be specified.'; + +// Write function to check for readTime and readConsistency. +function throwOnReadTimeAndConsistency(options: RunQueryStreamOptions) { + if (options.readTime && options.consistency) { + throw new Error(readTimeAndConsistencyError); + } +} + /** * A map of read consistency values to proto codes. * @@ -164,6 +181,19 @@ const CONSISTENCY_PROTO_CODE: ConsistencyProtoCode = { strong: 1, }; +/** + * By default a DatastoreRequest is in the NOT_TRANSACTION state. If the + * DatastoreRequest is a Transaction object, then initially it will be in + * the NOT_STARTED state, but then the state will become IN_PROGRESS after the + * transaction has started. + */ +export enum TransactionState { + NOT_TRANSACTION, + NOT_STARTED, + IN_PROGRESS, + EXPIRED, +} + /** * Handle logic for Datastore API operations. Handles request logic for * Datastore. @@ -184,6 +214,7 @@ class DatastoreRequest { | Array<(err: Error | null, resp: Entity | null) => void> | Entity; datastore!: Datastore; + protected state: TransactionState = TransactionState.NOT_TRANSACTION; [key: string]: Entity; /** @@ -335,10 +366,20 @@ class DatastoreRequest { ); } + /* This throws an error if the transaction has already expired. + * + */ + protected checkExpired() { + if (this.state === TransactionState.EXPIRED) { + throw Error(transactionExpiredError); + } + } + /** * Retrieve the entities as a readable object stream. * * @throws {Error} If at least one Key object is not provided. + * @throws {Error} If read time and read consistency cannot both be specified. * * @param {Key|Key[]} keys Datastore key object(s). * @param {object} [options] Optional configuration. See {@link Datastore#get} @@ -369,9 +410,11 @@ class DatastoreRequest { if (keys.length === 0) { throw new Error('At least one Key object is required.'); } - + this.checkExpired(); + throwOnReadTimeAndConsistency(options); + const reqOpts = this.getRequestOptions(options); + throwOnTransactionErrors(this, reqOpts); const makeRequest = (keys: entity.Key[] | KeyProto[]) => { - const reqOpts = this.getRequestOptions(options); Object.assign(reqOpts, {keys}); this.request_( { @@ -381,6 +424,7 @@ class DatastoreRequest { gaxOpts: options.gaxOptions, }, (err, resp) => { + this.parseTransactionResponse(resp); if (err) { stream.destroy(err); return; @@ -631,14 +675,34 @@ class DatastoreRequest { const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - this.createReadStream(keys, options) - .on('error', callback) - .pipe( - concat((results: Entity[]) => { - const isSingleLookup = !Array.isArray(keys); - callback(null, isSingleLookup ? results[0] : results); - }) - ); + try { + this.createReadStream(keys, options) + .on('error', callback) + .pipe( + concat((results: Entity[]) => { + const isSingleLookup = !Array.isArray(keys); + callback(null, isSingleLookup ? results[0] : results); + }) + ); + } catch (err: any) { + callback(err); + } + } + + /** + * This function saves results from a successful beginTransaction call. + * + * @param {BeginAsyncResponse} [response] The response from a call to + * begin a transaction that completed successfully. + * + **/ + protected parseTransactionResponse(resp?: { + transaction?: Uint8Array | string | undefined | null; + }): void { + if (resp && resp.transaction && Buffer.byteLength(resp.transaction) > 0) { + this.id = resp!.transaction; + this.state = TransactionState.IN_PROGRESS; + } } /** @@ -653,6 +717,8 @@ class DatastoreRequest { * @param {function} [callback] The callback function. If omitted, a promise is * returned. * + * @throws {Error} If read time and read consistency cannot both be specified. + * **/ runAggregationQuery( query: AggregateQuery, @@ -677,6 +743,14 @@ class DatastoreRequest { const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + if (this.state === TransactionState.EXPIRED) { + callback(new Error(transactionExpiredError)); + return; + } + if (options.readTime && options.consistency) { + callback(new Error(readTimeAndConsistencyError)); + return; + } query.query = extend(true, new Query(), query.query); let queryProto: QueryProto; try { @@ -687,7 +761,14 @@ class DatastoreRequest { setImmediate(callback, e as Error); return; } - const sharedQueryOpts = this.getQueryOptions(query.query, options); + let sharedQueryOpts; + try { + sharedQueryOpts = this.getQueryOptions(query.query, options); + throwOnTransactionErrors(this, sharedQueryOpts); + } catch (error: any) { + callback(error); + return; + } const aggregationQueryOptions: AggregationQueryOptions = { nestedQuery: queryProto, aggregations: query.toProto(), @@ -704,6 +785,7 @@ class DatastoreRequest { }, (err, res) => { const info = getInfoFromStats(res); + this.parseTransactionResponse(res); if (res && res.batch) { const results = res.batch.aggregationResults; const finalResults = results @@ -846,16 +928,20 @@ class DatastoreRequest { let info: RunQueryInfo; - this.runQueryStream(query, options) - .on('error', callback) - .on('info', info_ => { - info = info_; - }) - .pipe( - concat((results: Entity[]) => { - callback(null, results, info); + try { + this.runQueryStream(query, options) + .on('error', callback) + .on('info', info_ => { + info = info_; }) - ); + .pipe( + concat((results: Entity[]) => { + callback(null, results, info); + }) + ); + } catch (err: any) { + callback(err); + } } /** @@ -868,6 +954,8 @@ class DatastoreRequest { * @param {object} [options.gaxOptions] Request configuration options, outlined * here: https://googleapis.github.io/gax-nodejs/global.html#CallOptions. * + * @throws {Error} If read time and read consistency cannot both be specified. + * * @example * ``` * datastore.runQueryStream(query) @@ -892,7 +980,11 @@ class DatastoreRequest { * ``` */ runQueryStream(query: Query, options: RunQueryStreamOptions = {}): Transform { + this.checkExpired(); + throwOnReadTimeAndConsistency(options); query = extend(true, new Query(), query); + const sharedQueryOpts = this.getQueryOptions(query, options); + throwOnTransactionErrors(this, sharedQueryOpts); const makeRequest = (query: Query) => { let queryProto: QueryProto; try { @@ -903,7 +995,6 @@ class DatastoreRequest { setImmediate(onResultSet, e as Error); return; } - const sharedQueryOpts = this.getQueryOptions(query, options); const reqOpts: RequestOptions = sharedQueryOpts; reqOpts.query = queryProto; @@ -918,7 +1009,8 @@ class DatastoreRequest { ); }; - function onResultSet(err?: Error | null, resp?: Entity) { + const onResultSet = (err?: Error | null, resp?: Entity) => { + this.parseTransactionResponse(resp); if (err) { stream.destroy(err); return; @@ -977,7 +1069,7 @@ class DatastoreRequest { makeRequest(query); }); - } + }; const stream = streamEvents(new Transform({objectMode: true})); stream.once('reading', () => { @@ -990,11 +1082,24 @@ class DatastoreRequest { options: RunQueryStreamOptions ): SharedQueryOptions { const sharedQueryOpts = {} as SharedQueryOptions; + if (isTransaction(this)) { + if (this.state === TransactionState.NOT_STARTED) { + if (sharedQueryOpts.readOptions === undefined) { + sharedQueryOpts.readOptions = {}; + } + sharedQueryOpts.readOptions.newTransaction = getTransactionRequest( + this, + {} + ); + sharedQueryOpts.readOptions.consistencyType = 'newTransaction'; + } + } if (options.consistency) { const code = CONSISTENCY_PROTO_CODE[options.consistency.toLowerCase()]; - sharedQueryOpts.readOptions = { - readConsistency: code, - }; + if (sharedQueryOpts.readOptions === undefined) { + sharedQueryOpts.readOptions = {}; + } + sharedQueryOpts.readOptions.readConsistency = code; } if (options.readTime) { if (sharedQueryOpts.readOptions === undefined) { @@ -1121,19 +1226,13 @@ class DatastoreRequest { if (method === 'rollback') { reqOpts.transaction = this.id; } - + throwOnTransactionErrors(this, reqOpts); if ( isTransaction && (method === 'lookup' || method === 'runQuery' || method === 'runAggregationQuery') ) { - if (reqOpts.readOptions && reqOpts.readOptions.readConsistency) { - throw new Error( - 'Read consistency cannot be specified in a transaction.' - ); - } - if (reqOpts.readOptions) { Object.assign(reqOpts.readOptions, {transaction: this.id}); } else { @@ -1229,6 +1328,60 @@ class DatastoreRequest { } } +function isTransaction(request: DatastoreRequest): request is Transaction { + return request instanceof Transaction; +} + +function throwOnTransactionErrors( + request: DatastoreRequest, + options: SharedQueryOptions +) { + const isTransaction = request.id ? true : false; + if ( + isTransaction || + (options.readOptions && options.readOptions.newTransaction) + ) { + if (options.readOptions && options.readOptions.readConsistency) { + throw new Error('Read consistency cannot be specified in a transaction.'); + } + if (options.readOptions && options.readOptions.readTime) { + throw new Error('Read time cannot be specified in a transaction.'); + } + } +} + +/** + * This function gets transaction request options used for defining a + * request to create a new transaction on the server. + * + * @param transaction The transaction for which the request will be made. + * @param options Custom options that will be used to create the request. + */ +export function getTransactionRequest( + transaction: Transaction, + options: RunOptions +): TransactionRequestOptions { + // If transactionOptions are provide then they will be used. + // Otherwise, options passed into this function are used and when absent + // options that exist on Transaction are used. + return options.transactionOptions // If transactionOptions is specified: + ? options.transactionOptions.readOnly // Use readOnly on transactionOptions + ? {readOnly: {}} + : options.transactionOptions.id // Use retry transaction if specified: + ? {readWrite: {previousTransaction: options.transactionOptions.id}} + : {} + : options.readOnly || transaction.readOnly // If transactionOptions not set: + ? {readOnly: {}} // Create a readOnly transaction if readOnly option set + : options.transactionId || transaction.id + ? { + // Create readWrite transaction with a retry transaction set + readWrite: { + previousTransaction: options.transactionId || transaction.id, + }, + } + : {}; // Request will be readWrite with no retry transaction set; +} + export interface ConsistencyProtoCode { [key: string]: number; } @@ -1294,15 +1447,18 @@ export interface SharedQueryOptions { readConsistency?: number; transaction?: string | Uint8Array | null; readTime?: ITimestamp; + newTransaction?: TransactionRequestOptions; + consistencyType?: + | 'readConsistency' + | 'transaction' + | 'newTransaction' + | 'readTime'; }; } export interface RequestOptions extends SharedQueryOptions { mutations?: google.datastore.v1.IMutation[]; keys?: Entity; - transactionOptions?: { - readOnly?: {}; - readWrite?: {previousTransaction?: string | Uint8Array | null}; - } | null; + transactionOptions?: TransactionRequestOptions | null; transaction?: string | null | Uint8Array; mode?: string; query?: QueryProto; @@ -1333,7 +1489,7 @@ export type DeleteResponse = CommitResponse; * that a callback is omitted. */ promisifyAll(DatastoreRequest, { - exclude: ['getQueryOptions', 'getRequestOptions'], + exclude: ['checkExpired', 'getQueryOptions', 'getRequestOptions'], }); /** diff --git a/src/transaction.ts b/src/transaction.ts index e0378c73..1f583b4a 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -15,33 +15,34 @@ */ import {promisifyAll} from '@google-cloud/promisify'; -import arrify = require('arrify'); import {CallOptions} from 'google-gax'; import {google} from '../protos/protos'; import {Datastore, TransactionOptions} from '.'; -import {entity, Entity, Entities} from './entity'; +import {Entities, Entity, entity} from './entity'; import { Query, RunQueryCallback, - RunQueryInfo, RunQueryOptions, RunQueryResponse, } from './query'; import { CommitCallback, CommitResponse, - DatastoreRequest, - RequestOptions, - PrepareEntityObjectResponse, CreateReadStreamOptions, - GetResponse, + DatastoreRequest, GetCallback, + GetResponse, + getTransactionRequest, + PrepareEntityObjectResponse, RequestCallback, + transactionExpiredError, + TransactionState, } from './request'; import {AggregateQuery} from './aggregate'; import {Mutex} from 'async-mutex'; +import arrify = require('arrify'); /* * This type matches the value returned by the promise in the @@ -53,11 +54,6 @@ interface BeginAsyncResponse { resp?: google.datastore.v1.IBeginTransactionResponse; } -enum TransactionState { - NOT_STARTED, - IN_PROGRESS, // IN_PROGRESS currently tracks the expired state as well -} - /** * A transaction is a set of Datastore operations on one or more entities. Each * transaction is guaranteed to be atomic, which means that transactions are @@ -85,7 +81,6 @@ class Transaction extends DatastoreRequest { modifiedEntities_: ModifiedEntities; skipCommit?: boolean; #mutex = new Mutex(); - #state = TransactionState.NOT_STARTED; constructor(datastore: Datastore, options?: TransactionOptions) { super(); /** @@ -115,6 +110,7 @@ class Transaction extends DatastoreRequest { // Queue the requests to make when we send the transactional commit. this.requests_ = []; + this.state = TransactionState.NOT_STARTED; } /*! Developer Documentation @@ -177,6 +173,10 @@ class Transaction extends DatastoreRequest { : () => {}; const gaxOptions = typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; + if (this.state === TransactionState.EXPIRED) { + callback(new Error(transactionExpiredError)); + return; + } // This ensures that the transaction is started before calling runCommit this.#withBeginTransaction( gaxOptions, @@ -355,13 +355,9 @@ class Transaction extends DatastoreRequest { const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; // This ensures that the transaction is started before calling get - this.#withBeginTransaction( - options.gaxOptions, - () => { - super.get(keys, options, callback); - }, - callback - ); + this.#blockWithMutex(() => { + super.get(keys, options, callback); + }); } /** @@ -434,6 +430,14 @@ class Transaction extends DatastoreRequest { const callback = typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; + if (this.state === TransactionState.EXPIRED) { + callback(new Error(transactionExpiredError)); + return; + } + if (this.state === TransactionState.NOT_STARTED) { + callback(new Error('Transaction is not started')); + return; + } this.request_( { client: 'DatastoreClient', @@ -442,6 +446,7 @@ class Transaction extends DatastoreRequest { }, (err, resp) => { this.skipCommit = true; + this.state = TransactionState.EXPIRED; callback(err || null, resp); } ); @@ -511,7 +516,7 @@ class Transaction extends DatastoreRequest { const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; this.#mutex.runExclusive(async () => { - if (this.#state === TransactionState.NOT_STARTED) { + if (this.state === TransactionState.NOT_STARTED) { const runResults = await this.#beginTransactionAsync(options); this.#processBeginResults(runResults, callback); } else { @@ -635,6 +640,7 @@ class Transaction extends DatastoreRequest { return; } + this.state = TransactionState.EXPIRED; // The `callbacks` array was built previously. These are the callbacks // that handle the API response normally when using the // DatastoreRequest.save and .delete methods. @@ -666,24 +672,11 @@ class Transaction extends DatastoreRequest { if (err) { callback(err, null, resp); } else { - this.#parseRunSuccess(runResults); + this.parseTransactionResponse(resp); callback(null, this, resp); } } - /** - * This function saves results from a successful beginTransaction call. - * - * @param {BeginAsyncResponse} [response] The response from a call to - * begin a transaction that completed successfully. - * - **/ - #parseRunSuccess(runResults: BeginAsyncResponse) { - const resp = runResults.resp; - this.id = resp!.transaction; - this.#state = TransactionState.IN_PROGRESS; - } - /** * This async function makes a beginTransaction call and returns a promise with * the information returned from the call that was made. @@ -696,24 +689,10 @@ class Transaction extends DatastoreRequest { async #beginTransactionAsync( options: RunOptions ): Promise { - const reqOpts: RequestOptions = { - transactionOptions: {}, - }; - - if (options.readOnly || this.readOnly) { - reqOpts.transactionOptions!.readOnly = {}; - } - - if (options.transactionId || this.id) { - reqOpts.transactionOptions!.readWrite = { - previousTransaction: options.transactionId || this.id, - }; - } - - if (options.transactionOptions) { - reqOpts.transactionOptions = options.transactionOptions; - } return new Promise((resolve: (value: BeginAsyncResponse) => void) => { + const reqOpts = { + transactionOptions: getTransactionRequest(this, options), + }; this.request_( { client: 'DatastoreClient', @@ -766,13 +745,9 @@ class Transaction extends DatastoreRequest { const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; // This ensures that the transaction is started before calling runAggregationQuery - this.#withBeginTransaction( - options.gaxOptions, - () => { - super.runAggregationQuery(query, options, callback); - }, - callback - ); + this.#blockWithMutex(() => { + super.runAggregationQuery(query, options, callback); + }); } /** @@ -805,13 +780,9 @@ class Transaction extends DatastoreRequest { const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; // This ensures that the transaction is started before calling runQuery - this.#withBeginTransaction( - options.gaxOptions, - () => { - super.runQuery(query, options, callback); - }, - callback - ); + this.#blockWithMutex(() => { + super.runQuery(query, options, callback); + }); } /** @@ -1019,7 +990,7 @@ class Transaction extends DatastoreRequest { * @param {CallOptions | undefined} [gaxOptions] Gax options provided by the * user that are used for the beginTransaction grpc call. * @param {function} [fn] A function which is run after ensuring a - * beginTransaction call is made. + * transaction has begun. * @param {function} [callback] A callback provided by the user that expects * an error in the first argument and a custom data type for the rest of the * arguments. @@ -1031,10 +1002,10 @@ class Transaction extends DatastoreRequest { callback: (...args: [Error | null, ...T] | [Error | null]) => void ): void { (async () => { - if (this.#state === TransactionState.NOT_STARTED) { + if (this.state === TransactionState.NOT_STARTED) { try { await this.#mutex.runExclusive(async () => { - if (this.#state === TransactionState.NOT_STARTED) { + if (this.state === TransactionState.NOT_STARTED) { // This sends an rpc call to get the transaction id const runResults = await this.#beginTransactionAsync({ gaxOptions, @@ -1044,7 +1015,7 @@ class Transaction extends DatastoreRequest { // Do not call the wrapped function. throw runResults.err; } - this.#parseRunSuccess(runResults); + this.parseTransactionResponse(runResults.resp); // The rpc saving the transaction id was successful. // Now the wrapped function fn will be called. } @@ -1057,6 +1028,31 @@ class Transaction extends DatastoreRequest { return fn(); })(); } + + /* + * Some rpc calls require that the transaction has been started (i.e, has a + * valid id) before they can be sent. #withBeginTransaction acts as a wrapper + * over those functions. + * + * If the transaction has not begun yet, `#blockWithMutex` will call the + * wrapped function which will begin the transaction in the rpc call it sends. + * If the transaction has begun, the wrapped function will be called, but it + * will not begin a transaction. + * + * @param {function} [fn] A function which is run after ensuring a + * transaction has begun. + */ + #blockWithMutex(fn: () => void) { + (async () => { + if (this.state === TransactionState.NOT_STARTED) { + await this.#mutex.runExclusive(async () => { + fn(); + }); + } else { + fn(); + } + })(); + } } export type ModifiedEntities = Array<{ diff --git a/system-test/datastore.ts b/system-test/datastore.ts index 9990611b..74b96e78 100644 --- a/system-test/datastore.ts +++ b/system-test/datastore.ts @@ -25,6 +25,7 @@ import {and, or, PropertyFilter} from '../src/filter'; import {Entities, entity, Entity} from '../src/entity'; import {Query, RunQueryInfo, ExecutionStats} from '../src/query'; import KEY_SYMBOL = entity.KEY_SYMBOL; +import {transactionExpiredError} from '../src/request'; const async = require('async'); @@ -921,7 +922,7 @@ async.each( }); it('should run a transaction query as a stream via query#runStream', done => { - const transaction = datastore.transaction(); + const transaction = datastore.transaction({readOnly: true}); const q = transaction.createQuery('Character').hasAncestor(ancestor); let resultsReturned = 0; q.runStream() @@ -2368,6 +2369,238 @@ async.each( }); }); describe('transactions with and without run', () => { + describe('test to make sure various endpoints report an expired state', () => { + async.each( + [ + { + name: 'with commit', + setupExpiredFn: async (tx: Transaction) => { + await tx.commit(); + }, + }, + { + name: 'with rollback', + setupExpiredFn: async (tx: Transaction) => { + // We expect rollback to fail with an aborted error when + // tx.run hasn't been called yet so these tests are only + // valid when tx.run() has already been called. + await tx.run(); + await tx.rollback(); + }, + }, + ], + (test: { + name: string; + setupExpiredFn: (tx: Transaction) => Promise; + }) => { + describe(test.name, () => { + describe('get', () => { + it('with transaction.run', async () => { + try { + const key = datastore.key(['Company', 'Google']); + const transaction: Transaction = datastore.transaction(); + await transaction.run(); + await test.setupExpiredFn(transaction); + await transaction.get(key); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + it('without transaction.run', async () => { + try { + const key = datastore.key(['Company', 'Google']); + const transaction = datastore.transaction(); + await test.setupExpiredFn(transaction); + await transaction.get(key); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + }); + describe('runQuery', () => { + it('with transaction.run', async () => { + try { + const transaction = datastore.transaction(); + const query = transaction.createQuery('Company'); + await transaction.run(); + await test.setupExpiredFn(transaction); + await transaction.runQuery(query); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + it('without transaction.run', async () => { + try { + const transaction = datastore.transaction(); + const query = transaction.createQuery('Company'); + await test.setupExpiredFn(transaction); + await transaction.runQuery(query); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + }); + describe('runAggregationQuery', () => { + it('with transaction.run', async () => { + try { + const transaction = datastore.transaction(); + const query = transaction.createQuery('Company'); + const aggregationQuery = transaction + .createAggregationQuery(query) + .count('total'); + await transaction.run(); + await test.setupExpiredFn(transaction); + await transaction.runAggregationQuery(aggregationQuery); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + it('without transaction.run', async () => { + try { + const transaction = datastore.transaction(); + const query = transaction.createQuery('Company'); + const aggregationQuery = transaction + .createAggregationQuery(query) + .count('total'); + await test.setupExpiredFn(transaction); + await transaction.runAggregationQuery(aggregationQuery); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + }); + describe('runQueryStream', () => { + it('with transaction.run', async () => { + try { + const transaction = datastore.transaction(); + const query = transaction.createQuery('Company'); + await transaction.run(); + await test.setupExpiredFn(transaction); + transaction.runQueryStream(query); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + it('without transaction.run', async () => { + try { + const transaction = datastore.transaction(); + const query = transaction.createQuery('Company'); + await test.setupExpiredFn(transaction); + transaction.runQueryStream(query); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + }); + describe('createReadStream', () => { + it('with transaction.run', async () => { + try { + const datastore = new Datastore(clientOptions); + const key = datastore.key(['Company', 'Google']); + const transaction = datastore.transaction(); + await transaction.run(); + await test.setupExpiredFn(transaction); + transaction.createReadStream(key); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + it('without transaction.run', async () => { + try { + const datastore = new Datastore(clientOptions); + const key = datastore.key(['Company', 'Google']); + const transaction = datastore.transaction(); + await test.setupExpiredFn(transaction); + transaction.createReadStream(key); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + }); + describe('commit', () => { + it('with transaction.run', async () => { + try { + const transaction = datastore.transaction(); + await transaction.run(); + await test.setupExpiredFn(transaction); + await transaction.commit(); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + it('without transaction.run', async () => { + try { + const transaction = datastore.transaction(); + await test.setupExpiredFn(transaction); + await transaction.commit(); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + }); + describe('rollback', () => { + it('with transaction.run', async () => { + try { + const transaction = datastore.transaction(); + await transaction.run(); + await test.setupExpiredFn(transaction); + await transaction.rollback(); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + it('without transaction.run', async () => { + try { + const transaction = datastore.transaction(); + await test.setupExpiredFn(transaction); + await transaction.rollback(); + assert.fail('The expire error should have appeared'); + } catch (err: any) { + assert.strictEqual(err.message, transactionExpiredError); + } + }); + }); + }); + } + ); + }); + describe('comparing times with and without transaction.run', async () => { + const key = datastore.key(['Company', 'Google']); + // Record time for transaction with run + const startTimeWithRun = new Date().getTime(); + const transactionWithRun = datastore.transaction({readOnly: true}); + await transactionWithRun.run(); + await Promise.all([ + transactionWithRun.get(key), + transactionWithRun.get(key), + ]); + await transactionWithRun.commit(); + const timeElapsedWithRun = new Date().getTime() - startTimeWithRun; + // Record time for transaction without run + const startTimeWithoutRun = new Date().getTime(); + const transactionWithoutRun = datastore.transaction({readOnly: true}); + await Promise.all([ + transactionWithoutRun.get(key), + transactionWithoutRun.get(key), + ]); + await transactionWithoutRun.commit(); + const timeElapsedWithoutRun = + new Date().getTime() - startTimeWithoutRun; + assert(timeElapsedWithoutRun < timeElapsedWithRun); + }); describe('lookup, put, commit', () => { const key = datastore.key(['Company', 'Google']); const obj = { @@ -2450,6 +2683,33 @@ async.each( await doRunQueryPutCommit(transaction); }); }); + describe('readOnly for runQuery looks at snapshot from first read', () => { + const key = datastore.key(['Company', 'Google']); + const obj = { + url: 'www.google.com', + }; + afterEach(async () => { + await datastore.delete(key); + }); + async function doPutRunQueryCommit(transaction: Transaction) { + const query = transaction.createQuery('Company'); + const [results] = await transaction.runQuery(query); + assert.deepStrictEqual(results, []); + await datastore.save({key, data: obj}); + const [results2] = await transaction.runQuery(query); + assert.deepStrictEqual(results2, []); + await transaction.commit(); + } + it('should run in a transaction', async () => { + const transaction = datastore.transaction({readOnly: true}); + await transaction.run(); + await doPutRunQueryCommit(transaction); + }); + it('should run in a transaction without run', async () => { + const transaction = datastore.transaction({readOnly: true}); + await doPutRunQueryCommit(transaction); + }); + }); describe('put, runQuery, commit', () => { const key = datastore.key(['Company', 'Google']); const obj = { @@ -2785,6 +3045,37 @@ async.each( assert.deepStrictEqual(resultsAgain, [{total: 2}]); await transaction.commit(); }); + it('readOnly transaction should see consistent snapshot of database without transaction.run', async () => { + async function getResults(transaction: Transaction) { + const query = transaction.createQuery('Company'); + const aggregateQuery = transaction + .createAggregationQuery(query) + .count('total'); + let result; + try { + [result] = await aggregateQuery.run(); + } catch (e) { + await transaction.rollback(); + throw e; + } + return result; + } + const key = datastore.key(['Company', 'Google']); + const transaction = datastore.transaction({readOnly: true}); + const results = await getResults(transaction); + assert.deepStrictEqual(results, [{total: 3}]); + await datastore.save([ + { + key, + data: { + rating: 100, + }, + }, + ]); + const resultsAgain = await getResults(transaction); + assert.deepStrictEqual(resultsAgain, [{total: 3}]); + await transaction.commit(); + }); }); it('should read in a readOnly transaction', async () => { @@ -2794,6 +3085,12 @@ async.each( await transaction.get(key); }); + it('should read in a readOnly transaction without transaction.run', async () => { + const transaction = datastore.transaction({readOnly: true}); + const key = datastore.key(['Company', 'Google']); + await transaction.get(key); + }); + it('should not write in a readOnly transaction', async () => { const transaction = datastore.transaction({readOnly: true}); const key = datastore.key(['Company', 'Google']); diff --git a/test/gapic-mocks/error-mocks.ts b/test/gapic-mocks/error-mocks.ts new file mode 100644 index 00000000..226859e6 --- /dev/null +++ b/test/gapic-mocks/error-mocks.ts @@ -0,0 +1,73 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as mocha from 'mocha'; +import * as assert from 'assert'; +import {Datastore} from '../../src'; + +/** + * Returns a callback function that expects an error with a particular + * message. This is used for testing all client library functions that accept + * a callback in order to ensure the callback receives a particular error. + * + * @param {mocha.Done} done The mocha done function which is called when the + * test finishes. + * @param {string} message The expected error message in the test. + * + */ +export function getCallbackExpectingError(done: mocha.Done, message: string) { + return (error?: Error | null) => { + try { + if (error) { + assert.strictEqual(error.message, message); + done(); + return; + } + done(new Error('The callback should have received an error')); + } catch (err: unknown) { + done(err); + } + }; +} + +/** + * This function ends the test with an error if a call reaches the gapic + * layer. Using this function in a test makes the test fail if any outgoing + * grpc calls are made in that test. This allows the test to ensure that no + * grpc calls happen, which is typically desired behaviour when an error is + * sent back to the user from the handwritten layer. + * + * @param {Datastore} datastore The datastore client. + * @param {string} clientName The datastore client. + * @param {mocha.Done} done The mocha done function which is called when the + * test finishes. + */ +export function errorOnGapicCall( + datastore: Datastore, + clientName: string, + done: mocha.Done +) { + const dataClient = datastore.clients_.get(clientName); + if (dataClient) { + dataClient.runQuery = () => { + done(new Error('The gapic layer should not have received a call')); + }; + dataClient.runAggregationQuery = () => { + done(new Error('The gapic layer should not have received a call')); + }; + dataClient.lookup = () => { + done(new Error('The gapic layer should not have received a call')); + }; + } +} diff --git a/test/gapic-mocks/get-initialized-datastore-client.ts b/test/gapic-mocks/get-initialized-datastore-client.ts new file mode 100644 index 00000000..bf3e4b65 --- /dev/null +++ b/test/gapic-mocks/get-initialized-datastore-client.ts @@ -0,0 +1,44 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {Datastore} from '../../src'; + +/** + * This function gets a datastore client that has already been initialized + * meaning that its gapic data client has been created and is ready to be mocked + * out with whatever behavior is needed in a test. Mocking out the gapic client + * is common for testing handwritten layer behaviour because it is a way to + * evaluate data that reaches the handwritten layer thereby testing the + * handwritten layer in isolation. + */ +export function getInitializedDatastoreClient(): Datastore { + const clientName = 'DatastoreClient'; + const PROJECT_ID = 'project-id'; + const NAMESPACE = 'namespace'; + const options = { + projectId: PROJECT_ID, + namespace: NAMESPACE, + }; + const datastore = new Datastore(options); + // By default, datastore.clients_ is an empty map. + // To mock out commit we need the map to contain the Gapic data client. + // Normally a call to the data client through the datastore object would initialize it. + // We don't want to make this call because it would make a grpc request. + // So we just add the data client to the map. + const gapic = Object.freeze({ + v1: require('../../src/v1'), + }); + datastore.clients_.set(clientName, new gapic.v1[clientName](options)); + return datastore; +} diff --git a/test/gapic-mocks/handwritten-errors.ts b/test/gapic-mocks/handwritten-errors.ts new file mode 100644 index 00000000..59c743ee --- /dev/null +++ b/test/gapic-mocks/handwritten-errors.ts @@ -0,0 +1,117 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {describe} from 'mocha'; +import {AggregateField} from '../../src'; +import {getInitializedDatastoreClient} from './get-initialized-datastore-client'; +import {RunQueryOptions} from '../../src/query'; +import {errorOnGapicCall, getCallbackExpectingError} from './error-mocks'; +import * as assert from 'assert'; +const async = require('async'); + +describe('HandwrittenLayerErrors', () => { + describe('Can only specify one of transaction, consistency, readTime', () => { + const clientName = 'DatastoreClient'; + const datastore = getInitializedDatastoreClient(); + async.each( + [ + { + options: {consistency: 'eventual', readTime: 77000}, + expectedError: + 'Read time and read consistency cannot both be specified.', + description: + 'should error when read time and eventual consistency are specified', + }, + { + options: {consistency: 'eventual'}, + expectedError: + 'Read consistency cannot be specified in a transaction.', + description: + 'should error when new transaction and eventual consistency are specified', + }, + { + options: {readTime: 77000}, + expectedError: 'Read time cannot be specified in a transaction.', + description: + 'should error when new transaction and read time are specified', + }, + ], + (testParameters: { + options: RunQueryOptions; + expectedError: string; + description: string; + }) => { + describe(testParameters.description, () => { + it('should error when runQuery is used', done => { + const transaction = datastore.transaction(); + const query = datastore.createQuery('Task'); + errorOnGapicCall(datastore, clientName, done); // Test fails if Gapic layer receives a call. + transaction.runQuery( + query, + testParameters.options, + getCallbackExpectingError(done, testParameters.expectedError) + ); + }); + it('should error when runQueryStream is used', done => { + const transaction = datastore.transaction(); + const query = datastore.createQuery('Task'); + errorOnGapicCall(datastore, clientName, done); // Test fails if Gapic layer receives a call. + try { + transaction.runQueryStream(query, testParameters.options); + done(new Error('runQueryStream should have thrown an error')); + } catch (err: any) { + assert.strictEqual(err.message, testParameters.expectedError); + done(); + } + }); + it('should error when runAggregationQuery is used', done => { + const transaction = datastore.transaction(); + const query = datastore.createQuery('Task'); + const aggregate = datastore + .createAggregationQuery(query) + .addAggregation(AggregateField.sum('appearances')); + errorOnGapicCall(datastore, clientName, done); // Test fails if Gapic layer receives a call. + transaction.runAggregationQuery( + aggregate, + testParameters.options, + getCallbackExpectingError(done, testParameters.expectedError) + ); + }); + it('should error when get is used', done => { + const transaction = datastore.transaction(); + const keys = datastore.key(['Company', 'Google']); + errorOnGapicCall(datastore, clientName, done); // Test fails if Gapic layer receives a call. + transaction.get( + keys, + testParameters.options, + getCallbackExpectingError(done, testParameters.expectedError) + ); + }); + it('should error when createReadStream is used', done => { + const transaction = datastore.transaction(); + const keys = datastore.key(['Company', 'Google']); + errorOnGapicCall(datastore, clientName, done); // Test fails if Gapic layer receives a call. + try { + transaction.createReadStream(keys, testParameters.options); + done(new Error('createReadStream should have thrown an error')); + } catch (err: any) { + assert.strictEqual(err.message, testParameters.expectedError); + done(); + } + }); + }); + } + ); + }); +}); diff --git a/test/gapic-mocks/runQuery.ts b/test/gapic-mocks/runQuery.ts index e19964ec..d54d8649 100644 --- a/test/gapic-mocks/runQuery.ts +++ b/test/gapic-mocks/runQuery.ts @@ -14,28 +14,13 @@ import * as assert from 'assert'; import {describe} from 'mocha'; -import {DatastoreClient, Datastore} from '../../src'; +import {DatastoreClient} from '../../src'; import * as protos from '../../protos/protos'; -import {Callback} from 'google-gax'; +import {getInitializedDatastoreClient} from './get-initialized-datastore-client'; describe('Run Query', () => { - const PROJECT_ID = 'project-id'; - const NAMESPACE = 'namespace'; const clientName = 'DatastoreClient'; - const options = { - projectId: PROJECT_ID, - namespace: NAMESPACE, - }; - const datastore = new Datastore(options); - // By default, datastore.clients_ is an empty map. - // To mock out commit we need the map to contain the Gapic data client. - // Normally a call to the data client through the datastore object would initialize it. - // We don't want to make this call because it would make a grpc request. - // So we just add the data client to the map. - const gapic = Object.freeze({ - v1: require('../../src/v1'), - }); - datastore.clients_.set(clientName, new gapic.v1[clientName](options)); + const datastore = getInitializedDatastoreClient(); // This function is used for doing assertion checks. // The idea is to check that the right request gets passed to the commit function in the Gapic layer. @@ -68,35 +53,13 @@ describe('Run Query', () => { } } - it('should pass read time into runQuery for transactions', async () => { - // First mock out beginTransaction - const dataClient = datastore.clients_.get(clientName); - const testId = Buffer.from(Array.from(Array(100).keys())); - if (dataClient) { - dataClient.beginTransaction = ( - request: protos.google.datastore.v1.IBeginTransactionRequest, - options: protos.google.datastore.v1.IBeginTransactionResponse, - callback: Callback< - protos.google.datastore.v1.IBeginTransactionResponse, - | protos.google.datastore.v1.IBeginTransactionRequest - | null - | undefined, - {} | null | undefined - > - ) => { - callback(null, { - transaction: testId, - }); - }; - } + it('should pass new transaction into runQuery for transactions', async () => { setRunQueryComparison( (request: protos.google.datastore.v1.IRunQueryRequest) => { assert.deepStrictEqual(request, { readOptions: { - transaction: testId, - readTime: { - seconds: 77, - }, + consistencyType: 'newTransaction', + newTransaction: {}, }, partitionId: { namespaceId: 'namespace', @@ -113,6 +76,6 @@ describe('Run Query', () => { ); const transaction = datastore.transaction(); const query = datastore.createQuery('Task'); - await transaction.runQuery(query, {readTime: 77000}); + await transaction.runQuery(query); }); }); diff --git a/test/gapic_datastore_admin_v1.ts b/test/gapic_datastore_admin_v1.ts index 7d347a60..eb73aadb 100644 --- a/test/gapic_datastore_admin_v1.ts +++ b/test/gapic_datastore_admin_v1.ts @@ -1434,9 +1434,9 @@ describe('v1.DatastoreAdminClient', () => { assert( (client.descriptors.page.listIndexes.createStream as SinonStub) .getCall(0) - .args[2].otherArgs.headers['x-goog-request-params'].includes( - expectedHeaderRequestParams - ) + .args[2].otherArgs.headers[ + 'x-goog-request-params' + ].includes(expectedHeaderRequestParams) ); }); @@ -1485,9 +1485,9 @@ describe('v1.DatastoreAdminClient', () => { assert( (client.descriptors.page.listIndexes.createStream as SinonStub) .getCall(0) - .args[2].otherArgs.headers['x-goog-request-params'].includes( - expectedHeaderRequestParams - ) + .args[2].otherArgs.headers[ + 'x-goog-request-params' + ].includes(expectedHeaderRequestParams) ); }); @@ -1528,9 +1528,9 @@ describe('v1.DatastoreAdminClient', () => { assert( (client.descriptors.page.listIndexes.asyncIterate as SinonStub) .getCall(0) - .args[2].otherArgs.headers['x-goog-request-params'].includes( - expectedHeaderRequestParams - ) + .args[2].otherArgs.headers[ + 'x-goog-request-params' + ].includes(expectedHeaderRequestParams) ); }); @@ -1570,9 +1570,9 @@ describe('v1.DatastoreAdminClient', () => { assert( (client.descriptors.page.listIndexes.asyncIterate as SinonStub) .getCall(0) - .args[2].otherArgs.headers['x-goog-request-params'].includes( - expectedHeaderRequestParams - ) + .args[2].otherArgs.headers[ + 'x-goog-request-params' + ].includes(expectedHeaderRequestParams) ); }); }); diff --git a/test/transaction.ts b/test/transaction.ts index c1346f3d..4d4a1ebd 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -17,6 +17,7 @@ import arrify = require('arrify'); import * as assert from 'assert'; import {afterEach, beforeEach, before, describe, it} from 'mocha'; import * as proxyquire from 'proxyquire'; +import {getTransactionRequest} from '../src/request'; import { Datastore, @@ -902,25 +903,25 @@ async.each( * be passed into the Gapic layer. * @private */ - readonly #expectedRequests?: GapicRequestData[]; + readonly expectedRequests?: GapicRequestData[]; /** * requests are the actual order of the requests that are passed into the * gapic layer * @private */ - readonly #requests: GapicRequestData[] = []; + readonly requests: GapicRequestData[] = []; /** * expectedEventOrder is the order the test expects different events to occur * such as a callback being called, Gapic functions being called or user * code being run. */ - readonly #expectedEventOrder: TransactionEvent[] = []; + readonly expectedEventOrder: TransactionEvent[] = []; /** * eventOrder is the order events actually occur in the test and will be compared with * expectedEventOrder. * @private */ - #eventOrder: TransactionEvent[] = []; + eventOrder: TransactionEvent[] = []; // A transaction wrapper object is used to contain the transaction and mocked Gapic functions. #transactionWrapper: MockedTransactionWrapper; // Stores the mocha done function so that it can be called from this object. @@ -932,17 +933,17 @@ async.each( * happened then this function passes tests if the events happened in the * right order. */ - #checkForCompletion() { - if (this.#eventOrder.length >= this.#expectedEventOrder.length) { + checkForCompletion() { + if (this.eventOrder.length >= this.expectedEventOrder.length) { try { assert.deepStrictEqual( - this.#eventOrder, - this.#expectedEventOrder + this.eventOrder, + this.expectedEventOrder ); - if (this.#expectedRequests) { + if (this.expectedRequests) { assert.deepStrictEqual( - this.#requests, - this.#expectedRequests + this.requests, + this.expectedRequests ); } this.#done(); @@ -961,17 +962,17 @@ async.each( request?: RequestType; }[] ) { - this.#expectedEventOrder = expectedOrder; - this.#expectedRequests = expectedRequests; + this.expectedEventOrder = expectedOrder; + this.expectedRequests = expectedRequests; this.#done = done; transactionWrapper.callBackSignaler = ( call: GapicFunctionName, request?: RequestType ) => { try { - this.#requests.push({call, request}); - this.#eventOrder.push(call); - this.#checkForCompletion(); + this.requests.push({call, request}); + this.eventOrder.push(call); + this.checkForCompletion(); } catch (e) { done(e); } @@ -988,8 +989,8 @@ async.each( push(event: UserCodeEvent) { return () => { try { - this.#eventOrder.push(event); - this.#checkForCompletion(); + this.eventOrder.push(event); + this.checkForCompletion(); } catch (e) { this.#done(e); } @@ -1139,6 +1140,27 @@ async.each( transaction: testRunResp.transaction, }, }; + const lookupTransactionRequestWithNewTransaction = { + keys: [ + { + partitionId: { + namespaceId: 'run-without-mock', + }, + path: [ + { + kind: 'Company', + name: 'Google', + }, + ], + }, + ], + projectId: 'project-id', + readOptions: { + newTransaction: {}, + consistencyType: 'newTransaction', + transaction: testRunResp.transaction, + }, + }; describe('put, commit', () => { const expectedRequests = [ { @@ -1188,25 +1210,25 @@ async.each( }); }); describe('lookup, lookup, put, commit', () => { - const expectedRequests = [ - { - call: GapicFunctionName.BEGIN_TRANSACTION, - request: beginTransactionRequest, - }, - { - call: GapicFunctionName.COMMIT, - request: commitRequest, - }, - { - call: GapicFunctionName.LOOKUP, - request: lookupTransactionRequest, - }, - { - call: GapicFunctionName.LOOKUP, - request: lookupTransactionRequest, - }, - ]; - it('should verify that there is a BeginTransaction call while beginning later', done => { + it('should verify that there is no BeginTransaction call while beginning later', done => { + const expectedRequests = [ + { + call: GapicFunctionName.BEGIN_TRANSACTION, + request: beginTransactionRequest, + }, + { + call: GapicFunctionName.COMMIT, + request: commitRequest, + }, + { + call: GapicFunctionName.LOOKUP, + request: lookupTransactionRequestWithNewTransaction, + }, + { + call: GapicFunctionName.LOOKUP, + request: lookupTransactionRequestWithNewTransaction, + }, + ]; const tester = new TransactionOrderTester( transactionWrapper, done, @@ -1230,6 +1252,24 @@ async.each( transaction.commit(tester.push(UserCodeEvent.COMMIT_CALLBACK)); }); it('should verify that there is a BeginTransaction call while beginning early', done => { + const expectedRequests = [ + { + call: GapicFunctionName.BEGIN_TRANSACTION, + request: beginTransactionRequest, + }, + { + call: GapicFunctionName.COMMIT, + request: commitRequest, + }, + { + call: GapicFunctionName.LOOKUP, + request: lookupTransactionRequest, + }, + { + call: GapicFunctionName.LOOKUP, + request: lookupTransactionRequest, + }, + ]; const tester = new TransactionOrderTester( transactionWrapper, done, @@ -1257,6 +1297,775 @@ async.each( }); }); }); + describe('Testing requests passed into the gapic layer', () => { + let key: entity.Key; + const testCommitResp = { + mutationResults: [ + { + key: { + path: [ + { + kind: 'some-kind', + }, + ], + }, + }, + ], + }; + const runQueryResp = { + batch: { + entityResults: [], + endCursor: { + type: 'Buffer', + data: Buffer.from(Array.from(Array(100).keys())), + }, + }, + transaction: testRunResp.transaction, + }; + const runAggregationQueryResp = { + batch: { + aggregationResults: [ + { + aggregateProperties: { + 'average rating': { + meaning: 0, + excludeFromIndexes: false, + doubleValue: 100, + valueType: 'doubleValue', + }, + }, + }, + ], + moreResults: + google.datastore.v1.QueryResultBatch.MoreResultsType + .NO_MORE_RESULTS, + readTime: {seconds: '1699390681', nanos: 961667000}, + }, + query: null, + transaction: testRunResp.transaction, + }; + const getResp = { + found: [ + { + entity: { + key: { + path: [ + { + kind: 'Post', + name: 'post1', + idType: 'name', + }, + ], + partitionId: { + projectId: 'projectId', + databaseId: 'databaseId', + namespaceId: 'namespaceId', + }, + }, + excludeFromIndexes: false, + properties: {}, + }, + }, + ], + missing: [], + deferred: [], + transaction: testRunResp.transaction, + readTime: { + seconds: '1699470605', + nanos: 201398000, + }, + }; + afterEach(() => { + transactionWrapper.resetBeginTransaction(); + transactionWrapper.resetGapicFunctions(); + }); + beforeEach(async () => { + transactionWrapper = new MockedTransactionWrapper(); + key = transactionWrapper.datastore.key(['Company', 'Google']); + transactionWrapper.mockGapicFunction( + GapicFunctionName.RUN_AGGREGATION_QUERY, + runAggregationQueryResp, + null + ); + transactionWrapper.mockGapicFunction( + GapicFunctionName.LOOKUP, + getResp, + null + ); + transactionWrapper.mockGapicFunction( + GapicFunctionName.RUN_QUERY, + runQueryResp, + null + ); + transactionWrapper.mockGapicFunction( + GapicFunctionName.COMMIT, + testCommitResp, + null + ); + }); + describe('lookup, lookup, put, commit', () => { + it('without using transaction.run', done => { + let lookupCallCount = 0; + // This gets called when the program reaches the gapic layer. + // It ensures the data that reaches the gapic layer is correct. + transactionWrapper.callBackSignaler = ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => { + try { + switch (callbackReached) { + case GapicFunctionName.BEGIN_TRANSACTION: + throw Error( + 'BeginTransaction should not have been called' + ); + case GapicFunctionName.LOOKUP: { + const lookupRequest = + request as protos.google.datastore.v1.ILookupRequest; + switch (lookupCallCount) { + case 0: + assert.deepStrictEqual(lookupRequest.readOptions, { + newTransaction: {}, + consistencyType: 'newTransaction', + }); + break; + case 1: + assert.deepStrictEqual(lookupRequest.readOptions, { + transaction: testRunResp.transaction, + }); + break; + default: + throw Error('Lookup was called too many times'); + } + lookupCallCount++; + break; + } + case GapicFunctionName.COMMIT: { + const commitRequest = + request as protos.google.datastore.v1.ICommitRequest; + assert.deepStrictEqual( + commitRequest.mode, + 'TRANSACTIONAL' + ); + assert.deepStrictEqual( + commitRequest.transaction, + testRunResp.transaction + ); + done(); + break; + } + default: + throw Error( + 'A gapic function was called that should not have been called' + ); + } + } catch (err: any) { + done(err); + } + }; + (async () => { + try { + transaction = transactionWrapper.transaction; + await transaction.get(key); + await transaction.get(key); + transaction.save({key, data: ''}); + await transaction.commit(); + } catch (err: any) { + done(err); + } + })(); + }); + it('with using transaction.run', done => { + let beginCount = 0; + // This gets called when the program reaches the gapic layer. + // It ensures the data that reaches the gapic layer is correct. + transactionWrapper.callBackSignaler = ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => { + try { + switch (callbackReached) { + case GapicFunctionName.BEGIN_TRANSACTION: + assert.deepStrictEqual(request, { + projectId: 'project-id', + transactionOptions: {}, + }); + beginCount++; + break; + case GapicFunctionName.LOOKUP: { + const lookupRequest = + request as protos.google.datastore.v1.ILookupRequest; + assert.deepStrictEqual(lookupRequest.readOptions, { + transaction: testRunResp.transaction, + }); + break; + } + case GapicFunctionName.COMMIT: { + const commitRequest = + request as protos.google.datastore.v1.ICommitRequest; + assert.deepStrictEqual( + commitRequest.mode, + 'TRANSACTIONAL' + ); + assert.deepStrictEqual( + commitRequest.transaction, + testRunResp.transaction + ); + assert.strictEqual(beginCount, 1); + done(); + break; + } + default: + throw Error( + 'A gapic function was called that should not have been called' + ); + } + } catch (err: any) { + done(err); + } + }; + (async () => { + try { + transaction = transactionWrapper.transaction; + await transaction.run(); + await transaction.get(key); + await transaction.get(key); + transaction.save({key, data: ''}); + await transaction.commit(); + } catch (err: any) { + done(err); + } + })(); + }); + }); + describe('runQuery, lookup, put, commit', () => { + it('without using transaction.run', done => { + // This gets called when the program reaches the gapic layer. + // It ensures the data that reaches the gapic layer is correct. + transactionWrapper.callBackSignaler = ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => { + try { + switch (callbackReached) { + case GapicFunctionName.BEGIN_TRANSACTION: + throw Error( + 'BeginTransaction should not have been called' + ); + case GapicFunctionName.LOOKUP: { + const lookupRequest = + request as protos.google.datastore.v1.ILookupRequest; + assert.deepStrictEqual(lookupRequest.readOptions, { + transaction: testRunResp.transaction, + }); + break; + } + case GapicFunctionName.RUN_QUERY: { + const runQueryRequest = + request as protos.google.datastore.v1.IRunQueryRequest; + assert.deepStrictEqual(runQueryRequest.readOptions, { + newTransaction: {}, + consistencyType: 'newTransaction', + }); + break; + } + case GapicFunctionName.COMMIT: { + const commitRequest = + request as protos.google.datastore.v1.ICommitRequest; + assert.deepStrictEqual( + commitRequest.mode, + 'TRANSACTIONAL' + ); + assert.deepStrictEqual( + commitRequest.transaction, + testRunResp.transaction + ); + done(); + break; + } + default: + throw Error( + 'A gapic function was called that should not have been called' + ); + } + } catch (err: any) { + done(err); + } + }; + (async () => { + try { + transaction = transactionWrapper.transaction; + const query = + transactionWrapper.datastore.createQuery('Task'); + await transaction.runQuery(query); + await transaction.get(key); + transaction.save({key, data: ''}); + await transaction.commit(); + } catch (err: any) { + done(err); + } + })(); + }); + it('with using transaction.run', done => { + let beginCount = 0; + // This gets called when the program reaches the gapic layer. + // It ensures the data that reaches the gapic layer is correct. + transactionWrapper.callBackSignaler = ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => { + try { + switch (callbackReached) { + case GapicFunctionName.BEGIN_TRANSACTION: + assert.deepStrictEqual(request, { + projectId: 'project-id', + transactionOptions: {}, + }); + beginCount++; + break; + case GapicFunctionName.LOOKUP: { + const lookupRequest = + request as protos.google.datastore.v1.ILookupRequest; + assert.deepStrictEqual(lookupRequest.readOptions, { + transaction: testRunResp.transaction, + }); + break; + } + case GapicFunctionName.RUN_QUERY: { + const runQueryRequest = + request as protos.google.datastore.v1.IRunQueryRequest; + assert.deepStrictEqual(runQueryRequest.readOptions, { + transaction: testRunResp.transaction, + }); + break; + } + case GapicFunctionName.COMMIT: { + const commitRequest = + request as protos.google.datastore.v1.ICommitRequest; + assert.deepStrictEqual( + commitRequest.mode, + 'TRANSACTIONAL' + ); + assert.deepStrictEqual( + commitRequest.transaction, + testRunResp.transaction + ); + assert.strictEqual(beginCount, 1); + done(); + break; + } + default: + throw Error( + 'A gapic function was called that should not have been called' + ); + } + } catch (err: any) { + done(err); + } + }; + (async () => { + try { + transaction = transactionWrapper.transaction; + await transaction.run(); + const query = + transactionWrapper.datastore.createQuery('Task'); + await transaction.runQuery(query); + await transaction.get(key); + transaction.save({key, data: ''}); + await transaction.commit(); + } catch (err: any) { + done(err); + } + })(); + }); + }); + describe('runAggregationQuery, lookup, put, commit', () => { + it('without using transaction.run', done => { + // This gets called when the program reaches the gapic layer. + // It ensures the data that reaches the gapic layer is correct. + transactionWrapper.callBackSignaler = ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => { + try { + switch (callbackReached) { + case GapicFunctionName.BEGIN_TRANSACTION: + throw Error( + 'BeginTransaction should not have been called' + ); + case GapicFunctionName.LOOKUP: { + const lookupRequest = + request as protos.google.datastore.v1.ILookupRequest; + assert.deepStrictEqual(lookupRequest.readOptions, { + transaction: testRunResp.transaction, + }); + break; + } + case GapicFunctionName.RUN_AGGREGATION_QUERY: { + const runAggregationQueryRequest = + request as protos.google.datastore.v1.IRunAggregationQueryRequest; + assert.deepStrictEqual( + runAggregationQueryRequest.readOptions, + { + newTransaction: {}, + consistencyType: 'newTransaction', + } + ); + break; + } + case GapicFunctionName.COMMIT: { + const commitRequest = + request as protos.google.datastore.v1.ICommitRequest; + assert.deepStrictEqual( + commitRequest.mode, + 'TRANSACTIONAL' + ); + assert.deepStrictEqual( + commitRequest.transaction, + testRunResp.transaction + ); + done(); + break; + } + default: + throw Error( + 'A gapic function was called that should not have been called' + ); + } + } catch (err: any) { + done(err); + } + }; + (async () => { + try { + transaction = transactionWrapper.transaction; + const query = + transactionWrapper.datastore.createQuery('Task'); + const aggregate = transactionWrapper.datastore + .createAggregationQuery(query) + .addAggregation(AggregateField.average('appearances')); + await transaction.runAggregationQuery(aggregate); + await transaction.get(key); + transaction.save({key, data: ''}); + await transaction.commit(); + } catch (err: any) { + done(err); + } + })(); + }); + it('with using transaction.run', done => { + let beginCount = 0; + // This gets called when the program reaches the gapic layer. + // It ensures the data that reaches the gapic layer is correct. + transactionWrapper.callBackSignaler = ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => { + try { + switch (callbackReached) { + case GapicFunctionName.BEGIN_TRANSACTION: + beginCount++; + assert.deepStrictEqual(request, { + projectId: 'project-id', + transactionOptions: {}, + }); + break; + case GapicFunctionName.LOOKUP: { + const lookupRequest = + request as protos.google.datastore.v1.ILookupRequest; + assert.deepStrictEqual(lookupRequest.readOptions, { + transaction: testRunResp.transaction, + }); + break; + } + case GapicFunctionName.RUN_AGGREGATION_QUERY: { + const runAggregationQueryRequest = + request as protos.google.datastore.v1.IRunAggregationQueryRequest; + assert.deepStrictEqual( + runAggregationQueryRequest.readOptions, + { + transaction: testRunResp.transaction, + } + ); + break; + } + case GapicFunctionName.COMMIT: { + const commitRequest = + request as protos.google.datastore.v1.ICommitRequest; + assert.deepStrictEqual( + commitRequest.mode, + 'TRANSACTIONAL' + ); + assert.deepStrictEqual( + commitRequest.transaction, + testRunResp.transaction + ); + assert.strictEqual(beginCount, 1); + done(); + break; + } + default: + throw Error( + 'A gapic function was called that should not have been called' + ); + } + } catch (err: any) { + done(err); + } + }; + (async () => { + try { + transaction = transactionWrapper.transaction; + await transaction.run(); + const query = + transactionWrapper.datastore.createQuery('Task'); + const aggregate = transactionWrapper.datastore + .createAggregationQuery(query) + .addAggregation(AggregateField.average('appearances')); + await transaction.runAggregationQuery(aggregate); + await transaction.get(key); + transaction.save({key, data: ''}); + await transaction.commit(); + } catch (err: any) { + done(err); + } + })(); + }); + }); + describe('put, put, lookup, commit', () => { + it('without using transaction.run', done => { + // This gets called when the program reaches the gapic layer. + // It ensures the data that reaches the gapic layer is correct. + transactionWrapper.callBackSignaler = ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => { + try { + switch (callbackReached) { + case GapicFunctionName.BEGIN_TRANSACTION: + throw Error( + 'BeginTransaction should not have been called' + ); + case GapicFunctionName.LOOKUP: { + const lookupRequest = + request as protos.google.datastore.v1.ILookupRequest; + assert.deepStrictEqual(lookupRequest.readOptions, { + newTransaction: {}, + consistencyType: 'newTransaction', + }); + break; + } + case GapicFunctionName.COMMIT: { + const commitRequest = + request as protos.google.datastore.v1.ICommitRequest; + assert.deepStrictEqual( + commitRequest.mode, + 'TRANSACTIONAL' + ); + assert.deepStrictEqual( + commitRequest.transaction, + testRunResp.transaction + ); + done(); + break; + } + default: + throw Error( + 'A gapic function was called that should not have been called' + ); + } + } catch (err: any) { + done(err); + } + }; + (async () => { + try { + transaction = transactionWrapper.transaction; + transaction.save({key, data: ''}); + transaction.save({key, data: 'more-data'}); + await transaction.get(key); + await transaction.commit(); + } catch (err: any) { + done(err); + } + })(); + }); + it('with using transaction.run', done => { + let beginCount = 0; + // This gets called when the program reaches the gapic layer. + // It ensures the data that reaches the gapic layer is correct. + transactionWrapper.callBackSignaler = ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => { + try { + switch (callbackReached) { + case GapicFunctionName.BEGIN_TRANSACTION: + assert.deepStrictEqual(request, { + projectId: 'project-id', + transactionOptions: {}, + }); + beginCount++; + break; + case GapicFunctionName.LOOKUP: { + const lookupRequest = + request as protos.google.datastore.v1.ILookupRequest; + assert.deepStrictEqual(lookupRequest.readOptions, { + transaction: testRunResp.transaction, + }); + break; + } + case GapicFunctionName.COMMIT: { + const commitRequest = + request as protos.google.datastore.v1.ICommitRequest; + assert.deepStrictEqual( + commitRequest.mode, + 'TRANSACTIONAL' + ); + assert.deepStrictEqual( + commitRequest.transaction, + testRunResp.transaction + ); + assert.strictEqual(beginCount, 1); + done(); + break; + } + default: + throw Error( + 'A gapic function was called that should not have been called' + ); + } + } catch (err: any) { + done(err); + } + }; + (async () => { + try { + transaction = transactionWrapper.transaction; + await transaction.run(); + transaction.save({key, data: ''}); + transaction.save({key, data: 'more-data'}); + await transaction.get(key); + await transaction.commit(); + } catch (err: any) { + done(err); + } + })(); + }); + }); + describe('put, commit', () => { + it('without using transaction.run', done => { + let beginCount = 0; + // This gets called when the program reaches the gapic layer. + // It ensures the data that reaches the gapic layer is correct. + transactionWrapper.callBackSignaler = ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => { + try { + switch (callbackReached) { + case GapicFunctionName.BEGIN_TRANSACTION: + assert.deepStrictEqual(request, { + projectId: 'project-id', + transactionOptions: {}, + }); + beginCount++; + break; + case GapicFunctionName.COMMIT: { + const commitRequest = + request as protos.google.datastore.v1.ICommitRequest; + assert.deepStrictEqual( + commitRequest.mode, + 'TRANSACTIONAL' + ); + assert.deepStrictEqual( + commitRequest.transaction, + testRunResp.transaction + ); + assert.strictEqual(beginCount, 1); + done(); + break; + } + default: + throw Error( + 'A gapic function was called that should not have been called' + ); + } + } catch (err: any) { + done(err); + } + }; + (async () => { + try { + transaction = transactionWrapper.transaction; + transaction.save({key, data: ''}); + await transaction.commit(); + } catch (err: any) { + done(err); + } + })(); + }); + it('with using transaction.run', done => { + // This gets called when the program reaches the gapic layer. + // It ensures the data that reaches the gapic layer is correct. + let beginCount = 0; + transactionWrapper.callBackSignaler = ( + callbackReached: GapicFunctionName, + request?: RequestType + ) => { + try { + switch (callbackReached) { + case GapicFunctionName.BEGIN_TRANSACTION: + assert.deepStrictEqual(request, { + projectId: 'project-id', + transactionOptions: {}, + }); + beginCount++; + break; + case GapicFunctionName.LOOKUP: { + const lookupRequest = + request as protos.google.datastore.v1.ILookupRequest; + assert.deepStrictEqual(lookupRequest.readOptions, { + transaction: testRunResp.transaction, + }); + break; + } + case GapicFunctionName.COMMIT: { + const commitRequest = + request as protos.google.datastore.v1.ICommitRequest; + assert.deepStrictEqual( + commitRequest.mode, + 'TRANSACTIONAL' + ); + assert.deepStrictEqual( + commitRequest.transaction, + testRunResp.transaction + ); + assert.strictEqual(beginCount, 1); + done(); + break; + } + default: + throw Error( + 'A gapic function was called that should not have been called' + ); + } + } catch (err: any) { + done(err); + } + }; + (async () => { + try { + transaction = transactionWrapper.transaction; + await transaction.run(); + transaction.save({key, data: ''}); + await transaction.commit(); + } catch (err: any) { + done(err); + } + })(); + }); + }); + }); }); describe('run without setting up transaction id', () => { @@ -1682,8 +2491,18 @@ async.each( }); describe('rollback', () => { - beforeEach(() => { - transaction.id = TRANSACTION_ID; + beforeEach(done => { + // The transaction state needs to be set to IN_PROGRESS in order for + // the rollback function to reach request_. + transaction.request_ = ( + config: RequestConfig, + callback: RequestCallback + ) => { + callback(null, {transaction: Buffer.from(TRANSACTION_ID)}); + }; + transaction.run(() => { + done(); + }); }); it('should rollback', done => { @@ -1854,13 +2673,18 @@ async.each( }, }, } as {} as TransactionOptions; + const inputOptions = { + transactionOptions: { + id: 'transaction-id', + }, + }; transaction.request_ = (config: RequestConfig) => { assert.deepStrictEqual(config.reqOpts, options); done(); }; - transaction.run(options, assert.ifError); + transaction.run(inputOptions, assert.ifError); }); }); @@ -2033,3 +2857,87 @@ async.each( }); } ); + +describe('getTransactionRequest', () => { + const datastore = new Datastore(); + + it('should return an empty object if no options are provided', () => { + const transaction = new Transaction(datastore); + const options = {}; + const result = getTransactionRequest(transaction, options); + assert.deepStrictEqual(result, {}); + }); + + it('should return a readOnly object if readOnly is true', () => { + const transaction = new Transaction(datastore); + const options = {readOnly: true}; + const result = getTransactionRequest(transaction, options); + assert.deepStrictEqual(result, {readOnly: {}}); + }); + + it('should return a readWrite object with previousTransaction if transactionId is provided', () => { + const transaction = new Transaction(datastore); + const options = {transactionId: 'transaction-id'}; + const result = getTransactionRequest(transaction, options); + assert.deepStrictEqual(result, { + readWrite: {previousTransaction: 'transaction-id'}, + }); + }); + + it('should return a readWrite object with previousTransaction if transaction.id is provided', () => { + const transaction = new Transaction(datastore, {id: 'transaction-id'}); + const options = {}; + const result = getTransactionRequest(transaction, options); + assert.deepStrictEqual(result, { + readWrite: {previousTransaction: 'transaction-id'}, + }); + }); + + it('should return a readOnly object if transactionOptions.readOnly is true', () => { + const transaction = new Transaction(datastore); + const options = {transactionOptions: {readOnly: true}}; + const result = getTransactionRequest(transaction, options); + assert.deepStrictEqual(result, {readOnly: {}}); + }); + + it('should return a readWrite object with previousTransaction if transactionOptions.id is provided', () => { + const transaction = new Transaction(datastore); + const options = {transactionOptions: {id: 'transaction-id'}}; + const result = getTransactionRequest(transaction, options); + assert.deepStrictEqual(result, { + readWrite: {previousTransaction: 'transaction-id'}, + }); + }); + + it('should prioritize transactionOptions over other options', () => { + const transaction = new Transaction(datastore); + const options = { + readOnly: false, + transactionId: 'transaction-id-1', + transactionOptions: { + readOnly: true, + id: 'transaction-id-2', + }, + }; + const result = getTransactionRequest(transaction, options); + assert.deepStrictEqual(result, { + readOnly: {}, + }); + }); + + it('should return a readOnly object if transaction is constructed with readOnly: true', () => { + const transaction = new Transaction(datastore, {readOnly: true}); + const options = {}; + const result = getTransactionRequest(transaction, options); + assert.deepStrictEqual(result, {readOnly: {}}); + }); + + it('should return a readWrite object with previousTransaction if transaction is constructed with id', () => { + const transaction = new Transaction(datastore, {id: 'transaction-id'}); + const options = {}; + const result = getTransactionRequest(transaction, options); + assert.deepStrictEqual(result, { + readWrite: {previousTransaction: 'transaction-id'}, + }); + }); +});