forked from gocelery/gocelery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker_test.go
90 lines (77 loc) · 2.08 KB
/
worker_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package gocelery
import (
"math/rand"
"testing"
"time"
)
// add is test task method
func add(a int, b int) int {
return a + b
}
// newCeleryWorker creates celery worker
func newCeleryWorker(numWorkers int) *CeleryWorker {
broker := NewRedisCeleryBroker("redis://localhost:6379")
backend := NewRedisCeleryBackend("redis://localhost:6379")
celeryWorker := NewCeleryWorker(broker, backend, numWorkers)
return celeryWorker
}
// registerTask registers add test task
func registerTask(celeryWorker *CeleryWorker) string {
taskName := "add"
registeredTask := add
celeryWorker.Register(taskName, registeredTask)
return taskName
}
func TestRegisterTask(t *testing.T) {
celeryWorker := newCeleryWorker(1)
taskName := registerTask(celeryWorker)
receivedTask := celeryWorker.GetTask(taskName)
if receivedTask == nil {
t.Errorf("failed to retrieve task")
}
}
func TestRunTask(t *testing.T) {
celeryWorker := newCeleryWorker(1)
taskName := registerTask(celeryWorker)
// prepare args
args := []interface{}{
rand.Int(),
rand.Int(),
}
// Run task normally
res := add(args[0].(int), args[1].(int))
// construct task message
taskMessage := &TaskMessage{
ID: generateUUID(),
Task: taskName,
Args: args,
Kwargs: nil,
Retries: 1,
ETA: "",
}
resultMsg, err := celeryWorker.RunTask(taskMessage)
if err != nil {
t.Errorf("failed to run celery task %v: %v", taskMessage, err)
}
reflectRes := resultMsg.Result.(int64)
// check result
if int64(res) != reflectRes {
t.Errorf("reflect result %v is different from normal result %v", reflectRes, res)
}
}
func TestNumWorkers(t *testing.T) {
numWorkers := rand.Intn(10)
celeryWorker := newCeleryWorker(numWorkers)
celeryNumWorkers := celeryWorker.GetNumWorkers()
if numWorkers != celeryNumWorkers {
t.Errorf("number of workers are different: %d vs %d", numWorkers, celeryNumWorkers)
}
}
func TestStartStop(t *testing.T) {
numWorkers := rand.Intn(10)
celeryWorker := newCeleryWorker(numWorkers)
_ = registerTask(celeryWorker)
go celeryWorker.StartWorker()
time.Sleep(100 * time.Millisecond)
celeryWorker.StopWorker()
}