-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcore.go
78 lines (72 loc) · 1.81 KB
/
core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package bossworker
import (
"context"
"log"
"time"
)
type Worker[Event any, Response any] func(context.Context, Event) Response
func NewBoss[Event any, Response any](
maxWorkersCount uint,
workerLifetime time.Duration,
eventsChannelBuffer int,
worker Worker[Event, Response]) (chan Event, chan Response) {
events, responses := makeChannels[Event, Response](eventsChannelBuffer)
go run(maxWorkersCount, workerLifetime, worker, events, responses)
return events, responses
}
func makeChannels[Event any, Response any](
eventsChannelBuffer int) (chan Event, chan Response) {
var events chan Event
if 0 >= eventsChannelBuffer {
events = make(chan Event)
} else {
events = make(chan Event, eventsChannelBuffer)
}
return events, make(chan Response)
}
func run[Event any, Response any](
maxWorkersCount uint,
workerLifetime time.Duration,
worker Worker[Event, Response],
input chan Event,
output chan Response) {
activeWorkers := make(chan struct{}, maxWorkersCount)
for {
select {
case activeWorkers <- struct{}{}:
select {
case newEvent := <-input:
go execute(activeWorkers, workerLifetime, output, worker, newEvent)
default: //without deadlock
time.Sleep(time.Millisecond * 50)
<-activeWorkers
continue
}
}
}
}
func execute[Event any, Response any](
activeWorkers chan struct{},
workerLifetime time.Duration,
output chan Response,
worker Worker[Event, Response],
event Event) {
subResponse := make(chan Response)
ctx, cancel := context.WithCancel(context.Background()) //with timeout?
defer cancel()
go func() {
subResponse <- worker(ctx, event)
}()
select {
case newResp := <-subResponse:
output <- newResp
log.Println("worker done")
<-activeWorkers
return
case <-time.After(workerLifetime):
log.Println("worker too long")
cancel()
<-activeWorkers
return
}
}