Skip to content

Commit

Permalink
refactor(blooms): Apply retention in planner (#13484)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jul 11, 2024
1 parent 15c8b45 commit 2cc901a
Showing 7 changed files with 1,109 additions and 5 deletions.
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
@@ -645,6 +645,11 @@ bloom_build:
# CLI flag: -bloom-build.planner.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]

retention:
# Enable bloom retention.
# CLI flag: -bloom-build.planner.retention.enabled
[enabled: <boolean> | default = false]

builder:
# The grpc_client block configures the gRPC client used to communicate
# between a client and server component in Loki.
15 changes: 11 additions & 4 deletions pkg/bloombuild/planner/config.go
Original file line number Diff line number Diff line change
@@ -8,10 +8,11 @@ import (

// Config configures the bloom-planner component.
type Config struct {
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
RetentionConfig RetentionConfig `yaml:"retention"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
@@ -26,17 +27,23 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// I'm doing it the simple way for now.
f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
cfg.RetentionConfig.RegisterFlagsWithPrefix(prefix+".retention", f)
}

func (cfg *Config) Validate() error {
if cfg.MinTableOffset > cfg.MaxTableOffset {
return fmt.Errorf("min-table-offset (%d) must be less than or equal to max-table-offset (%d)", cfg.MinTableOffset, cfg.MaxTableOffset)
}

if err := cfg.RetentionConfig.Validate(); err != nil {
return err
}

return nil
}

type Limits interface {
RetentionLimits
BloomCreationEnabled(tenantID string) bool
BloomSplitSeriesKeyspaceBy(tenantID string) int
BloomBuildMaxBuilders(tenantID string) int
48 changes: 48 additions & 0 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
@@ -42,6 +42,13 @@ type Metrics struct {
tenantsDiscovered prometheus.Counter
tenantTasksPlanned *prometheus.GaugeVec
tenantTasksCompleted *prometheus.GaugeVec

// Retention metrics
retentionRunning prometheus.Gauge
retentionTime *prometheus.HistogramVec
retentionDaysPerIteration *prometheus.HistogramVec
retentionTenantsPerIteration *prometheus.HistogramVec
retentionTenantsExceedingLookback prometheus.Gauge
}

func NewMetrics(
@@ -161,6 +168,47 @@ func NewMetrics(
Name: "tenant_tasks_completed",
Help: "Number of tasks completed for a tenant during the current build iteration.",
}, []string{"tenant", "status"}),

// Retention
retentionRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_running",
Help: "1 if retention is running in this compactor.",
}),

retentionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_time_seconds",
Help: "Time this retention process took to complete.",
Buckets: prometheus.DefBuckets,
}, []string{"status"}),

retentionDaysPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_days_processed",
Help: "Number of days iterated over during the retention process.",
// 1day -> 5 years, 10 buckets
Buckets: prometheus.ExponentialBucketsRange(1, 365*5, 10),
}, []string{"status"}),

retentionTenantsPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_tenants_processed",
Help: "Number of tenants on which retention was applied during the retention process.",
// 1 tenant -> 10k tenants, 10 buckets
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 10),
}, []string{"status"}),

retentionTenantsExceedingLookback: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_tenants_exceeding_lookback",
Help: "Number of tenants with a retention exceeding the configured retention lookback.",
}),
}
}

21 changes: 20 additions & 1 deletion pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ type Planner struct {
// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
retentionManager *RetentionManager

cfg Config
limits Limits
@@ -91,6 +92,14 @@ func New(
logger: logger,
}

p.retentionManager = NewRetentionManager(
p.cfg.RetentionConfig,
p.limits,
p.bloomStore,
p.metrics,
p.logger,
)

svcs := []services.Service{p.tasksQueue, p.activeUsers}
p.subservices, err = services.NewManager(svcs...)
if err != nil {
@@ -184,6 +193,7 @@ type tenantTable struct {

func (p *Planner) runOne(ctx context.Context) error {
var (
wg sync.WaitGroup
start = time.Now()
status = statusFailure
)
@@ -197,6 +207,16 @@ func (p *Planner) runOne(ctx context.Context) error {
}()

p.metrics.buildStarted.Inc()
level.Info(p.logger).Log("msg", "running bloom build iteration")

// Launch retention (will return instantly if retention is disabled)
wg.Add(1)
go func() {
defer wg.Done()
if err := p.retentionManager.Apply(ctx); err != nil {
level.Error(p.logger).Log("msg", "failed apply retention", "err", err)
}
}()

tables := p.tables(time.Now())
level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays())
@@ -265,7 +285,6 @@ func (p *Planner) runOne(ctx context.Context) error {
// TODO(salvacorts): This may end up creating too many goroutines.
// Create a pool of workers to process table-tenant tuples.
var tasksSucceed atomic.Int64
var wg sync.WaitGroup
for tt, results := range tasksResultForTenantTable {
if results.tasksToWait == 0 {
// No tasks enqueued for this tenant-table tuple, skip processing
1 change: 1 addition & 0 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
@@ -1019,6 +1019,7 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

type fakeLimits struct {
Limits
timeout time.Duration
maxRetries int
}
Loading

0 comments on commit 2cc901a

Please sign in to comment.