Skip to content

Commit

Permalink
Add support of passing attributes in publish/consume (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Jul 11, 2024
1 parent bc3b3a1 commit 6694ab6
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 143 deletions.
16 changes: 0 additions & 16 deletions docker/docker-compose.yml

This file was deleted.

48 changes: 26 additions & 22 deletions engine/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,33 @@ type jobImpl struct {
// NOTE: there is a trick in this factory, the delay is embedded in the jobID.
// By doing this we can delete the job that's located in hourly AOF, by placing
// a tombstone record in that AOF.
func NewJob(namespace, queue string, body []byte, ttl, delay uint32, tries uint16, jobID string) Job {
func NewJob(namespace, queue string, body []byte, attributes map[string]string, ttl, delay uint32, tries uint16, jobID string) Job {
if jobID == "" {
jobID = uuid.GenJobIDWithVersion(0, delay)
}
return &jobImpl{
namespace: namespace,
queue: queue,
id: jobID,
body: body,
ttl: ttl,
delay: delay,
tries: tries,
namespace: namespace,
queue: queue,
id: jobID,
body: body,
ttl: ttl,
delay: delay,
tries: tries,
attributes: attributes,
}
}

func NewJobWithID(namespace, queue string, body []byte, ttl uint32, tries uint16, jobID string) Job {
func NewJobWithID(namespace, queue string, body []byte, attributes map[string]string, ttl uint32, tries uint16, jobID string) Job {
delay, _ := uuid.ExtractDelaySecondFromUniqueID(jobID)
return &jobImpl{
namespace: namespace,
queue: queue,
id: jobID,
body: body,
ttl: ttl,
delay: delay,
tries: tries,
namespace: namespace,
queue: queue,
id: jobID,
body: body,
ttl: ttl,
delay: delay,
tries: tries,
attributes: attributes,
}
}

Expand Down Expand Up @@ -108,19 +110,21 @@ func (j *jobImpl) Attributes() map[string]string {

func (j *jobImpl) MarshalText() (text []byte, err error) {
var job struct {
Namespace string `json:"namespace"`
Queue string `json:"queue"`
ID string `json:"id"`
TTL uint32 `json:"ttl"`
ElapsedMS int64 `json:"elapsed_ms"`
Body []byte `json:"body"`
Namespace string `json:"namespace"`
Queue string `json:"queue"`
ID string `json:"id"`
TTL uint32 `json:"ttl"`
ElapsedMS int64 `json:"elapsed_ms"`
Body []byte `json:"body"`
Attributes map[string]string `json:"attributes,omitempty"`
}
job.Namespace = j.namespace
job.Queue = j.queue
job.ID = j.id
job.TTL = j.ttl
job.ElapsedMS = j._elapsedMS
job.Body = j.body
job.Attributes = j.attributes
return json.Marshal(job)
}

Expand Down
30 changes: 15 additions & 15 deletions engine/migration/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
func TestEngine_Publish(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 1")
j := engine.NewJob("ns-engine", "q1", body, 10, 2, 1, "")
j := engine.NewJob("ns-engine", "q1", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
}

// Publish no-delay job
j = engine.NewJob("ns-engine", "q1", body, 10, 0, 1, "")
j = engine.NewJob("ns-engine", "q1", body, nil, 10, 0, 1, "")
jobID, err = e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -37,7 +37,7 @@ func TestEngine_Publish(t *testing.T) {
func TestEngine_Consume(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 2")
j := engine.NewJob("ns-engine", "q2", body, 10, 2, 1, "")
j := engine.NewJob("ns-engine", "q2", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -53,7 +53,7 @@ func TestEngine_Consume(t *testing.T) {
}

// Consume job that's published in no-delay way
j = engine.NewJob("ns-engine", "q2", body, 10, 0, 1, "")
j = engine.NewJob("ns-engine", "q2", body, nil, 10, 0, 1, "")
jobID, err = e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -72,9 +72,9 @@ func TestEngine_Consume(t *testing.T) {
func TestEngine_Consume2(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 3")
j1 := engine.NewJob("ns-engine", "q3", []byte("delay msg"), 10, 5, 1, "")
j1 := engine.NewJob("ns-engine", "q3", []byte("delay msg"), nil, 10, 5, 1, "")
_, err := e.Publish(j1)
j2 := engine.NewJob("ns-engine", "q3", body, 10, 2, 1, "")
j2 := engine.NewJob("ns-engine", "q3", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j2)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -91,12 +91,12 @@ func TestEngine_Consume2(t *testing.T) {
func TestEngine_ConsumeMulti(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 4")
j1 := engine.NewJob("ns-engine", "q4", body, 10, 3, 1, "")
j1 := engine.NewJob("ns-engine", "q4", body, nil, 10, 3, 1, "")
jobID, err := e.Publish(j1)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
}
j2 := engine.NewJob("ns-engine", "q5", body, 10, 1, 1, "")
j2 := engine.NewJob("ns-engine", "q5", body, nil, 10, 1, 1, "")
jobID2, err := e.Publish(j2)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -122,7 +122,7 @@ func TestEngine_ConsumeMulti(t *testing.T) {
func TestEngine_Peek(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 6")
j := engine.NewJob("ns-engine", "q6", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q6", body, nil, 10, 0, 1, "")
jobID, err := e.Publish(j)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -136,7 +136,7 @@ func TestEngine_Peek(t *testing.T) {
func TestEngine_DrainOld(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 7")
j := engine.NewJob("ns-engine", "q7", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q7", body, nil, 10, 0, 1, "")
jobID, err := OldRedisEngine.Publish(j)
job, err := e.Consume("ns-engine", []string{"q7"}, 5, 0)
if err != nil {
Expand All @@ -150,7 +150,7 @@ func TestEngine_DrainOld(t *testing.T) {
func TestEngine_BatchConsume(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 8")
j := engine.NewJob("ns-engine", "q8", body, 10, 2, 1, "")
j := engine.NewJob("ns-engine", "q8", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -176,7 +176,7 @@ func TestEngine_BatchConsume(t *testing.T) {
// Consume some jobs
jobIDMap := map[string]bool{}
for i := 0; i < 4; i++ {
j := engine.NewJob("ns-engine", "q8", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q8", body, nil, 10, 0, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestEngine_BatchConsume(t *testing.T) {
func TestEngine_DeadLetter_Size(t *testing.T) {
body := []byte("hello msg 9")
queues := []string{"q9"}
j := engine.NewJob("ns-engine", "q9", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q9", body, nil, 10, 0, 1, "")
jobID, err := OldRedisEngine.Publish(j)
job, err := OldRedisEngine.Consume("ns-engine", queues, 0, 0)
if err != nil {
Expand All @@ -232,7 +232,7 @@ func TestEngine_DeadLetter_Size(t *testing.T) {
if job.ID() != jobID {
t.Fatal("Mismatched job")
}
j = engine.NewJob("ns-engine", "q9", body, 10, 0, 1, "")
j = engine.NewJob("ns-engine", "q9", body, nil, 10, 0, 1, "")
jobID, err = NewRedisEngine.Publish(j)
job, err = NewRedisEngine.Consume("ns-engine", queues, 0, 0)
if job.ID() != jobID {
Expand All @@ -250,7 +250,7 @@ func TestEngine_PublishWithJobID(t *testing.T) {
e := NewEngine(OldRedisEngine, NewRedisEngine)
body := []byte("hello msg 1")
// Publish no-delay job
j := engine.NewJob("ns-engine", "q10", body, 10, 0, 1, "jobID1")
j := engine.NewJob("ns-engine", "q10", body, nil, 10, 0, 1, "jobID1")
jobID, err := e.Publish(j)
t.Log(jobID)
assert.Nil(t, err)
Expand Down
8 changes: 4 additions & 4 deletions engine/redis/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func TestDeadLetter_Delete(t *testing.T) {

func TestDeadLetter_Respawn(t *testing.T) {
p := NewPool(R)
job1 := engine.NewJob("ns-dead", "q3", []byte("1"), 60, 0, 1, "")
job2 := engine.NewJob("ns-dead", "q3", []byte("2"), 60, 0, 1, "")
job3 := engine.NewJob("ns-dead", "q3", []byte("3"), 60, 0, 1, "")
job1 := engine.NewJob("ns-dead", "q3", []byte("1"), nil, 60, 0, 1, "")
job2 := engine.NewJob("ns-dead", "q3", []byte("2"), nil, 60, 0, 1, "")
job3 := engine.NewJob("ns-dead", "q3", []byte("3"), nil, 60, 0, 1, "")
p.Add(job1)
p.Add(job2)
p.Add(job3)
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestDeadLetter_Size(t *testing.T) {
dl, _ := NewDeadLetter("ns-dead", "q3", R)
cnt := 3
for i := 0; i < cnt; i++ {
job := engine.NewJob("ns-dead", "q3", []byte("1"), 60, 0, 1, "")
job := engine.NewJob("ns-dead", "q3", []byte("1"), nil, 60, 0, 1, "")
p.Add(job)
dl.Add(job.ID())
}
Expand Down
12 changes: 6 additions & 6 deletions engine/redis/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (e *Engine) consumeMulti(namespace string, queues []string, ttrSecond, time
return nil, nil
}
endTime := time.Now().Unix()
body, ttl, err := e.pool.Get(namespace, queueName.Queue, jobID)
payload, ttl, err := e.pool.Get(namespace, queueName.Queue, jobID)
switch err {
case nil:
// no-op
Expand All @@ -177,7 +177,7 @@ func (e *Engine) consumeMulti(namespace string, queues []string, ttrSecond, time
default:
return nil, fmt.Errorf("pool: %s", err)
}
job = engine.NewJobWithID(namespace, queueName.Queue, body, ttl, tries, jobID)
job = engine.NewJobWithID(namespace, queueName.Queue, payload.Body, payload.Attributes, ttl, tries, jobID)
metrics.jobElapsedMS.WithLabelValues(e.redis.Name, namespace, queueName.Queue).Observe(float64(job.ElapsedMS()))
return job, nil
}
Expand Down Expand Up @@ -207,19 +207,19 @@ func (e *Engine) Peek(namespace, queue, optionalJobID string) (job engine.Job, e
return nil, fmt.Errorf("failed to peek queue: %s", err)
}
}
body, ttl, err := e.pool.Get(namespace, queue, jobID)
payload, ttl, err := e.pool.Get(namespace, queue, jobID)
// Tricky: we shouldn't return the not found error when the job was not found,
// since the job may expired(TTL was reached) and it would confuse the user, so
// we return the nil job instead of the not found error here. But if the `optionalJobID`
// was assigned we should return the not fond error.
if optionalJobID == "" && err == engine.ErrNotFound {
if optionalJobID == "" && errors.Is(err, engine.ErrNotFound) {
// return jobID with nil body if the job is expired
return engine.NewJobWithID(namespace, queue, nil, 0, 0, jobID), nil
return engine.NewJobWithID(namespace, queue, nil, nil, 0, 0, jobID), nil
}
if err != nil {
return nil, err
}
return engine.NewJobWithID(namespace, queue, body, ttl, tries, jobID), err
return engine.NewJobWithID(namespace, queue, payload.Body, payload.Attributes, ttl, tries, jobID), err
}

func (e *Engine) Size(namespace, queue string) (size int64, err error) {
Expand Down
24 changes: 12 additions & 12 deletions engine/redis/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ func TestEngine_Publish(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 1")
j := engine.NewJob("ns-engine", "q0", body, 10, 2, 1, "")
j := engine.NewJob("ns-engine", "q0", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
}

// Publish no-delay job
j = engine.NewJob("ns-engine", "q0", body, 10, 0, 1, "")
j = engine.NewJob("ns-engine", "q0", body, nil, 10, 0, 1, "")
jobID, err = e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -40,7 +40,7 @@ func TestEngine_Consume(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 2")
j := engine.NewJob("ns-engine", "q2", body, 10, 2, 1, "")
j := engine.NewJob("ns-engine", "q2", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -58,7 +58,7 @@ func TestEngine_Consume(t *testing.T) {
}

// Consume job that's published in no-delay way
j = engine.NewJob("ns-engine", "q2", body, 10, 0, 1, "")
j = engine.NewJob("ns-engine", "q2", body, nil, 10, 0, 1, "")
jobID, err = e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -81,9 +81,9 @@ func TestEngine_Consume2(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 3")
j := engine.NewJob("ns-engine", "q3", []byte("delay msg"), 10, 5, 1, "")
j := engine.NewJob("ns-engine", "q3", []byte("delay msg"), nil, 10, 5, 1, "")
_, err = e.Publish(j)
j = engine.NewJob("ns-engine", "q3", body, 10, 2, 1, "")
j = engine.NewJob("ns-engine", "q3", body, nil, 10, 2, 1, "")
jobID, err := e.Publish(j)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -107,12 +107,12 @@ func TestEngine_ConsumeMulti(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 4")
j := engine.NewJob("ns-engine", "q4", body, 10, 3, 1, "")
j := engine.NewJob("ns-engine", "q4", body, nil, 10, 3, 1, "")
jobID, err := e.Publish(j)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
}
j2 := engine.NewJob("ns-engine", "q5", body, 10, 1, 1, "")
j2 := engine.NewJob("ns-engine", "q5", body, nil, 10, 1, 1, "")
jobID2, err := e.Publish(j2)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestEngine_Peek(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 6")
j := engine.NewJob("ns-engine", "q6", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q6", body, nil, 10, 0, 1, "")
jobID, err := e.Publish(j)
if err != nil {
t.Fatalf("Failed to publish: %s", err)
Expand All @@ -173,7 +173,7 @@ func TestEngine_BatchConsume(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 7")
j := engine.NewJob("ns-engine", "q7", body, 10, 3, 1, "")
j := engine.NewJob("ns-engine", "q7", body, nil, 10, 3, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
Expand All @@ -199,7 +199,7 @@ func TestEngine_BatchConsume(t *testing.T) {
// Consume some jobs
jobIDMap := map[string]bool{}
for i := 0; i < 4; i++ {
j := engine.NewJob("ns-engine", "q7", body, 10, 0, 1, "")
j := engine.NewJob("ns-engine", "q7", body, nil, 10, 0, 1, "")
jobID, err := e.Publish(j)
t.Log(jobID)
if err != nil {
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestEngine_PublishWithJobID(t *testing.T) {
}
defer e.Shutdown()
body := []byte("hello msg 1")
j := engine.NewJob("ns-engine", "q8", body, 10, 0, 1, "jobID1")
j := engine.NewJob("ns-engine", "q8", body, nil, 10, 0, 1, "jobID1")
jobID, err := e.Publish(j)
t.Log(jobID)
assert.Nil(t, err)
Expand Down
Loading

0 comments on commit 6694ab6

Please sign in to comment.