-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
128 lines (112 loc) · 3.18 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package dispatcher
import (
"runtime/debug"
"time"
log "github.com/sirupsen/logrus"
)
// Worker represents the worker that executes the job
type Worker struct {
WorkerID string
WorkerPool chan chan Job
JobChannel chan Job
shutdown chan bool
confirmShutdown chan bool
}
// NewWorker ...
func NewWorker(workerID string, workerPool chan chan Job) *Worker {
if workerPool == nil {
log.WithFields(module).Errorf("WorkerPool channel is nil/not initialized")
return nil
}
if workerID == "" {
log.WithFields(module).Errorf("WorkerID is not set")
return nil
}
return &Worker{
WorkerID: workerID,
WorkerPool: workerPool,
JobChannel: make(chan Job),
shutdown: make(chan bool, 1),
confirmShutdown: make(chan bool, 1),
}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w *Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
log.WithFields(module).Debugf("%v: Received a job", w.WorkerID)
w.executeJob(job)
w.executeFinally(job)
case <-w.shutdown:
// we have received a signal to stop
log.WithFields(module).Debugf("%v: Quitting the worker", w.WorkerID)
w.confirmShutdown <- true
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w *Worker) Stop() {
// Sending the signal to shutdown the worker
w.shutdown <- true
// Wait for the confirmation
<-w.confirmShutdown
}
func (w *Worker) executeJob(job Job) {
ch := make(chan bool, 1)
go func() {
defer func() {
if e := recover(); e != nil {
log.WithFields(module).Errorf("%v: Job execution failure: %v.\nstack_trace: %s", w.WorkerID, e, debug.Stack())
}
ch <- true
}()
if errors := job.Execute(); errors != nil && (len(errors) > 0) {
log.WithFields(module).Debugf(w.WorkerID, "Worker %v: Error/s in doing the job. Job Object: %v", w.WorkerID, job)
for i := range errors {
if errors[i] != nil {
log.WithFields(module).Errorf("%v: Error %v: %v", w.WorkerID, i, errors[i].Error())
}
}
}
}()
if job.GetExecutionTimeout() > 0 {
select {
case <-ch:
// log.WithFields(module).Debugf("%v: Job completed", w.WorkerID)
case <-time.After(job.GetExecutionTimeout()):
log.WithFields(module).Errorf("%v: Job execution timed out after %f seconds", w.WorkerID, job.GetExecutionTimeout().Seconds())
}
} else {
<-ch
}
}
func (w *Worker) executeFinally(job Job) {
ch := make(chan bool, 1)
go func() {
defer func() {
if e := recover(); e != nil {
log.WithFields(module).Errorf("%v: Job's finally execution failure: %v.\nstack_trace: %s", w.WorkerID, e, debug.Stack())
}
ch <- true
}()
job.Finally()
}()
if job.GetFinallyTimeout() > 0 {
select {
case <-ch:
// log.WithFields(module).Debugf("%v: Job's finally completed", w.WorkerID)
case <-time.After(job.GetFinallyTimeout()):
log.WithFields(module).Errorf("%v: Job's finally execution timed out after %f seconds", w.WorkerID, job.GetFinallyTimeout().Seconds())
}
} else {
<-ch
}
}