Skip to content

Commit

Permalink
Improve performance of enqueueing tasks (#946)
Browse files Browse the repository at this point in the history
* Improve performance of enqueueing tasks

Add an in-memory cache to keep track of all the queues. Use this cache
to avoid sending an SADD since after the first call, that extra network
call isn't necessary.

The cache will expire every 10 secs so for cases where the queue is
deleted from asynq:queues set, it can be added again next time a task is
enqueued to it.

* Use sync.Map to simplify the conditional SADD

* Cleanup queuePublished in RemoveQueue

---------

Co-authored-by: Yousif <753751+yousifh@users.noreply.github.com>
  • Loading branch information
pior and yousifh authored Oct 30, 2024
1 parent 02c6dae commit 3dbda60
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 15 deletions.
3 changes: 2 additions & 1 deletion internal/rdb/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"strings"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/redis/go-redis/v9"
"github.com/spf13/cast"
)

Expand Down Expand Up @@ -1832,6 +1832,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
if err := r.client.SRem(context.Background(), base.AllQueues, qname).Err(); err != nil {
return errors.E(op, errors.Unknown, err)
}
r.queuesPublished.Delete(qname)
return nil
case -1:
return errors.E(op, errors.NotFound, &errors.QueueNotEmptyError{Queue: qname})
Expand Down
48 changes: 34 additions & 14 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/google/uuid"
Expand All @@ -26,8 +27,9 @@ const LeaseDuration = 30 * time.Second

// RDB is a client interface to query and mutate task queues.
type RDB struct {
client redis.UniversalClient
clock timeutil.Clock
client redis.UniversalClient
clock timeutil.Clock
queuesPublished sync.Map
}

// NewRDB returns a new instance of RDB.
Expand Down Expand Up @@ -112,8 +114,11 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
if _, found := r.queuesPublished.Load(msg.Queue); !found {
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
r.queuesPublished.Store(msg.Queue, true)
}
keys := []string{
base.TaskKey(msg.Queue, msg.ID),
Expand Down Expand Up @@ -174,8 +179,11 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
if err != nil {
return errors.E(op, errors.Internal, "cannot encode task message: %v", err)
}
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
if _, found := r.queuesPublished.Load(msg.Queue); !found {
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
r.queuesPublished.Store(msg.Queue, true)
}
keys := []string{
msg.UniqueKey,
Expand Down Expand Up @@ -529,8 +537,11 @@ func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey st
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
if _, found := r.queuesPublished.Load(msg.Queue); !found {
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
r.queuesPublished.Store(msg.Queue, true)
}
keys := []string{
base.TaskKey(msg.Queue, msg.ID),
Expand Down Expand Up @@ -591,8 +602,11 @@ func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, group
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
if _, found := r.queuesPublished.Load(msg.Queue); !found {
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
r.queuesPublished.Store(msg.Queue, true)
}
keys := []string{
base.TaskKey(msg.Queue, msg.ID),
Expand Down Expand Up @@ -648,8 +662,11 @@ func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt tim
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
if _, found := r.queuesPublished.Load(msg.Queue); !found {
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
r.queuesPublished.Store(msg.Queue, true)
}
keys := []string{
base.TaskKey(msg.Queue, msg.ID),
Expand Down Expand Up @@ -707,8 +724,11 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process
if err != nil {
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode task message: %v", err))
}
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
if _, found := r.queuesPublished.Load(msg.Queue); !found {
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
}
r.queuesPublished.Store(msg.Queue, true)
}
keys := []string{
msg.UniqueKey,
Expand Down
53 changes: 53 additions & 0 deletions internal/rdb/rdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,59 @@ func TestEnqueueTaskIdConflictError(t *testing.T) {
}
}

func TestEnqueueQueueCache(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessageWithQueue("sync1", nil, "low")

enqueueTime := time.Now()
clock := timeutil.NewSimulatedClock(enqueueTime)
r.SetClock(clock)

err := r.Enqueue(context.Background(), t1)
if err != nil {
t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err)
}

// Check queue is in the AllQueues set.
if !r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
t.Fatalf("%q is not a member of SET %q", t1.Queue, base.AllQueues)
}

if _, ok := r.queuesPublished.Load(t1.Queue); !ok {
t.Fatalf("%q is not cached in queuesPublished", t1.Queue)
}

t.Run("remove-queue", func(t *testing.T) {
err := r.RemoveQueue(t1.Queue, true)
if err != nil {
t.Errorf("(*RDB).RemoveQueue(%q, %t) = %v, want nil", t1.Queue, true, err)
}

if _, ok := r.queuesPublished.Load(t1.Queue); ok {
t.Fatalf("%q is still cached in queuesPublished", t1.Queue)
}

if r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
t.Fatalf("%q is a member of SET %q", t1.Queue, base.AllQueues)
}

err = r.Enqueue(context.Background(), t1)
if err != nil {
t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err)
}

// Check queue is in the AllQueues set.
if !r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
t.Fatalf("%q is not a member of SET %q", t1.Queue, base.AllQueues)
}

if _, ok := r.queuesPublished.Load(t1.Queue); !ok {
t.Fatalf("%q is not cached in queuesPublished", t1.Queue)
}
})
}

func TestEnqueueUnique(t *testing.T) {
r := setup(t)
defer r.Close()
Expand Down

0 comments on commit 3dbda60

Please sign in to comment.