This repository has been archived by the owner on Sep 20, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 47
/
job_test.go
343 lines (311 loc) · 8.58 KB
/
job_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
// Copyright 2015 Alex Browne. All rights reserved.
// Use of this source code is governed by the MIT
// license, which can be found in the LICENSE file.
package jobs
import (
"errors"
"reflect"
"strings"
"testing"
"time"
)
func TestJobSave(t *testing.T) {
testingSetUp()
defer testingTeardown()
// Create and save a test job
job, err := createTestJob()
if err != nil {
t.Fatal(err)
}
job.started = 1
job.finished = 5
job.freq = 10
job.retries = 3
job.poolId = "testPool"
if err := job.save(); err != nil {
t.Errorf("Unexpected error saving job: %s", err.Error())
}
// Make sure the main hash was saved correctly
expectJobFieldEquals(t, job, "data", job.data, nil)
expectJobFieldEquals(t, job, "type", job.typ.name, stringConverter)
expectJobFieldEquals(t, job, "time", job.time, int64Converter)
expectJobFieldEquals(t, job, "freq", job.freq, int64Converter)
expectJobFieldEquals(t, job, "priority", job.priority, intConverter)
expectJobFieldEquals(t, job, "started", job.started, int64Converter)
expectJobFieldEquals(t, job, "finished", job.finished, int64Converter)
expectJobFieldEquals(t, job, "retries", job.retries, uintConverter)
expectJobFieldEquals(t, job, "poolId", job.poolId, stringConverter)
// Make sure the job status was correct
expectStatusEquals(t, job, StatusSaved)
// Make sure the job was indexed by its time correctly
expectJobInTimeIndex(t, job)
}
func TestJobFindById(t *testing.T) {
testingSetUp()
defer testingTeardown()
// Create and save a test job
job, err := createTestJob()
if err != nil {
t.Fatal(err)
}
job.started = 1
job.finished = 5
job.freq = 10
job.retries = 3
job.poolId = "testPool"
if err := job.save(); err != nil {
t.Errorf("Unexpected error saving job: %s", err.Error())
}
// Find the job in the database
jobCopy, err := FindById(job.id)
if err != nil {
t.Errorf("Unexpected error in FindById: %s", err)
}
if !reflect.DeepEqual(jobCopy, job) {
t.Errorf("Found job was not correct.\n\tExpected: %+v\n\tBut got: %+v", job, jobCopy)
}
// Attempting to find a job that doesn't exist should return an error
fakeId := "foobar"
if _, err := FindById(fakeId); err == nil {
t.Error("Expected error when FindById was called with a fake id but got none.")
} else if _, ok := err.(ErrorJobNotFound); !ok {
t.Errorf("Expected error to have type ErrorJobNotFound, but got %T", err)
} else if !strings.Contains(err.Error(), fakeId) {
t.Error("Expected error message to contain the fake id but it did not.")
}
}
func TestJobRefresh(t *testing.T) {
testingSetUp()
defer testingTeardown()
// Create and save a job
job, err := createAndSaveTestJob()
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
// Get a copy of that job directly from database
jobCopy := &Job{}
tx := newTransaction()
tx.scanJobById(job.id, jobCopy)
if err := tx.exec(); err != nil {
t.Errorf("Unexpected error in tx.exec(): %s", err.Error())
}
// Modify and save the copy
newPriority := jobCopy.priority + 100
jobCopy.priority = newPriority
if err := jobCopy.save(); err != nil {
t.Errorf("Unexpected error in jobCopy.save(): %s", err.Error())
}
// Refresh the original job
if err := job.Refresh(); err != nil {
t.Errorf("Unexpected error in job.Refresh(): %s", err.Error())
}
// Now the original and the copy should b equal
if !reflect.DeepEqual(job, jobCopy) {
t.Errorf("Expected job to equal jobCopy but it did not.\n\tExpected %+v\n\tBut got %+v", jobCopy, job)
}
}
func TestJobenqueue(t *testing.T) {
testingSetUp()
defer testingTeardown()
// Run through a set of possible state paths and make sure the result is
// always what we expect
statePaths := []statePath{
{
steps: []func(*Job) error{
// Just call enqueue after creating a new job
enqueueJob,
},
expected: StatusQueued,
},
{
steps: []func(*Job) error{
// Call enqueue, then Cancel, then enqueue again
enqueueJob,
cancelJob,
enqueueJob,
},
expected: StatusQueued,
},
}
testJobStatePaths(t, statePaths)
}
func TestJobCancel(t *testing.T) {
testingSetUp()
defer testingTeardown()
// Run through a set of possible state paths and make sure the result is
// always what we expect
statePaths := []statePath{
{
steps: []func(*Job) error{
// Just call Cancel after creating a new job
cancelJob,
},
expected: StatusCancelled,
},
{
steps: []func(*Job) error{
// Call Cancel, then enqueue, then Cancel again
cancelJob,
enqueueJob,
cancelJob,
},
expected: StatusCancelled,
},
}
testJobStatePaths(t, statePaths)
}
func TestJobReschedule(t *testing.T) {
testingSetUp()
defer testingTeardown()
// Create and save a new job, then make sure that the time
// parameter is set correctly when we call reschedule.
job, err := createAndSaveTestJob()
if err != nil {
t.Fatalf("Unexpected error in createAndSaveTestJob(): %s", err.Error())
}
currentTime := time.Now()
unixNanoTime := currentTime.UTC().UnixNano()
if err := job.Reschedule(currentTime); err != nil {
t.Errorf("Unexpected error in job.Reschedule: %s", err.Error())
}
expectJobFieldEquals(t, job, "time", unixNanoTime, int64Converter)
expectJobInTimeIndex(t, job)
// Run through a set of possible state paths and make sure the result is
// always what we expect
statePaths := []statePath{
{
steps: []func(*Job) error{
// Just call Reschedule after creating a new job
rescheduleJob,
},
expected: StatusQueued,
},
{
steps: []func(*Job) error{
// Call Cancel, then reschedule
cancelJob,
rescheduleJob,
},
expected: StatusQueued,
},
}
testJobStatePaths(t, statePaths)
}
func TestJobDestroy(t *testing.T) {
testingSetUp()
defer testingTeardown()
// Run through a set of possible state paths and make sure the result is
// always what we expect
statePaths := []statePath{
{
steps: []func(*Job) error{
// Just call Destroy after creating a new job
destroyJob,
},
expected: StatusDestroyed,
},
{
steps: []func(*Job) error{
// Call Destroy after cancel
cancelJob,
destroyJob,
},
expected: StatusDestroyed,
},
{
steps: []func(*Job) error{
// Call Destroy after enqueue
enqueueJob,
destroyJob,
},
expected: StatusDestroyed,
},
{
steps: []func(*Job) error{
// Call Destroy after enqueue then cancel
enqueueJob,
cancelJob,
destroyJob,
},
expected: StatusDestroyed,
},
}
testJobStatePaths(t, statePaths)
}
func TestJobSetError(t *testing.T) {
testingSetUp()
defer testingTeardown()
job, err := createAndSaveTestJob()
if err != nil {
t.Fatalf("Unexpected error in createAndSaveTestJob(): %s", err.Error())
}
testErr := errors.New("Test Error")
if err := job.setError(testErr); err != nil {
t.Errorf("Unexpected error in job.setError(): %s", err.Error())
}
expectJobFieldEquals(t, job, "error", testErr.Error(), stringConverter)
}
// statePath represents a path through which a job can travel, where each step
// potentially modifies its status. expected is what we expect the job status
// to be after the last step.
type statePath struct {
steps []func(*Job) error
expected Status
}
var (
// Some easy to use step functions
enqueueJob = func(j *Job) error {
return j.enqueue()
}
cancelJob = func(j *Job) error {
return j.Cancel()
}
destroyJob = func(j *Job) error {
return j.Destroy()
}
rescheduleJob = func(j *Job) error {
return j.Reschedule(time.Now())
}
)
// testJobStatePaths will for each statePath run through the steps, make sure
// there were no errors at any step, and check that the status after the last
// step is what we expect.
func testJobStatePaths(t *testing.T, statePaths []statePath) {
for _, statePath := range statePaths {
testingSetUp()
defer testingTeardown()
// Create a new test job
job, err := createAndSaveTestJob()
if err != nil {
t.Fatal(err)
}
// Run the job through each step
for _, step := range statePath.steps {
if err := step(job); err != nil {
t.Errorf("Unexpected error in step %v: %s", step, err)
}
}
expectStatusEquals(t, job, statePath.expected)
}
}
func TestScanJob(t *testing.T) {
testingSetUp()
defer testingTeardown()
job, err := createAndSaveTestJob()
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
conn := redisPool.Get()
defer conn.Close()
replies, err := conn.Do("HGETALL", job.Key())
if err != nil {
t.Errorf("Unexpected error in HGETALL: %s", err.Error())
}
jobCopy := &Job{id: job.id}
if err := scanJob(replies, jobCopy); err != nil {
t.Errorf("Unexpected error: %s", err)
}
if !reflect.DeepEqual(job, jobCopy) {
t.Errorf("Result of scanJob was incorrect.\n\tExpected %+v\n\tbut got %+v", job, jobCopy)
}
}