diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index 1a540d98b0f..8ecc286e83e 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -50,7 +50,6 @@ type Syncer struct { fetcher block.MetadataFetcher mtx sync.Mutex blocks map[ulid.ULID]*metadata.Meta - partial map[ulid.ULID]error metrics *syncerMetrics deduplicateBlocksFilter DeduplicateFilter } @@ -105,23 +104,14 @@ func (s *Syncer) SyncMetas(ctx context.Context) error { s.mtx.Lock() defer s.mtx.Unlock() - metas, partial, err := s.fetcher.Fetch(ctx) + metas, _, err := s.fetcher.Fetch(ctx) if err != nil { return err } s.blocks = metas - s.partial = partial return nil } -// Partial returns partial blocks since last sync. -func (s *Syncer) Partial() map[ulid.ULID]error { - s.mtx.Lock() - defer s.mtx.Unlock() - - return s.partial -} - // Metas returns loaded metadata blocks since last sync. func (s *Syncer) Metas() map[ulid.ULID]*metadata.Meta { s.mtx.Lock() @@ -943,7 +933,7 @@ func (f *NoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]struct{} // Filter finds blocks that should not be compacted, and fills f.noCompactMarkedMap. If f.removeNoCompactBlocks is true, // blocks are also removed from metas. (Thanos version of the filter doesn't do removal). -func (f *NoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { +func (f *NoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error { noCompactMarkedMap := make(map[ulid.ULID]struct{}) // Find all no-compact markers in the storage. @@ -1007,7 +997,7 @@ func (f *ExcludeMarkedForDeletionFilter) DeletionMarkBlocks() map[ulid.ULID]stru // Filter filters out blocks that are marked for deletion. // It also builds the map returned by DeletionMarkBlocks() method. -func (f *ExcludeMarkedForDeletionFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { +func (f *ExcludeMarkedForDeletionFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error { deletionMarkMap := make(map[ulid.ULID]struct{}) // Find all markers in the storage. diff --git a/pkg/compactor/bucket_compactor_test.go b/pkg/compactor/bucket_compactor_test.go index 5c2fb57ddb5..1b335c13ca7 100644 --- a/pkg/compactor/bucket_compactor_test.go +++ b/pkg/compactor/bucket_compactor_test.go @@ -185,7 +185,7 @@ func TestNoCompactionMarkFilter(t *testing.T) { } f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), false) - require.NoError(t, f.Filter(ctx, metas, synced, nil)) + require.NoError(t, f.Filter(ctx, metas, synced)) require.Contains(t, metas, block1) require.Contains(t, metas, block2) @@ -206,7 +206,7 @@ func TestNoCompactionMarkFilter(t *testing.T) { } f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true) - require.NoError(t, f.Filter(ctx, metas, synced, nil)) + require.NoError(t, f.Filter(ctx, metas, synced)) require.Contains(t, metas, block1) require.NotContains(t, metas, block2) // block2 was removed from metas. @@ -232,7 +232,7 @@ func TestNoCompactionMarkFilter(t *testing.T) { cancel() f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true) - require.Error(t, f.Filter(canceledCtx, metas, synced, nil)) + require.Error(t, f.Filter(canceledCtx, metas, synced)) require.Contains(t, metas, block1) require.Contains(t, metas, block2) @@ -249,7 +249,7 @@ func TestNoCompactionMarkFilter(t *testing.T) { } f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true) - err := f.Filter(ctx, metas, synced, nil) + err := f.Filter(ctx, metas, synced) require.NoError(t, err) require.Empty(t, metas) diff --git a/pkg/compactor/label_remover_filter.go b/pkg/compactor/label_remover_filter.go index 067e5db435b..2ad16aad58d 100644 --- a/pkg/compactor/label_remover_filter.go +++ b/pkg/compactor/label_remover_filter.go @@ -24,7 +24,7 @@ func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter { } // Filter modifies external labels of existing blocks, removing given labels from the metadata of blocks that have it. -func (f *LabelRemoverFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ block.GaugeVec, _ block.GaugeVec) error { +func (f *LabelRemoverFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ block.GaugeVec) error { for _, meta := range metas { for _, l := range f.labels { delete(meta.Thanos.Labels, l) diff --git a/pkg/compactor/label_remover_filter_test.go b/pkg/compactor/label_remover_filter_test.go index 7b98d684814..63b7888b090 100644 --- a/pkg/compactor/label_remover_filter_test.go +++ b/pkg/compactor/label_remover_filter_test.go @@ -64,7 +64,7 @@ func TestLabelRemoverFilter(t *testing.T) { } f := NewLabelRemoverFilter(testData.labels) - err := f.Filter(context.Background(), metas, nil, nil) + err := f.Filter(context.Background(), metas, nil) require.NoError(t, err) assert.Len(t, metas, len(testData.expected)) diff --git a/pkg/compactor/shard_aware_deduplicate_filter.go b/pkg/compactor/shard_aware_deduplicate_filter.go index 4727377be1a..379b7702bf5 100644 --- a/pkg/compactor/shard_aware_deduplicate_filter.go +++ b/pkg/compactor/shard_aware_deduplicate_filter.go @@ -33,7 +33,7 @@ func NewShardAwareDeduplicateFilter() *ShardAwareDeduplicateFilter { // Filter filters out from metas, the initial map of blocks, all the blocks that are contained in other, compacted, blocks. // The removed blocks are source blocks of the blocks that remain in metas after the filtering is executed. -func (f *ShardAwareDeduplicateFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { +func (f *ShardAwareDeduplicateFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error { f.duplicateIDs = f.duplicateIDs[:0] metasByResolution := make(map[int64][]*metadata.Meta) diff --git a/pkg/compactor/shard_aware_deduplicate_filter_test.go b/pkg/compactor/shard_aware_deduplicate_filter_test.go index 1f5f0c91077..1ae137340d5 100644 --- a/pkg/compactor/shard_aware_deduplicate_filter_test.go +++ b/pkg/compactor/shard_aware_deduplicate_filter_test.go @@ -373,7 +373,7 @@ func TestShardAwareDeduplicateFilter_Filter(t *testing.T) { expected[id] = m } - require.NoError(t, f.Filter(context.Background(), metas, m.Synced, m.Modified)) + require.NoError(t, f.Filter(context.Background(), metas, m.Synced)) require.Equal(t, expected, metas) require.Equal(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.Synced.WithLabelValues(duplicateMeta))) @@ -387,8 +387,7 @@ func TestShardAwareDeduplicateFilter_Filter(t *testing.T) { func newTestFetcherMetrics() *block.FetcherMetrics { return &block.FetcherMetrics{ - Synced: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}), - Modified: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"modified"}), + Synced: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}), } } @@ -453,7 +452,7 @@ func BenchmarkDeduplicateFilter_Filter(b *testing.B) { b.ResetTimer() b.Run("", func(b *testing.B) { for n := 0; n <= b.N; n++ { - _ = dedupFilter.Filter(context.Background(), tcase, synced, nil) + _ = dedupFilter.Filter(context.Background(), tcase, synced) require.Equal(b, 0, len(dedupFilter.DuplicateIDs())) } }) diff --git a/pkg/storage/tsdb/block/fetcher.go b/pkg/storage/tsdb/block/fetcher.go index e6e8f819741..cedb51784cd 100644 --- a/pkg/storage/tsdb/block/fetcher.go +++ b/pkg/storage/tsdb/block/fetcher.go @@ -40,20 +40,17 @@ type FetcherMetrics struct { SyncFailures prometheus.Counter SyncDuration prometheus.Histogram - Synced *extprom.TxGaugeVec - Modified *extprom.TxGaugeVec + Synced *extprom.TxGaugeVec } // Submit applies new values for metrics tracked by transaction GaugeVec. func (s *FetcherMetrics) Submit() { s.Synced.Submit() - s.Modified.Submit() } // ResetTx starts new transaction for metrics tracked by transaction GaugeVec. func (s *FetcherMetrics) ResetTx() { s.Synced.ResetTx() - s.Modified.ResetTx() } const ( @@ -74,9 +71,6 @@ const ( // MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric. MarkedForNoCompactionMeta = "marked-for-no-compact" - - // Modified label values. - replicaRemovedMeta = "replica-label-removed" ) func NewFetcherMetrics(reg prometheus.Registerer, syncedExtraLabels, modifiedExtraLabels [][]string) *FetcherMetrics { @@ -118,18 +112,6 @@ func NewFetcherMetrics(reg prometheus.Registerer, syncedExtraLabels, modifiedExt {MarkedForNoCompactionMeta}, }, syncedExtraLabels...)..., ) - m.Modified = extprom.NewTxGaugeVec( - reg, - prometheus.GaugeOpts{ - Subsystem: fetcherSubSys, - Name: "modified", - Help: "Number of blocks whose metadata changed", - }, - []string{"modified"}, - append([][]string{ - {replicaRemovedMeta}, - }, modifiedExtraLabels...)..., - ) return &m } @@ -144,7 +126,7 @@ type GaugeVec interface { // Filter allows filtering or modifying metas from the provided map or returns error. type MetadataFilter interface { - Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error + Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) error } // MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. @@ -158,7 +140,6 @@ type MetaFetcher struct { // Optional local directory to cache meta.json files. cacheDir string - syncs prometheus.Counter g singleflight.Group mtx sync.Mutex @@ -187,11 +168,6 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente cached: map[ulid.ULID]*metadata.Meta{}, metrics: NewFetcherMetrics(reg, nil, nil), filters: filters, - syncs: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Subsystem: fetcherSubSys, - Name: "base_syncs_total", - Help: "Total blocks metadata synchronization attempts by meta fetcher", - }), }, nil } @@ -286,8 +262,6 @@ type response struct { } func (f *MetaFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { - f.syncs.Inc() - var ( resp = response{ metas: make(map[ulid.ULID]*metadata.Meta), @@ -438,7 +412,7 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (_ map[ulid.ULID]*metadata.Meta for _, filter := range f.filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. - if err := filter.Filter(ctx, metas, f.metrics.Synced, f.metrics.Modified); err != nil { + if err := filter.Filter(ctx, metas, f.metrics.Synced); err != nil { return nil, nil, errors.Wrap(err, "filter metas") } } @@ -503,7 +477,7 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata. // Filter filters out blocks that are marked for deletion after a given delay. // It also returns the blocks that can be deleted since they were uploaded delay duration before current time. -func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { +func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) error { deletionMarkMap := make(map[ulid.ULID]*metadata.DeletionMark) // Make a copy of block IDs to check, in order to avoid concurrency issues diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go index df69f206383..4fa9ffb6068 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -108,7 +108,7 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. if customFilter, ok := filter.(MetadataFilterWithBucketIndex); ok { err = customFilter.FilterWithBucketIndex(ctx, metas, idx, f.metrics.Synced) } else { - err = filter.Filter(ctx, metas, f.metrics.Synced, f.metrics.Modified) + err = filter.Filter(ctx, metas, f.metrics.Synced) } if err != nil { diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index a45591bcce2..c9eabc07ad6 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -72,10 +72,6 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { assert.Empty(t, logs) assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP blocks_meta_modified Number of blocks whose metadata changed - # TYPE blocks_meta_modified gauge - blocks_meta_modified{modified="replica-label-removed"} 0 - # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures # TYPE blocks_meta_sync_failures_total counter blocks_meta_sync_failures_total 0 @@ -99,7 +95,6 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { # TYPE blocks_meta_syncs_total counter blocks_meta_syncs_total 1 `), - "blocks_meta_modified", "blocks_meta_sync_failures_total", "blocks_meta_synced", "blocks_meta_syncs_total", @@ -123,10 +118,6 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) { assert.Empty(t, logs) assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP blocks_meta_modified Number of blocks whose metadata changed - # TYPE blocks_meta_modified gauge - blocks_meta_modified{modified="replica-label-removed"} 0 - # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures # TYPE blocks_meta_sync_failures_total counter blocks_meta_sync_failures_total 0 @@ -150,7 +141,6 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) { # TYPE blocks_meta_syncs_total counter blocks_meta_syncs_total 1 `), - "blocks_meta_modified", "blocks_meta_sync_failures_total", "blocks_meta_synced", "blocks_meta_syncs_total", @@ -177,10 +167,6 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) { assert.Regexp(t, "corrupted bucket index found", logs) assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP blocks_meta_modified Number of blocks whose metadata changed - # TYPE blocks_meta_modified gauge - blocks_meta_modified{modified="replica-label-removed"} 0 - # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures # TYPE blocks_meta_sync_failures_total counter blocks_meta_sync_failures_total 0 @@ -204,7 +190,6 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) { # TYPE blocks_meta_syncs_total counter blocks_meta_syncs_total 1 `), - "blocks_meta_modified", "blocks_meta_sync_failures_total", "blocks_meta_synced", "blocks_meta_syncs_total", diff --git a/pkg/storegateway/metadata_fetcher_filters.go b/pkg/storegateway/metadata_fetcher_filters.go index 643bb1da138..d4a5d038bde 100644 --- a/pkg/storegateway/metadata_fetcher_filters.go +++ b/pkg/storegateway/metadata_fetcher_filters.go @@ -53,8 +53,8 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata. } // Filter implements block.MetadataFilter. -func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { - return f.upstream.Filter(ctx, metas, synced, modified) +func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error { + return f.upstream.Filter(ctx, metas, synced) } // FilterWithBucketIndex implements MetadataFilterWithBucketIndex. @@ -93,7 +93,7 @@ func newMinTimeMetaFilter(limit time.Duration) *minTimeMetaFilter { return &minTimeMetaFilter{limit: limit} } -func (f *minTimeMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { +func (f *minTimeMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error { if f.limit <= 0 { return nil } diff --git a/pkg/storegateway/metadata_fetcher_filters_test.go b/pkg/storegateway/metadata_fetcher_filters_test.go index 9e9f2103686..f7f5b2277ae 100644 --- a/pkg/storegateway/metadata_fetcher_filters_test.go +++ b/pkg/storegateway/metadata_fetcher_filters_test.go @@ -105,7 +105,7 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) { if bucketIndexEnabled { require.NoError(t, f.FilterWithBucketIndex(ctx, inputMetas, idx, synced)) } else { - require.NoError(t, f.Filter(ctx, inputMetas, synced, nil)) + require.NoError(t, f.Filter(ctx, inputMetas, synced)) } assert.Equal(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(block.MarkedForDeletionMeta))) @@ -138,12 +138,12 @@ func TestTimeMetaFilter(t *testing.T) { // Test negative limit. f := newMinTimeMetaFilter(-10 * time.Minute) - require.NoError(t, f.Filter(context.Background(), inputMetas, synced, nil)) + require.NoError(t, f.Filter(context.Background(), inputMetas, synced)) assert.Equal(t, inputMetas, inputMetas) assert.Equal(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(minTimeExcludedMeta))) f = newMinTimeMetaFilter(limit) - require.NoError(t, f.Filter(context.Background(), inputMetas, synced, nil)) + require.NoError(t, f.Filter(context.Background(), inputMetas, synced)) assert.Equal(t, expectedMetas, inputMetas) assert.Equal(t, 2.0, promtest.ToFloat64(synced.WithLabelValues(minTimeExcludedMeta))) diff --git a/pkg/storegateway/metadata_fetcher_metrics.go b/pkg/storegateway/metadata_fetcher_metrics.go index c9cc02434ab..6670383b645 100644 --- a/pkg/storegateway/metadata_fetcher_metrics.go +++ b/pkg/storegateway/metadata_fetcher_metrics.go @@ -22,10 +22,6 @@ type MetadataFetcherMetrics struct { syncFailures *prometheus.Desc syncDuration *prometheus.Desc synced *prometheus.Desc - - // Ignored: - // blocks_meta_modified - // blocks_meta_base_syncs_total } func NewMetadataFetcherMetrics() *MetadataFetcherMetrics { diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index da5c99a1223..7de5280e5ae 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -193,7 +193,7 @@ func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) // Filter implements block.MetadataFilter. // This function is NOT safe for use by multiple goroutines concurrently. -func (a *shardingMetadataFilterAdapter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { +func (a *shardingMetadataFilterAdapter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec) error { if err := a.strategy.FilterBlocks(ctx, a.userID, metas, a.lastBlocks, synced); err != nil { return err } diff --git a/tools/compaction-planner/main.go b/tools/compaction-planner/main.go index d4e13e5a3af..114aa1c417b 100644 --- a/tools/compaction-planner/main.go +++ b/tools/compaction-planner/main.go @@ -96,7 +96,7 @@ func main() { compactor.NewNoCompactionMarkFilter(bucket.NewUserBucketClient(cfg.userID, bkt, nil), true), } { log.Printf("Filtering using %T\n", f) - err = f.Filter(ctx, metas, synced, nil) + err = f.Filter(ctx, metas, synced) if err != nil { log.Fatalln("filter failed:", err) } diff --git a/tools/list-deduplicated-blocks/main.go b/tools/list-deduplicated-blocks/main.go index 96f9c6d62df..4121a38d083 100644 --- a/tools/list-deduplicated-blocks/main.go +++ b/tools/list-deduplicated-blocks/main.go @@ -80,7 +80,7 @@ func main() { []string{"state"}, []string{"duplicate"}) log.Println("Running filter") - err = df.Filter(ctx, metasMap, s, nil) + err = df.Filter(ctx, metasMap, s) if err != nil { log.Fatalln("deduplication failed:", err) }