This repository has been archived by the owner on Dec 5, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
worker.go
83 lines (73 loc) · 1.87 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package jobqueue
import (
"context"
"fmt"
"time"
)
// worker is a single instance processing jobs.
type worker struct {
m *Manager
jobc <-chan *Job
}
// newWorker creates a new worker. It spins up a new goroutine that waits
// on jobc for new jobs to process.
func newWorker(m *Manager, jobc <-chan *Job) *worker {
w := &worker{m: m, jobc: jobc}
go w.run()
return w
}
// run is the main goroutine in the worker. It listens for new jobs, then
// calls process.
func (w *worker) run() {
defer w.m.workersWg.Done()
for job := range w.jobc {
err := w.process(job)
if err != nil {
w.m.logger.Printf("jobqueue: job %v failed: %v", job.ID, err)
}
}
}
// process runs a single job.
func (w *worker) process(job *Job) error {
defer func() {
w.m.mu.Lock()
w.m.working[job.Rank]--
w.m.mu.Unlock()
}()
// Find the topic
w.m.mu.Lock()
p, found := w.m.tm[job.Topic]
w.m.mu.Unlock()
if !found {
return fmt.Errorf("no processor found for topic %s", job.Topic)
}
w.m.testJobStarted() // testing hook
// Execute the job
err := p(job)
if err != nil {
if job.Retry >= job.MaxRetry {
// Failed
w.m.logger.Printf("jobqueue: Job %v failed after %d retries: %v", job.ID, job.Retry+1, err)
w.m.testJobFailed() // testing hook
job.State = Failed
job.Completed = time.Now().UnixNano()
return w.m.st.Update(context.Background(), job)
}
// Retry
w.m.logger.Printf("jobqueue: Job %v failed on try %d of %d: %v", job.ID, job.Retry+1, job.MaxRetry, err)
w.m.testJobRetry() // testing hook
job.Priority = -time.Now().Add(w.m.backoff(job.Retry)).UnixNano()
job.State = Waiting
job.Retry++
return w.m.st.Update(context.Background(), job)
}
// Successfully executed the job
job.State = Succeeded
job.Completed = time.Now().UnixNano()
err = w.m.st.Update(context.Background(), job)
if err != nil {
return err
}
w.m.testJobSucceeded()
return nil
}