diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index f5539511ca..1cfc53ec5c 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -285,6 +285,12 @@ compactor: # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] + # How long shuffle sharding planner would wait before running planning code. + # This delay would prevent double compaction when two compactors claimed same + # partition in grouper at same time. + # CLI flag: -compactor.sharding-planner-delay + [sharding_planner_delay: | default = 10s] + # The compaction strategy to use. Supported values are: default, partitioning. # CLI flag: -compactor.compaction-strategy [compaction_strategy: | default = "default"] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 03d0bcf594..38fb74e05b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2330,6 +2330,12 @@ sharding_ring: # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] +# How long shuffle sharding planner would wait before running planning code. +# This delay would prevent double compaction when two compactors claimed same +# partition in grouper at same time. +# CLI flag: -compactor.sharding-planner-delay +[sharding_planner_delay: | default = 10s] + # The compaction strategy to use. Supported values are: default, partitioning. # CLI flag: -compactor.compaction-strategy [compaction_strategy: | default = "default"] diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index c50a98ca48..a1a9a8f8a2 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -153,7 +153,7 @@ var ( plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner { if cfg.CompactionStrategy == util.CompactionStrategyPartitioning { - return NewPartitionCompactionPlanner(ctx, bkt, logger) + return NewPartitionCompactionPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics) } else { return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) } @@ -234,9 +234,10 @@ type Config struct { DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` // Compactors sharding. - ShardingEnabled bool `yaml:"sharding_enabled"` - ShardingStrategy string `yaml:"sharding_strategy"` - ShardingRing RingConfig `yaml:"sharding_ring"` + ShardingEnabled bool `yaml:"sharding_enabled"` + ShardingStrategy string `yaml:"sharding_strategy"` + ShardingRing RingConfig `yaml:"sharding_ring"` + ShardingPlannerDelay time.Duration `yaml:"sharding_planner_delay"` // Compaction strategy. CompactionStrategy string `yaml:"compaction_strategy"` @@ -304,6 +305,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.") f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status") + + f.DurationVar(&cfg.ShardingPlannerDelay, "compactor.sharding-planner-delay", 10*time.Second, "How long shuffle sharding planner would wait before running planning code. This delay would prevent double compaction when two compactors claimed same partition in grouper at same time.") } func (cfg *Config) Validate(limits validation.Limits) error { diff --git a/pkg/compactor/compactor_metrics.go b/pkg/compactor/compactor_metrics.go index e14fb9a0dc..23e7bca6c0 100644 --- a/pkg/compactor/compactor_metrics.go +++ b/pkg/compactor/compactor_metrics.go @@ -39,6 +39,7 @@ type compactorMetrics struct { remainingPlannedCompactions *prometheus.GaugeVec compactionErrorsCount *prometheus.CounterVec partitionCount *prometheus.GaugeVec + compactionsNotPlanned *prometheus.CounterVec } const ( @@ -174,6 +175,10 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str Name: "cortex_compactor_group_partition_count", Help: "Number of partitions for each compaction group.", }, compactionLabels) + m.compactionsNotPlanned = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compactor_group_compactions_not_planned_total", + Help: "Total number of group compaction not planned due to error.", + }, compactionLabels) return &m } @@ -225,6 +230,7 @@ func (m *compactorMetrics) initMetricWithCompactionLabelValues(labelValue ...str m.compactionFailures.WithLabelValues(labelValue...) m.verticalCompactions.WithLabelValues(labelValue...) m.partitionCount.WithLabelValues(labelValue...) + m.compactionsNotPlanned.WithLabelValues(labelValue...) } func (m *compactorMetrics) deleteMetricsForDeletedTenant(userID string) { @@ -236,4 +242,5 @@ func (m *compactorMetrics) deleteMetricsForDeletedTenant(userID string) { m.compactionFailures.DeleteLabelValues(userID) m.verticalCompactions.DeleteLabelValues(userID) m.partitionCount.DeleteLabelValues(userID) + m.compactionsNotPlanned.DeleteLabelValues(userID) } diff --git a/pkg/compactor/compactor_metrics_test.go b/pkg/compactor/compactor_metrics_test.go index f2a13276cd..947fd7f396 100644 --- a/pkg/compactor/compactor_metrics_test.go +++ b/pkg/compactor/compactor_metrics_test.go @@ -135,6 +135,11 @@ func TestSyncerMetrics(t *testing.T) { cortex_compactor_group_partition_count{user="aaa"} 511060 cortex_compactor_group_partition_count{user="bbb"} 522170 cortex_compactor_group_partition_count{user="ccc"} 533280 + # HELP cortex_compactor_group_compactions_not_planned_total Total number of group compaction not planned due to error. + # TYPE cortex_compactor_group_compactions_not_planned_total counter + cortex_compactor_group_compactions_not_planned_total{user="aaa"} 544390 + cortex_compactor_group_compactions_not_planned_total{user="bbb"} 555500 + cortex_compactor_group_compactions_not_planned_total{user="ccc"} 566610 `)) require.NoError(t, err) @@ -191,4 +196,7 @@ func generateTestData(cm *compactorMetrics, base float64) { cm.partitionCount.WithLabelValues("aaa").Add(46 * base) cm.partitionCount.WithLabelValues("bbb").Add(47 * base) cm.partitionCount.WithLabelValues("ccc").Add(48 * base) + cm.compactionsNotPlanned.WithLabelValues("aaa").Add(49 * base) + cm.compactionsNotPlanned.WithLabelValues("bbb").Add(50 * base) + cm.compactionsNotPlanned.WithLabelValues("ccc").Add(51 * base) } diff --git a/pkg/compactor/partition_compaction_planner.go b/pkg/compactor/partition_compaction_planner.go index 963771aa6d..436bba426b 100644 --- a/pkg/compactor/partition_compaction_planner.go +++ b/pkg/compactor/partition_compaction_planner.go @@ -2,30 +2,148 @@ package compactor import ( "context" + "fmt" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +var ( + plannerCompletedPartitionError = errors.New("got completed partition") + plannerVisitedPartitionError = errors.New("got partition visited by other compactor") ) type PartitionCompactionPlanner struct { - ctx context.Context - bkt objstore.InstrumentedBucket - logger log.Logger + ctx context.Context + bkt objstore.InstrumentedBucket + logger log.Logger + ranges []int64 + noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark + ringLifecyclerID string + userID string + plannerDelay time.Duration + partitionVisitMarkerTimeout time.Duration + partitionVisitMarkerFileUpdateInterval time.Duration + compactorMetrics *compactorMetrics } func NewPartitionCompactionPlanner( ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, + ranges []int64, + noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, + ringLifecyclerID string, + userID string, + plannerDelay time.Duration, + partitionVisitMarkerTimeout time.Duration, + partitionVisitMarkerFileUpdateInterval time.Duration, + compactorMetrics *compactorMetrics, ) *PartitionCompactionPlanner { return &PartitionCompactionPlanner{ - ctx: ctx, - bkt: bkt, - logger: logger, + ctx: ctx, + bkt: bkt, + logger: logger, + ranges: ranges, + noCompBlocksFunc: noCompBlocksFunc, + ringLifecyclerID: ringLifecyclerID, + userID: userID, + plannerDelay: plannerDelay, + partitionVisitMarkerTimeout: partitionVisitMarkerTimeout, + partitionVisitMarkerFileUpdateInterval: partitionVisitMarkerFileUpdateInterval, + compactorMetrics: compactorMetrics, } } func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) { - panic("PartitionCompactionPlanner not implemented") + cortexMetaExtensions, err := tsdb.ConvertToCortexMetaExtensions(extensions) + if err != nil { + return nil, err + } + if cortexMetaExtensions == nil { + return nil, fmt.Errorf("cortexMetaExtensions cannot be nil") + } + return p.PlanWithPartition(ctx, metasByMinTime, cortexMetaExtensions, errChan) +} + +func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasByMinTime []*metadata.Meta, cortexMetaExtensions *tsdb.CortexMetaExtensions, errChan chan error) ([]*metadata.Meta, error) { + partitionInfo := cortexMetaExtensions.PartitionInfo + if partitionInfo == nil { + return nil, fmt.Errorf("partitionInfo cannot be nil") + } + partitionID := partitionInfo.PartitionID + partitionedGroupID := partitionInfo.PartitionedGroupID + + // This delay would prevent double compaction when two compactors + // claimed same partition in grouper at same time. + time.Sleep(p.plannerDelay) + + visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionID) + visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker) + existingPartitionVisitMarker := &partitionVisitMarker{} + err := visitMarkerManager.ReadVisitMarker(p.ctx, existingPartitionVisitMarker) + visitMarkerExists := true + if err != nil { + if errors.Is(err, errorVisitMarkerNotFound) { + visitMarkerExists = false + } else { + p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc() + return nil, fmt.Errorf("unable to get visit marker file for partition with partition ID %d, partitioned group ID %d: %s", partitionID, partitionedGroupID, err.Error()) + } + } + if visitMarkerExists { + if existingPartitionVisitMarker.GetStatus() == Completed { + level.Warn(p.logger).Log("msg", "partition is in completed status", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, existingPartitionVisitMarker.String()) + return nil, plannerCompletedPartitionError + } + if !existingPartitionVisitMarker.IsPendingByCompactor(p.partitionVisitMarkerTimeout, partitionID, p.ringLifecyclerID) { + level.Warn(p.logger).Log("msg", "partition is not visited by current compactor", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, existingPartitionVisitMarker.String()) + return nil, plannerVisitedPartitionError + } + } + + // Ensure all blocks fits within the largest range. This is a double check + // to ensure there's no bug in the previous blocks grouping, given this Plan() + // is just a pass-through. + // Modified from https://github.com/cortexproject/cortex/pull/2616/files#diff-e3051fc530c48bb276ba958dd8fadc684e546bd7964e6bc75cef9a86ef8df344R28-R63 + largestRange := p.ranges[len(p.ranges)-1] + rangeStart := getRangeStart(metasByMinTime[0], largestRange) + rangeEnd := rangeStart + largestRange + noCompactMarked := p.noCompBlocksFunc() + resultMetas := make([]*metadata.Meta, 0, len(metasByMinTime)) + + for _, b := range metasByMinTime { + if b.ULID == DUMMY_BLOCK_ID { + continue + } + blockID := b.ULID.String() + if _, excluded := noCompactMarked[b.ULID]; excluded { + continue + } + + if b.MinTime < rangeStart || b.MaxTime > rangeEnd { + p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc() + level.Warn(p.logger).Log("msg", "block is outside the largest expected range", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "block_id", blockID, "block_min_time", b.MinTime, "block_max_time", b.MaxTime, "range_start", rangeStart, "range_end", rangeEnd) + return nil, fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", blockID, b.MinTime, b.MaxTime, rangeStart, rangeEnd) + } + + resultMetas = append(resultMetas, b) + } + + if len(resultMetas) < 1 { + p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc() + level.Warn(p.logger).Log("msg", "result meta size is empty", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "group_size", len(metasByMinTime)) + return nil, nil + } + + go visitMarkerManager.HeartBeat(p.ctx, errChan, p.partitionVisitMarkerFileUpdateInterval, false) + + return resultMetas, nil } diff --git a/pkg/compactor/partition_compaction_planner_test.go b/pkg/compactor/partition_compaction_planner_test.go new file mode 100644 index 0000000000..67d5ba60f5 --- /dev/null +++ b/pkg/compactor/partition_compaction_planner_test.go @@ -0,0 +1,338 @@ +package compactor + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortextsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/concurrency" +) + +func TestPartitionCompactionPlanner_Plan(t *testing.T) { + type VisitedPartition struct { + isExpired bool + compactorID string + } + + currentCompactor := "test-compactor" + otherCompactor := "other-compactor" + + block1ulid := ulid.MustNew(1, nil) + block2ulid := ulid.MustNew(2, nil) + block3ulid := ulid.MustNew(3, nil) + + tests := map[string]struct { + ranges []int64 + noCompactBlocks map[ulid.ULID]*metadata.NoCompactMark + blocks []*metadata.Meta + expected []*metadata.Meta + expectedErr error + visitedPartition VisitedPartition + }{ + "test basic plan": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + visitedPartition: VisitedPartition{ + isExpired: false, + compactorID: currentCompactor, + }, + expected: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + }, + "test blocks outside largest range smaller min time after": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 2 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + visitedPartition: VisitedPartition{ + isExpired: false, + compactorID: currentCompactor, + }, + expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block2ulid.String(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds()), + }, + "test blocks outside largest range 1": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + }, + visitedPartition: VisitedPartition{ + isExpired: false, + compactorID: currentCompactor, + }, + expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block1ulid.String(), 0*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds()), + }, + "test blocks outside largest range 2": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + }, + visitedPartition: VisitedPartition{ + isExpired: false, + compactorID: currentCompactor, + }, + expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block2ulid.String(), 0*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds()), + }, + "test should skip blocks marked for no compact": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + noCompactBlocks: map[ulid.ULID]*metadata.NoCompactMark{block1ulid: {}}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block3ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + visitedPartition: VisitedPartition{ + isExpired: false, + compactorID: currentCompactor, + }, + expected: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block3ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + }, + "test should not compact if there is no compactable block": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + noCompactBlocks: map[ulid.ULID]*metadata.NoCompactMark{block1ulid: {}}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + visitedPartition: VisitedPartition{ + isExpired: false, + compactorID: currentCompactor, + }, + expected: []*metadata.Meta{}, + }, + "test should not compact if visit marker file is not expired and visited by other compactor": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + visitedPartition: VisitedPartition{ + isExpired: false, + compactorID: otherCompactor, + }, + expectedErr: plannerVisitedPartitionError, + }, + "test should not compact if visit marker file is expired": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + visitedPartition: VisitedPartition{ + isExpired: true, + compactorID: currentCompactor, + }, + expectedErr: plannerVisitedPartitionError, + }, + } + + visitMarkerTimeout := 5 * time.Minute + partitionedGroupID := uint32(1) + partitionID := 0 + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + bkt := &bucket.ClientMock{} + visitMarkerFile := GetPartitionVisitMarkerFilePath(partitionedGroupID, partitionID) + expireTime := time.Now() + if testData.visitedPartition.isExpired { + expireTime = expireTime.Add(-1 * visitMarkerTimeout) + } + visitMarker := partitionVisitMarker{ + CompactorID: testData.visitedPartition.compactorID, + PartitionedGroupID: partitionedGroupID, + PartitionID: partitionID, + VisitTime: expireTime.Unix(), + Status: Pending, + Version: PartitionVisitMarkerVersion1, + } + visitMarkerFileContent, _ := json.Marshal(visitMarker) + bkt.MockGet(visitMarkerFile, string(visitMarkerFileContent), nil) + bkt.MockUpload(mock.Anything, nil) + bkt.MockGet(mock.Anything, "", nil) + + registerer := prometheus.NewPedanticRegistry() + + metrics := newCompactorMetrics(registerer) + + logs := &concurrency.SyncBuffer{} + logger := log.NewLogfmtLogger(logs) + p := NewPartitionCompactionPlanner( + context.Background(), + objstore.WithNoopInstr(bkt), + logger, + testData.ranges, + func() map[ulid.ULID]*metadata.NoCompactMark { + return testData.noCompactBlocks + }, + currentCompactor, + "test-user", + 10*time.Millisecond, + visitMarkerTimeout, + time.Minute, + metrics, + ) + actual, err := p.Plan(context.Background(), testData.blocks, nil, &cortextsdb.CortexMetaExtensions{ + PartitionInfo: &cortextsdb.PartitionInfo{ + PartitionCount: 1, + PartitionID: partitionID, + PartitionedGroupID: partitionedGroupID, + }, + }) + + if testData.expectedErr != nil { + assert.Equal(t, err, testData.expectedErr) + } else { + require.NoError(t, err) + } + + require.Len(t, actual, len(testData.expected)) + + for idx, expectedMeta := range testData.expected { + assert.Equal(t, expectedMeta.ULID, actual[idx].ULID) + } + }) + } +}