From bdeba405458c26164df4ddb29d6af590f2dba758 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Feb 2021 14:51:13 +0100 Subject: [PATCH 1/2] Reuse FetcherMetrics from Thanos Signed-off-by: Marco Pracucci --- go.mod | 2 +- go.sum | 4 +- .../bucket_index_metadata_fetcher.go | 143 +++--------------- pkg/storegateway/bucket_stores.go | 2 +- pkg/storegateway/chunk_bytes_pool.go | 9 +- pkg/storegateway/chunk_bytes_pool_test.go | 7 +- pkg/storegateway/metadata_fetcher_filters.go | 2 +- .../metadata_fetcher_filters_test.go | 3 +- .../thanos-io/thanos/pkg/block/fetcher.go | 115 ++++++++------ .../thanos-io/thanos/pkg/pool/pool.go | 29 +++- .../thanos-io/thanos/pkg/store/bucket.go | 50 ++++-- vendor/modules.txt | 2 +- 12 files changed, 164 insertions(+), 204 deletions(-) diff --git a/go.mod b/go.mod index e47f3d9fde..2279ecb29e 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/sony/gobreaker v0.4.1 github.com/spf13/afero v1.2.2 github.com/stretchr/testify v1.7.0 - github.com/thanos-io/thanos v0.13.1-0.20210226093915-2027fb3098e5 + github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1 github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120 go.etcd.io/bbolt v1.3.5 diff --git a/go.sum b/go.sum index 33d7553f83..6024c97ed4 100644 --- a/go.sum +++ b/go.sum @@ -1287,8 +1287,8 @@ github.com/thanos-io/thanos v0.13.1-0.20210108102609-f85e4003ba51/go.mod h1:kPvI github.com/thanos-io/thanos v0.13.1-0.20210204123931-82545cdd16fe/go.mod h1:ZLDGYRNkgM+FCwYNOD+6tOV+DE2fpjzfV6iqXyOgFIw= github.com/thanos-io/thanos v0.13.1-0.20210224074000-659446cab117 h1:+rTDtekRPNMsLgCpReU13zJfBEScxGB+w0N4rHGWnRA= github.com/thanos-io/thanos v0.13.1-0.20210224074000-659446cab117/go.mod h1:kdqFpzdkveIKpNNECVJd75RPvgsAifQgJymwCdfev1w= -github.com/thanos-io/thanos v0.13.1-0.20210226093915-2027fb3098e5 h1:5/zDK+wXZfFNOkYCKUk33AZNEmU0ev15dcG5G73/3FE= -github.com/thanos-io/thanos v0.13.1-0.20210226093915-2027fb3098e5/go.mod h1:gMCy4oCteKTT7VuXVvXLTPGzzjovX1VPE5p+HgL1hyU= +github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1 h1:ebr5jjRA6al28bNWhouwHC7hQqC1wexo2uac1+utOus= +github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1/go.mod h1:gMCy4oCteKTT7VuXVvXLTPGzzjovX1VPE5p+HgL1hyU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go index cd4bd513e5..0cb544403a 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -9,16 +9,19 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" ) +const ( + corruptedBucketIndex = "corrupted-bucket-index" + noBucketIndex = "no-bucket-index" +) + // BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Cortex bucket index. type BucketIndexMetadataFetcher struct { userID string @@ -28,7 +31,7 @@ type BucketIndexMetadataFetcher struct { logger log.Logger filters []block.MetadataFilter modifiers []block.MetadataModifier - metrics *fetcherMetrics + metrics *block.FetcherMetrics } func NewBucketIndexMetadataFetcher( @@ -49,37 +52,37 @@ func NewBucketIndexMetadataFetcher( logger: logger, filters: filters, modifiers: modifiers, - metrics: newFetcherMetrics(reg), + metrics: block.NewFetcherMetrics(reg, [][]string{{corruptedBucketIndex}, {noBucketIndex}}, nil), } } // Fetch implements metadata.MetadataFetcher. func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) { - f.metrics.resetTx() + f.metrics.ResetTx() // Check whether the user belongs to the shard. if len(f.strategy.FilterUsers(ctx, []string{f.userID})) != 1 { - f.metrics.submit() + f.metrics.Submit() return nil, nil, nil } // Track duration and sync counters only if wasn't filtered out by the sharding strategy. start := time.Now() defer func() { - f.metrics.syncDuration.Observe(time.Since(start).Seconds()) + f.metrics.SyncDuration.Observe(time.Since(start).Seconds()) if err != nil { - f.metrics.syncFailures.Inc() + f.metrics.SyncFailures.Inc() } }() - f.metrics.syncs.Inc() + f.metrics.Syncs.Inc() // Fetch the bucket index. idx, err := bucketindex.ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger) if errors.Is(err, bucketindex.ErrIndexNotFound) { // This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters // and their bucket index has not been created yet. - f.metrics.synced.WithLabelValues(noBucketIndex).Set(1) - f.metrics.submit() + f.metrics.Synced.WithLabelValues(noBucketIndex).Set(1) + f.metrics.Submit() return nil, nil, nil } @@ -88,14 +91,14 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. // because unable to fetch blocks metadata. We'll act as if the tenant has no bucket index, but the query // will fail anyway in the querier (the querier fails in the querier if bucket index is corrupted). level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err) - f.metrics.synced.WithLabelValues(corruptedBucketIndex).Set(1) - f.metrics.submit() + f.metrics.Synced.WithLabelValues(corruptedBucketIndex).Set(1) + f.metrics.Submit() return nil, nil, nil } if err != nil { - f.metrics.synced.WithLabelValues(failedMeta).Set(1) - f.metrics.submit() + f.metrics.Synced.WithLabelValues(block.FailedMeta).Set(1) + f.metrics.Submit() return nil, nil, errors.Wrapf(err, "read bucket index") } @@ -111,9 +114,9 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. // NOTE: filter can update synced metric accordingly to the reason of the exclude. if customFilter, ok := filter.(MetadataFilterWithBucketIndex); ok { - err = customFilter.FilterWithBucketIndex(ctx, metas, idx, f.metrics.synced) + err = customFilter.FilterWithBucketIndex(ctx, metas, idx, f.metrics.Synced) } else { - err = filter.Filter(ctx, metas, f.metrics.synced) + err = filter.Filter(ctx, metas, f.metrics.Synced) } if err != nil { @@ -123,13 +126,13 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. for _, m := range f.modifiers { // NOTE: modifier can update modified metric accordingly to the reason of the modification. - if err := m.Modify(ctx, metas, f.metrics.modified); err != nil { + if err := m.Modify(ctx, metas, f.metrics.Modified); err != nil { return nil, nil, errors.Wrap(err, "modify metas") } } - f.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) - f.metrics.submit() + f.metrics.Synced.WithLabelValues(block.LoadedMeta).Set(float64(len(metas))) + f.metrics.Submit() return metas, nil, nil } @@ -138,103 +141,3 @@ func (f *BucketIndexMetadataFetcher) UpdateOnChange(callback func([]metadata.Met // Unused by the store-gateway. callback(nil, errors.New("UpdateOnChange is unsupported")) } - -const ( - fetcherSubSys = "blocks_meta" - - corruptedMeta = "corrupted-meta-json" - noMeta = "no-meta-json" - loadedMeta = "loaded" - failedMeta = "failed" - corruptedBucketIndex = "corrupted-bucket-index" - noBucketIndex = "no-bucket-index" - - // Synced label values. - labelExcludedMeta = "label-excluded" - timeExcludedMeta = "time-excluded" - tooFreshMeta = "too-fresh" - duplicateMeta = "duplicate" - // Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted, - // but don't have a replacement block yet. - markedForDeletionMeta = "marked-for-deletion" - - // MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric. - MarkedForNoCompactionMeta = "marked-for-no-compact" - - // Modified label values. - replicaRemovedMeta = "replica-label-removed" -) - -// fetcherMetrics is a copy of Thanos internal fetcherMetrics. These metrics have been copied from -// Thanos in order to track the same exact metrics in our own custom metadata fetcher implementation. -type fetcherMetrics struct { - syncs prometheus.Counter - syncFailures prometheus.Counter - syncDuration prometheus.Histogram - - synced *extprom.TxGaugeVec - modified *extprom.TxGaugeVec -} - -func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { - var m fetcherMetrics - - m.syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Subsystem: fetcherSubSys, - Name: "syncs_total", - Help: "Total blocks metadata synchronization attempts", - }) - m.syncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Subsystem: fetcherSubSys, - Name: "sync_failures_total", - Help: "Total blocks metadata synchronization failures", - }) - m.syncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Subsystem: fetcherSubSys, - Name: "sync_duration_seconds", - Help: "Duration of the blocks metadata synchronization in seconds", - Buckets: []float64{0.01, 1, 10, 100, 1000}, - }) - m.synced = extprom.NewTxGaugeVec( - reg, - prometheus.GaugeOpts{ - Subsystem: fetcherSubSys, - Name: "synced", - Help: "Number of block metadata synced", - }, - []string{"state"}, - []string{corruptedMeta}, - []string{corruptedBucketIndex}, - []string{noMeta}, - []string{noBucketIndex}, - []string{loadedMeta}, - []string{tooFreshMeta}, - []string{failedMeta}, - []string{labelExcludedMeta}, - []string{timeExcludedMeta}, - []string{duplicateMeta}, - []string{markedForDeletionMeta}, - []string{MarkedForNoCompactionMeta}, - ) - m.modified = extprom.NewTxGaugeVec( - reg, - prometheus.GaugeOpts{ - Subsystem: fetcherSubSys, - Name: "modified", - Help: "Number of blocks whose metadata changed", - }, - []string{"modified"}, - []string{replicaRemovedMeta}, - ) - return &m -} - -func (s *fetcherMetrics) submit() { - s.synced.Submit() - s.modified.Submit() -} - -func (s *fetcherMetrics) resetTx() { - s.synced.ResetTx() - s.modified.ResetTx() -} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 2abcd5d70e..01d55b9ea3 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -52,7 +52,7 @@ type BucketStores struct { indexCache storecache.IndexCache // Chunks bytes pool shared across all tenants. - chunksPool pool.BytesPool + chunksPool pool.Bytes // Partitioner shared across all tenants. partitioner store.Partitioner diff --git a/pkg/storegateway/chunk_bytes_pool.go b/pkg/storegateway/chunk_bytes_pool.go index bd5badd3b6..095d5c4836 100644 --- a/pkg/storegateway/chunk_bytes_pool.go +++ b/pkg/storegateway/chunk_bytes_pool.go @@ -4,14 +4,11 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/thanos/pkg/pool" -) - -const ( - maxChunkSize = 16000 + "github.com/thanos-io/thanos/pkg/store" ) type chunkBytesPool struct { - pool *pool.BucketedBytesPool + pool *pool.BucketedBytes // Metrics. requestedBytes prometheus.Counter @@ -19,7 +16,7 @@ type chunkBytesPool struct { } func newChunkBytesPool(maxChunkPoolBytes uint64, reg prometheus.Registerer) (*chunkBytesPool, error) { - upstream, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes) + upstream, err := pool.NewBucketedBytes(store.EstimatedMaxChunkSize, 50e6, 2, maxChunkPoolBytes) if err != nil { return nil, err } diff --git a/pkg/storegateway/chunk_bytes_pool_test.go b/pkg/storegateway/chunk_bytes_pool_test.go index 1ebebbf5cc..586add6ffc 100644 --- a/pkg/storegateway/chunk_bytes_pool_test.go +++ b/pkg/storegateway/chunk_bytes_pool_test.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store" ) func TestChunkBytesPool_Get(t *testing.T) { @@ -16,10 +17,10 @@ func TestChunkBytesPool_Get(t *testing.T) { p, err := newChunkBytesPool(0, reg) require.NoError(t, err) - _, err = p.Get(maxChunkSize - 1) + _, err = p.Get(store.EstimatedMaxChunkSize - 1) require.NoError(t, err) - _, err = p.Get(maxChunkSize + 1) + _, err = p.Get(store.EstimatedMaxChunkSize + 1) require.NoError(t, err) assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` @@ -30,5 +31,5 @@ func TestChunkBytesPool_Get(t *testing.T) { # HELP cortex_bucket_store_chunk_pool_returned_bytes_total Total bytes returned by the chunk bytes pool. # TYPE cortex_bucket_store_chunk_pool_returned_bytes_total counter cortex_bucket_store_chunk_pool_returned_bytes_total %d - `, maxChunkSize*2, maxChunkSize*3)))) + `, store.EstimatedMaxChunkSize*2, store.EstimatedMaxChunkSize*3)))) } diff --git a/pkg/storegateway/metadata_fetcher_filters.go b/pkg/storegateway/metadata_fetcher_filters.go index 7bd8693dd4..76c643ef0c 100644 --- a/pkg/storegateway/metadata_fetcher_filters.go +++ b/pkg/storegateway/metadata_fetcher_filters.go @@ -69,7 +69,7 @@ func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, meta } if time.Since(time.Unix(mark.DeletionTime, 0)).Seconds() > f.delay.Seconds() { - synced.WithLabelValues(markedForDeletionMeta).Inc() + synced.WithLabelValues(block.MarkedForDeletionMeta).Inc() delete(metas, mark.ID) } } diff --git a/pkg/storegateway/metadata_fetcher_filters_test.go b/pkg/storegateway/metadata_fetcher_filters_test.go index 33aec75d5f..84e6ec0cfe 100644 --- a/pkg/storegateway/metadata_fetcher_filters_test.go +++ b/pkg/storegateway/metadata_fetcher_filters_test.go @@ -14,6 +14,7 @@ import ( promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" @@ -100,7 +101,7 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) { require.NoError(t, f.Filter(ctx, inputMetas, synced)) } - assert.Equal(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(markedForDeletionMeta))) + assert.Equal(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(block.MarkedForDeletionMeta))) assert.Equal(t, expectedMetas, inputMetas) assert.Equal(t, expectedDeletionMarks, f.DeletionMarkBlocks()) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go b/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go index c0e04dbe8e..0d067b20df 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go @@ -37,32 +37,37 @@ import ( const FetcherConcurrency = 32 -type fetcherMetrics struct { - syncs prometheus.Counter - syncFailures prometheus.Counter - syncDuration prometheus.Histogram +// FetcherMetrics holds metrics tracked by the metadata fetcher. This struct and its fields are exported +// to allow depending projects (eg. Cortex) to implement their own custom metadata fetcher while tracking +// compatible metrics. +type FetcherMetrics struct { + Syncs prometheus.Counter + SyncFailures prometheus.Counter + SyncDuration prometheus.Histogram - synced *extprom.TxGaugeVec - modified *extprom.TxGaugeVec + Synced *extprom.TxGaugeVec + Modified *extprom.TxGaugeVec } -func (s *fetcherMetrics) submit() { - s.synced.Submit() - s.modified.Submit() +// Submit applies new values for metrics tracked by transaction GaugeVec. +func (s *FetcherMetrics) Submit() { + s.Synced.Submit() + s.Modified.Submit() } -func (s *fetcherMetrics) resetTx() { - s.synced.ResetTx() - s.modified.ResetTx() +// ResetTx starts new transaction for metrics tracked by transaction GaugeVec. +func (s *FetcherMetrics) ResetTx() { + s.Synced.ResetTx() + s.Modified.ResetTx() } const ( fetcherSubSys = "blocks_meta" - corruptedMeta = "corrupted-meta-json" - noMeta = "no-meta-json" - loadedMeta = "loaded" - failedMeta = "failed" + CorruptedMeta = "corrupted-meta-json" + NoMeta = "no-meta-json" + LoadedMeta = "loaded" + FailedMeta = "failed" // Synced label values. labelExcludedMeta = "label-excluded" @@ -71,7 +76,7 @@ const ( duplicateMeta = "duplicate" // Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted, // but don't have a replacement block yet. - markedForDeletionMeta = "marked-for-deletion" + MarkedForDeletionMeta = "marked-for-deletion" // MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric. MarkedForNoCompactionMeta = "marked-for-no-compact" @@ -80,26 +85,26 @@ const ( replicaRemovedMeta = "replica-label-removed" ) -func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { - var m fetcherMetrics +func NewFetcherMetrics(reg prometheus.Registerer, syncedExtraLabels, modifiedExtraLabels [][]string) *FetcherMetrics { + var m FetcherMetrics - m.syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + m.Syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Subsystem: fetcherSubSys, Name: "syncs_total", Help: "Total blocks metadata synchronization attempts", }) - m.syncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + m.SyncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Subsystem: fetcherSubSys, Name: "sync_failures_total", Help: "Total blocks metadata synchronization failures", }) - m.syncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.SyncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Subsystem: fetcherSubSys, Name: "sync_duration_seconds", Help: "Duration of the blocks metadata synchronization in seconds", Buckets: []float64{0.01, 1, 10, 100, 1000}, }) - m.synced = extprom.NewTxGaugeVec( + m.Synced = extprom.NewTxGaugeVec( reg, prometheus.GaugeOpts{ Subsystem: fetcherSubSys, @@ -107,18 +112,20 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { Help: "Number of block metadata synced", }, []string{"state"}, - []string{corruptedMeta}, - []string{noMeta}, - []string{loadedMeta}, - []string{tooFreshMeta}, - []string{failedMeta}, - []string{labelExcludedMeta}, - []string{timeExcludedMeta}, - []string{duplicateMeta}, - []string{markedForDeletionMeta}, - []string{MarkedForNoCompactionMeta}, + append([][]string{ + {CorruptedMeta}, + {NoMeta}, + {LoadedMeta}, + {tooFreshMeta}, + {FailedMeta}, + {labelExcludedMeta}, + {timeExcludedMeta}, + {duplicateMeta}, + {MarkedForDeletionMeta}, + {MarkedForNoCompactionMeta}, + }, syncedExtraLabels...)..., ) - m.modified = extprom.NewTxGaugeVec( + m.Modified = extprom.NewTxGaugeVec( reg, prometheus.GaugeOpts{ Subsystem: fetcherSubSys, @@ -126,7 +133,9 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { Help: "Number of blocks whose metadata changed", }, []string{"modified"}, - []string{replicaRemovedMeta}, + append([][]string{ + {replicaRemovedMeta}, + }, modifiedExtraLabels...)..., ) return &m } @@ -186,6 +195,12 @@ func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente }, nil } +// NewRawMetaFetcher returns basic meta fetcher without proper handling for eventual consistent backends or partial uploads. +// NOTE: Not suitable to use in production. +func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) (*MetaFetcher, error) { + return NewMetaFetcher(logger, 1, bkt, "", nil, nil, nil) +} + // NewMetaFetcher returns meta fetcher. func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) (*MetaFetcher, error) { b, err := NewBaseFetcher(logger, concurrency, bkt, dir, reg) @@ -197,7 +212,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente // NewMetaFetcher transforms BaseFetcher into actually usable *MetaFetcher. func (f *BaseFetcher) NewMetaFetcher(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier, logTags ...interface{}) *MetaFetcher { - return &MetaFetcher{metrics: newFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers, logger: log.With(f.logger, logTags...)} + return &MetaFetcher{metrics: NewFetcherMetrics(reg, nil, nil), wrapped: f, filters: filters, modifiers: modifiers, logger: log.With(f.logger, logTags...)} } var ( @@ -405,16 +420,16 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { return resp, nil } -func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filters []MetadataFilter, modifiers []MetadataModifier) (_ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]error, err error) { +func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filters []MetadataFilter, modifiers []MetadataModifier) (_ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]error, err error) { start := time.Now() defer func() { - metrics.syncDuration.Observe(time.Since(start).Seconds()) + metrics.SyncDuration.Observe(time.Since(start).Seconds()) if err != nil { - metrics.syncFailures.Inc() + metrics.SyncFailures.Inc() } }() - metrics.syncs.Inc() - metrics.resetTx() + metrics.Syncs.Inc() + metrics.ResetTx() // Run this in thread safe run group. // TODO(bwplotka): Consider custom singleflight with ttl. @@ -433,26 +448,26 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter metas[id] = m } - metrics.synced.WithLabelValues(failedMeta).Set(float64(len(resp.metaErrs))) - metrics.synced.WithLabelValues(noMeta).Set(resp.noMetas) - metrics.synced.WithLabelValues(corruptedMeta).Set(resp.corruptedMetas) + metrics.Synced.WithLabelValues(FailedMeta).Set(float64(len(resp.metaErrs))) + metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetas) + metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetas) for _, filter := range filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. - if err := filter.Filter(ctx, metas, metrics.synced); err != nil { + if err := filter.Filter(ctx, metas, metrics.Synced); err != nil { return nil, nil, errors.Wrap(err, "filter metas") } } for _, m := range modifiers { // NOTE: modifier can update modified metric accordingly to the reason of the modification. - if err := m.Modify(ctx, metas, metrics.modified); err != nil { + if err := m.Modify(ctx, metas, metrics.Modified); err != nil { return nil, nil, errors.Wrap(err, "modify metas") } } - metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) - metrics.submit() + metrics.Synced.WithLabelValues(LoadedMeta).Set(float64(len(metas))) + metrics.Submit() if len(resp.metaErrs) > 0 { return metas, resp.partial, errors.Wrap(resp.metaErrs.Err(), "incomplete view") @@ -464,7 +479,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter type MetaFetcher struct { wrapped *BaseFetcher - metrics *fetcherMetrics + metrics *FetcherMetrics filters []MetadataFilter modifiers []MetadataModifier @@ -823,7 +838,7 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL mtx.Lock() f.deletionMarkMap[id] = m if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() { - synced.WithLabelValues(markedForDeletionMeta).Inc() + synced.WithLabelValues(MarkedForDeletionMeta).Inc() delete(metas, id) } mtx.Unlock() diff --git a/vendor/github.com/thanos-io/thanos/pkg/pool/pool.go b/vendor/github.com/thanos-io/thanos/pkg/pool/pool.go index 25eec7c736..cbd034e9e7 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/pool/pool.go +++ b/vendor/github.com/thanos-io/thanos/pkg/pool/pool.go @@ -9,15 +9,28 @@ import ( "github.com/pkg/errors" ) -type BytesPool interface { +// Bytes is a pool of bytes that can be reused. +type Bytes interface { + // Get returns a new byte slices that fits the given size. Get(sz int) (*[]byte, error) + // Put returns a byte slice to the right bucket in the pool. Put(b *[]byte) } -// BucketedBytesPool is a bucketed pool for variably sized byte slices. It can be configured to not allow +// NoopBytes is pool that always allocated required slice on heap and ignore puts. +type NoopBytes struct{} + +func (p NoopBytes) Get(sz int) (*[]byte, error) { + b := make([]byte, 0, sz) + return &b, nil +} + +func (p NoopBytes) Put(*[]byte) {} + +// BucketedBytes is a bucketed pool for variably sized byte slices. It can be configured to not allow // more than a maximum number of bytes being used at a given time. // Every byte slice obtained from the pool must be returned. -type BucketedBytesPool struct { +type BucketedBytes struct { buckets []sync.Pool sizes []int maxTotal uint64 @@ -27,10 +40,10 @@ type BucketedBytesPool struct { new func(s int) *[]byte } -// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize +// NewBucketedBytes returns a new Bytes with size buckets for minSize to maxSize // increasing by the given factor and maximum number of used bytes. // No more than maxTotal bytes can be used at any given time unless maxTotal is set to 0. -func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedBytesPool, error) { +func NewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedBytes, error) { if minSize < 1 { return nil, errors.New("invalid minimum pool size") } @@ -46,7 +59,7 @@ func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64) for s := minSize; s <= maxSize; s = int(float64(s) * factor) { sizes = append(sizes, s) } - p := &BucketedBytesPool{ + p := &BucketedBytes{ buckets: make([]sync.Pool, len(sizes)), sizes: sizes, maxTotal: maxTotal, @@ -62,7 +75,7 @@ func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64) var ErrPoolExhausted = errors.New("pool exhausted") // Get returns a new byte slice that fits the given size. -func (p *BucketedBytesPool) Get(sz int) (*[]byte, error) { +func (p *BucketedBytes) Get(sz int) (*[]byte, error) { p.mtx.Lock() defer p.mtx.Unlock() @@ -89,7 +102,7 @@ func (p *BucketedBytesPool) Get(sz int) (*[]byte, error) { } // Put returns a byte slice to the right bucket in the pool. -func (p *BucketedBytesPool) Put(b *[]byte) { +func (p *BucketedBytes) Put(b *[]byte) { if b == nil { return } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index 9ef8b8dc7d..f0301f3052 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -61,8 +61,9 @@ const ( // because you barely get any improvements in compression when the number of samples is beyond this. // Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf. MaxSamplesPerChunk = 120 - maxChunkSize = 16000 - maxSeriesSize = 64 * 1024 + // EstimatedMaxChunkSize is average max of chunk size. This can be exceeded though in very rare (valid) cases. + EstimatedMaxChunkSize = 16000 + maxSeriesSize = 64 * 1024 // CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility // with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels. @@ -258,7 +259,7 @@ type BucketStore struct { dir string indexCache storecache.IndexCache indexReaderPool *indexheader.ReaderPool - chunkPool pool.BytesPool + chunkPool pool.Bytes // Sets of blocks that have the same labels. They are indexed by a hash over their label set. mtx sync.RWMutex @@ -283,14 +284,33 @@ type BucketStore struct { advLabelSets []labelpb.ZLabelSet enableCompatibilityLabel bool + // Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32. postingOffsetsInMemSampling int // Enables hints in the Series() response. enableSeriesResponseHints bool } +type noopCache struct{} + +func (noopCache) StorePostings(context.Context, ulid.ULID, labels.Label, []byte) {} +func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) { + return map[labels.Label][]byte{}, keys +} + +func (noopCache) StoreSeries(context.Context, ulid.ULID, uint64, []byte) {} +func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) { + return map[uint64][]byte{}, ids +} + +type noopGate struct{} + +func (noopGate) Start(context.Context) error { return nil } +func (noopGate) Done() {} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. +// TODO(bwplotka): Move to config at this point. func NewBucketStore( logger log.Logger, reg prometheus.Registerer, @@ -299,7 +319,7 @@ func NewBucketStore( dir string, indexCache storecache.IndexCache, queryGate gate.Gate, - chunkPool pool.BytesPool, + chunkPool pool.Bytes, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, partitioner Partitioner, @@ -316,6 +336,16 @@ func NewBucketStore( logger = log.NewNopLogger() } + if chunkPool == nil { + chunkPool = pool.NoopBytes{} + } + if indexCache == nil { + indexCache = noopCache{} + } + if queryGate == nil { + queryGate = noopGate{} + } + s := &BucketStore{ logger: logger, bkt: bkt, @@ -1369,7 +1399,7 @@ type bucketBlock struct { meta *metadata.Meta dir string indexCache storecache.IndexCache - chunkPool pool.BytesPool + chunkPool pool.Bytes extLset labels.Labels indexHeaderReader indexheader.Reader @@ -1393,7 +1423,7 @@ func newBucketBlock( bkt objstore.BucketReader, dir string, indexCache storecache.IndexCache, - chunkPool pool.BytesPool, + chunkPool pool.Bytes, indexHeadReader indexheader.Reader, p Partitioner, ) (b *bucketBlock, err error) { @@ -2228,7 +2258,7 @@ func (r *bucketChunkReader) preload() error { return offsets[i] < offsets[j] }) parts := r.block.partitioner.Partition(len(offsets), func(i int) (start, end uint64) { - return uint64(offsets[i]), uint64(offsets[i]) + maxChunkSize + return uint64(offsets[i]), uint64(offsets[i]) + EstimatedMaxChunkSize }) seq := seq @@ -2337,7 +2367,7 @@ func chunkOffsetsToByteRanges(offsets []uint32, start uint32) byteRanges { ranges[idx] = byteRange{ // The byte range offset is required to be relative to the start of the read slice. offset: int(offsets[idx] - start), - length: maxChunkSize, + length: EstimatedMaxChunkSize, } if idx > 0 { @@ -2480,6 +2510,6 @@ func (s queryStats) merge(o *queryStats) *queryStats { } // NewDefaultChunkBytesPool returns a chunk bytes pool with default settings. -func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.BytesPool, error) { - return pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes) +func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.Bytes, error) { + return pool.NewBucketedBytes(EstimatedMaxChunkSize, 50e6, 2, maxChunkPoolBytes) } diff --git a/vendor/modules.txt b/vendor/modules.txt index b5aec8eaa6..e8222bb68b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -552,7 +552,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/thanos v0.13.1-0.20210226093915-2027fb3098e5 +# github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1 ## explicit github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader From ac27b3c86f04019f6bc69abbfa9d41bdb5ffcfe7 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 2 Mar 2021 09:34:30 +0100 Subject: [PATCH 2/2] Updated comment Signed-off-by: Marco Pracucci --- pkg/storegateway/bucket_index_metadata_fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go index 0cb544403a..b65d1c4747 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -56,7 +56,7 @@ func NewBucketIndexMetadataFetcher( } } -// Fetch implements metadata.MetadataFetcher. +// Fetch implements block.MetadataFetcher. Not goroutine-safe. func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) { f.metrics.ResetTx()