From 5f75da9b781c0f06038eb806659c0ce3a03f2e2e Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 13 Nov 2024 09:35:46 +0100 Subject: [PATCH] feat(blooms): disk-backed queue for the bloom-planner (#14874) Co-authored-by: Christian Haudum (cherry picked from commit b646861e770e6903538bb22dd551686aaeb7349d) --- docs/sources/shared/configuration.md | 12 ++ pkg/bloombuild/planner/planner.go | 41 ++-- pkg/bloombuild/planner/planner_test.go | 3 +- pkg/bloombuild/planner/queue/config.go | 13 +- pkg/bloombuild/planner/queue/queue.go | 195 ++++++++++++++++--- pkg/bloombuild/planner/queue/queue_test.go | 211 +++++++++++++++++++++ pkg/bloombuild/planner/task.go | 33 ++-- pkg/bloombuild/protos/compat.go | 4 + 8 files changed, 445 insertions(+), 67 deletions(-) create mode 100644 pkg/bloombuild/planner/queue/queue_test.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 8aef496adafd5..cf9bb386c1e1d 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1287,6 +1287,18 @@ planner: # CLI flag: -bloom-build.planner.queue.max-tasks-per-tenant [max_queued_tasks_per_tenant: | default = 30000] + # Whether to store tasks on disk. + # CLI flag: -bloom-build.planner.queue.store-tasks-on-disk + [store_tasks_on_disk: | default = false] + + # Directory to store tasks on disk. + # CLI flag: -bloom-build.planner.queue.tasks-disk-directory + [tasks_disk_directory: | default = "/tmp/bloom-planner-queue"] + + # Whether to clean the tasks directory on startup. + # CLI flag: -bloom-build.planner.queue.clean-tasks-directory + [clean_tasks_directory: | default = false] + builder: # The grpc_client block configures the gRPC client used to communicate between # a client and server component in Loki. diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 783e32cd2579a..9ef59a9b7f855 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -80,7 +80,7 @@ func New( // Queue to manage tasks queueMetrics := queue.NewMetrics(r, metricsNamespace, metricsSubsystem) queueLimits := NewQueueLimits(limits) - tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics) + tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics, storageMetrics) if err != nil { return nil, fmt.Errorf("error creating tasks queue: %w", err) } @@ -280,7 +280,8 @@ func (p *Planner) runOne(ctx context.Context) error { now := time.Now() for _, task := range tasks { - queueTask := NewQueueTask(ctx, now, task, resultsCh) + protoTask := task.ToProtoTask() + queueTask := NewQueueTask(ctx, now, protoTask, resultsCh) if err := p.enqueueTask(queueTask); err != nil { level.Error(logger).Log("msg", "error enqueuing task", "err", err) continue @@ -703,7 +704,7 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli } func (p *Planner) enqueueTask(task *QueueTask) error { - return p.tasksQueue.Enqueue(task.Tenant(), task, func() { + return p.tasksQueue.Enqueue(task.ProtoTask, task.TaskMeta, func() { task.timesEnqueued.Add(1) }) } @@ -738,7 +739,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer lastIndex := queue.StartIndex for p.isRunningOrStopping() { - item, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID) + protoTask, meta, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID) if err != nil { if errors.Is(err, queue.ErrStopped) { // Planner is stopping, break the loop and return @@ -748,12 +749,16 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer } lastIndex = idx - if item == nil { + if protoTask == nil { return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID) } - task := item.(*QueueTask) - logger := log.With(logger, "task", task.ID()) + task := &QueueTask{ + ProtoTask: protoTask, + TaskMeta: meta.(*TaskMeta), + } + + logger := log.With(logger, "task", task.Id) queueTime := time.Since(task.queueTime) p.metrics.queueDuration.Observe(queueTime.Seconds()) @@ -761,15 +766,15 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer if task.ctx.Err() != nil { level.Warn(logger).Log("msg", "task context done after dequeue", "err", task.ctx.Err()) lastIndex = lastIndex.ReuseLastIndex() - p.tasksQueue.Release(task) + p.tasksQueue.Release(task.ProtoTask) continue } result, err := p.forwardTaskToBuilder(builder, builderID, task) if err != nil { - maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant()) + maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant) if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries { - p.tasksQueue.Release(task) + p.tasksQueue.Release(task.ProtoTask) level.Error(logger).Log( "msg", "task failed after max retries", "retries", task.timesEnqueued.Load(), @@ -777,7 +782,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer "err", err, ) task.resultsChannel <- &protos.TaskResult{ - TaskID: task.ID(), + TaskID: task.Id, Error: fmt.Errorf("task failed after max retries (%d): %w", maxRetries, err), } continue @@ -786,10 +791,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer // Re-queue the task if the builder is failing to process the tasks if err := p.enqueueTask(task); err != nil { p.metrics.taskLost.Inc() - p.tasksQueue.Release(task) + p.tasksQueue.Release(task.ProtoTask) level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err) task.resultsChannel <- &protos.TaskResult{ - TaskID: task.ID(), + TaskID: task.Id, Error: fmt.Errorf("error re-enqueuing task: %w", err), } continue @@ -809,7 +814,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer "duration", time.Since(task.queueTime).Seconds(), "retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry ) - p.tasksQueue.Release(task) + p.tasksQueue.Release(task.ProtoTask) // Send the result back to the task. The channel is buffered, so this should not block. task.resultsChannel <- result @@ -824,7 +829,7 @@ func (p *Planner) forwardTaskToBuilder( task *QueueTask, ) (*protos.TaskResult, error) { msg := &protos.PlannerToBuilder{ - Task: task.ToProtoTask(), + Task: task.ProtoTask, } if err := builder.Send(msg); err != nil { @@ -846,7 +851,7 @@ func (p *Planner) forwardTaskToBuilder( }() timeout := make(<-chan time.Time) - taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant()) + taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant) if taskTimeout != 0 { // If the timeout is not 0 (disabled), configure it timeout = time.After(taskTimeout) @@ -886,8 +891,8 @@ func (p *Planner) receiveResultFromBuilder( if err != nil { return nil, fmt.Errorf("error processing task result in builder (%s): %w", builderID, err) } - if result.TaskID != task.ID() { - return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.ID()) + if result.TaskID != task.Id { + return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.Id) } return result, nil diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index e4a382cb55a32..a83e93f28f545 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" "google.golang.org/grpc" @@ -725,7 +726,7 @@ func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask { for i := 0; i < n; i++ { task := NewQueueTask( context.Background(), time.Now(), - protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil), + protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(model.Fingerprint(i), model.Fingerprint(i+10)), plannertest.TsdbID(1), nil).ToProtoTask(), resultsCh, ) tasks = append(tasks, task) diff --git a/pkg/bloombuild/planner/queue/config.go b/pkg/bloombuild/planner/queue/config.go index e5fa151063e10..b7d6054d31766 100644 --- a/pkg/bloombuild/planner/queue/config.go +++ b/pkg/bloombuild/planner/queue/config.go @@ -2,20 +2,31 @@ package queue import ( "flag" + "fmt" "github.com/grafana/loki/v3/pkg/queue" ) type Config struct { - MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"` + MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"` + StoreTasksOnDisk bool `yaml:"store_tasks_on_disk"` + TasksDiskDirectory string `yaml:"tasks_disk_directory"` + CleanTasksDirectory bool `yaml:"clean_tasks_directory"` } // RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.") + f.BoolVar(&cfg.StoreTasksOnDisk, prefix+".store-tasks-on-disk", false, "Whether to store tasks on disk.") + f.StringVar(&cfg.TasksDiskDirectory, prefix+".tasks-disk-directory", "/tmp/bloom-planner-queue", "Directory to store tasks on disk.") + f.BoolVar(&cfg.CleanTasksDirectory, prefix+".clean-tasks-directory", false, "Whether to clean the tasks directory on startup.") } func (cfg *Config) Validate() error { + if cfg.StoreTasksOnDisk && cfg.TasksDiskDirectory == "" { + return fmt.Errorf("tasks_disk_directory must be set when store_tasks_on_disk is true") + } + return nil } diff --git a/pkg/bloombuild/planner/queue/queue.go b/pkg/bloombuild/planner/queue/queue.go index 0f6542d1c8ddf..b36be5630688a 100644 --- a/pkg/bloombuild/planner/queue/queue.go +++ b/pkg/bloombuild/planner/queue/queue.go @@ -1,46 +1,76 @@ package queue import ( + "bytes" "context" "fmt" + "io" "path/filepath" "sync" "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/gogo/protobuf/proto" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" "github.com/grafana/loki/v3/pkg/queue" + "github.com/grafana/loki/v3/pkg/storage" + "github.com/grafana/loki/v3/pkg/storage/chunk/client" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" + "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/util" ) -type Task interface { - Tenant() string - Table() string - ID() string -} - // Queue is a wrapper of queue.RequestQueue that uses the file system to store the pending tasks. -// When a task is enqueued, it's stored in the file system and recorded ad pending. -// When it's dequeued, it's removed from the queue but kept in FS until removed. +// The queue also allows to store metadata with the task. This metadata can be anything. Metadata is stored in memory. +// When a task is enqueued (Enqueue), it's stored in the file system and recorded as pending. +// When it's dequeued (Dequeue), it's removed from the queue but kept in FS until released (Release). +// +// TODO(salvacorts): In the future we may reuse this queue to store any proto message. We would need to use generics for that. type Queue struct { services.Service + // queue is the in-memory queue where the tasks are stored. + // If cfg.StoreTasksOnDisk is false, we store the whole task here and the metadata. + // Otherwise, we store the task path and the metadata. queue *queue.RequestQueue // pendingTasks is a map of task ID to the file where the task is stored. + // If cfg.StoreTasksOnDisk is false, the value is empty. pendingTasks sync.Map activeUsers *util.ActiveUsersCleanupService cfg Config logger log.Logger + disk client.ObjectClient // Subservices manager. subservices *services.Manager subservicesWatcher *services.FailureWatcher } -func NewQueue(logger log.Logger, cfg Config, limits Limits, metrics *Metrics) (*Queue, error) { +func NewQueue( + logger log.Logger, + cfg Config, + limits Limits, + metrics *Metrics, + storeMetrics storage.ClientMetrics, +) (*Queue, error) { + // Configure the filesystem client if we are storing tasks on disk. + var diskClient client.ObjectClient + if cfg.StoreTasksOnDisk { + storeCfg := storage.Config{ + FSConfig: local.FSConfig{Directory: cfg.TasksDiskDirectory}, + } + var err error + diskClient, err = storage.NewObjectClient(types.StorageTypeFileSystem, "bloom-planner-queue", storeCfg, storeMetrics) + if err != nil { + return nil, fmt.Errorf("failed to create object client: %w", err) + } + } + tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, limits, metrics) // Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour @@ -61,6 +91,7 @@ func NewQueue(logger log.Logger, cfg Config, limits Limits, metrics *Metrics) (* activeUsers: activeUsers, cfg: cfg, logger: logger, + disk: diskClient, subservices: subservices, subservicesWatcher: subservicesWatcher, } @@ -71,6 +102,15 @@ func NewQueue(logger log.Logger, cfg Config, limits Limits, metrics *Metrics) (* } func (q *Queue) starting(ctx context.Context) error { + // TODO(salvacorts): We do not recover the queue from the disk yet. + // Until then, we just remove all the files in the directory so the disk + // doesn't grow indefinitely. + if q.cfg.StoreTasksOnDisk && q.cfg.CleanTasksDirectory { + if err := q.cleanTasksOnDisk(ctx); err != nil { + return fmt.Errorf("failed to clean tasks on disk during startup: %w", err) + } + } + if err := services.StartManagerAndAwaitHealthy(ctx, q.subservices); err != nil { return fmt.Errorf("failed to start task queue subservices: %w", err) } @@ -78,6 +118,21 @@ func (q *Queue) starting(ctx context.Context) error { return nil } +func (q *Queue) cleanTasksOnDisk(ctx context.Context) error { + objects, _, err := q.disk.List(ctx, "", "") + if err != nil { + return fmt.Errorf("failed to list tasks: %w", err) + } + + for _, o := range objects { + if err = q.disk.DeleteObject(ctx, o.Key); err != nil { + return fmt.Errorf("failed to delete task (%s)", o.Key) + } + } + + return nil +} + func (q *Queue) stopping(_ error) error { if err := services.StopManagerAndAwaitStopped(context.Background(), q.subservices); err != nil { return fmt.Errorf("failed to stop task queue subservices: %w", err) @@ -101,20 +156,38 @@ func (q *Queue) UnregisterConsumerConnection(consumer string) { q.queue.UnregisterConsumerConnection(consumer) } +type metaWithPath struct { + taskPath string + metadata any +} + +type metaWithTask struct { + task *protos.ProtoTask + metadata any +} + // Enqueue adds a task to the queue. -func (q *Queue) Enqueue(tenant string, task Task, successFn func()) error { +// The task is enqueued only if it doesn't already exist in the queue. +func (q *Queue) Enqueue(task *protos.ProtoTask, metadata any, successFn func()) error { + tenant := task.Tenant + q.activeUsers.UpdateUserTimestamp(tenant, time.Now()) - return q.queue.Enqueue(tenant, nil, task, func() { - taskPath := getTaskPath(task) - _, existed := q.pendingTasks.LoadOrStore(task.ID(), taskPath) - if existed { - // Task already exists, so it's already in the FS - return + + var taskPath string + var enqueuedValue any = metaWithTask{task: task, metadata: metadata} + if q.cfg.StoreTasksOnDisk { + taskPath = getTaskPath(task) + if err := q.writeTask(task, taskPath); err != nil { + return err } - // TODO: Write to FS - _ = taskPath + // If we're storing tasks on disk, we don't want to store the task in the queue but just the path (and the metadata) + enqueuedValue = metaWithPath{taskPath: taskPath, metadata: metadata} + } + return q.queue.Enqueue(tenant, nil, enqueuedValue, func() { + // Note: If we are storing tasks in-mem, taskPath is empty + q.pendingTasks.Store(task.Id, taskPath) if successFn != nil { successFn() } @@ -122,26 +195,92 @@ func (q *Queue) Enqueue(tenant string, task Task, successFn func()) error { } // Dequeue takes a task from the queue. The task is not removed from the filesystem until Release is called. -func (q *Queue) Dequeue(ctx context.Context, last Index, consumerID string) (Task, Index, error) { +func (q *Queue) Dequeue(ctx context.Context, last Index, consumerID string) (*protos.ProtoTask, any, Index, error) { item, idx, err := q.queue.Dequeue(ctx, last, consumerID) if err != nil { - return nil, idx, err + return nil, nil, idx, err + } + + if !q.cfg.StoreTasksOnDisk { + val := item.(metaWithTask) + return val.task, val.metadata, idx, nil + } + + // Read task from disk + meta := item.(metaWithPath) + task, err := q.readTask(meta.taskPath) + if err != nil { + return nil, nil, idx, err } - return item.(Task), idx, nil + return task, meta.metadata, idx, nil } // Release removes a task from the filesystem. // Dequeue should be called before Remove. -func (q *Queue) Release(task Task) { - taskPath, existed := q.pendingTasks.LoadAndDelete(task.ID()) +func (q *Queue) Release(task *protos.ProtoTask) { + val, existed := q.pendingTasks.LoadAndDelete(task.Id) if !existed { // Task doesn't exist, so it's not in the FS return } - // TODO: Remove from FS - _ = taskPath + if !q.cfg.StoreTasksOnDisk { + return + } + + // Remove task from FS + taskPath := val.(string) + if err := q.deleteTask(taskPath); err != nil { + level.Error(q.logger).Log("msg", "failed to remove task from disk", "task", task.Id, "err", err) + } +} + +func (q *Queue) writeTask(task *protos.ProtoTask, taskPath string) error { + // Do not write the task if it's already pending. I.e. was already enqueued thus written to disk. + if _, exists := q.pendingTasks.Load(task.Id); exists { + return nil + } + + taskSer, err := proto.Marshal(task) + if err != nil { + q.logger.Log("msg", "failed to serialize task", "task", task.Id, "err", err) + return fmt.Errorf("failed to serialize task: %w", err) + } + + reader := bytes.NewReader(taskSer) + if err = q.disk.PutObject(context.Background(), taskPath, reader); err != nil { + q.logger.Log("msg", "failed to write task to disk", "task", task.Id, "err", err) + return fmt.Errorf("failed to write task to disk: %w", err) + } + + return nil +} + +func (q *Queue) readTask(taskPath string) (*protos.ProtoTask, error) { + reader, _, err := q.disk.GetObject(context.Background(), taskPath) + if err != nil { + return nil, fmt.Errorf("failed to read task from disk: %w", err) + } + + taskSer, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to read task from disk: %w", err) + } + + task := &protos.ProtoTask{} + if err = proto.Unmarshal(taskSer, task); err != nil { + return nil, fmt.Errorf("failed to deserialize task: %w", err) + } + + return task, nil +} + +func (q *Queue) deleteTask(taskPath string) error { + if err := q.disk.DeleteObject(context.Background(), taskPath); err != nil { + return fmt.Errorf("failed to delete task from disk: %w", err) + } + return nil } func (q *Queue) TotalPending() (total int) { @@ -152,8 +291,10 @@ func (q *Queue) TotalPending() (total int) { return total } -func getTaskPath(task Task) string { - return filepath.Join("tasks", task.Tenant(), task.Table(), task.ID()) +func getTaskPath(task *protos.ProtoTask) string { + table := protos.FromProtoDayTableToDayTable(task.Table) + taskFile := task.Id + ".protobuf" + return filepath.Join("tasks", task.Tenant, table.String(), taskFile) } // The following are aliases for the queue package types. diff --git a/pkg/bloombuild/planner/queue/queue_test.go b/pkg/bloombuild/planner/queue/queue_test.go new file mode 100644 index 0000000000000..9947ac9381ab1 --- /dev/null +++ b/pkg/bloombuild/planner/queue/queue_test.go @@ -0,0 +1,211 @@ +package queue + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/log" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" + "github.com/grafana/loki/v3/pkg/storage" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" +) + +type taskMeta struct { + stat1 int + stat2 string +} + +type taskWithMeta struct { + *protos.ProtoTask + *taskMeta +} + +func createTasks(n int) []*taskWithMeta { + tasks := make([]*taskWithMeta, 0, n) + // Enqueue tasks + for i := 0; i < n; i++ { + task := &taskWithMeta{ + ProtoTask: protos.NewTask( + config.NewDayTable(plannertest.TestDay, "fake"), + "fakeTenant", + v1.NewBounds(model.Fingerprint(i), model.Fingerprint(i+10)), + plannertest.TsdbID(1), + []protos.Gap{ + { + Bounds: v1.NewBounds(0, 10), + Series: plannertest.GenSeries(v1.NewBounds(0, 10)), + Blocks: []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 5), + plannertest.GenBlockRef(6, 10), + }, + }, + }, + ).ToProtoTask(), + taskMeta: &taskMeta{stat1: i, stat2: fmt.Sprintf("task-%d", i)}, + } + tasks = append(tasks, task) + } + return tasks +} + +func TestQueue(t *testing.T) { + for _, tc := range []struct { + name string + useDisk bool + }{ + { + name: "in-memory", + useDisk: false, + }, + { + name: "on-disk", + useDisk: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + logger := log.NewNopLogger() + //logger := log.NewLogfmtLogger(os.Stdout) + + taskPath := t.TempDir() + count, err := filesInDir(taskPath) + require.NoError(t, err) + require.Equal(t, 0, count) + + // Create 10 random files that should be deleted on startup + if tc.useDisk { + createFiles(taskPath, 10) + } + + clientMetrics := storage.NewClientMetrics() + defer clientMetrics.Unregister() + queueMetrics := NewMetrics(prometheus.NewPedanticRegistry(), "test", "queue") + cfg := Config{ + MaxQueuedTasksPerTenant: 1000, + StoreTasksOnDisk: tc.useDisk, + TasksDiskDirectory: taskPath, + CleanTasksDirectory: true, + } + + queue, err := NewQueue(logger, cfg, fakeLimits{}, queueMetrics, clientMetrics) + require.NoError(t, err) + + err = services.StartAndAwaitRunning(context.Background(), queue) + require.NoError(t, err) + + // Previously written files should be deleted + if tc.useDisk { + count, err = filesInDir(taskPath) + require.NoError(t, err) + require.Equal(t, 0, count) + } + + const consumer = "fakeConsumer" + queue.RegisterConsumerConnection(consumer) + defer queue.UnregisterConsumerConnection(consumer) + + // Write some tasks to the queue + tasks := createTasks(10) + for _, task := range tasks { + err = queue.Enqueue(task.ProtoTask, task.taskMeta, nil) + require.NoError(t, err) + } + + // There should be 10 task pending + require.Equal(t, len(tasks), queue.TotalPending()) + count, err = filesInDir(taskPath) + require.NoError(t, err) + if tc.useDisk { + require.Equal(t, len(tasks), count) + } else { + require.Equal(t, 0, count) + } + + idx := StartIndex + const nDequeue = 5 + var dequeuedTasks []*taskWithMeta + for i := 0; i < nDequeue; i++ { + var task *protos.ProtoTask + var meta any + task, meta, idx, err = queue.Dequeue(context.Background(), idx, consumer) + require.NoError(t, err) + require.NotNil(t, task) + require.NotNil(t, meta) + + require.Equal(t, task, tasks[i].ProtoTask) + require.Equal(t, meta.(*taskMeta), tasks[i].taskMeta) + + dequeuedTasks = append(dequeuedTasks, &taskWithMeta{ProtoTask: task, taskMeta: meta.(*taskMeta)}) + } + + // The task files should still be there + require.Equal(t, len(tasks), queue.TotalPending()) + count, err = filesInDir(taskPath) + require.NoError(t, err) + if tc.useDisk { + require.Equal(t, len(tasks), count) + } else { + require.Equal(t, 0, count) + } + + // Release the tasks that were dequeued + for _, task := range dequeuedTasks { + queue.Release(task.ProtoTask) + } + + // The task files should be gone + require.Equal(t, len(tasks)-nDequeue, queue.TotalPending()) + count, err = filesInDir(taskPath) + require.NoError(t, err) + if tc.useDisk { + require.Equal(t, len(tasks)-nDequeue, count) + } else { + require.Equal(t, 0, count) + } + }) + } +} + +func filesInDir(path string) (int, error) { + var count int + + if err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + count++ + } + return nil + }); err != nil { + return 0, err + } + + return count, nil +} + +func createFiles(path string, n int) { + for i := 0; i < n; i++ { + file, err := os.Create(filepath.Join(path, fmt.Sprintf("file-%d", i))) + if err != nil { + panic(err) + } + _ = file.Close() + } +} + +type fakeLimits struct{} + +func (f fakeLimits) MaxConsumers(_ string, _ int) int { + return 0 // Unlimited +} diff --git a/pkg/bloombuild/planner/task.go b/pkg/bloombuild/planner/task.go index 05a49d494ab3a..3d0213e4c48ae 100644 --- a/pkg/bloombuild/planner/task.go +++ b/pkg/bloombuild/planner/task.go @@ -9,9 +9,7 @@ import ( "github.com/grafana/loki/v3/pkg/bloombuild/protos" ) -type QueueTask struct { - *protos.Task - +type TaskMeta struct { resultsChannel chan *protos.TaskResult // Tracking @@ -20,28 +18,23 @@ type QueueTask struct { ctx context.Context } +type QueueTask struct { + *protos.ProtoTask + *TaskMeta +} + func NewQueueTask( ctx context.Context, queueTime time.Time, - task *protos.Task, + task *protos.ProtoTask, resultsChannel chan *protos.TaskResult, ) *QueueTask { return &QueueTask{ - Task: task, - resultsChannel: resultsChannel, - ctx: ctx, - queueTime: queueTime, + ProtoTask: task, + TaskMeta: &TaskMeta{ + resultsChannel: resultsChannel, + ctx: ctx, + queueTime: queueTime, + }, } } - -func (t QueueTask) Tenant() string { - return t.Task.Tenant -} - -func (t QueueTask) Table() string { - return t.Task.Table.String() -} - -func (t QueueTask) ID() string { - return t.Task.ID -} diff --git a/pkg/bloombuild/protos/compat.go b/pkg/bloombuild/protos/compat.go index 7c910d405ad9b..6909967379379 100644 --- a/pkg/bloombuild/protos/compat.go +++ b/pkg/bloombuild/protos/compat.go @@ -260,3 +260,7 @@ func (r *TaskResult) ToProtoTaskResult() *ProtoTaskResult { CreatedMetas: protoMetas, } } + +func FromProtoDayTableToDayTable(proto DayTable) config.DayTable { + return config.NewDayTable(config.NewDayTime(model.Time(proto.DayTimestampMS)), proto.Prefix) +}