From 72d6e0cc91a4e9856d0fcb04228fd643c88a61ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 18 Nov 2021 15:25:45 +0100 Subject: [PATCH] Let store-gateways ignore blocks that are too young. (#502) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Let store-gateways ignore blocks that are too young. Signed-off-by: Peter Štibraný * CHANGELOG.md Signed-off-by: Peter Štibraný * Renamed TimeMetaFilter to minTimeMetaFilter, added it to TestBucketIndexMetadataFetcher_Fetch test. Signed-off-by: Peter Štibraný * Unexport NewMinTimeMetaFilter. Signed-off-by: Peter Štibraný * Enhanced field description. Signed-off-by: Peter Štibraný * Fix wording as suggested by Arve. Signed-off-by: Peter Štibraný * Address review feedback. Signed-off-by: Peter Štibraný * Address review feedback. Signed-off-by: Peter Štibraný * Fix tests after changing label value. Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 8 ++++ docs/blocks-storage/store-gateway.md | 8 ++++ docs/configuration/config-file-reference.md | 8 ++++ pkg/storage/tsdb/config.go | 2 + .../bucket_index_metadata_fetcher.go | 2 +- .../bucket_index_metadata_fetcher_test.go | 13 ++++++- pkg/storegateway/bucket_stores.go | 6 ++- pkg/storegateway/metadata_fetcher_filters.go | 30 +++++++++++++++ .../metadata_fetcher_filters_test.go | 38 +++++++++++++++++++ 10 files changed, 112 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 754e7a56faf..1076585c34b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -117,6 +117,7 @@ * [ENHANCEMENT] Querier&Ruler: reduce cpu usage, latency and peak memory consumption. #459 #463 * [ENHANCEMENT] Overrides Exporter: Add `max_fetched_chunks_per_query` limit to the default and per-tenant limits exported as metrics. #471 * [ENHANCEMENT] Compactor (blocks cleaner): Delete blocks marked for deletion faster. #490 +* [ENHANCEMENT] Store-gateway: store-gateway can now ignore blocks with minimum time within `-blocks-storage.bucket-store.ignore-blocks-within` duration. Useful when used together with `-querier.query-store-after`. #502 * [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #206 * [BUGFIX] Fixes a panic in the query-tee when comparing result. #207 * [BUGFIX] Upgrade Prometheus. TSDB now waits for pending readers before truncating Head block, fixing the `chunk not found` error and preventing wrong query results. #16 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 9a3ffdbc04a..06d8ed33d4b 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -731,6 +731,14 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period [max_stale_period: | default = 1h] + # Blocks with minimum time within this duration are ignored, and not loaded + # by store-gateway. Useful when used together with + # -querier.query-store-after to prevent loading young blocks, because there + # are usually many of them (depending on number of ingesters) and they are + # not yet compacted. Negative values or 0 disable the filter. + # CLI flag: -blocks-storage.bucket-store.ignore-blocks-within + [ignore_blocks_within: | default = 0s] + # Max size - in bytes - of a chunks pool, used to reduce memory allocations. # The pool is shared across all tenants. 0 to disable the limit. # CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 1d7580d6e55..abbedacdaa3 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -784,6 +784,14 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period [max_stale_period: | default = 1h] + # Blocks with minimum time within this duration are ignored, and not loaded + # by store-gateway. Useful when used together with + # -querier.query-store-after to prevent loading young blocks, because there + # are usually many of them (depending on number of ingesters) and they are + # not yet compacted. Negative values or 0 disable the filter. + # CLI flag: -blocks-storage.bucket-store.ignore-blocks-within + [ignore_blocks_within: | default = 0s] + # Max size - in bytes - of a chunks pool, used to reduce memory allocations. # The pool is shared across all tenants. 0 to disable the limit. # CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 35352684bf1..37ce735c388 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4985,6 +4985,14 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period [max_stale_period: | default = 1h] + # Blocks with minimum time within this duration are ignored, and not loaded by + # store-gateway. Useful when used together with -querier.query-store-after to + # prevent loading young blocks, because there are usually many of them + # (depending on number of ingesters) and they are not yet compacted. Negative + # values or 0 disable the filter. + # CLI flag: -blocks-storage.bucket-store.ignore-blocks-within + [ignore_blocks_within: | default = 0s] + # Max size - in bytes - of a chunks pool, used to reduce memory allocations. # The pool is shared across all tenants. 0 to disable the limit. # CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index ba580e27626..b8f78610a2c 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -259,6 +259,7 @@ type BucketStoreConfig struct { MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"` BucketIndex BucketIndexConfig `yaml:"bucket_index"` + IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"` // Chunk pool. MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` @@ -305,6 +306,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.IgnoreDeletionMarksDelay, "blocks-storage.bucket-store.ignore-deletion-marks-delay", time.Hour*6, "Duration after which the blocks marked for deletion will be filtered out while fetching blocks. "+ "The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+ "Default is 6h, half of the default value for -compactor.deletion-delay.") + f.DurationVar(&cfg.IgnoreBlocksWithin, "blocks-storage.bucket-store.ignore-blocks-within", 0, "Blocks with minimum time within this duration are ignored, and not loaded by store-gateway. Useful when used together with -querier.query-store-after to prevent loading young blocks, because there are usually many of them (depending on number of ingesters) and they are not yet compacted. Negative values or 0 disable the filter.") f.IntVar(&cfg.PostingOffsetsInMemSampling, "blocks-storage.bucket-store.posting-offsets-in-mem-sampling", DefaultPostingOffsetInMemorySampling, "Controls what is the ratio of postings offsets that the store will hold in memory.") f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", false, "If enabled, store-gateway will lazy load an index-header only once required by a query.") f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 20*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity.") diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go index 63947c51727..5b57b5d1897 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -57,7 +57,7 @@ func NewBucketIndexMetadataFetcher( logger: logger, filters: filters, modifiers: modifiers, - metrics: block.NewFetcherMetrics(reg, [][]string{{corruptedBucketIndex}, {noBucketIndex}}, nil), + metrics: block.NewFetcherMetrics(reg, [][]string{{corruptedBucketIndex}, {noBucketIndex}, {minTimeExcludedMeta}}, nil), } } diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index 1944c175016..7562ddc6e91 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -18,6 +18,7 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -43,12 +44,14 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { block1 := &bucketindex.Block{ID: ulid.MustNew(1, nil)} block2 := &bucketindex.Block{ID: ulid.MustNew(2, nil)} block3 := &bucketindex.Block{ID: ulid.MustNew(3, nil)} + block4 := &bucketindex.Block{ID: ulid.MustNew(4, nil), MinTime: timestamp.FromTime(now.Add(-30 * time.Minute))} // Has most-recent data, to be ignored by minTimeMetaFilter. + mark1 := &bucketindex.BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold. mark2 := &bucketindex.BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold. require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, &bucketindex.Index{ Version: bucketindex.IndexVersion1, - Blocks: bucketindex.Blocks{block1, block2, block3}, + Blocks: bucketindex.Blocks{block1, block2, block3, block4}, BlockDeletionMarks: bucketindex.BlockDeletionMarks{mark1, mark2}, UpdatedAt: now.Unix(), })) @@ -56,6 +59,7 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { // Create a metadata fetcher with filters. filters := []block.MetadataFilter{ NewIgnoreDeletionMarkFilter(logger, bucket.NewUserBucketClient(userID, bkt, nil), 2*time.Hour, 1), + newMinTimeMetaFilter(1 * time.Hour), } fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, filters, nil) @@ -90,6 +94,7 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { blocks_meta_synced{state="no-bucket-index"} 0 blocks_meta_synced{state="no-meta-json"} 0 blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="min-time-excluded"} 1 blocks_meta_synced{state="too-fresh"} 0 # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts @@ -141,6 +146,7 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) { blocks_meta_synced{state="no-bucket-index"} 1 blocks_meta_synced{state="no-meta-json"} 0 blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="min-time-excluded"} 0 blocks_meta_synced{state="too-fresh"} 0 # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts @@ -195,6 +201,7 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) { blocks_meta_synced{state="no-bucket-index"} 0 blocks_meta_synced{state="no-meta-json"} 0 blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="min-time-excluded"} 0 blocks_meta_synced{state="too-fresh"} 0 # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts @@ -241,6 +248,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T) blocks_meta_synced{state="no-bucket-index"} 0 blocks_meta_synced{state="no-meta-json"} 0 blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="min-time-excluded"} 0 blocks_meta_synced{state="too-fresh"} 0 `), "blocks_meta_synced")) @@ -265,6 +273,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T) blocks_meta_synced{state="no-bucket-index"} 1 blocks_meta_synced{state="no-meta-json"} 0 blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="min-time-excluded"} 0 blocks_meta_synced{state="too-fresh"} 0 `), "blocks_meta_synced")) @@ -297,6 +306,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T) blocks_meta_synced{state="no-bucket-index"} 0 blocks_meta_synced{state="no-meta-json"} 0 blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="min-time-excluded"} 0 blocks_meta_synced{state="too-fresh"} 0 `), "blocks_meta_synced")) @@ -323,6 +333,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T) blocks_meta_synced{state="no-bucket-index"} 0 blocks_meta_synced{state="no-meta-json"} 0 blocks_meta_synced{state="time-excluded"} 0 + blocks_meta_synced{state="min-time-excluded"} 0 blocks_meta_synced{state="too-fresh"} 0 `), "blocks_meta_synced")) } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 7f0ff6383f1..0d729838321 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -440,15 +440,17 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { fetcherReg := prometheus.NewRegistry() // The sharding strategy filter MUST be before the ones we create here (order matters). - filters := append([]block.MetadataFilter{NewShardingMetadataFilterAdapter(userID, u.shardingStrategy)}, []block.MetadataFilter{ + filters := []block.MetadataFilter{ + NewShardingMetadataFilterAdapter(userID, u.shardingStrategy), block.NewConsistencyDelayMetaFilter(userLogger, u.cfg.BucketStore.ConsistencyDelay, fetcherReg), + newMinTimeMetaFilter(u.cfg.BucketStore.IgnoreBlocksWithin), // Use our own custom implementation. NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksDelay, u.cfg.BucketStore.MetaSyncConcurrency), // The duplicate filter has been intentionally omitted because it could cause troubles with // the consistency check done on the querier. The duplicate filter removes redundant blocks // but if the store-gateway removes redundant blocks before the querier discovers them, the // consistency check on the querier will fail. - }...) + } // Instantiate a different blocks metadata fetcher based on whether bucket index is enabled or not. var fetcher block.MetadataFetcher diff --git a/pkg/storegateway/metadata_fetcher_filters.go b/pkg/storegateway/metadata_fetcher_filters.go index 7a103ebc888..9a1e16837c8 100644 --- a/pkg/storegateway/metadata_fetcher_filters.go +++ b/pkg/storegateway/metadata_fetcher_filters.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" @@ -81,3 +82,32 @@ func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, meta return nil } + +const minTimeExcludedMeta = "min-time-excluded" + +// minTimeMetaFilter filters out blocks that contain the most recent data (based on block MinTime). +type minTimeMetaFilter struct { + limit time.Duration +} + +func newMinTimeMetaFilter(limit time.Duration) *minTimeMetaFilter { + return &minTimeMetaFilter{limit: limit} +} + +func (f *minTimeMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { + if f.limit <= 0 { + return nil + } + + limitTime := timestamp.FromTime(time.Now().Add(-f.limit)) + + for id, m := range metas { + if m.MinTime < limitTime { + continue + } + + synced.WithLabelValues(minTimeExcludedMeta).Inc() + delete(metas, id) + } + return nil +} diff --git a/pkg/storegateway/metadata_fetcher_filters_test.go b/pkg/storegateway/metadata_fetcher_filters_test.go index 9cc2a4619f5..e54bb49e0e3 100644 --- a/pkg/storegateway/metadata_fetcher_filters_test.go +++ b/pkg/storegateway/metadata_fetcher_filters_test.go @@ -17,6 +17,8 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block" @@ -110,3 +112,39 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) { assert.Equal(t, expectedMetas, inputMetas) assert.Equal(t, expectedDeletionMarks, f.DeletionMarkBlocks()) } + +func TestTimeMetaFilter(t *testing.T) { + now := time.Now() + limit := 10 * time.Minute + limitTime := now.Add(-limit) + + ulid1 := ulid.MustNew(1, nil) + ulid2 := ulid.MustNew(2, nil) + ulid3 := ulid.MustNew(3, nil) + ulid4 := ulid.MustNew(4, nil) + + inputMetas := map[ulid.ULID]*metadata.Meta{ + ulid1: {BlockMeta: tsdb.BlockMeta{MinTime: 100}}, // Very old, keep it + ulid2: {BlockMeta: tsdb.BlockMeta{MinTime: timestamp.FromTime(now)}}, // Fresh block, remove. + ulid3: {BlockMeta: tsdb.BlockMeta{MinTime: timestamp.FromTime(limitTime.Add(time.Minute))}}, // Inside limit time, remove. + ulid4: {BlockMeta: tsdb.BlockMeta{MinTime: timestamp.FromTime(limitTime.Add(-time.Minute))}}, // Before limit time, keep. + } + + expectedMetas := map[ulid.ULID]*metadata.Meta{} + expectedMetas[ulid1] = inputMetas[ulid1] + expectedMetas[ulid4] = inputMetas[ulid4] + + synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{Name: "synced"}, []string{"state"}) + + // Test negative limit. + f := newMinTimeMetaFilter(-10 * time.Minute) + require.NoError(t, f.Filter(context.Background(), inputMetas, synced)) + assert.Equal(t, inputMetas, inputMetas) + assert.Equal(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(minTimeExcludedMeta))) + + f = newMinTimeMetaFilter(limit) + require.NoError(t, f.Filter(context.Background(), inputMetas, synced)) + + assert.Equal(t, expectedMetas, inputMetas) + assert.Equal(t, 2.0, promtest.ToFloat64(synced.WithLabelValues(minTimeExcludedMeta))) +}