forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
doc.go
160 lines (119 loc) · 5.35 KB
/
doc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/*
Package river is a robust high-performance job processing system for Go and
Postgres.
See [homepage], [docs], and [godoc].
Being built for Postgres, River encourages the use of the same database for
application data and job queue. By enqueueing jobs transactionally along with
other database changes, whole classes of distributed systems problems are
avoided. Jobs are guaranteed to be enqueued if their transaction commits, are
removed if their transaction rolls back, and aren't visible for work _until_
commit. See [transactional enqueueing] for more background on this philosophy.
# Job args and workers
Jobs are defined in struct pairs, with an implementation of [`JobArgs`] and one
of [`Worker`].
Job args contain `json` annotations and define how jobs are serialized to and
from the database, along with a "kind", a stable string that uniquely identifies
the job.
type SortArgs struct {
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}
func (SortArgs) Kind() string { return "sort" }
Workers expose a `Work` function that dictates how jobs run.
type SortWorker struct {
// An embedded WorkerDefaults sets up default methods to fulfill the rest of
// the Worker interface:
river.WorkerDefaults[SortArgs]
}
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}
# Registering workers
Jobs are uniquely identified by their "kind" string. Workers are registered on
start up so that River knows how to assign jobs to workers:
workers := river.NewWorkers()
// AddWorker panics if the worker is already registered or invalid:
river.AddWorker(workers, &SortWorker{})
# Starting a client
A River [`Client`] provides an interface for job insertion and manages job
processing and [maintenance services]. A client's created with a database pool,
[driver], and config struct containing a `Workers` bundle and other settings.
Here's a client `Client` working one queue (`"default"`) with up to 100 worker
goroutines at a time:
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
if err != nil {
panic(err)
}
// Run the client inline. All executed jobs will inherit from ctx:
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
## Stopping
The client should also be stopped on program shutdown:
// Stop fetching new work and wait for active jobs to finish.
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
There are some complexities around ensuring clients stop cleanly, but also in a
timely manner. See [graceful shutdown] for more details on River's stop modes.
# Inserting jobs
[`Client.InsertTx`] is used in conjunction with an instance of job args to
insert a job to work on a transaction:
_, err = riverClient.InsertTx(ctx, tx, SortArgs{
Strings: []string{
"whale", "tiger", "bear",
},
}, nil)
if err != nil {
panic(err)
}
See the [`InsertAndWork` example] for complete code.
# Other features
- [Batch job insertion] for efficiently inserting many jobs at once using
Postgres `COPY FROM`.
- [Cancelling jobs] from inside a work function.
- [Error and panic handling].
- [Periodic and cron jobs].
- [Scheduled jobs] that run automatically at their scheduled time in the
future.
- [Snoozing jobs] from inside a work function.
- [Subscriptions] to queue activity and statistics, providing easy hooks for
telemetry like logging and metrics.
- [Transactional job completion] to guarantee job completion commits with
other changes in a transaction.
- [Unique jobs] by args, period, queue, and state.
- [Work functions] for simplified worker implementation.
# Development
See [developing River].
[`Client`]: https://pkg.go.dev/weavelab.xyz/river#Client
[`Client.InsertTx`]: https://pkg.go.dev/weavelab.xyz/river#Client.InsertTx
[`InsertAndWork` example]: https://pkg.go.dev/weavelab.xyz/river#example-package-InsertAndWork
[`JobArgs`]: https://pkg.go.dev/weavelab.xyz/river#JobArgs
[`Worker`]: https://pkg.go.dev/weavelab.xyz/river#Worker
[Batch job insertion]: https://riverqueue.com/docs/batch-job-insertion
[Cancelling jobs]: https://riverqueue.com/docs/cancelling-jobs
[Error and panic handling]: https://riverqueue.com/docs/error-handling
[Periodic and cron jobs]: https://riverqueue.com/docs/periodic-jobs
[Scheduled jobs]: https://riverqueue.com/docs/scheduled-jobs
[Snoozing jobs]: https://riverqueue.com/docs/snoozing-jobs
[Subscriptions]: https://riverqueue.com/docs/subscriptions
[Transactional job completion]: https://riverqueue.com/docs/transactional-job-completion
[Unique jobs]: https://riverqueue.com/docs/unique-jobs
[Work functions]: https://riverqueue.com/docs/work-functions
[docs]: https://riverqueue.com/docs
[developing River]: https://weavelab.xyz/river/blob/master/docs/development.md
[driver]: https://riverqueue.com/docs/database-drivers
[godoc]: https://pkg.go.dev/weavelab.xyz/river
[graceful shutdown]: https://riverqueue.com/docs/graceful-shutdown
[homepage]: https://riverqueue.com
[maintenance services]: https://riverqueue.com/docs/maintenance-services
[transactional enqueueing]: https://riverqueue.com/docs/transactional-enqueueing
*/
package river