From 3dbda603333da7c47449e3c1fc14f3c681ac58a3 Mon Sep 17 00:00:00 2001 From: Pior Bastida Date: Wed, 30 Oct 2024 06:25:35 +0100 Subject: [PATCH] Improve performance of enqueueing tasks (#946) * Improve performance of enqueueing tasks Add an in-memory cache to keep track of all the queues. Use this cache to avoid sending an SADD since after the first call, that extra network call isn't necessary. The cache will expire every 10 secs so for cases where the queue is deleted from asynq:queues set, it can be added again next time a task is enqueued to it. * Use sync.Map to simplify the conditional SADD * Cleanup queuePublished in RemoveQueue --------- Co-authored-by: Yousif <753751+yousifh@users.noreply.github.com> --- internal/rdb/inspect.go | 3 ++- internal/rdb/rdb.go | 48 +++++++++++++++++++++++++----------- internal/rdb/rdb_test.go | 53 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 15 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index cbaf4bfd1..a18c4e298 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -10,9 +10,9 @@ import ( "strings" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" + "github.com/redis/go-redis/v9" "github.com/spf13/cast" ) @@ -1832,6 +1832,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { if err := r.client.SRem(context.Background(), base.AllQueues, qname).Err(); err != nil { return errors.E(op, errors.Unknown, err) } + r.queuesPublished.Delete(qname) return nil case -1: return errors.E(op, errors.NotFound, &errors.QueueNotEmptyError{Queue: qname}) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 74c140ed4..e358b015a 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "math" + "sync" "time" "github.com/google/uuid" @@ -26,8 +27,9 @@ const LeaseDuration = 30 * time.Second // RDB is a client interface to query and mutate task queues. type RDB struct { - client redis.UniversalClient - clock timeutil.Clock + client redis.UniversalClient + clock timeutil.Clock + queuesPublished sync.Map } // NewRDB returns a new instance of RDB. @@ -112,8 +114,11 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error { if err != nil { return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, found := r.queuesPublished.Load(msg.Queue); !found { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.queuesPublished.Store(msg.Queue, true) } keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -174,8 +179,11 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time if err != nil { return errors.E(op, errors.Internal, "cannot encode task message: %v", err) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, found := r.queuesPublished.Load(msg.Queue); !found { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.queuesPublished.Store(msg.Queue, true) } keys := []string{ msg.UniqueKey, @@ -529,8 +537,11 @@ func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey st if err != nil { return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, found := r.queuesPublished.Load(msg.Queue); !found { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.queuesPublished.Store(msg.Queue, true) } keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -591,8 +602,11 @@ func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, group if err != nil { return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, found := r.queuesPublished.Load(msg.Queue); !found { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.queuesPublished.Store(msg.Queue, true) } keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -648,8 +662,11 @@ func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt tim if err != nil { return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, found := r.queuesPublished.Load(msg.Queue); !found { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.queuesPublished.Store(msg.Queue, true) } keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -707,8 +724,11 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process if err != nil { return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode task message: %v", err)) } - if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { - return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + if _, found := r.queuesPublished.Load(msg.Queue); !found { + if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) + } + r.queuesPublished.Store(msg.Queue, true) } keys := []string{ msg.UniqueKey, diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 433d6fbcd..5249a29a6 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -160,6 +160,59 @@ func TestEnqueueTaskIdConflictError(t *testing.T) { } } +func TestEnqueueQueueCache(t *testing.T) { + r := setup(t) + defer r.Close() + t1 := h.NewTaskMessageWithQueue("sync1", nil, "low") + + enqueueTime := time.Now() + clock := timeutil.NewSimulatedClock(enqueueTime) + r.SetClock(clock) + + err := r.Enqueue(context.Background(), t1) + if err != nil { + t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err) + } + + // Check queue is in the AllQueues set. + if !r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() { + t.Fatalf("%q is not a member of SET %q", t1.Queue, base.AllQueues) + } + + if _, ok := r.queuesPublished.Load(t1.Queue); !ok { + t.Fatalf("%q is not cached in queuesPublished", t1.Queue) + } + + t.Run("remove-queue", func(t *testing.T) { + err := r.RemoveQueue(t1.Queue, true) + if err != nil { + t.Errorf("(*RDB).RemoveQueue(%q, %t) = %v, want nil", t1.Queue, true, err) + } + + if _, ok := r.queuesPublished.Load(t1.Queue); ok { + t.Fatalf("%q is still cached in queuesPublished", t1.Queue) + } + + if r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() { + t.Fatalf("%q is a member of SET %q", t1.Queue, base.AllQueues) + } + + err = r.Enqueue(context.Background(), t1) + if err != nil { + t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err) + } + + // Check queue is in the AllQueues set. + if !r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() { + t.Fatalf("%q is not a member of SET %q", t1.Queue, base.AllQueues) + } + + if _, ok := r.queuesPublished.Load(t1.Queue); !ok { + t.Fatalf("%q is not cached in queuesPublished", t1.Queue) + } + }) +} + func TestEnqueueUnique(t *testing.T) { r := setup(t) defer r.Close()