-
Notifications
You must be signed in to change notification settings - Fork 4
/
worker.go
74 lines (65 loc) · 1.71 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package parallel
import "errors"
// Worker struct contains the necessary components for
// a parallelizable worker.
type Worker struct {
p *Parallel
Name string
Config *WorkerConfig
execute func(wh *WorkerHelper, args interface{})
helper *WorkerHelper
}
// WorkerConfig struct contains the configuration for
// a worker.
type WorkerConfig struct {
Parallelism int
}
// NewWorker creates a new worker with the specified name
// and config.
func (p *Parallel) NewWorker(name string, cfg *WorkerConfig) (*Worker, error) {
if _, exists := p.workers[name]; exists {
return nil, errors.New("worker already exists")
}
if cfg.Parallelism < 1 {
return nil, errors.New("parallelism must be 1 or higher")
}
w := &Worker{
p: p,
Name: name,
Config: cfg,
}
p.workers[name] = w
return w, nil
}
// Worker gets a worker by name.
func (p *Parallel) Worker(name string) *Worker {
if _, exists := p.workers[name]; !exists {
return nil
}
return p.workers[name]
}
// SetExecution sets the execution function of the worker.
// This is the function that is executed inside every worker.
func (w *Worker) SetExecution(exec func(wh *WorkerHelper, args interface{})) {
w.execute = exec
}
// Start starts a worker with the specified args, which are
// passed to every instance of the worker.
func (w *Worker) Start(args interface{}) {
wh := newWorkerHelper(w)
w.helper = wh
for i := 0; i < w.Config.Parallelism; i++ {
w.helper.wg.Add(1)
go w.execute(wh, args)
}
}
// Wait until all worker routines have finished
// processing.
func (w *Worker) Wait() {
w.helper.wg.Wait()
}
// SetParallelism sets the number of parallel routines
// for a worker.
func (w *Worker) SetParallelism(p int) {
w.Config.Parallelism = p
}