This repository has been archived by the owner on Sep 20, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 47
/
transaction.go
310 lines (278 loc) · 10 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
// Copyright 2015 Alex Browne. All rights reserved.
// Use of this source code is governed by the MIT
// license, which can be found in the LICENSE file.
package jobs
import (
"fmt"
"github.com/garyburd/redigo/redis"
"time"
)
// transaction is an abstraction layer around a redis transaction.
// transactions feature delayed execution, so nothing touches the database
// until exec is called.
type transaction struct {
conn redis.Conn
actions []*action
}
// action is a single step in a transaction and must be either a command
// or a script with optional arguments.
type action struct {
kind actionKind
name string
script *redis.Script
args redis.Args
handler replyHandler
}
// actionKind is either a command or a script
type actionKind int
const (
actionCommand = iota
actionScript
)
// replyHandler is a function which does something with the reply from a redis
// command or script.
type replyHandler func(interface{}) error
// newTransaction instantiates and returns a new transaction.
func newTransaction() *transaction {
t := &transaction{
conn: redisPool.Get(),
}
return t
}
// command adds a command action to the transaction with the given args.
// handler will be called with the reply from this specific command when
// the transaction is executed.
func (t *transaction) command(name string, args redis.Args, handler replyHandler) {
t.actions = append(t.actions, &action{
kind: actionCommand,
name: name,
args: args,
handler: handler,
})
}
// command adds a script action to the transaction with the given args.
// handler will be called with the reply from this specific script when
// the transaction is executed.
func (t *transaction) script(script *redis.Script, args redis.Args, handler replyHandler) {
t.actions = append(t.actions, &action{
kind: actionScript,
script: script,
args: args,
handler: handler,
})
}
// sendAction writes a to a connection buffer using conn.Send()
func (t *transaction) sendAction(a *action) error {
switch a.kind {
case actionCommand:
return t.conn.Send(a.name, a.args...)
case actionScript:
return a.script.Send(t.conn, a.args...)
}
return nil
}
// doAction writes a to the connection buffer and then immediately
// flushes the buffer and reads the reply via conn.Do()
func (t *transaction) doAction(a *action) (interface{}, error) {
switch a.kind {
case actionCommand:
return t.conn.Do(a.name, a.args...)
case actionScript:
return a.script.Do(t.conn, a.args...)
}
return nil, nil
}
// exec executes the transaction, sequentially sending each action and
// calling all the action handlers with the corresponding replies.
func (t *transaction) exec() error {
// Return the connection to the pool when we are done
defer t.conn.Close()
if len(t.actions) == 1 {
// If there is only one command, no need to use MULTI/EXEC
a := t.actions[0]
reply, err := t.doAction(a)
if err != nil {
return err
}
if a.handler != nil {
if err := a.handler(reply); err != nil {
return err
}
}
} else {
// Send all the commands and scripts at once using MULTI/EXEC
t.conn.Send("MULTI")
for _, a := range t.actions {
if err := t.sendAction(a); err != nil {
return err
}
}
// Invoke redis driver to execute the transaction
replies, err := redis.Values(t.conn.Do("EXEC"))
if err != nil {
return err
}
// Iterate through the replies, calling the corresponding handler functions
for i, reply := range replies {
a := t.actions[i]
if a.handler != nil {
if err := a.handler(reply); err != nil {
return err
}
}
}
}
return nil
}
// newScanJobHandler returns a replyHandler which, when run, will scan the values
// of reply into job.
func newScanJobHandler(job *Job) replyHandler {
return func(reply interface{}) error {
return scanJob(reply, job)
}
}
// newScanJobsHandler returns a replyHandler which, when run, will scan the values
// of reply into jobs.
func newScanJobsHandler(jobs *[]*Job) replyHandler {
return func(reply interface{}) error {
values, err := redis.Values(reply, nil)
if err != nil {
return nil
}
for _, fields := range values {
job := &Job{}
if err := scanJob(fields, job); err != nil {
return err
}
(*jobs) = append((*jobs), job)
}
return nil
}
}
// debugSet simply prints out the value of the given set
func (t *transaction) debugSet(setName string) {
t.command("ZRANGE", redis.Args{setName, 0, -1, "WITHSCORES"}, func(reply interface{}) error {
vals, err := redis.Strings(reply, nil)
if err != nil {
return err
}
fmt.Printf("%s: %v\n", setName, vals)
return nil
})
}
// newScanStringsHandler returns a replyHandler which, when run, will scan the values
// of reply into strings.
func newScanStringsHandler(strings *[]string) replyHandler {
return func(reply interface{}) error {
if strings == nil {
return fmt.Errorf("jobs: Error in newScanStringsHandler: expected strings arg to be a pointer to a slice of strings but it was nil")
}
var err error
(*strings), err = redis.Strings(reply, nil)
if err != nil {
return fmt.Errorf("jobs: Error in newScanStringsHandler: %s", err.Error())
}
return nil
}
}
// newScanStringHandler returns a replyHandler which, when run, will convert reply to a
// string and scan it into s.
func newScanStringHandler(s *string) replyHandler {
return func(reply interface{}) error {
if s == nil {
return fmt.Errorf("jobs: Error in newScanStringHandler: expected arg s to be a pointer to a string but it was nil")
}
var err error
(*s), err = redis.String(reply, nil)
if err != nil {
return fmt.Errorf("jobs: Error in newScanStringHandler: %s", err.Error())
}
return nil
}
}
// newScanIntHandler returns a replyHandler which, when run, will convert reply to a
// int and scan it into i.
func newScanIntHandler(i *int) replyHandler {
return func(reply interface{}) error {
if i == nil {
return fmt.Errorf("jobs: Error in newScanIntHandler: expected arg s to be a pointer to a string but it was nil")
}
var err error
(*i), err = redis.Int(reply, nil)
if err != nil {
return fmt.Errorf("jobs: Error in newScanIntHandler: %s", err.Error())
}
return nil
}
}
// newScanBoolHandler returns a replyHandler which, when run, will convert reply to a
// bool and scan it into b.
func newScanBoolHandler(b *bool) replyHandler {
return func(reply interface{}) error {
if b == nil {
return fmt.Errorf("jobs: Error in newScanBoolHandler: expected arg v to be a pointer to a bool but it was nil")
}
var err error
(*b), err = redis.Bool(reply, nil)
if err != nil {
return fmt.Errorf("jobs: Error in newScanBoolHandler: %s", err.Error())
}
return nil
}
}
//go:generate go run scripts/generate.go
// popNextJobs is a small function wrapper around getAndMovesJobToExecutingScript.
// It offers some type safety and helps make sure the arguments you pass through to the are correct.
// The script will get the next n jobs from the queue that are ready based on their time parameter.
func (t *transaction) popNextJobs(n int, poolId string, handler replyHandler) {
currentTime := time.Now().UTC().UnixNano()
t.script(popNextJobsScript, redis.Args{n, currentTime, poolId}, handler)
}
// retryOrFailJob is a small function wrapper around retryOrFailJobScript.
// It offers some type safety and helps make sure the arguments you pass through to the are correct.
// The script will either mark the job as failed or queue it for retry depending on the number of
// retries left.
func (t *transaction) retryOrFailJob(job *Job, handler replyHandler) {
t.script(retryOrFailJobScript, redis.Args{job.id}, handler)
}
// setStatus is a small function wrapper around setStatusScript.
// It offers some type safety and helps make sure the arguments you pass through to the are correct.
// The script will atomically update the status of the job, removing it from its old status set and
// adding it to the new one.
func (t *transaction) setStatus(job *Job, status Status) {
t.script(setJobStatusScript, redis.Args{job.id, string(status)}, nil)
}
// destroyJob is a small function wrapper around destroyJobScript.
// It offers some type safety and helps make sure the arguments you pass through to the are correct.
// The script will remove all records associated with job from the database.
func (t *transaction) destroyJob(job *Job) {
t.script(destroyJobScript, redis.Args{job.id}, nil)
}
// purgeStalePool is a small function wrapper around purgeStalePoolScript.
// It offers some type safety and helps make sure the arguments you pass through to the are correct.
// The script will remove the stale pool from the active pools set, and then requeue any jobs associated
// with the stale pool that are stuck in the executing set.
func (t *transaction) purgeStalePool(poolId string) {
t.script(purgeStalePoolScript, redis.Args{poolId}, nil)
}
// getJobsByIds is a small function wrapper around getJobsByIdsScript.
// It offers some type safety and helps make sure the arguments you pass through to the are correct.
// The script will return all the fields for jobs which are identified by ids in the given sorted set.
// You can use the handler to scan the jobs into a slice of jobs.
func (t *transaction) getJobsByIds(setKey string, handler replyHandler) {
t.script(getJobsByIdsScript, redis.Args{setKey}, handler)
}
// setJobField is a small function wrapper around setJobFieldScript.
// It offers some type safety and helps make sure the arguments you pass through are correct.
// The script will set the given field to the given value iff the job exists and has not been
// destroyed.
func (t *transaction) setJobField(job *Job, fieldName string, fieldValue interface{}) {
t.script(setJobFieldScript, redis.Args{job.id, fieldName, fieldValue}, nil)
}
// addJobToSet is a small function wrapper around addJobToSetScript.
// It offers some type safety and helps make sure the arguments you pass through are correct.
// The script will add the job to the given set with the given score iff the job exists
// and has not been destroyed.
func (t *transaction) addJobToSet(job *Job, setName string, score float64) {
t.script(addJobToSetScript, redis.Args{job.id, setName, score}, nil)
}