-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: new transaction feature #1239
Changes from 63 commits
f9bb82e
e6c1265
974eee2
9cf180a
a58d4ec
bf110eb
f1cb78d
8a420c3
6ca70ac
46121e0
8b5a311
5a91c4e
38d76ff
6299bf3
e36b158
28a89ed
fa4f200
6c976e1
920f282
7f9c2c9
f762bb1
4375378
599bec1
dc23b45
dac429a
637ec5d
4a5fdae
e0ad5e8
06661c4
30f5b79
a3872bd
3b03bf7
5891af8
5fd45e7
4aedb17
38bd702
d8821d9
5eed19f
ecb9737
c08df99
316c651
974117b
5edf87a
9f3b5ca
602280e
82f3b03
57e7575
ff7d200
d43f02f
1c74b6b
a1a5ba6
52f4199
13dcb58
8091062
4a74f4d
d52c773
66b0286
2b93852
22e7b9d
a54b6c6
418c43b
2edd410
5c1dee5
7bf7a90
2022006
f94c37e
03b7cd3
b16afbf
9b05013
f10d1ef
7e453e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,33 +15,34 @@ | |
*/ | ||
|
||
import {promisifyAll} from '@google-cloud/promisify'; | ||
import arrify = require('arrify'); | ||
import {CallOptions} from 'google-gax'; | ||
|
||
import {google} from '../protos/protos'; | ||
|
||
import {Datastore, TransactionOptions} from '.'; | ||
import {entity, Entity, Entities} from './entity'; | ||
import {Entities, Entity, entity} from './entity'; | ||
import { | ||
Query, | ||
RunQueryCallback, | ||
RunQueryInfo, | ||
RunQueryOptions, | ||
RunQueryResponse, | ||
} from './query'; | ||
import { | ||
CommitCallback, | ||
CommitResponse, | ||
DatastoreRequest, | ||
RequestOptions, | ||
PrepareEntityObjectResponse, | ||
CreateReadStreamOptions, | ||
GetResponse, | ||
DatastoreRequest, | ||
GetCallback, | ||
GetResponse, | ||
getTransactionRequest, | ||
PrepareEntityObjectResponse, | ||
RequestCallback, | ||
transactionExpiredError, | ||
TransactionState, | ||
} from './request'; | ||
import {AggregateQuery} from './aggregate'; | ||
import {Mutex} from 'async-mutex'; | ||
import arrify = require('arrify'); | ||
|
||
/* | ||
* This type matches the value returned by the promise in the | ||
|
@@ -53,11 +54,6 @@ interface BeginAsyncResponse { | |
resp?: google.datastore.v1.IBeginTransactionResponse; | ||
} | ||
|
||
enum TransactionState { | ||
NOT_STARTED, | ||
IN_PROGRESS, // IN_PROGRESS currently tracks the expired state as well | ||
} | ||
|
||
/** | ||
* A transaction is a set of Datastore operations on one or more entities. Each | ||
* transaction is guaranteed to be atomic, which means that transactions are | ||
|
@@ -85,7 +81,6 @@ class Transaction extends DatastoreRequest { | |
modifiedEntities_: ModifiedEntities; | ||
skipCommit?: boolean; | ||
#mutex = new Mutex(); | ||
#state = TransactionState.NOT_STARTED; | ||
constructor(datastore: Datastore, options?: TransactionOptions) { | ||
super(); | ||
/** | ||
|
@@ -115,6 +110,7 @@ class Transaction extends DatastoreRequest { | |
|
||
// Queue the requests to make when we send the transactional commit. | ||
this.requests_ = []; | ||
this.state = TransactionState.NOT_STARTED; | ||
} | ||
|
||
/*! Developer Documentation | ||
|
@@ -177,6 +173,10 @@ class Transaction extends DatastoreRequest { | |
: () => {}; | ||
const gaxOptions = | ||
typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; | ||
if (this.state === TransactionState.EXPIRED) { | ||
callback(new Error(transactionExpiredError)); | ||
return; | ||
} | ||
// This ensures that the transaction is started before calling runCommit | ||
this.#withBeginTransaction( | ||
gaxOptions, | ||
|
@@ -355,13 +355,9 @@ class Transaction extends DatastoreRequest { | |
const callback = | ||
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; | ||
// This ensures that the transaction is started before calling get | ||
this.#withBeginTransaction( | ||
options.gaxOptions, | ||
() => { | ||
super.get(keys, options, callback); | ||
}, | ||
callback | ||
); | ||
this.#blockWithMutex(() => { | ||
super.get(keys, options, callback); | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -434,6 +430,14 @@ class Transaction extends DatastoreRequest { | |
const callback = | ||
typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; | ||
|
||
if (this.state === TransactionState.EXPIRED) { | ||
callback(new Error(transactionExpiredError)); | ||
return; | ||
} | ||
if (this.state === TransactionState.NOT_STARTED) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just wondering what are the allowed states here? NOT_TRANSACTION and IN_PROGRESS? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NOT_TRANSACTION, NOT_STARTED, IN_PROGRESS, EXPIRED. |
||
callback(new Error('Transaction is not started')); | ||
return; | ||
} | ||
this.request_( | ||
{ | ||
client: 'DatastoreClient', | ||
|
@@ -442,6 +446,7 @@ class Transaction extends DatastoreRequest { | |
}, | ||
(err, resp) => { | ||
this.skipCommit = true; | ||
this.state = TransactionState.EXPIRED; | ||
callback(err || null, resp); | ||
} | ||
); | ||
|
@@ -511,7 +516,7 @@ class Transaction extends DatastoreRequest { | |
const callback = | ||
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; | ||
this.#mutex.runExclusive(async () => { | ||
if (this.#state === TransactionState.NOT_STARTED) { | ||
if (this.state === TransactionState.NOT_STARTED) { | ||
const runResults = await this.#beginTransactionAsync(options); | ||
this.#processBeginResults(runResults, callback); | ||
} else { | ||
|
@@ -635,6 +640,7 @@ class Transaction extends DatastoreRequest { | |
return; | ||
} | ||
|
||
this.state = TransactionState.EXPIRED; | ||
// The `callbacks` array was built previously. These are the callbacks | ||
// that handle the API response normally when using the | ||
// DatastoreRequest.save and .delete methods. | ||
|
@@ -666,24 +672,11 @@ class Transaction extends DatastoreRequest { | |
if (err) { | ||
callback(err, null, resp); | ||
} else { | ||
this.#parseRunSuccess(runResults); | ||
this.parseTransactionResponse(resp); | ||
callback(null, this, resp); | ||
} | ||
} | ||
|
||
/** | ||
* This function saves results from a successful beginTransaction call. | ||
* | ||
* @param {BeginAsyncResponse} [response] The response from a call to | ||
* begin a transaction that completed successfully. | ||
* | ||
**/ | ||
#parseRunSuccess(runResults: BeginAsyncResponse) { | ||
const resp = runResults.resp; | ||
this.id = resp!.transaction; | ||
this.#state = TransactionState.IN_PROGRESS; | ||
} | ||
|
||
/** | ||
* This async function makes a beginTransaction call and returns a promise with | ||
* the information returned from the call that was made. | ||
|
@@ -696,24 +689,10 @@ class Transaction extends DatastoreRequest { | |
async #beginTransactionAsync( | ||
options: RunOptions | ||
): Promise<BeginAsyncResponse> { | ||
const reqOpts: RequestOptions = { | ||
transactionOptions: {}, | ||
}; | ||
|
||
if (options.readOnly || this.readOnly) { | ||
reqOpts.transactionOptions!.readOnly = {}; | ||
} | ||
|
||
if (options.transactionId || this.id) { | ||
reqOpts.transactionOptions!.readWrite = { | ||
previousTransaction: options.transactionId || this.id, | ||
}; | ||
} | ||
|
||
if (options.transactionOptions) { | ||
reqOpts.transactionOptions = options.transactionOptions; | ||
} | ||
return new Promise((resolve: (value: BeginAsyncResponse) => void) => { | ||
const reqOpts = { | ||
transactionOptions: getTransactionRequest(this, options), | ||
}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
this.request_( | ||
{ | ||
client: 'DatastoreClient', | ||
|
@@ -766,13 +745,9 @@ class Transaction extends DatastoreRequest { | |
const callback = | ||
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; | ||
// This ensures that the transaction is started before calling runAggregationQuery | ||
this.#withBeginTransaction( | ||
options.gaxOptions, | ||
() => { | ||
super.runAggregationQuery(query, options, callback); | ||
}, | ||
callback | ||
); | ||
this.#blockWithMutex(() => { | ||
super.runAggregationQuery(query, options, callback); | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -805,13 +780,9 @@ class Transaction extends DatastoreRequest { | |
const callback = | ||
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; | ||
// This ensures that the transaction is started before calling runQuery | ||
this.#withBeginTransaction( | ||
options.gaxOptions, | ||
() => { | ||
super.runQuery(query, options, callback); | ||
}, | ||
callback | ||
); | ||
this.#blockWithMutex(() => { | ||
super.runQuery(query, options, callback); | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -1019,7 +990,7 @@ class Transaction extends DatastoreRequest { | |
* @param {CallOptions | undefined} [gaxOptions] Gax options provided by the | ||
* user that are used for the beginTransaction grpc call. | ||
* @param {function} [fn] A function which is run after ensuring a | ||
* beginTransaction call is made. | ||
* transaction has begun. | ||
* @param {function} [callback] A callback provided by the user that expects | ||
* an error in the first argument and a custom data type for the rest of the | ||
* arguments. | ||
|
@@ -1031,10 +1002,10 @@ class Transaction extends DatastoreRequest { | |
callback: (...args: [Error | null, ...T] | [Error | null]) => void | ||
): void { | ||
(async () => { | ||
if (this.#state === TransactionState.NOT_STARTED) { | ||
if (this.state === TransactionState.NOT_STARTED) { | ||
try { | ||
await this.#mutex.runExclusive(async () => { | ||
if (this.#state === TransactionState.NOT_STARTED) { | ||
if (this.state === TransactionState.NOT_STARTED) { | ||
// This sends an rpc call to get the transaction id | ||
const runResults = await this.#beginTransactionAsync({ | ||
gaxOptions, | ||
|
@@ -1044,7 +1015,7 @@ class Transaction extends DatastoreRequest { | |
// Do not call the wrapped function. | ||
throw runResults.err; | ||
} | ||
this.#parseRunSuccess(runResults); | ||
this.parseTransactionResponse(runResults.resp); | ||
// The rpc saving the transaction id was successful. | ||
// Now the wrapped function fn will be called. | ||
} | ||
|
@@ -1057,6 +1028,31 @@ class Transaction extends DatastoreRequest { | |
return fn(); | ||
})(); | ||
} | ||
|
||
/* | ||
* Some rpc calls require that the transaction has been started (i.e, has a | ||
* valid id) before they can be sent. #withBeginTransaction acts as a wrapper | ||
* over those functions. | ||
* | ||
* If the transaction has not begun yet, `#blockWithMutex` will call the | ||
* wrapped function which will begin the transaction in the rpc call it sends. | ||
* If the transaction has begun, the wrapped function will be called, but it | ||
* will not begin a transaction. | ||
* | ||
* @param {function} [fn] A function which is run after ensuring a | ||
* transaction has begun. | ||
*/ | ||
#blockWithMutex(fn: () => void) { | ||
(async () => { | ||
if (this.state === TransactionState.NOT_STARTED) { | ||
await this.#mutex.runExclusive(async () => { | ||
fn(); | ||
}); | ||
} else { | ||
fn(); | ||
} | ||
})(); | ||
} | ||
} | ||
|
||
export type ModifiedEntities = Array<{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to the super class because it is now needed there.