From 2cc901ad69bfc635646cda399b62b3540f5af1f5 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 11 Jul 2024 15:00:45 +0200 Subject: [PATCH] refactor(blooms): Apply retention in planner (#13484) --- docs/sources/shared/configuration.md | 5 + pkg/bloombuild/planner/config.go | 15 +- pkg/bloombuild/planner/metrics.go | 48 ++ pkg/bloombuild/planner/planner.go | 21 +- pkg/bloombuild/planner/planner_test.go | 1 + pkg/bloombuild/planner/retention.go | 262 ++++++++ pkg/bloombuild/planner/retention_test.go | 762 +++++++++++++++++++++++ 7 files changed, 1109 insertions(+), 5 deletions(-) create mode 100644 pkg/bloombuild/planner/retention.go create mode 100644 pkg/bloombuild/planner/retention_test.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 1597222310b87..38502dc2caae1 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -645,6 +645,11 @@ bloom_build: # CLI flag: -bloom-build.planner.max-tasks-per-tenant [max_queued_tasks_per_tenant: | default = 30000] + retention: + # Enable bloom retention. + # CLI flag: -bloom-build.planner.retention.enabled + [enabled: | default = false] + builder: # The grpc_client block configures the gRPC client used to communicate # between a client and server component in Loki. diff --git a/pkg/bloombuild/planner/config.go b/pkg/bloombuild/planner/config.go index 03ed5d204e2a7..40ec5707ef715 100644 --- a/pkg/bloombuild/planner/config.go +++ b/pkg/bloombuild/planner/config.go @@ -8,10 +8,11 @@ import ( // Config configures the bloom-planner component. type Config struct { - PlanningInterval time.Duration `yaml:"planning_interval"` - MinTableOffset int `yaml:"min_table_offset"` - MaxTableOffset int `yaml:"max_table_offset"` - MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"` + PlanningInterval time.Duration `yaml:"planning_interval"` + MinTableOffset int `yaml:"min_table_offset"` + MaxTableOffset int `yaml:"max_table_offset"` + MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"` + RetentionConfig RetentionConfig `yaml:"retention"` } // RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. @@ -26,6 +27,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // I'm doing it the simple way for now. f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.") f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.") + cfg.RetentionConfig.RegisterFlagsWithPrefix(prefix+".retention", f) } func (cfg *Config) Validate() error { @@ -33,10 +35,15 @@ func (cfg *Config) Validate() error { return fmt.Errorf("min-table-offset (%d) must be less than or equal to max-table-offset (%d)", cfg.MinTableOffset, cfg.MaxTableOffset) } + if err := cfg.RetentionConfig.Validate(); err != nil { + return err + } + return nil } type Limits interface { + RetentionLimits BloomCreationEnabled(tenantID string) bool BloomSplitSeriesKeyspaceBy(tenantID string) int BloomBuildMaxBuilders(tenantID string) int diff --git a/pkg/bloombuild/planner/metrics.go b/pkg/bloombuild/planner/metrics.go index 77ae68687b35a..3523135780e55 100644 --- a/pkg/bloombuild/planner/metrics.go +++ b/pkg/bloombuild/planner/metrics.go @@ -42,6 +42,13 @@ type Metrics struct { tenantsDiscovered prometheus.Counter tenantTasksPlanned *prometheus.GaugeVec tenantTasksCompleted *prometheus.GaugeVec + + // Retention metrics + retentionRunning prometheus.Gauge + retentionTime *prometheus.HistogramVec + retentionDaysPerIteration *prometheus.HistogramVec + retentionTenantsPerIteration *prometheus.HistogramVec + retentionTenantsExceedingLookback prometheus.Gauge } func NewMetrics( @@ -161,6 +168,47 @@ func NewMetrics( Name: "tenant_tasks_completed", Help: "Number of tasks completed for a tenant during the current build iteration.", }, []string{"tenant", "status"}), + + // Retention + retentionRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "retention_running", + Help: "1 if retention is running in this compactor.", + }), + + retentionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "retention_time_seconds", + Help: "Time this retention process took to complete.", + Buckets: prometheus.DefBuckets, + }, []string{"status"}), + + retentionDaysPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "retention_days_processed", + Help: "Number of days iterated over during the retention process.", + // 1day -> 5 years, 10 buckets + Buckets: prometheus.ExponentialBucketsRange(1, 365*5, 10), + }, []string{"status"}), + + retentionTenantsPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "retention_tenants_processed", + Help: "Number of tenants on which retention was applied during the retention process.", + // 1 tenant -> 10k tenants, 10 buckets + Buckets: prometheus.ExponentialBucketsRange(1, 10000, 10), + }, []string{"status"}), + + retentionTenantsExceedingLookback: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "retention_tenants_exceeding_lookback", + Help: "Number of tenants with a retention exceeding the configured retention lookback.", + }), } } diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 39ccfd2f7709d..08f6bb1c40bb7 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -36,6 +36,7 @@ type Planner struct { // Subservices manager. subservices *services.Manager subservicesWatcher *services.FailureWatcher + retentionManager *RetentionManager cfg Config limits Limits @@ -91,6 +92,14 @@ func New( logger: logger, } + p.retentionManager = NewRetentionManager( + p.cfg.RetentionConfig, + p.limits, + p.bloomStore, + p.metrics, + p.logger, + ) + svcs := []services.Service{p.tasksQueue, p.activeUsers} p.subservices, err = services.NewManager(svcs...) if err != nil { @@ -184,6 +193,7 @@ type tenantTable struct { func (p *Planner) runOne(ctx context.Context) error { var ( + wg sync.WaitGroup start = time.Now() status = statusFailure ) @@ -197,6 +207,16 @@ func (p *Planner) runOne(ctx context.Context) error { }() p.metrics.buildStarted.Inc() + level.Info(p.logger).Log("msg", "running bloom build iteration") + + // Launch retention (will return instantly if retention is disabled) + wg.Add(1) + go func() { + defer wg.Done() + if err := p.retentionManager.Apply(ctx); err != nil { + level.Error(p.logger).Log("msg", "failed apply retention", "err", err) + } + }() tables := p.tables(time.Now()) level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays()) @@ -265,7 +285,6 @@ func (p *Planner) runOne(ctx context.Context) error { // TODO(salvacorts): This may end up creating too many goroutines. // Create a pool of workers to process table-tenant tuples. var tasksSucceed atomic.Int64 - var wg sync.WaitGroup for tt, results := range tasksResultForTenantTable { if results.tasksToWait == 0 { // No tasks enqueued for this tenant-table tuple, skip processing diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 0e119cc1af229..ab34c82c6940d 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -1019,6 +1019,7 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) { } type fakeLimits struct { + Limits timeout time.Duration maxRetries int } diff --git a/pkg/bloombuild/planner/retention.go b/pkg/bloombuild/planner/retention.go new file mode 100644 index 0000000000000..8a937d332a42f --- /dev/null +++ b/pkg/bloombuild/planner/retention.go @@ -0,0 +1,262 @@ +package planner + +import ( + "context" + "flag" + "math" + "slices" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/v3/pkg/storage/chunk/client" + storageconfig "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/validation" +) + +type RetentionConfig struct { + Enabled bool `yaml:"enabled"` + MaxLookbackDays int `yaml:"max_lookback_days" doc:"hidden"` +} + +func (cfg *RetentionConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.BoolVar(&cfg.Enabled, prefix+".enabled", false, "Enable bloom retention.") + f.IntVar(&cfg.MaxLookbackDays, prefix+".max-lookback-days", 365, "Max lookback days for retention.") +} + +func (cfg *RetentionConfig) Validate() error { + if !cfg.Enabled { + return nil + } + + if cfg.MaxLookbackDays < 1 { + return errors.New("max lookback days must be a positive number") + } + return nil +} + +type RetentionLimits interface { + RetentionPeriod(userID string) time.Duration + StreamRetention(userID string) []validation.StreamRetention + AllByUserID() map[string]*validation.Limits + DefaultLimits() *validation.Limits +} + +type RetentionManager struct { + cfg RetentionConfig + limits RetentionLimits + bloomStore bloomshipper.StoreBase + metrics *Metrics + logger log.Logger + lastDayRun storageconfig.DayTime + + // For testing + now func() model.Time +} + +func NewRetentionManager( + cfg RetentionConfig, + limits RetentionLimits, + bloomStore bloomshipper.StoreBase, + metrics *Metrics, + logger log.Logger, +) *RetentionManager { + return &RetentionManager{ + cfg: cfg, + limits: limits, + bloomStore: bloomStore, + metrics: metrics, + logger: log.With(logger, "subcomponent", "retention-manager"), + now: model.Now, + lastDayRun: storageconfig.NewDayTime(0), + } +} + +func (r *RetentionManager) Apply(ctx context.Context) error { + if !r.cfg.Enabled { + level.Debug(r.logger).Log("msg", "retention is disabled") + return nil + } + + start := r.now() + today := storageconfig.NewDayTime(start) + if !today.After(r.lastDayRun) { + // We've already run retention for today + return nil + } + + level.Info(r.logger).Log("msg", "Applying retention", "today", today.String(), "lastDayRun", r.lastDayRun.String()) + r.metrics.retentionRunning.Set(1) + defer r.metrics.retentionRunning.Set(0) + + tenantsRetention := retentionByTenant(r.limits) + r.reportTenantsExceedingLookback(tenantsRetention) + + defaultLimits := r.limits.DefaultLimits() + defaultRetention := findLongestRetention(time.Duration(defaultLimits.RetentionPeriod), defaultLimits.StreamRetention) + + smallestRetention := smallestEnabledRetention(defaultRetention, tenantsRetention) + if smallestRetention == 0 { + level.Debug(r.logger).Log("msg", "no retention period set for any tenant, skipping retention") + return nil + } + + // Start day is today minus the smallest retention period. + // Note that the last retention day is exclusive. E.g. 30 days retention means we keep 30 days of data, + // thus we start deleting data from the 31st day onwards. + startDay := storageconfig.NewDayTime(today.Add(-smallestRetention)).Dec() + // End day is today minus the max lookback days + endDay := storageconfig.NewDayTime(today.Add(-time.Duration(r.cfg.MaxLookbackDays) * 24 * time.Hour)) + + var daysProcessed int + tenantsRetentionApplied := make(map[string]struct{}, 100) + for day := startDay; day.After(endDay); day = day.Dec() { + dayLogger := log.With(r.logger, "day", day.String()) + bloomClient, err := r.bloomStore.Client(day.ModelTime()) + if err != nil { + level.Error(dayLogger).Log("msg", "failed to get bloom store client", "err", err) + break + } + objectClient := bloomClient.ObjectClient() + + tenants, err := r.bloomStore.TenantFilesForInterval( + ctx, bloomshipper.NewInterval(day.Bounds()), + func(tenant string, _ client.StorageObject) bool { + // Filter out tenants whose retention hasn't expired yet + globalRetention := r.limits.RetentionPeriod(tenant) + streamRetention := r.limits.StreamRetention(tenant) + tenantRetention := findLongestRetention(globalRetention, streamRetention) + expirationDay := storageconfig.NewDayTime(today.Add(-tenantRetention)) + return day.Before(expirationDay) + }, + ) + if err != nil { + r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds()) + r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed)) + r.metrics.retentionTenantsPerIteration.WithLabelValues(statusFailure).Observe(float64(len(tenantsRetentionApplied))) + return errors.Wrap(err, "getting users for period") + } + + if len(tenants) == 0 { + // No tenants for this day means we can break here since previous + // retention iterations have already deleted all tenants + break + } + + for tenant, objects := range tenants { + if len(objects) == 0 { + continue + } + + tenantLogger := log.With(dayLogger, "tenant", tenant) + level.Info(tenantLogger).Log("msg", "applying retention to tenant", "keys", len(objects)) + + // Note: we cannot delete the tenant directory directly because it is not an + // actual key in the object store. Instead, we need to delete all keys one by one. + for _, object := range objects { + if err := objectClient.DeleteObject(ctx, object.Key); err != nil { + r.metrics.retentionTime.WithLabelValues(statusFailure).Observe(time.Since(start.Time()).Seconds()) + r.metrics.retentionDaysPerIteration.WithLabelValues(statusFailure).Observe(float64(daysProcessed)) + r.metrics.retentionTenantsPerIteration.WithLabelValues(statusFailure).Observe(float64(len(tenantsRetentionApplied))) + return errors.Wrapf(err, "deleting key %s", object.Key) + } + } + + tenantsRetentionApplied[tenant] = struct{}{} + } + + daysProcessed++ + } + + r.lastDayRun = today + r.metrics.retentionTime.WithLabelValues(statusSuccess).Observe(time.Since(start.Time()).Seconds()) + r.metrics.retentionDaysPerIteration.WithLabelValues(statusSuccess).Observe(float64(daysProcessed)) + r.metrics.retentionTenantsPerIteration.WithLabelValues(statusSuccess).Observe(float64(len(tenantsRetentionApplied))) + level.Info(r.logger).Log("msg", "finished applying retention", "daysProcessed", daysProcessed, "tenants", len(tenantsRetentionApplied)) + + return nil +} + +func (r *RetentionManager) reportTenantsExceedingLookback(retentionByTenant map[string]time.Duration) { + if len(retentionByTenant) == 0 { + r.metrics.retentionTenantsExceedingLookback.Set(0) + return + } + + var tenantsExceedingLookback int + for tenant, retention := range retentionByTenant { + if retention > time.Duration(r.cfg.MaxLookbackDays)*24*time.Hour { + level.Warn(r.logger).Log("msg", "tenant retention exceeds max lookback days", "tenant", tenant, "retention", retention.String()) + } + tenantsExceedingLookback++ + } + + r.metrics.retentionTenantsExceedingLookback.Set(float64(tenantsExceedingLookback)) +} + +func findLongestRetention(globalRetention time.Duration, streamRetention []validation.StreamRetention) time.Duration { + if len(streamRetention) == 0 { + return globalRetention + } + + maxStreamRetention := slices.MaxFunc(streamRetention, func(a, b validation.StreamRetention) int { + return int(a.Period - b.Period) + }) + + if time.Duration(maxStreamRetention.Period) > globalRetention { + return time.Duration(maxStreamRetention.Period) + } + return globalRetention +} + +func retentionByTenant(limits RetentionLimits) map[string]time.Duration { + all := limits.AllByUserID() + if len(all) == 0 { + return nil + } + + retentions := make(map[string]time.Duration, len(all)) + for tenant, lim := range all { + retention := findLongestRetention(time.Duration(lim.RetentionPeriod), lim.StreamRetention) + if retention == 0 { + continue + } + retentions[tenant] = retention + } + + return retentions +} + +// smallestEnabledRetention returns the smallest retention period across all tenants and the default. +func smallestEnabledRetention(defaultRetention time.Duration, perTenantRetention map[string]time.Duration) time.Duration { + if len(perTenantRetention) == 0 { + return defaultRetention + } + + smallest := time.Duration(math.MaxInt64) + if defaultRetention != 0 { + smallest = defaultRetention + } + + for _, retention := range perTenantRetention { + // Skip unlimited retention + if retention == 0 { + continue + } + + if retention < smallest { + smallest = retention + } + } + + if smallest == time.Duration(math.MaxInt64) { + // No tenant nor defaults configures a retention + return 0 + } + + return smallest +} diff --git a/pkg/bloombuild/planner/retention_test.go b/pkg/bloombuild/planner/retention_test.go new file mode 100644 index 0000000000000..15118aeca70ae --- /dev/null +++ b/pkg/bloombuild/planner/retention_test.go @@ -0,0 +1,762 @@ +package planner + +import ( + "context" + "math" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/storage" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/chunk/cache" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" + storageconfig "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" + "github.com/grafana/loki/v3/pkg/storage/types" + "github.com/grafana/loki/v3/pkg/util/mempool" + "github.com/grafana/loki/v3/pkg/validation" +) + +var testTime = parseDayTime("2024-12-31").ModelTime() + +func TestRetention(t *testing.T) { + for _, tc := range []struct { + name string + cfg RetentionConfig + lim mockRetentionLimits + prePopulate func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) + expectErr bool + check func(t *testing.T, bloomStore *bloomshipper.BloomStore) + }{ + { + name: "retention disabled", + cfg: RetentionConfig{ + Enabled: false, + MaxLookbackDays: 2 * 365, + }, + lim: mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 30 * 24 * time.Hour, + "2": 200 * 24 * time.Hour, + "3": 500 * 24 * time.Hour, + }, + }, + prePopulate: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { + putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 200) + putMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 50) + putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) + }, + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 200, len(metas[0])) + metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 50, len(metas[0])) + metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 500, len(metas[0])) + }, + }, + { + name: "unlimited retention", + cfg: RetentionConfig{ + Enabled: true, + MaxLookbackDays: 2 * 365, + }, + lim: mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 0, + }, + }, + prePopulate: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { + putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 200) + }, + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 200, len(metas[0])) + }, + }, + { + name: "default retention", + cfg: RetentionConfig{ + Enabled: true, + MaxLookbackDays: 2 * 365, + }, + lim: mockRetentionLimits{ + defaultRetention: 30 * 24 * time.Hour, + }, + prePopulate: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { + putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 200) + }, + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 31, len(metas[0])) + }, + }, + { + name: "retention lookback smaller than max retention", + cfg: RetentionConfig{ + Enabled: true, + MaxLookbackDays: 100, + }, + lim: mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 30 * 24 * time.Hour, + "2": 20 * 24 * time.Hour, + "3": 200 * 24 * time.Hour, + "4": 400 * 24 * time.Hour, + }, + streamRetention: map[string][]validation.StreamRetention{ + "1": { + { + Period: model.Duration(30 * 24 * time.Hour), + }, + { + Period: model.Duration(40 * 24 * time.Hour), + }, + }, + "2": { + { + Period: model.Duration(10 * 24 * time.Hour), + }, + }, + }, + }, + prePopulate: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { + putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 200) + putMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 50) + putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) + putMetasForLastNDays(t, schemaCfg, bloomStore, "4", testTime, 500) + }, + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { + // Tenant 1 has 40 days of retention, and we wrote 200 days of metas + // We should get two groups: 0th-40th and 101th-200th + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) + require.Equal(t, 2, len(metas)) + require.Equal(t, 41, len(metas[0])) // 0-40th day + require.Equal(t, 100, len(metas[1])) // 100th-200th day + + // Tenant 2 has 20 days of retention, and we wrote 50 days of metas + // We should get one group: 0th-20th + metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 21, len(metas[0])) // 0th-20th + + // Tenant 3 has 200 days of retention, and we wrote 500 days of metas + // Since the manager looks up to 100 days, we shouldn't have deleted any metas + metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 500, len(metas[0])) // 0th-500th + + // Tenant 4 has 400 days of retention, and we wrote 500 days of metas + // Since the manager looks up to 100 days, we shouldn't have deleted any metas + metas = getGroupedMetasForLastNDays(t, bloomStore, "4", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 500, len(metas[0])) // 0th-500th + }, + }, + { + name: "retention lookback bigger than max retention", + cfg: RetentionConfig{ + Enabled: true, + MaxLookbackDays: 2 * 365, + }, + lim: mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 30 * 24 * time.Hour, + "2": 20 * 24 * time.Hour, + "3": 200 * 24 * time.Hour, + "4": 400 * 24 * time.Hour, + }, + streamRetention: map[string][]validation.StreamRetention{ + "1": { + { + Period: model.Duration(30 * 24 * time.Hour), + }, + { + Period: model.Duration(40 * 24 * time.Hour), + }, + }, + "2": { + { + Period: model.Duration(10 * 24 * time.Hour), + }, + }, + }, + }, + prePopulate: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { + putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 200) + putMetasForLastNDays(t, schemaCfg, bloomStore, "2", testTime, 50) + putMetasForLastNDays(t, schemaCfg, bloomStore, "3", testTime, 500) + putMetasForLastNDays(t, schemaCfg, bloomStore, "4", testTime, 500) + }, + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { + // Tenant 1 has 40 days of retention, and we wrote 200 days of metas + // We should get one groups: 0th-40th + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 41, len(metas[0])) // 0-40th day + + // Tenant 2 has 20 days of retention, and we wrote 50 days of metas + // We should get one group: 0th-20th + metas = getGroupedMetasForLastNDays(t, bloomStore, "2", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 21, len(metas[0])) // 0th-20th + + // Tenant 3 has 200 days of retention, and we wrote 500 days of metas + // We should get one group: 0th-200th + metas = getGroupedMetasForLastNDays(t, bloomStore, "3", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 201, len(metas[0])) // 0th-200th + + // Tenant 4 has 400 days of retention, and we wrote 500 days of metas + // Since the manager looks up to 100 days, we shouldn't have deleted any metas + metas = getGroupedMetasForLastNDays(t, bloomStore, "4", testTime, 500) + require.Equal(t, 1, len(metas)) + require.Equal(t, 401, len(metas[0])) // 0th-400th + }, + }, + { + name: "hit no tenants in table", + cfg: RetentionConfig{ + Enabled: true, + MaxLookbackDays: 2 * 365, + }, + lim: mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 30 * 24 * time.Hour, + }, + }, + prePopulate: func(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore) { + // Place metas with a gap of 50 days. [0th-100th], [151th-200th] + putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime, 100) + putMetasForLastNDays(t, schemaCfg, bloomStore, "1", testTime.Add(-150*24*time.Hour), 50) + }, + check: func(t *testing.T, bloomStore *bloomshipper.BloomStore) { + // We should get two groups: 0th-30th and 151th-200th + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 500) + require.Equal(t, 2, len(metas)) + require.Equal(t, 31, len(metas[0])) // 0th-30th day + require.Equal(t, 50, len(metas[1])) // 151th-200th day + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + logger := log.NewNopLogger() + //logger := log.NewLogfmtLogger(os.Stdout) + + bloomStore, schema, _, err := NewMockBloomStore(t, logger) + require.NoError(t, err) + + rm := NewRetentionManager( + tc.cfg, + tc.lim, + bloomStore, + NewMetrics(nil, nil), + logger, + ) + rm.now = func() model.Time { + return testTime + } + + tc.prePopulate(t, schema, bloomStore) + + err = rm.Apply(context.Background()) + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + tc.check(t, bloomStore) + }) + } +} + +func TestRetentionRunsOncePerDay(t *testing.T) { + logger := log.NewNopLogger() + //logger := log.NewLogfmtLogger(os.Stdout) + + bloomStore, schema, _, err := NewMockBloomStore(t, logger) + require.NoError(t, err) + + rm := NewRetentionManager( + RetentionConfig{ + Enabled: true, + MaxLookbackDays: 365, + }, + mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 30 * 24 * time.Hour, + }, + }, + bloomStore, + NewMetrics(nil, nil), + logger, + ) + rm.now = func() model.Time { + return testTime + } + + // Write metas for the last 100 days and run retention + putMetasForLastNDays(t, schema, bloomStore, "1", testTime, 100) + err = rm.Apply(context.Background()) + require.NoError(t, err) + + // We should get only the first 30 days of metas + metas := getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 100) + require.Equal(t, 1, len(metas)) + require.Equal(t, 31, len(metas[0])) // 0th-30th day + + // We now change the now() time to be a bit later in the day + rm.now = func() model.Time { + return testTime.Add(1 * time.Hour) + } + + // Write metas again and run retention. Since we already ran retention at now()'s day, + // Apply should be a noop, and therefore we should be able to get all the 100 days of metas + putMetasForLastNDays(t, schema, bloomStore, "1", testTime, 100) + err = rm.Apply(context.Background()) + require.NoError(t, err) + + metas = getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 100) + require.Equal(t, 1, len(metas)) + require.Equal(t, 100, len(metas[0])) + + // We now change the now() time to be the next day, retention should run again + rm.now = func() model.Time { + return testTime.Add(24 * time.Hour) + } + err = rm.Apply(context.Background()) + require.NoError(t, err) + + // We should only see the first 30 days of metas + metas = getGroupedMetasForLastNDays(t, bloomStore, "1", testTime, 100) + require.Equal(t, 1, len(metas)) + require.Equal(t, 30, len(metas[0])) // 0th-30th day +} + +func TestFindLongestRetention(t *testing.T) { + for _, tc := range []struct { + name string + globalRetention time.Duration + streamRetention []validation.StreamRetention + expectedRetention time.Duration + }{ + { + name: "no retention", + expectedRetention: 0, + }, + { + name: "global retention", + globalRetention: 30 * 24 * time.Hour, + expectedRetention: 30 * 24 * time.Hour, + }, + { + name: "stream retention", + streamRetention: []validation.StreamRetention{ + { + Period: model.Duration(30 * 24 * time.Hour), + }, + }, + expectedRetention: 30 * 24 * time.Hour, + }, + { + name: "two stream retention", + streamRetention: []validation.StreamRetention{ + { + Period: model.Duration(30 * 24 * time.Hour), + }, + { + Period: model.Duration(40 * 24 * time.Hour), + }, + }, + expectedRetention: 40 * 24 * time.Hour, + }, + { + name: "stream retention bigger than global", + globalRetention: 20 * 24 * time.Hour, + streamRetention: []validation.StreamRetention{ + { + Period: model.Duration(30 * 24 * time.Hour), + }, + { + Period: model.Duration(40 * 24 * time.Hour), + }, + }, + expectedRetention: 40 * 24 * time.Hour, + }, + { + name: "global retention bigger than stream", + globalRetention: 40 * 24 * time.Hour, + streamRetention: []validation.StreamRetention{ + { + Period: model.Duration(20 * 24 * time.Hour), + }, + { + Period: model.Duration(30 * 24 * time.Hour), + }, + }, + expectedRetention: 40 * 24 * time.Hour, + }, + } { + t.Run(tc.name, func(t *testing.T) { + retention := findLongestRetention(tc.globalRetention, tc.streamRetention) + require.Equal(t, tc.expectedRetention, retention) + }) + } +} + +func TestSmallestRetention(t *testing.T) { + for _, tc := range []struct { + name string + limits RetentionLimits + expectedRetention time.Duration + expectedHasRetention bool + }{ + { + name: "no retention", + limits: mockRetentionLimits{}, + expectedRetention: 0, + }, + { + name: "default global retention", + limits: mockRetentionLimits{ + defaultRetention: 30 * 24 * time.Hour, + }, + expectedRetention: 30 * 24 * time.Hour, + }, + { + name: "default stream retention", + limits: mockRetentionLimits{ + defaultStreamRetention: []validation.StreamRetention{ + { + Period: model.Duration(30 * 24 * time.Hour), + }, + }, + }, + expectedRetention: 30 * 24 * time.Hour, + }, + { + name: "tenant configured unlimited", + limits: mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 0, + }, + defaultRetention: 30 * 24 * time.Hour, + }, + expectedRetention: 30 * 24 * time.Hour, + }, + { + name: "no default one tenant", + limits: mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 30 * 24 * time.Hour, + }, + streamRetention: map[string][]validation.StreamRetention{ + "1": { + { + Period: model.Duration(40 * 24 * time.Hour), + }, + }, + }, + }, + expectedRetention: 40 * 24 * time.Hour, + }, + { + name: "no default two tenants", + limits: mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 30 * 24 * time.Hour, + "2": 20 * 24 * time.Hour, + }, + streamRetention: map[string][]validation.StreamRetention{ + "1": { + { + Period: model.Duration(40 * 24 * time.Hour), + }, + }, + "2": { + { + Period: model.Duration(10 * 24 * time.Hour), + }, + }, + }, + }, + expectedRetention: 20 * 24 * time.Hour, + }, + { + name: "default bigger than tenant", + limits: mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 10 * 24 * time.Hour, + }, + streamRetention: map[string][]validation.StreamRetention{ + "1": { + { + Period: model.Duration(20 * 24 * time.Hour), + }, + }, + }, + defaultRetention: 40 * 24 * time.Hour, + defaultStreamRetention: []validation.StreamRetention{ + { + Period: model.Duration(30 * 24 * time.Hour), + }, + }, + }, + expectedRetention: 20 * 24 * time.Hour, + }, + { + name: "tenant bigger than default", + limits: mockRetentionLimits{ + retention: map[string]time.Duration{ + "1": 30 * 24 * time.Hour, + }, + streamRetention: map[string][]validation.StreamRetention{ + "1": { + { + Period: model.Duration(40 * 24 * time.Hour), + }, + }, + }, + defaultRetention: 10 * 24 * time.Hour, + defaultStreamRetention: []validation.StreamRetention{ + { + Period: model.Duration(20 * 24 * time.Hour), + }, + }, + }, + expectedRetention: 20 * 24 * time.Hour, + }, + } { + t.Run(tc.name, func(t *testing.T) { + defaultLim := tc.limits.DefaultLimits() + defaultRetention := findLongestRetention(time.Duration(defaultLim.RetentionPeriod), defaultLim.StreamRetention) + tenantsRetention := retentionByTenant(tc.limits) + + retention := smallestEnabledRetention(defaultRetention, tenantsRetention) + require.Equal(t, tc.expectedRetention, retention) + }) + } +} + +func TestRetentionConfigValidate(t *testing.T) { + for _, tc := range []struct { + name string + cfg RetentionConfig + expectErr bool + }{ + { + name: "enabled and valid", + cfg: RetentionConfig{ + Enabled: true, + MaxLookbackDays: 2 * 365, + }, + expectErr: false, + }, + { + name: "invalid max lookback days", + cfg: RetentionConfig{ + Enabled: true, + MaxLookbackDays: 0, + }, + expectErr: true, + }, + { + name: "disabled and invalid", + cfg: RetentionConfig{ + Enabled: false, + MaxLookbackDays: 0, + }, + expectErr: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} + +func putMetasForLastNDays(t *testing.T, schemaCfg storageconfig.SchemaConfig, bloomStore *bloomshipper.BloomStore, tenant string, start model.Time, days int) { + const metasPerDay = 2 + + startDay := storageconfig.NewDayTime(start) + endDay := storageconfig.NewDayTime(startDay.Add(-time.Duration(days) * 24 * time.Hour)) + for day := startDay; day.After(endDay); day = day.Dec() { + period, err := schemaCfg.SchemaForTime(day.ModelTime()) + require.NoError(t, err) + + dayTable := storageconfig.NewDayTable(day, period.IndexTables.Prefix) + bloomClient, err := bloomStore.Client(dayTable.ModelTime()) + require.NoErrorf(t, err, "failed to get bloom client for day %d: %s", day, err) + + for i := 0; i < metasPerDay; i++ { + err = bloomClient.PutMeta(context.Background(), bloomshipper.Meta{ + MetaRef: bloomshipper.MetaRef{ + Ref: bloomshipper.Ref{ + TenantID: tenant, + TableName: dayTable.String(), + Bounds: v1.NewBounds(model.Fingerprint(i*100), model.Fingerprint(i*100+100)), + }, + }, + Blocks: []bloomshipper.BlockRef{}, + }) + require.NoError(t, err) + } + } +} + +// getMetasForLastNDays returns groups of continuous metas for the last N days. +func getGroupedMetasForLastNDays(t *testing.T, bloomStore *bloomshipper.BloomStore, tenant string, start model.Time, days int) [][][]bloomshipper.Meta { + metasGrouped := make([][][]bloomshipper.Meta, 0) + currentGroup := make([][]bloomshipper.Meta, 0) + + startDay := storageconfig.NewDayTime(start) + endDay := storageconfig.NewDayTime(startDay.Add(-time.Duration(days) * 24 * time.Hour)) + + for day := startDay; day.After(endDay); day = day.Dec() { + metas, err := bloomStore.FetchMetas(context.Background(), bloomshipper.MetaSearchParams{ + TenantID: tenant, + Interval: bloomshipper.NewInterval(day.Bounds()), + Keyspace: v1.NewBounds(0, math.MaxUint64), + }) + require.NoError(t, err) + if len(metas) == 0 { + // We have reached the end of the metas group: cut a new group + if len(currentGroup) > 0 { + metasGrouped = append(metasGrouped, currentGroup) + currentGroup = make([][]bloomshipper.Meta, 0) + } + continue + } + currentGroup = append(currentGroup, metas) + } + + // Append the last group if it's not empty + if len(currentGroup) > 0 { + metasGrouped = append(metasGrouped, currentGroup) + } + + return metasGrouped +} + +func NewMockBloomStore(t *testing.T, logger log.Logger) (*bloomshipper.BloomStore, storageconfig.SchemaConfig, string, error) { + workDir := t.TempDir() + return NewMockBloomStoreWithWorkDir(t, workDir, logger) +} + +func NewMockBloomStoreWithWorkDir(t *testing.T, workDir string, logger log.Logger) (*bloomshipper.BloomStore, storageconfig.SchemaConfig, string, error) { + schemaCfg := storageconfig.SchemaConfig{ + Configs: []storageconfig.PeriodConfig{ + { + ObjectType: types.StorageTypeFileSystem, + From: storageconfig.DayTime{ + Time: testTime.Add(-2 * 365 * 24 * time.Hour), // -2 year + }, + IndexTables: storageconfig.IndexPeriodicTableConfig{ + PeriodicTableConfig: storageconfig.PeriodicTableConfig{ + Period: 24 * time.Hour, + Prefix: "schema_a_table_", + }}, + }, + { + ObjectType: types.StorageTypeFileSystem, + From: storageconfig.DayTime{ + Time: testTime.Add(-365 * 24 * time.Hour), // -1 year + }, + IndexTables: storageconfig.IndexPeriodicTableConfig{ + PeriodicTableConfig: storageconfig.PeriodicTableConfig{ + Period: 24 * time.Hour, + Prefix: "schema_b_table_", + }}, + }, + }, + } + + storageConfig := storage.Config{ + FSConfig: local.FSConfig{ + Directory: workDir, + }, + BloomShipperConfig: config.Config{ + WorkingDirectory: []string{workDir}, + DownloadParallelism: 1, + BlocksCache: config.BlocksCacheConfig{ + SoftLimit: 1 << 20, + HardLimit: 2 << 20, + TTL: time.Hour, + PurgeInterval: time.Hour, + }, + }, + } + + reg := prometheus.NewPedanticRegistry() + metrics := storage.NewClientMetrics() + t.Cleanup(metrics.Unregister) + + metasCache := cache.NewMockCache() + blocksCache := bloomshipper.NewFsBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger) + + store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger) + if err == nil { + t.Cleanup(store.Stop) + } + + return store, schemaCfg, workDir, err +} + +type mockRetentionLimits struct { + retention map[string]time.Duration + streamRetention map[string][]validation.StreamRetention + defaultRetention time.Duration + defaultStreamRetention []validation.StreamRetention +} + +func (m mockRetentionLimits) RetentionPeriod(tenant string) time.Duration { + return m.retention[tenant] +} + +func (m mockRetentionLimits) StreamRetention(tenant string) []validation.StreamRetention { + return m.streamRetention[tenant] +} + +func (m mockRetentionLimits) AllByUserID() map[string]*validation.Limits { + tenants := make(map[string]*validation.Limits, len(m.retention)) + + for tenant, retention := range m.retention { + if _, ok := tenants[tenant]; !ok { + tenants[tenant] = &validation.Limits{} + } + tenants[tenant].RetentionPeriod = model.Duration(retention) + } + + for tenant, streamRetention := range m.streamRetention { + if _, ok := tenants[tenant]; !ok { + tenants[tenant] = &validation.Limits{} + } + tenants[tenant].StreamRetention = streamRetention + } + + return tenants +} + +func (m mockRetentionLimits) DefaultLimits() *validation.Limits { + return &validation.Limits{ + RetentionPeriod: model.Duration(m.defaultRetention), + StreamRetention: m.defaultStreamRetention, + } +}