-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatch_processor.go
107 lines (86 loc) · 2.34 KB
/
batch_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package common
import (
"log"
"math"
"sync"
"time"
)
type ProcessFunction func([]interface{}, *sync.WaitGroup)
type BatchProcessor struct {
ProcessFunc ProcessFunction
ProcessInterval time.Duration
ThreadCount int
Events chan interface{}
Running bool
WaitGroup sync.WaitGroup
}
func NewBatchProcessor(processFunc ProcessFunction, requestQueueSize int, processInterval int, threadCount int) *BatchProcessor {
batchProcessor := new(BatchProcessor)
batchProcessor.Events = make(chan interface{}, requestQueueSize)
batchProcessor.ProcessInterval = time.Duration(processInterval)
batchProcessor.ThreadCount = threadCount
batchProcessor.ProcessFunc = processFunc
return batchProcessor
}
func (batchProcessor *BatchProcessor) AddEvent(event interface{}) {
batchProcessor.Events <- event
}
func (batchProcessor *BatchProcessor) Stop() {
batchProcessor.Running = false
batchProcessor.WaitGroup.Wait()
}
func (batchProcessor *BatchProcessor) Start() {
go batchProcessor.process()
}
func (batchProcessor *BatchProcessor) process() {
log.Print("Started batch writing thread")
batchProcessor.Running = true
batchProcessor.WaitGroup.Add(1)
defer batchProcessor.WaitGroup.Done()
for batchProcessor.Running {
time.Sleep(batchProcessor.ProcessInterval * time.Second)
var elements []interface{}
processing := true
for processing {
select {
case event, ok := <-batchProcessor.Events:
if ok {
elements = append(elements, event)
break
} else {
log.Print("Select channel closed")
processing = false
batchProcessor.Running = false
break
}
default:
processing = false
break
}
}
if len(elements) <= 0 {
continue
}
log.Printf("Retrieved %d values. Processing with %d connections", len(elements), batchProcessor.ThreadCount)
sliceSize := int(math.Floor(float64(len(elements) / batchProcessor.ThreadCount)))
remainder := len(elements) % batchProcessor.ThreadCount
start := 0
end := 0
for iter := 0; iter < batchProcessor.ThreadCount; iter++ {
var leftover int
if remainder > 0 {
leftover = 1
remainder--
} else {
leftover = 0
}
end += sliceSize + leftover
if start == end {
break
}
batchProcessor.WaitGroup.Add(1)
go batchProcessor.ProcessFunc(elements[start:end], &batchProcessor.WaitGroup)
start = end
}
}
}