-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathproducer.go
207 lines (168 loc) · 5.45 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package curlyq
import (
"context"
"time"
"github.com/go-redis/redis/v7"
)
// A Producer pushes jobs onto a queue.
type Producer struct {
opts *ProducerOpts
client *redis.Client
queue *queue
pushJobScript *redis.Script
scheduleJobScript *redis.Script
}
// ProducerOpts exposes options used when creating a new Producer.
type ProducerOpts struct {
// Address specifies the address of the Redis backing your queue.
// CurlyQ will generate a go-redis instance based on this address.
Address string
// Client is a custom go-redis instance used to communicate with Redis.
// If provided, this option overrides the value set in Address.
Client *redis.Client
// Logger provides a concrete implementation of the Logger interface.
// If not provided, it will default to using the stdlib's log package.
Logger Logger
// Queue specifies the name of the queue that this producer will push to.
Queue string
}
func (o *ProducerOpts) withDefaults() *ProducerOpts {
opts := *o
if opts.Logger == nil {
opts.Logger = &DefaultLogger{}
}
return &opts
}
// NewProducer instantiates a new Producer.
func NewProducer(opts *ProducerOpts) *Producer {
// Required arguments
if opts.Address == "" && opts.Client == nil {
panic("A redis client must be provided.")
}
if opts.Queue == "" {
panic("A queue name must be provided.")
}
// Computed properties
var client *redis.Client
if opts.Client != nil {
client = opts.Client
} else {
client = redis.NewClient(&redis.Options{
Addr: opts.Address,
})
}
queue := newQueue(&queueOpts{
Name: opts.Queue,
})
// Embed Lua scripts
prepScripts()
pushJobScript := loadLua("/lua/push_job.lua")
scheduleJobScript := loadLua("/lua/schedule_job.lua")
return &Producer{
opts: opts.withDefaults(),
// Computed properties
client: client,
queue: queue,
// Scripts
pushJobScript: pushJobScript,
scheduleJobScript: scheduleJobScript,
}
}
// Public API
// PerformAfter enqueues a job to be performed after a certain amount of time.
// It calls to Redis using a default background context.
// It returns the ID of the enqueued job when successful or an error otherwise.
func (p *Producer) PerformAfter(duration time.Duration, job Job) (string, error) {
return p.PerformAfterCtx(context.Background(), duration, job)
}
// PerformAfterCtx enqueues a job to be performed after a certain amount of time.
// It calls to Redis using a user-supplied context.
// It returns the ID of the enqueued job when successful or an error otherwise.
func (p *Producer) PerformAfterCtx(ctx context.Context, duration time.Duration, job Job) (string, error) {
return p.PerformAtCtx(ctx, time.Now().Add(duration), job)
}
// PerformAt calls PerformAtCtx with a default context.
// It calls to Redis using a default background context.
// It returns the ID of the enqueued job when successful or an error otherwise.
func (p *Producer) PerformAt(at time.Time, job Job) (string, error) {
return p.PerformAtCtx(context.Background(), at, job)
}
// PerformAtCtx schedules a job to be performed at a particular point in time.
// It calls to Redis using a user-supplied context.
// It returns the ID of the enqueued job when successful or an error otherwise.
func (p *Producer) PerformAtCtx(ctx context.Context, at time.Time, job Job) (string, error) {
return p.scheduleJob(ctx, at, job)
}
// Perform calls PerformCtx with a default context.
// It calls to Redis using a default background context.
// It returns the ID of the enqueued job when successful or an error otherwise.
func (p *Producer) Perform(job Job) (string, error) {
return p.PerformCtx(context.Background(), job)
}
// PerformCtx enqueues a job to be performed as soon as possible.
// It calls to Redis using a user-supplied context.
// It returns the ID of the enqueued job when successful or an error otherwise.
func (p *Producer) PerformCtx(ctx context.Context, job Job) (string, error) {
return p.pushJob(ctx, job)
}
// Redis operations
// pushJob pushes a job onto the active queue.
// It returns an error if the Redis script fails, if it cannot marshal the job,
// or if a job with the provided ID already exists in Redis.
func (p *Producer) pushJob(ctx context.Context, job Job) (string, error) {
msg, err := job.message()
if err != nil {
return "", err
}
client := p.client.WithContext(ctx)
keys := []string{
p.queue.jobDataHash,
p.queue.activeJobsList,
p.queue.signalList,
}
args := []interface{}{
job.ID,
msg,
}
enqueued, err := p.pushJobScript.Run(client, keys, args...).Int()
if err != nil {
return "", err
}
if enqueued == 0 {
return "", ErrJobAlreadyExists{
Job: job,
}
}
p.opts.Logger.Info("Enqueued job", "job_id", job.ID)
return job.ID, nil
}
// scheduleJob inserts the job into the scheduled set.
// It returns an error if the Redis script fails, if it cannot marshal the job,
// or if a job with the provided ID already exists in Redis.
func (p *Producer) scheduleJob(ctx context.Context, at time.Time, job Job) (string, error) {
msg, err := job.message()
if err != nil {
return "", err
}
client := p.client.WithContext(ctx)
keys := []string{
p.queue.jobDataHash,
p.queue.scheduledJobsSet,
}
args := []interface{}{
job.ID,
msg,
float64(at.Unix()),
}
scheduled, err := p.scheduleJobScript.Run(client, keys, args...).Int()
if err != nil {
return "", err
}
if scheduled == 0 {
return "", ErrJobAlreadyExists{
Job: job,
}
}
p.opts.Logger.Info("Scheduled job", "job_id", job.ID, "scheduled_at", at.Format(time.RFC1123Z))
return job.ID, nil
}