-
Notifications
You must be signed in to change notification settings - Fork 2
/
batcher.go
164 lines (133 loc) · 3.76 KB
/
batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package batcher
import (
"sync"
"sync/atomic"
"time"
)
// Batcher allows for the grouping and batching of items to be processed
type Batcher[T any] struct {
// config stores configuration for the Batcher
config Config[T]
// queue stores the job for each group
queue map[string]*job[T]
// itemCount tracks the total amount of items across all groups
itemCount int
// jobs sends the jobs to the workers to be processed
jobs chan *job[T]
// results allows the workers to tell the Batcher that a job was complete
results chan struct{}
// ticker triggers the processing of the queue periodically
ticker *time.Ticker
// addMutex prevents concurrent additions to the queue
addMutex sync.Mutex
// processingMutex prevents processing the queue when it is already being processed
processingMutex sync.Mutex
// processing indicates if the queue is currently being processed
processing atomic.Bool
// shutdown indicates if the Batcher has been shutdown
shutdown chan struct{}
}
// Add adds an item to a given group and blocks until it has been accepted but not processed
func (b *Batcher[T]) Add(group string, item T) {
b.addMutex.Lock()
defer b.addMutex.Unlock()
if _, exists := b.queue[group]; !exists {
b.queue[group] = &job[T]{
group: group,
items: make([]T, 0, 1),
}
}
b.queue[group].items = append(b.queue[group].items, item)
b.itemCount++
if b.isQueueFull() {
b.processQueue()
}
}
// isQueueFull indicates if the queue is full either based on group or item count
func (b *Batcher[T]) isQueueFull() bool {
switch {
case b.config.GroupCountThreshold > 0 && len(b.queue) >= b.config.GroupCountThreshold:
return true
case b.config.ItemCountThreshold > 0 && b.itemCount >= b.config.ItemCountThreshold:
return true
}
return false
}
// processQueue processes the jobs in the queue concurrently and blocks until complete
func (b *Batcher[T]) processQueue() {
b.processingMutex.Lock()
b.ticker.Stop()
b.processing.Store(true)
defer func() {
b.processing.Store(false)
b.itemCount = 0
b.processingMutex.Unlock()
b.ticker.Reset(b.config.DelayThreshold)
}()
count := len(b.queue)
if count == 0 {
return
}
// Pass the jobs to the worker pool
for k, j := range b.queue {
b.jobs <- j
delete(b.queue, k)
}
// Wait until all workers are done
for i := 0; i < count; i++ {
<-b.results
}
}
// jobWorker consumes jobs from the channel and passes them to the processor
func (b *Batcher[T]) jobWorker() {
for j := range b.jobs {
// Pass the data to the desired processor
b.config.Processor(j.group, j.items)
// Tell the queue processor that we're done
b.results <- struct{}{}
}
}
// tickerListener listens to ticks from the ticker in order to process the queue periodically
func (b *Batcher[T]) tickerListener() {
for {
select {
case <-b.shutdown:
return
case <-b.ticker.C:
if !b.processing.Load() {
b.addMutex.Lock()
b.processQueue()
b.addMutex.Unlock()
}
}
}
}
// Shutdown processes any remaining queued items then shuts down the Batcher and prevents further processing
func (b *Batcher[T]) Shutdown() {
b.ticker.Stop()
b.processQueue()
b.processingMutex.Lock()
close(b.jobs)
close(b.results)
b.shutdown <- struct{}{}
close(b.shutdown)
}
// NewBatcher creates a new Batcher from given configuration
func NewBatcher[T any](cfg Config[T]) (*Batcher[T], error) {
if err := cfg.validate(); err != nil {
return nil, err
}
b := &Batcher[T]{
config: cfg,
queue: make(map[string]*job[T], cfg.GroupCountThreshold),
jobs: make(chan *job[T], cfg.NumGoroutines),
results: make(chan struct{}, cfg.GroupCountThreshold),
ticker: time.NewTicker(cfg.DelayThreshold),
shutdown: make(chan struct{}),
}
go b.tickerListener()
for i := 0; i < cfg.NumGoroutines; i++ {
go b.jobWorker()
}
return b, nil
}