-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: new transaction feature #1239
Changes from 58 commits
f9bb82e
e6c1265
974eee2
9cf180a
a58d4ec
bf110eb
f1cb78d
8a420c3
6ca70ac
46121e0
8b5a311
5a91c4e
38d76ff
6299bf3
e36b158
28a89ed
fa4f200
6c976e1
920f282
7f9c2c9
f762bb1
4375378
599bec1
dc23b45
dac429a
637ec5d
4a5fdae
e0ad5e8
06661c4
30f5b79
a3872bd
3b03bf7
5891af8
5fd45e7
4aedb17
38bd702
d8821d9
5eed19f
ecb9737
c08df99
316c651
974117b
5edf87a
9f3b5ca
602280e
82f3b03
57e7575
ff7d200
d43f02f
1c74b6b
a1a5ba6
52f4199
13dcb58
8091062
4a74f4d
d52c773
66b0286
2b93852
22e7b9d
a54b6c6
418c43b
2edd410
5c1dee5
7bf7a90
2022006
f94c37e
03b7cd3
b16afbf
9b05013
f10d1ef
7e453e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,11 +27,17 @@ | |
|
||
// eslint-disable-next-line @typescript-eslint/no-var-requires | ||
const streamEvents = require('stream-events'); | ||
export const transactionExpiredError = 'This transaction has already expired.'; | ||
|
||
export interface AbortableDuplex extends Duplex { | ||
abort(): void; | ||
} | ||
|
||
interface TransactionRequestOptions { | ||
readOnly?: {}; | ||
readWrite?: {previousTransaction?: string | Uint8Array | null}; | ||
} | ||
|
||
// Import the clients for each version supported by this package. | ||
const gapic = Object.freeze({ | ||
v1: require('./v1'), | ||
|
@@ -55,9 +61,10 @@ | |
RunQueryResponse, | ||
RunQueryCallback, | ||
} from './query'; | ||
import {Datastore} from '.'; | ||
import {Datastore, Transaction} from '.'; | ||
import ITimestamp = google.protobuf.ITimestamp; | ||
import {AggregateQuery} from './aggregate'; | ||
import {RunOptions} from './transaction'; | ||
import * as protos from '../protos/protos'; | ||
import {serializer} from 'google-gax'; | ||
import * as gax from 'google-gax'; | ||
|
@@ -153,6 +160,16 @@ | |
return {}; | ||
} | ||
|
||
const readTimeAndConsistencyError = | ||
'Read time and read consistency cannot both be specified.'; | ||
|
||
// Write function to check for readTime and readConsistency. | ||
function throwOnReadTimeAndConsistency(options: RunQueryStreamOptions) { | ||
if (options.readTime && options.consistency) { | ||
throw new Error(readTimeAndConsistencyError); | ||
} | ||
} | ||
|
||
/** | ||
* A map of read consistency values to proto codes. | ||
* | ||
|
@@ -164,6 +181,19 @@ | |
strong: 1, | ||
}; | ||
|
||
/** | ||
* By default a DatastoreRequest is in the NOT_TRANSACTION state. If the | ||
* DatastoreRequest is a Transaction object, then initially it will be in | ||
* the NOT_STARTED state, but then the state will become IN_PROGRESS after the | ||
* transaction has started. | ||
*/ | ||
export enum TransactionState { | ||
NOT_TRANSACTION, | ||
NOT_STARTED, | ||
IN_PROGRESS, | ||
EXPIRED, | ||
} | ||
|
||
/** | ||
* Handle logic for Datastore API operations. Handles request logic for | ||
* Datastore. | ||
|
@@ -184,6 +214,7 @@ | |
| Array<(err: Error | null, resp: Entity | null) => void> | ||
| Entity; | ||
datastore!: Datastore; | ||
protected state: TransactionState = TransactionState.NOT_TRANSACTION; | ||
[key: string]: Entity; | ||
|
||
/** | ||
|
@@ -335,6 +366,15 @@ | |
); | ||
} | ||
|
||
/* This throws an error if the transaction has already expired. | ||
* | ||
*/ | ||
protected checkExpired() { | ||
if (this.state === TransactionState.EXPIRED) { | ||
throw Error(transactionExpiredError); | ||
} | ||
} | ||
|
||
/** | ||
* Retrieve the entities as a readable object stream. | ||
* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: update L381 with new added errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Done. |
||
|
@@ -365,12 +405,19 @@ | |
keys: Entities, | ||
options: CreateReadStreamOptions = {} | ||
): Transform { | ||
this.checkExpired(); | ||
keys = arrify(keys).map(entity.keyToKeyProto); | ||
if (keys.length === 0) { | ||
throw new Error('At least one Key object is required.'); | ||
} | ||
|
||
const makeRequest = (keys: entity.Key[] | KeyProto[]) => { | ||
try { | ||
throwOnReadTimeAndConsistency(options); | ||
} catch (error: any) { | ||
stream.destroy(error); | ||
return; | ||
} | ||
const reqOpts = this.getRequestOptions(options); | ||
Object.assign(reqOpts, {keys}); | ||
this.request_( | ||
|
@@ -381,6 +428,7 @@ | |
gaxOpts: options.gaxOptions, | ||
}, | ||
(err, resp) => { | ||
this.parseTransactionResponse(resp); | ||
if (err) { | ||
stream.destroy(err); | ||
return; | ||
|
@@ -631,6 +679,10 @@ | |
const callback = | ||
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; | ||
|
||
if (this.state === TransactionState.EXPIRED) { | ||
callback(new Error(transactionExpiredError)); | ||
return; | ||
} | ||
this.createReadStream(keys, options) | ||
.on('error', callback) | ||
.pipe( | ||
|
@@ -641,6 +693,22 @@ | |
); | ||
} | ||
|
||
/** | ||
* This function saves results from a successful beginTransaction call. | ||
* | ||
* @param {BeginAsyncResponse} [response] The response from a call to | ||
* begin a transaction that completed successfully. | ||
* | ||
**/ | ||
protected parseTransactionResponse(resp?: { | ||
transaction?: Uint8Array | string | undefined | null; | ||
}): void { | ||
if (resp && resp.transaction && Buffer.byteLength(resp.transaction) > 0) { | ||
this.id = resp!.transaction; | ||
this.state = TransactionState.IN_PROGRESS; | ||
} | ||
} | ||
|
||
/** | ||
* Datastore allows you to run aggregate queries by supplying aggregate fields | ||
* which will determine the type of aggregation that is performed. | ||
|
@@ -677,6 +745,14 @@ | |
const callback = | ||
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; | ||
|
||
if (this.state === TransactionState.EXPIRED) { | ||
callback(new Error(transactionExpiredError)); | ||
return; | ||
} | ||
if (options.readTime && options.consistency) { | ||
callback(new Error(readTimeAndConsistencyError)); | ||
return; | ||
} | ||
query.query = extend(true, new Query(), query.query); | ||
let queryProto: QueryProto; | ||
try { | ||
|
@@ -687,7 +763,13 @@ | |
setImmediate(callback, e as Error); | ||
return; | ||
} | ||
const sharedQueryOpts = this.getQueryOptions(query.query, options); | ||
let sharedQueryOpts; | ||
try { | ||
sharedQueryOpts = this.getQueryOptions(query.query, options); | ||
} catch (error: any) { | ||
callback(error); | ||
return; | ||
} | ||
const aggregationQueryOptions: AggregationQueryOptions = { | ||
nestedQuery: queryProto, | ||
aggregations: query.toProto(), | ||
|
@@ -704,6 +786,7 @@ | |
}, | ||
(err, res) => { | ||
const info = getInfoFromStats(res); | ||
this.parseTransactionResponse(res); | ||
if (res && res.batch) { | ||
const results = res.batch.aggregationResults; | ||
const finalResults = results | ||
|
@@ -846,6 +929,10 @@ | |
|
||
let info: RunQueryInfo; | ||
|
||
if (this.state === TransactionState.EXPIRED) { | ||
callback(new Error(transactionExpiredError)); | ||
return; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. That check is done further downstream in the call to |
||
this.runQueryStream(query, options) | ||
.on('error', callback) | ||
.on('info', info_ => { | ||
|
@@ -892,11 +979,13 @@ | |
* ``` | ||
*/ | ||
runQueryStream(query: Query, options: RunQueryStreamOptions = {}): Transform { | ||
this.checkExpired(); | ||
query = extend(true, new Query(), query); | ||
const makeRequest = (query: Query) => { | ||
let queryProto: QueryProto; | ||
try { | ||
queryProto = entity.queryToQueryProto(query); | ||
throwOnReadTimeAndConsistency(options); | ||
} catch (e) { | ||
// using setImmediate here to make sure this doesn't throw a | ||
// synchronous error | ||
|
@@ -918,7 +1007,8 @@ | |
); | ||
}; | ||
|
||
function onResultSet(err?: Error | null, resp?: Entity) { | ||
const onResultSet = (err?: Error | null, resp?: Entity) => { | ||
this.parseTransactionResponse(resp); | ||
if (err) { | ||
stream.destroy(err); | ||
return; | ||
|
@@ -977,7 +1067,7 @@ | |
|
||
makeRequest(query); | ||
}); | ||
} | ||
}; | ||
|
||
const stream = streamEvents(new Transform({objectMode: true})); | ||
stream.once('reading', () => { | ||
|
@@ -990,11 +1080,24 @@ | |
options: RunQueryStreamOptions | ||
): SharedQueryOptions { | ||
const sharedQueryOpts = {} as SharedQueryOptions; | ||
if (isTransaction(this)) { | ||
if (this.state === TransactionState.NOT_STARTED) { | ||
if (sharedQueryOpts.readOptions === undefined) { | ||
sharedQueryOpts.readOptions = {}; | ||
} | ||
sharedQueryOpts.readOptions.newTransaction = getTransactionRequest( | ||
this, | ||
{} | ||
); | ||
sharedQueryOpts.readOptions.consistencyType = 'newTransaction'; | ||
} | ||
} | ||
if (options.consistency) { | ||
const code = CONSISTENCY_PROTO_CODE[options.consistency.toLowerCase()]; | ||
sharedQueryOpts.readOptions = { | ||
readConsistency: code, | ||
}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a bug because it would overwrite |
||
if (sharedQueryOpts.readOptions === undefined) { | ||
sharedQueryOpts.readOptions = {}; | ||
} | ||
sharedQueryOpts.readOptions.readConsistency = code; | ||
} | ||
if (options.readTime) { | ||
if (sharedQueryOpts.readOptions === undefined) { | ||
|
@@ -1122,18 +1225,27 @@ | |
reqOpts.transaction = this.id; | ||
} | ||
|
||
if ( | ||
isTransaction || | ||
(reqOpts.readOptions && reqOpts.readOptions.newTransaction) | ||
) { | ||
if (reqOpts.readOptions && reqOpts.readOptions.readConsistency) { | ||
callback( | ||
new Error('Read consistency cannot be specified in a transaction.') | ||
); | ||
return; | ||
} | ||
if (reqOpts.readOptions && reqOpts.readOptions.readTime) { | ||
callback(new Error('Read time cannot be specified in a transaction.')); | ||
return; | ||
} | ||
} | ||
if ( | ||
isTransaction && | ||
(method === 'lookup' || | ||
method === 'runQuery' || | ||
method === 'runAggregationQuery') | ||
) { | ||
if (reqOpts.readOptions && reqOpts.readOptions.readConsistency) { | ||
throw new Error( | ||
'Read consistency cannot be specified in a transaction.' | ||
); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just moved up to also be included for new transaction calls. Also, the error was not propogating up and needs to be wrapped in a callback. |
||
if (reqOpts.readOptions) { | ||
Object.assign(reqOpts.readOptions, {transaction: this.id}); | ||
} else { | ||
|
@@ -1229,6 +1341,36 @@ | |
} | ||
} | ||
|
||
function isTransaction(request: DatastoreRequest): request is Transaction { | ||
return request instanceof Transaction; | ||
} | ||
|
||
export function getTransactionRequest( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is used by |
||
transaction: Transaction, | ||
options: RunOptions | ||
): TransactionRequestOptions { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add some comments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
let reqOpts: TransactionRequestOptions = {}; | ||
if (options.readOnly || transaction.readOnly) { | ||
reqOpts.readOnly = {}; | ||
} | ||
if (options.transactionId || transaction.id) { | ||
reqOpts.readWrite = { | ||
previousTransaction: options.transactionId || transaction.id, | ||
}; | ||
} | ||
if (options.transactionOptions) { | ||
reqOpts = {}; | ||
if (options.transactionOptions.readOnly) { | ||
reqOpts.readOnly = {}; | ||
} | ||
const id = options.transactionOptions.id; | ||
if (id) { | ||
reqOpts.readWrite = {previousTransaction: id}; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a way to trim down this logic a bit? Seems a bit redundant There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I generated unit tests, corrected them and then reduced this function down to one ternary operation. Previously this was just copy pasted code from before. Search |
||
return reqOpts; | ||
} | ||
|
||
export interface ConsistencyProtoCode { | ||
[key: string]: number; | ||
} | ||
|
@@ -1294,15 +1436,18 @@ | |
readConsistency?: number; | ||
transaction?: string | Uint8Array | null; | ||
readTime?: ITimestamp; | ||
newTransaction?: TransactionRequestOptions; | ||
consistencyType?: | ||
| 'readConsistency' | ||
| 'transaction' | ||
| 'newTransaction' | ||
| 'readTime'; | ||
}; | ||
} | ||
export interface RequestOptions extends SharedQueryOptions { | ||
mutations?: google.datastore.v1.IMutation[]; | ||
keys?: Entity; | ||
transactionOptions?: { | ||
readOnly?: {}; | ||
readWrite?: {previousTransaction?: string | Uint8Array | null}; | ||
} | null; | ||
transactionOptions?: TransactionRequestOptions | null; | ||
transaction?: string | null | Uint8Array; | ||
mode?: string; | ||
query?: QueryProto; | ||
|
@@ -1333,7 +1478,7 @@ | |
* that a callback is omitted. | ||
*/ | ||
promisifyAll(DatastoreRequest, { | ||
exclude: ['getQueryOptions', 'getRequestOptions'], | ||
exclude: ['checkExpired', 'getQueryOptions', 'getRequestOptions'], | ||
}); | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like using
protected
here because typescript doesn't technically hideprotected
properties completely, but it seems that there is no simple way around it because bothTransaction
andDatastoreRequest
need access to thestate
variable. There is more information about some complex ways to avoid this in the design doc in Alternative 3, but they require major architecture changes.