-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransaction.go
227 lines (179 loc) · 6.99 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package dal
import (
"context"
"fmt"
)
var (
transactionContextKey = "transactionContextKey"
nonTransactionalContextKey = "nonTransactionalContextKey"
)
// TxIsolationLevel defines an isolation level for a transaction
type TxIsolationLevel int
const (
// TxUnspecified indicates transaction level is not specified
TxUnspecified TxIsolationLevel = iota
// TxChaos - The pending changes from more highly isolated transactions cannot be overwritten.
TxChaos
// TxReadCommitted - Shared locks are held while the data is being read to avoid dirty reads,
// but the data can be changed before the end of the transaction,
// resulting in non-repeatable reads or phantom data.
TxReadCommitted
// TxReadUncommitted - A dirty read is possible, meaning that no shared locks are issued
// and no exclusive locks are honored.
TxReadUncommitted
// TxRepeatableRead - Locks are placed on all data that is used in a query,
// preventing other users from updating the data.
// Prevents non-repeatable reads but phantom rows are still possible.
TxRepeatableRead
// TxSerializable - A range lock is placed on the DataSet, preventing other users
// from updating or inserting rows into the dataset until the transaction is complete.
TxSerializable
// TxSnapshot - Reduces blocking by storing a version of data that one application can read
// while another is modifying the same data.
// Indicates that from one transaction you cannot see changes made in other transactions,
// even if you requery.
TxSnapshot
)
// ROTxWorker defines a callback to be called to do work within a readonly transaction
type ROTxWorker = func(ctx context.Context, tx ReadTransaction) error
// RWTxWorker defines a callback to be called to do work within a readwrite transaction
type RWTxWorker = func(ctx context.Context, tx ReadwriteTransaction) error
// TransactionCoordinator provides methods to work with transactions
type TransactionCoordinator interface {
// ReadTransactionCoordinator can start a readonly transaction
ReadTransactionCoordinator
// ReadwriteTransactionCoordinator can start a readwrite transaction
ReadwriteTransactionCoordinator
}
// ReadTransactionCoordinator creates a readonly transaction
type ReadTransactionCoordinator interface {
// RunReadonlyTransaction starts readonly transaction
RunReadonlyTransaction(ctx context.Context, f ROTxWorker, options ...TransactionOption) error
}
// ReadwriteTransactionCoordinator creates a read-write transaction
type ReadwriteTransactionCoordinator interface {
// RunReadwriteTransaction starts read-write transaction
RunReadwriteTransaction(ctx context.Context, f RWTxWorker, options ...TransactionOption) error
}
// Transaction defines an instance of DALgo transaction
type Transaction interface {
// Options indicates parameters that were requested at time of transaction creation.
Options() TransactionOptions
}
// ReadTransaction defines an interface for a readonly transaction
type ReadTransaction interface {
Transaction
ReadSession
}
// ReadwriteTransaction defines an interface for a readwrite transaction
type ReadwriteTransaction interface {
// ID returns a unique ID of a transaction if it is supported by the underlying DB client
ID() string
Transaction
ReadwriteSession
}
// NewContextWithTransaction stores transaction and original context into a transactional context
func NewContextWithTransaction(nonTransactionalContext context.Context, tx Transaction) context.Context {
nonTransactionalContext = context.WithValue(nonTransactionalContext, &nonTransactionalContextKey, nonTransactionalContext)
return context.WithValue(nonTransactionalContext, &transactionContextKey, tx)
}
// GetTransaction returns original transaction object
func GetTransaction(ctx context.Context) Transaction {
tx := ctx.Value(&transactionContextKey)
if tx == nil {
return nil
}
return tx.(Transaction)
}
// GetNonTransactionalContext returns non transaction context (e.g. Parent of transactional context)
// TODO: This is can be dangerous if child context creates a new context with a deadline for example
func GetNonTransactionalContext(ctx context.Context) context.Context {
return ctx.Value(&nonTransactionalContextKey).(context.Context)
}
// TransactionOptions holds transaction settings
type TransactionOptions interface {
// IsolationLevel indicates requested isolation level
IsolationLevel() TxIsolationLevel
// IsReadonly indicates a readonly transaction
IsReadonly() bool
// IsCrossGroup indicates a cross-group transaction. Makes sense for Google App Engine.
IsCrossGroup() bool
// Attempts returns number of attempts to execute a transaction. This is used in Google Datastore for example.
Attempts() int
// Password() string - TODO: document why it was added
}
// TransactionOption defines contact for transaction option
type TransactionOption func(options *txOptions)
type txOptions struct {
isolationLevel TxIsolationLevel
isReadonly bool
isCrossGroup bool
attempts int
password string
}
var _ TransactionOptions = (*txOptions)(nil)
// IsReadonly indicates a readonly transaction was requested
func (v txOptions) IsReadonly() bool {
return v.isReadonly
}
// IsolationLevel indicates what isolation level was requested for a transaction
func (v txOptions) IsolationLevel() TxIsolationLevel {
return v.isolationLevel
}
// IsCrossGroup indicates a cross-group transaction was requested
func (v txOptions) IsCrossGroup() bool {
return v.isCrossGroup
}
func (v txOptions) Attempts() int {
return v.attempts
}
// Password // TODO: why we need it?
func (v txOptions) Password() string {
return v.password
}
// NewTransactionOptions creates instance of TransactionOptions
func NewTransactionOptions(opts ...TransactionOption) TransactionOptions {
options := txOptions{}
for _, opt := range opts {
opt(&options)
}
return options
}
// TxWithIsolationLevel requests transaction with required isolation level
func TxWithIsolationLevel(isolationLevel TxIsolationLevel) TransactionOption {
if isolationLevel == TxUnspecified {
panic("isolationLevel == TxUnspecified")
}
return func(options *txOptions) {
if options.isolationLevel != TxUnspecified {
if options.isolationLevel == isolationLevel {
panic(fmt.Sprintf("an attempt to set same isolation level twice: %v", isolationLevel))
}
panic("an attempt to request more then 1 isolation level")
}
options.isolationLevel = isolationLevel
}
}
// TxWithAttempts specifies number of attempts to execute a transaction
func TxWithAttempts(attempts int) TransactionOption {
return func(options *txOptions) {
options.attempts = attempts
}
}
// TxWithReadonly requests a readonly transaction
func TxWithReadonly() TransactionOption {
return func(options *txOptions) {
options.isReadonly = true
}
}
// TxWithCrossGroup requires transaction that spans multiple entity groups
func TxWithCrossGroup() TransactionOption {
return func(options *txOptions) {
options.isCrossGroup = true
}
}
//func WithPassword(password string) TransactionOption {
// return func(options *txOptions) {
// options.password = password
// }
//}