Skip to content

Commit

Permalink
Let store-gateways ignore blocks that are too young. (#502)
Browse files Browse the repository at this point in the history
* Let store-gateways ignore blocks that are too young.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* CHANGELOG.md

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Renamed TimeMetaFilter to minTimeMetaFilter, added it to TestBucketIndexMetadataFetcher_Fetch test.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Unexport NewMinTimeMetaFilter.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Enhanced field description.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Fix wording as suggested by Arve.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Address review feedback.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Address review feedback.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Fix tests after changing label value.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
  • Loading branch information
pstibrany authored Nov 18, 2021
1 parent 63ec943 commit 72d6e0c
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
* [ENHANCEMENT] Querier&Ruler: reduce cpu usage, latency and peak memory consumption. #459 #463
* [ENHANCEMENT] Overrides Exporter: Add `max_fetched_chunks_per_query` limit to the default and per-tenant limits exported as metrics. #471
* [ENHANCEMENT] Compactor (blocks cleaner): Delete blocks marked for deletion faster. #490
* [ENHANCEMENT] Store-gateway: store-gateway can now ignore blocks with minimum time within `-blocks-storage.bucket-store.ignore-blocks-within` duration. Useful when used together with `-querier.query-store-after`. #502
* [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #206
* [BUGFIX] Fixes a panic in the query-tee when comparing result. #207
* [BUGFIX] Upgrade Prometheus. TSDB now waits for pending readers before truncating Head block, fixing the `chunk not found` error and preventing wrong query results. #16
Expand Down
8 changes: 8 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,14 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]
# Blocks with minimum time within this duration are ignored, and not loaded
# by store-gateway. Useful when used together with
# -querier.query-store-after to prevent loading young blocks, because there
# are usually many of them (depending on number of ingesters) and they are
# not yet compacted. Negative values or 0 disable the filter.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]
# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
8 changes: 8 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,14 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]
# Blocks with minimum time within this duration are ignored, and not loaded
# by store-gateway. Useful when used together with
# -querier.query-store-after to prevent loading young blocks, because there
# are usually many of them (depending on number of ingesters) and they are
# not yet compacted. Negative values or 0 disable the filter.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]
# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4985,6 +4985,14 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]
# Blocks with minimum time within this duration are ignored, and not loaded by
# store-gateway. Useful when used together with -querier.query-store-after to
# prevent loading young blocks, because there are usually many of them
# (depending on number of ingesters) and they are not yet compacted. Negative
# values or 0 disable the filter.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]
# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ type BucketStoreConfig struct {
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`

// Chunk pool.
MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"`
Expand Down Expand Up @@ -305,6 +306,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.IgnoreDeletionMarksDelay, "blocks-storage.bucket-store.ignore-deletion-marks-delay", time.Hour*6, "Duration after which the blocks marked for deletion will be filtered out while fetching blocks. "+
"The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+
"Default is 6h, half of the default value for -compactor.deletion-delay.")
f.DurationVar(&cfg.IgnoreBlocksWithin, "blocks-storage.bucket-store.ignore-blocks-within", 0, "Blocks with minimum time within this duration are ignored, and not loaded by store-gateway. Useful when used together with -querier.query-store-after to prevent loading young blocks, because there are usually many of them (depending on number of ingesters) and they are not yet compacted. Negative values or 0 disable the filter.")
f.IntVar(&cfg.PostingOffsetsInMemSampling, "blocks-storage.bucket-store.posting-offsets-in-mem-sampling", DefaultPostingOffsetInMemorySampling, "Controls what is the ratio of postings offsets that the store will hold in memory.")
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", false, "If enabled, store-gateway will lazy load an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 20*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_index_metadata_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewBucketIndexMetadataFetcher(
logger: logger,
filters: filters,
modifiers: modifiers,
metrics: block.NewFetcherMetrics(reg, [][]string{{corruptedBucketIndex}, {noBucketIndex}}, nil),
metrics: block.NewFetcherMetrics(reg, [][]string{{corruptedBucketIndex}, {noBucketIndex}, {minTimeExcludedMeta}}, nil),
}
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/storegateway/bucket_index_metadata_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -43,19 +44,22 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) {
block1 := &bucketindex.Block{ID: ulid.MustNew(1, nil)}
block2 := &bucketindex.Block{ID: ulid.MustNew(2, nil)}
block3 := &bucketindex.Block{ID: ulid.MustNew(3, nil)}
block4 := &bucketindex.Block{ID: ulid.MustNew(4, nil), MinTime: timestamp.FromTime(now.Add(-30 * time.Minute))} // Has most-recent data, to be ignored by minTimeMetaFilter.

mark1 := &bucketindex.BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold.
mark2 := &bucketindex.BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold.

require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, &bucketindex.Index{
Version: bucketindex.IndexVersion1,
Blocks: bucketindex.Blocks{block1, block2, block3},
Blocks: bucketindex.Blocks{block1, block2, block3, block4},
BlockDeletionMarks: bucketindex.BlockDeletionMarks{mark1, mark2},
UpdatedAt: now.Unix(),
}))

// Create a metadata fetcher with filters.
filters := []block.MetadataFilter{
NewIgnoreDeletionMarkFilter(logger, bucket.NewUserBucketClient(userID, bkt, nil), 2*time.Hour, 1),
newMinTimeMetaFilter(1 * time.Hour),
}

fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, filters, nil)
Expand Down Expand Up @@ -90,6 +94,7 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) {
blocks_meta_synced{state="no-bucket-index"} 0
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 1
blocks_meta_synced{state="too-fresh"} 0
# HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts
Expand Down Expand Up @@ -141,6 +146,7 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) {
blocks_meta_synced{state="no-bucket-index"} 1
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0
# HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts
Expand Down Expand Up @@ -195,6 +201,7 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) {
blocks_meta_synced{state="no-bucket-index"} 0
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0
# HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts
Expand Down Expand Up @@ -241,6 +248,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T)
blocks_meta_synced{state="no-bucket-index"} 0
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0
`), "blocks_meta_synced"))

Expand All @@ -265,6 +273,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T)
blocks_meta_synced{state="no-bucket-index"} 1
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0
`), "blocks_meta_synced"))

Expand Down Expand Up @@ -297,6 +306,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T)
blocks_meta_synced{state="no-bucket-index"} 0
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0
`), "blocks_meta_synced"))

Expand All @@ -323,6 +333,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T)
blocks_meta_synced{state="no-bucket-index"} 0
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0
`), "blocks_meta_synced"))
}
6 changes: 4 additions & 2 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,15 +440,17 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
fetcherReg := prometheus.NewRegistry()

// The sharding strategy filter MUST be before the ones we create here (order matters).
filters := append([]block.MetadataFilter{NewShardingMetadataFilterAdapter(userID, u.shardingStrategy)}, []block.MetadataFilter{
filters := []block.MetadataFilter{
NewShardingMetadataFilterAdapter(userID, u.shardingStrategy),
block.NewConsistencyDelayMetaFilter(userLogger, u.cfg.BucketStore.ConsistencyDelay, fetcherReg),
newMinTimeMetaFilter(u.cfg.BucketStore.IgnoreBlocksWithin),
// Use our own custom implementation.
NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksDelay, u.cfg.BucketStore.MetaSyncConcurrency),
// The duplicate filter has been intentionally omitted because it could cause troubles with
// the consistency check done on the querier. The duplicate filter removes redundant blocks
// but if the store-gateway removes redundant blocks before the querier discovers them, the
// consistency check on the querier will fail.
}...)
}

// Instantiate a different blocks metadata fetcher based on whether bucket index is enabled or not.
var fetcher block.MetadataFetcher
Expand Down
30 changes: 30 additions & 0 deletions pkg/storegateway/metadata_fetcher_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -81,3 +82,32 @@ func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, meta

return nil
}

const minTimeExcludedMeta = "min-time-excluded"

// minTimeMetaFilter filters out blocks that contain the most recent data (based on block MinTime).
type minTimeMetaFilter struct {
limit time.Duration
}

func newMinTimeMetaFilter(limit time.Duration) *minTimeMetaFilter {
return &minTimeMetaFilter{limit: limit}
}

func (f *minTimeMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
if f.limit <= 0 {
return nil
}

limitTime := timestamp.FromTime(time.Now().Add(-f.limit))

for id, m := range metas {
if m.MinTime < limitTime {
continue
}

synced.WithLabelValues(minTimeExcludedMeta).Inc()
delete(metas, id)
}
return nil
}
38 changes: 38 additions & 0 deletions pkg/storegateway/metadata_fetcher_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block"
Expand Down Expand Up @@ -110,3 +112,39 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) {
assert.Equal(t, expectedMetas, inputMetas)
assert.Equal(t, expectedDeletionMarks, f.DeletionMarkBlocks())
}

func TestTimeMetaFilter(t *testing.T) {
now := time.Now()
limit := 10 * time.Minute
limitTime := now.Add(-limit)

ulid1 := ulid.MustNew(1, nil)
ulid2 := ulid.MustNew(2, nil)
ulid3 := ulid.MustNew(3, nil)
ulid4 := ulid.MustNew(4, nil)

inputMetas := map[ulid.ULID]*metadata.Meta{
ulid1: {BlockMeta: tsdb.BlockMeta{MinTime: 100}}, // Very old, keep it
ulid2: {BlockMeta: tsdb.BlockMeta{MinTime: timestamp.FromTime(now)}}, // Fresh block, remove.
ulid3: {BlockMeta: tsdb.BlockMeta{MinTime: timestamp.FromTime(limitTime.Add(time.Minute))}}, // Inside limit time, remove.
ulid4: {BlockMeta: tsdb.BlockMeta{MinTime: timestamp.FromTime(limitTime.Add(-time.Minute))}}, // Before limit time, keep.
}

expectedMetas := map[ulid.ULID]*metadata.Meta{}
expectedMetas[ulid1] = inputMetas[ulid1]
expectedMetas[ulid4] = inputMetas[ulid4]

synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{Name: "synced"}, []string{"state"})

// Test negative limit.
f := newMinTimeMetaFilter(-10 * time.Minute)
require.NoError(t, f.Filter(context.Background(), inputMetas, synced))
assert.Equal(t, inputMetas, inputMetas)
assert.Equal(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(minTimeExcludedMeta)))

f = newMinTimeMetaFilter(limit)
require.NoError(t, f.Filter(context.Background(), inputMetas, synced))

assert.Equal(t, expectedMetas, inputMetas)
assert.Equal(t, 2.0, promtest.ToFloat64(synced.WithLabelValues(minTimeExcludedMeta)))
}

0 comments on commit 72d6e0c

Please sign in to comment.