Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): disk-backed queue for the bloom-planner #14874

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,18 @@ planner:
# CLI flag: -bloom-build.planner.queue.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]

# Whether to store tasks on disk.
# CLI flag: -bloom-build.planner.queue.store-tasks-on-disk
[store_tasks_on_disk: <boolean> | default = false]

# Directory to store tasks on disk.
# CLI flag: -bloom-build.planner.queue.tasks-disk-directory
[tasks_disk_directory: <string> | default = "/tmp/bloom-planner-queue"]

# Whether to clean the tasks directory on startup.
# CLI flag: -bloom-build.planner.queue.clean-tasks-directory
[clean_tasks_directory: <boolean> | default = false]

builder:
# The grpc_client block configures the gRPC client used to communicate between
# a client and server component in Loki.
Expand Down
41 changes: 23 additions & 18 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func New(
// Queue to manage tasks
queueMetrics := queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
queueLimits := NewQueueLimits(limits)
tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics)
tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics, storageMetrics)
if err != nil {
return nil, fmt.Errorf("error creating tasks queue: %w", err)
}
Expand Down Expand Up @@ -280,7 +280,8 @@ func (p *Planner) runOne(ctx context.Context) error {

now := time.Now()
for _, task := range tasks {
queueTask := NewQueueTask(ctx, now, task, resultsCh)
protoTask := task.ToProtoTask()
queueTask := NewQueueTask(ctx, now, protoTask, resultsCh)
if err := p.enqueueTask(queueTask); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
continue
Expand Down Expand Up @@ -703,7 +704,7 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
}

func (p *Planner) enqueueTask(task *QueueTask) error {
return p.tasksQueue.Enqueue(task.Tenant(), task, func() {
return p.tasksQueue.Enqueue(task.ProtoTask, task.TaskMeta, func() {
task.timesEnqueued.Add(1)
})
}
Expand Down Expand Up @@ -738,7 +739,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer

lastIndex := queue.StartIndex
for p.isRunningOrStopping() {
item, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID)
protoTask, meta, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID)
if err != nil {
if errors.Is(err, queue.ErrStopped) {
// Planner is stopping, break the loop and return
Expand All @@ -748,36 +749,40 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
}
lastIndex = idx

if item == nil {
if protoTask == nil {
return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID)
}

task := item.(*QueueTask)
logger := log.With(logger, "task", task.ID())
task := &QueueTask{
ProtoTask: protoTask,
TaskMeta: meta.(*TaskMeta),
}

logger := log.With(logger, "task", task.Id)

queueTime := time.Since(task.queueTime)
p.metrics.queueDuration.Observe(queueTime.Seconds())

if task.ctx.Err() != nil {
level.Warn(logger).Log("msg", "task context done after dequeue", "err", task.ctx.Err())
lastIndex = lastIndex.ReuseLastIndex()
p.tasksQueue.Release(task)
p.tasksQueue.Release(task.ProtoTask)
continue
}

result, err := p.forwardTaskToBuilder(builder, builderID, task)
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant())
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.tasksQueue.Release(task)
p.tasksQueue.Release(task.ProtoTask)
level.Error(logger).Log(
"msg", "task failed after max retries",
"retries", task.timesEnqueued.Load(),
"maxRetries", maxRetries,
"err", err,
)
task.resultsChannel <- &protos.TaskResult{
TaskID: task.ID(),
TaskID: task.Id,
Error: fmt.Errorf("task failed after max retries (%d): %w", maxRetries, err),
}
continue
Expand All @@ -786,10 +791,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
// Re-queue the task if the builder is failing to process the tasks
if err := p.enqueueTask(task); err != nil {
p.metrics.taskLost.Inc()
p.tasksQueue.Release(task)
p.tasksQueue.Release(task.ProtoTask)
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
task.resultsChannel <- &protos.TaskResult{
TaskID: task.ID(),
TaskID: task.Id,
Error: fmt.Errorf("error re-enqueuing task: %w", err),
}
continue
Expand All @@ -809,7 +814,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"duration", time.Since(task.queueTime).Seconds(),
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.tasksQueue.Release(task)
p.tasksQueue.Release(task.ProtoTask)

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand All @@ -824,7 +829,7 @@ func (p *Planner) forwardTaskToBuilder(
task *QueueTask,
) (*protos.TaskResult, error) {
msg := &protos.PlannerToBuilder{
Task: task.ToProtoTask(),
Task: task.ProtoTask,
}

if err := builder.Send(msg); err != nil {
Expand All @@ -846,7 +851,7 @@ func (p *Planner) forwardTaskToBuilder(
}()

timeout := make(<-chan time.Time)
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant())
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant)
if taskTimeout != 0 {
// If the timeout is not 0 (disabled), configure it
timeout = time.After(taskTimeout)
Expand Down Expand Up @@ -886,8 +891,8 @@ func (p *Planner) receiveResultFromBuilder(
if err != nil {
return nil, fmt.Errorf("error processing task result in builder (%s): %w", builderID, err)
}
if result.TaskID != task.ID() {
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.ID())
if result.TaskID != task.Id {
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.Id)
}

return result, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"
Expand Down Expand Up @@ -725,7 +726,7 @@ func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
for i := 0; i < n; i++ {
task := NewQueueTask(
context.Background(), time.Now(),
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil),
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(model.Fingerprint(i), model.Fingerprint(i+10)), plannertest.TsdbID(1), nil).ToProtoTask(),
resultsCh,
)
tasks = append(tasks, task)
Expand Down
13 changes: 12 additions & 1 deletion pkg/bloombuild/planner/queue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,31 @@ package queue

import (
"flag"
"fmt"

"github.com/grafana/loki/v3/pkg/queue"
)

type Config struct {
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
StoreTasksOnDisk bool `yaml:"store_tasks_on_disk"`
TasksDiskDirectory string `yaml:"tasks_disk_directory"`
CleanTasksDirectory bool `yaml:"clean_tasks_directory"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
f.BoolVar(&cfg.StoreTasksOnDisk, prefix+".store-tasks-on-disk", false, "Whether to store tasks on disk.")
f.StringVar(&cfg.TasksDiskDirectory, prefix+".tasks-disk-directory", "/tmp/bloom-planner-queue", "Directory to store tasks on disk.")
f.BoolVar(&cfg.CleanTasksDirectory, prefix+".clean-tasks-directory", false, "Whether to clean the tasks directory on startup.")
}

func (cfg *Config) Validate() error {
if cfg.StoreTasksOnDisk && cfg.TasksDiskDirectory == "" {
return fmt.Errorf("tasks_disk_directory must be set when store_tasks_on_disk is true")
}

return nil
}

Expand Down
Loading
Loading