forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
400 lines (351 loc) · 12.7 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
package river
import (
"context"
"encoding/json"
"errors"
"sync/atomic"
"time"
"weavelab.xyz/monorail/shared/wlib/werror"
"weavelab.xyz/monorail/shared/wlib/wlog/tag"
"weavelab.xyz/river/internal/baseservice"
"weavelab.xyz/river/internal/componentstatus"
"weavelab.xyz/river/internal/dbadapter"
"weavelab.xyz/river/internal/dbsqlc"
"weavelab.xyz/river/internal/jobcompleter"
"weavelab.xyz/river/internal/notifier"
"weavelab.xyz/river/internal/util/chanutil"
"weavelab.xyz/river/internal/util/sliceutil"
"weavelab.xyz/river/internal/workunit"
"weavelab.xyz/river/rivertype"
)
type producerConfig struct {
ErrorHandler ErrorHandler
// FetchCooldown is the minimum amount of time to wait between fetches of new
// jobs. Jobs will only be fetched *at most* this often, but if no new jobs
// are coming in via LISTEN/NOTIFY then feches may be delayed as long as
// FetchPollInterval.
FetchCooldown time.Duration
// FetchPollInterval is the amount of time between periodic fetches for new
// jobs. Typically new jobs will be picked up ~immediately after insert via
// LISTEN/NOTIFY, but this provides a fallback.
FetchPollInterval time.Duration
JobTimeout time.Duration
MaxWorkerCount uint16
Notifier *notifier.Notifier
QueueName string
RetryPolicy ClientRetryPolicy
SchedulerInterval time.Duration
WorkerName string
Workers *Workers
}
// producer manages a fleet of Workers up to a maximum size. It periodically fetches jobs
// from the adapter and dispatches them to Workers. It receives completed job results from Workers.
//
// The producer never fetches more jobs than the number of free Worker slots it
// has available. This is not optimal for throughput compared to pre-fetching
// extra jobs, but it is better for smaller job counts or slower jobs where even
// distribution and minimizing execution latency is more important.
type producer struct {
baseservice.BaseService
// Jobs which are currently being worked. Only used by main goroutine.
activeJobs map[int64]*jobExecutor
adapter dbadapter.Adapter
completer jobcompleter.JobCompleter
config *producerConfig
errorHandler ErrorHandler
workers *Workers
// Receives job IDs to cancel. Written by notifier goroutine, only read from
// main goroutine.
cancelCh chan int64
// Receives completed jobs from workers. Written by completed workers, only
// read from main goroutine.
jobResultCh chan *rivertype.JobRow
jobTimeout time.Duration
// An atomic count of the number of jobs actively being worked on. This is
// written to by the main goroutine, but read by the dispatcher.
numJobsActive atomic.Int32
numJobsRan atomic.Uint64
retryPolicy ClientRetryPolicy
}
func newProducer(archetype *baseservice.Archetype, adapter dbadapter.Adapter, completer jobcompleter.JobCompleter, config *producerConfig) (*producer, error) {
if adapter == nil {
return nil, errors.New("Adapter is required") //nolint:stylecheck
}
if completer == nil {
return nil, errors.New("Completer is required") //nolint:stylecheck
}
if config.FetchCooldown <= 0 {
return nil, errors.New("FetchCooldown must be great than zero")
}
if config.FetchPollInterval <= 0 {
return nil, errors.New("FetchPollInterval must be greater than zero")
}
if config.JobTimeout < -1 {
return nil, errors.New("JobTimeout must be greater or equal to zero")
}
if config.MaxWorkerCount == 0 {
return nil, errors.New("MaxWorkerCount is required")
}
if config.Notifier == nil {
return nil, errors.New("Notifier is required") //nolint:stylecheck
}
if config.QueueName == "" {
return nil, errors.New("QueueName is required")
}
if config.RetryPolicy == nil {
return nil, errors.New("RetryPolicy is required")
}
if config.SchedulerInterval == 0 {
return nil, errors.New("SchedulerInterval is required")
}
if config.WorkerName == "" {
return nil, errors.New("WorkerName is required")
}
if config.Workers == nil {
return nil, errors.New("Workers is required")
}
return baseservice.Init(archetype, &producer{
activeJobs: make(map[int64]*jobExecutor),
adapter: adapter,
cancelCh: make(chan int64, 1000),
completer: completer,
config: config,
errorHandler: config.ErrorHandler,
jobResultCh: make(chan *rivertype.JobRow, config.MaxWorkerCount),
jobTimeout: config.JobTimeout,
retryPolicy: config.RetryPolicy,
workers: config.Workers,
}), nil
}
type producerStatusUpdateFunc func(queue string, status componentstatus.Status)
// Run starts the producer. It blocks until the producer has completed
// graceful shutdown.
//
// When fetchCtx is cancelled, no more jobs will be fetched; however, if a fetch
// is already in progress, It will be allowed to complete and run any fetched
// jobs. When workCtx is cancelled, any in-progress jobs will have their
// contexts cancelled too.
func (p *producer) Run(fetchCtx, workCtx context.Context, statusFunc producerStatusUpdateFunc) {
p.Logger.InfoC(workCtx, p.Name+": Producer started", tag.String("queue", p.config.QueueName))
defer func() {
p.Logger.InfoC(workCtx, p.Name+": Producer stopped", tag.String("queue", p.config.QueueName), tag.Int64("num_completed_jobs", int64(p.numJobsRan.Load())))
}()
go p.heartbeatLogLoop(fetchCtx)
statusFunc(p.config.QueueName, componentstatus.Initializing)
// TODO: fetcher should have some jitter in it to avoid stampeding issues.
fetchLimiter := chanutil.NewDebouncedChan(fetchCtx, p.config.FetchCooldown)
handleJobControlNotification := func(topic notifier.NotificationTopic, payload string) {
var decoded jobControlPayload
if err := json.Unmarshal([]byte(payload), &decoded); err != nil {
p.Logger.WErrorC(workCtx, werror.Wrap(err, p.Name+": Failed to unmarshal job control notification payload"))
return
}
if string(decoded.Action) == string(jobControlActionCancel) && decoded.Queue == p.config.QueueName && decoded.JobID > 0 {
select {
case p.cancelCh <- decoded.JobID:
default:
p.Logger.WarnC(workCtx, p.Name+": Job cancel notification dropped due to full buffer", tag.Int64("job_id", decoded.JobID))
}
return
}
p.Logger.DebugC(workCtx, p.Name+": Received job control notification with unknown action or other queue",
tag.String("action", string(decoded.Action)),
tag.Int64("job_id", decoded.JobID),
tag.String("queue", decoded.Queue),
)
}
sub := p.config.Notifier.Listen(notifier.NotificationTopicJobControl, handleJobControlNotification)
defer sub.Unlisten()
p.fetchAndRunLoop(fetchCtx, workCtx, fetchLimiter, statusFunc)
statusFunc(p.config.QueueName, componentstatus.ShuttingDown)
p.executorShutdownLoop()
statusFunc(p.config.QueueName, componentstatus.Stopped)
}
type jobControlAction string
const (
jobControlActionCancel jobControlAction = "cancel"
)
type jobControlPayload struct {
Action jobControlAction `json:"action"`
JobID int64 `json:"job_id"`
Queue string `json:"queue"`
}
type insertPayload struct {
Queue string `json:"queue"`
}
func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimiter *chanutil.DebouncedChan, statusFunc producerStatusUpdateFunc) {
p.Logger.InfoC(workCtx, p.Name+": Run loop started")
defer p.Logger.InfoC(workCtx, p.Name+": Run loop stopped")
// Prime the fetchLimiter so we can make an initial fetch without waiting for
// an insert notification or a fetch poll.
fetchLimiter.Call()
handleInsertNotification := func(topic notifier.NotificationTopic, payload string) {
var decoded insertPayload
if err := json.Unmarshal([]byte(payload), &decoded); err != nil {
p.Logger.WErrorC(workCtx, werror.Wrap(err, p.Name+": Failed to unmarshal insert notification payload"))
return
}
if decoded.Queue != p.config.QueueName {
return
}
p.Logger.DebugC(workCtx, p.Name+": Received insert notification", tag.String("queue", decoded.Queue))
fetchLimiter.Call()
}
sub := p.config.Notifier.Listen(notifier.NotificationTopicInsert, handleInsertNotification)
defer sub.Unlisten()
fetchPollTimer := time.NewTimer(p.config.FetchPollInterval)
go func() {
for {
select {
case <-fetchCtx.Done():
// Stop fetch timer so no more fetches are triggered.
if !fetchPollTimer.Stop() {
<-fetchPollTimer.C
}
return
case <-fetchPollTimer.C:
fetchLimiter.Call()
fetchPollTimer.Reset(p.config.FetchPollInterval)
}
}
}()
statusFunc(p.config.QueueName, componentstatus.Healthy)
fetchResultCh := make(chan producerFetchResult)
for {
select {
case <-fetchCtx.Done():
return
case <-fetchLimiter.C():
p.innerFetchLoop(workCtx, fetchResultCh)
// Ensure we can't start another fetch when fetchCtx is done, even if
// the fetchLimiter is also ready to fire:
select {
case <-fetchCtx.Done():
return
default:
}
case result := <-p.jobResultCh:
p.removeActiveJob(result.ID)
}
}
}
func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) {
count := p.maxJobsToFetch()
go p.dispatchWork(count, fetchResultCh) //nolint:contextcheck
for {
select {
case result := <-fetchResultCh:
if result.err != nil {
p.Logger.WErrorC(workCtx, werror.Wrap(result.err, p.Name+": Error fetching jobs"))
} else if len(result.jobs) > 0 {
p.startNewExecutors(workCtx, result.jobs)
}
return
case result := <-p.jobResultCh:
p.removeActiveJob(result.ID)
case jobID := <-p.cancelCh:
p.maybeCancelJob(jobID)
}
}
}
func (p *producer) executorShutdownLoop() {
// No more jobs will be fetched or executed. However, we must wait for all
// in-progress jobs to complete.
for {
if len(p.activeJobs) == 0 {
break
}
result := <-p.jobResultCh
p.removeActiveJob(result.ID)
}
}
func (p *producer) addActiveJob(id int64, executor *jobExecutor) {
p.numJobsActive.Add(1)
p.activeJobs[id] = executor
}
func (p *producer) removeActiveJob(id int64) {
delete(p.activeJobs, id)
p.numJobsActive.Add(-1)
p.numJobsRan.Add(1)
}
func (p *producer) maybeCancelJob(id int64) {
executor, ok := p.activeJobs[id]
if !ok {
return
}
executor.Cancel()
}
func (p *producer) dispatchWork(count int32, jobsFetchedCh chan<- producerFetchResult) {
// This intentionally uses a background context because we don't want it to
// get cancelled if the producer is asked to shut down. In that situation, we
// want to finish fetching any jobs we are in the midst of fetching, work
// them, and then stop. Otherwise we'd have a risk of shutting down when we
// had already fetched jobs in the database, leaving those jobs stranded. We'd
// then potentially have to release them back to the queue.
internalJobs, err := p.adapter.JobGetAvailable(context.Background(), p.config.QueueName, count)
if err != nil {
jobsFetchedCh <- producerFetchResult{err: err}
return
}
jobs := sliceutil.Map(internalJobs, dbsqlc.JobRowFromInternal)
jobsFetchedCh <- producerFetchResult{jobs: jobs}
}
// Periodically logs an informational log line giving some insight into the
// current state of the producer.
func (p *producer) heartbeatLogLoop(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
p.Logger.InfoC(ctx, p.Name+": Heartbeat",
tag.Int64("num_completed_jobs", int64(p.numJobsRan.Load())),
tag.Int("num_jobs_running", int(p.numJobsActive.Load())),
tag.String("queue", p.config.QueueName),
)
}
}
}
func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype.JobRow) {
for _, job := range jobs {
workInfo, ok := p.workers.workersMap[job.Kind]
var workUnit workunit.WorkUnit
if ok {
workUnit = workInfo.workUnitFactory.MakeUnit(job)
}
jobCtx, jobCancel := context.WithCancelCause(workCtx)
executor := baseservice.Init(&p.Archetype, &jobExecutor{
Adapter: p.adapter,
CancelFunc: jobCancel,
ClientJobTimeout: p.jobTimeout,
ClientRetryPolicy: p.retryPolicy,
Completer: p.completer,
ErrorHandler: p.errorHandler,
InformProducerDoneFunc: p.handleWorkerDone,
JobRow: job,
SchedulerInterval: p.config.SchedulerInterval,
WorkUnit: workUnit,
})
p.addActiveJob(job.ID, executor)
go executor.Execute(jobCtx)
// TODO:
// Errors can be recorded synchronously before the Executor slot is considered
// available.
//
// Successful jobs can be sent to the completer for async acking, IF they
// aren't already completed by the user. Do we need an internal field +
// convenience method to make that part work?
}
}
func (p *producer) maxJobsToFetch() int32 {
return int32(p.config.MaxWorkerCount) - p.numJobsActive.Load()
}
func (p *producer) handleWorkerDone(job *rivertype.JobRow) {
p.jobResultCh <- job
}
type producerFetchResult struct {
jobs []*rivertype.JobRow
err error
}