Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse FetcherMetrics from Thanos #3892

Merged
merged 2 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.2.2
github.com/stretchr/testify v1.7.0
github.com/thanos-io/thanos v0.13.1-0.20210226093915-2027fb3098e5
github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120
go.etcd.io/bbolt v1.3.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1287,8 +1287,8 @@ github.com/thanos-io/thanos v0.13.1-0.20210108102609-f85e4003ba51/go.mod h1:kPvI
github.com/thanos-io/thanos v0.13.1-0.20210204123931-82545cdd16fe/go.mod h1:ZLDGYRNkgM+FCwYNOD+6tOV+DE2fpjzfV6iqXyOgFIw=
github.com/thanos-io/thanos v0.13.1-0.20210224074000-659446cab117 h1:+rTDtekRPNMsLgCpReU13zJfBEScxGB+w0N4rHGWnRA=
github.com/thanos-io/thanos v0.13.1-0.20210224074000-659446cab117/go.mod h1:kdqFpzdkveIKpNNECVJd75RPvgsAifQgJymwCdfev1w=
github.com/thanos-io/thanos v0.13.1-0.20210226093915-2027fb3098e5 h1:5/zDK+wXZfFNOkYCKUk33AZNEmU0ev15dcG5G73/3FE=
github.com/thanos-io/thanos v0.13.1-0.20210226093915-2027fb3098e5/go.mod h1:gMCy4oCteKTT7VuXVvXLTPGzzjovX1VPE5p+HgL1hyU=
github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1 h1:ebr5jjRA6al28bNWhouwHC7hQqC1wexo2uac1+utOus=
github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1/go.mod h1:gMCy4oCteKTT7VuXVvXLTPGzzjovX1VPE5p+HgL1hyU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
143 changes: 23 additions & 120 deletions pkg/storegateway/bucket_index_metadata_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
)

const (
corruptedBucketIndex = "corrupted-bucket-index"
noBucketIndex = "no-bucket-index"
)

// BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Cortex bucket index.
type BucketIndexMetadataFetcher struct {
userID string
Expand All @@ -28,7 +31,7 @@ type BucketIndexMetadataFetcher struct {
logger log.Logger
filters []block.MetadataFilter
modifiers []block.MetadataModifier
metrics *fetcherMetrics
metrics *block.FetcherMetrics
}

func NewBucketIndexMetadataFetcher(
Expand All @@ -49,37 +52,37 @@ func NewBucketIndexMetadataFetcher(
logger: logger,
filters: filters,
modifiers: modifiers,
metrics: newFetcherMetrics(reg),
metrics: block.NewFetcherMetrics(reg, [][]string{{corruptedBucketIndex}, {noBucketIndex}}, nil),
}
}

// Fetch implements metadata.MetadataFetcher.
func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) {
f.metrics.resetTx()
f.metrics.ResetTx()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought(non-blocking): The upstream ResetTx function is not goroutine safe. Would it make sense to propagate that warning here with a comment indicating the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole MetadataFetcher.Fetch() is not required to be goroutine-safe (in Thanos it's not implemented in a goroutine-safe way too). I've updated the function comment.


// Check whether the user belongs to the shard.
if len(f.strategy.FilterUsers(ctx, []string{f.userID})) != 1 {
f.metrics.submit()
f.metrics.Submit()
return nil, nil, nil
}

// Track duration and sync counters only if wasn't filtered out by the sharding strategy.
start := time.Now()
defer func() {
f.metrics.syncDuration.Observe(time.Since(start).Seconds())
f.metrics.SyncDuration.Observe(time.Since(start).Seconds())
if err != nil {
f.metrics.syncFailures.Inc()
f.metrics.SyncFailures.Inc()
}
}()
f.metrics.syncs.Inc()
f.metrics.Syncs.Inc()

// Fetch the bucket index.
idx, err := bucketindex.ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger)
if errors.Is(err, bucketindex.ErrIndexNotFound) {
// This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters
// and their bucket index has not been created yet.
f.metrics.synced.WithLabelValues(noBucketIndex).Set(1)
f.metrics.submit()
f.metrics.Synced.WithLabelValues(noBucketIndex).Set(1)
f.metrics.Submit()

return nil, nil, nil
}
Expand All @@ -88,14 +91,14 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.
// because unable to fetch blocks metadata. We'll act as if the tenant has no bucket index, but the query
// will fail anyway in the querier (the querier fails in the querier if bucket index is corrupted).
level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err)
f.metrics.synced.WithLabelValues(corruptedBucketIndex).Set(1)
f.metrics.submit()
f.metrics.Synced.WithLabelValues(corruptedBucketIndex).Set(1)
f.metrics.Submit()

return nil, nil, nil
}
if err != nil {
f.metrics.synced.WithLabelValues(failedMeta).Set(1)
f.metrics.submit()
f.metrics.Synced.WithLabelValues(block.FailedMeta).Set(1)
f.metrics.Submit()

return nil, nil, errors.Wrapf(err, "read bucket index")
}
Expand All @@ -111,9 +114,9 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.

// NOTE: filter can update synced metric accordingly to the reason of the exclude.
if customFilter, ok := filter.(MetadataFilterWithBucketIndex); ok {
err = customFilter.FilterWithBucketIndex(ctx, metas, idx, f.metrics.synced)
err = customFilter.FilterWithBucketIndex(ctx, metas, idx, f.metrics.Synced)
} else {
err = filter.Filter(ctx, metas, f.metrics.synced)
err = filter.Filter(ctx, metas, f.metrics.Synced)
}

if err != nil {
Expand All @@ -123,13 +126,13 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.

for _, m := range f.modifiers {
// NOTE: modifier can update modified metric accordingly to the reason of the modification.
if err := m.Modify(ctx, metas, f.metrics.modified); err != nil {
if err := m.Modify(ctx, metas, f.metrics.Modified); err != nil {
return nil, nil, errors.Wrap(err, "modify metas")
}
}

f.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas)))
f.metrics.submit()
f.metrics.Synced.WithLabelValues(block.LoadedMeta).Set(float64(len(metas)))
f.metrics.Submit()

return metas, nil, nil
}
Expand All @@ -138,103 +141,3 @@ func (f *BucketIndexMetadataFetcher) UpdateOnChange(callback func([]metadata.Met
// Unused by the store-gateway.
callback(nil, errors.New("UpdateOnChange is unsupported"))
}

const (
fetcherSubSys = "blocks_meta"

corruptedMeta = "corrupted-meta-json"
noMeta = "no-meta-json"
loadedMeta = "loaded"
failedMeta = "failed"
corruptedBucketIndex = "corrupted-bucket-index"
noBucketIndex = "no-bucket-index"

// Synced label values.
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
// Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted,
// but don't have a replacement block yet.
markedForDeletionMeta = "marked-for-deletion"

// MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric.
MarkedForNoCompactionMeta = "marked-for-no-compact"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
)

// fetcherMetrics is a copy of Thanos internal fetcherMetrics. These metrics have been copied from
// Thanos in order to track the same exact metrics in our own custom metadata fetcher implementation.
type fetcherMetrics struct {
syncs prometheus.Counter
syncFailures prometheus.Counter
syncDuration prometheus.Histogram

synced *extprom.TxGaugeVec
modified *extprom.TxGaugeVec
}

func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
var m fetcherMetrics

m.syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: fetcherSubSys,
Name: "syncs_total",
Help: "Total blocks metadata synchronization attempts",
})
m.syncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: fetcherSubSys,
Name: "sync_failures_total",
Help: "Total blocks metadata synchronization failures",
})
m.syncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Subsystem: fetcherSubSys,
Name: "sync_duration_seconds",
Help: "Duration of the blocks metadata synchronization in seconds",
Buckets: []float64{0.01, 1, 10, 100, 1000},
})
m.synced = extprom.NewTxGaugeVec(
reg,
prometheus.GaugeOpts{
Subsystem: fetcherSubSys,
Name: "synced",
Help: "Number of block metadata synced",
},
[]string{"state"},
[]string{corruptedMeta},
[]string{corruptedBucketIndex},
[]string{noMeta},
[]string{noBucketIndex},
[]string{loadedMeta},
[]string{tooFreshMeta},
[]string{failedMeta},
[]string{labelExcludedMeta},
[]string{timeExcludedMeta},
[]string{duplicateMeta},
[]string{markedForDeletionMeta},
[]string{MarkedForNoCompactionMeta},
)
m.modified = extprom.NewTxGaugeVec(
reg,
prometheus.GaugeOpts{
Subsystem: fetcherSubSys,
Name: "modified",
Help: "Number of blocks whose metadata changed",
},
[]string{"modified"},
[]string{replicaRemovedMeta},
)
return &m
}

func (s *fetcherMetrics) submit() {
s.synced.Submit()
s.modified.Submit()
}

func (s *fetcherMetrics) resetTx() {
s.synced.ResetTx()
s.modified.ResetTx()
}
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type BucketStores struct {
indexCache storecache.IndexCache

// Chunks bytes pool shared across all tenants.
chunksPool pool.BytesPool
chunksPool pool.Bytes

// Partitioner shared across all tenants.
partitioner store.Partitioner
Expand Down
9 changes: 3 additions & 6 deletions pkg/storegateway/chunk_bytes_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,19 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/pool"
)

const (
maxChunkSize = 16000
"github.com/thanos-io/thanos/pkg/store"
)

type chunkBytesPool struct {
pool *pool.BucketedBytesPool
pool *pool.BucketedBytes

// Metrics.
requestedBytes prometheus.Counter
returnedBytes prometheus.Counter
}

func newChunkBytesPool(maxChunkPoolBytes uint64, reg prometheus.Registerer) (*chunkBytesPool, error) {
upstream, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes)
upstream, err := pool.NewBucketedBytes(store.EstimatedMaxChunkSize, 50e6, 2, maxChunkPoolBytes)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/storegateway/chunk_bytes_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/store"
)

func TestChunkBytesPool_Get(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
p, err := newChunkBytesPool(0, reg)
require.NoError(t, err)

_, err = p.Get(maxChunkSize - 1)
_, err = p.Get(store.EstimatedMaxChunkSize - 1)
require.NoError(t, err)

_, err = p.Get(maxChunkSize + 1)
_, err = p.Get(store.EstimatedMaxChunkSize + 1)
require.NoError(t, err)

assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(`
Expand All @@ -30,5 +31,5 @@ func TestChunkBytesPool_Get(t *testing.T) {
# HELP cortex_bucket_store_chunk_pool_returned_bytes_total Total bytes returned by the chunk bytes pool.
# TYPE cortex_bucket_store_chunk_pool_returned_bytes_total counter
cortex_bucket_store_chunk_pool_returned_bytes_total %d
`, maxChunkSize*2, maxChunkSize*3))))
`, store.EstimatedMaxChunkSize*2, store.EstimatedMaxChunkSize*3))))
}
2 changes: 1 addition & 1 deletion pkg/storegateway/metadata_fetcher_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, meta
}

if time.Since(time.Unix(mark.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(markedForDeletionMeta).Inc()
synced.WithLabelValues(block.MarkedForDeletionMeta).Inc()
delete(metas, mark.ID)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/metadata_fetcher_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -100,7 +101,7 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) {
require.NoError(t, f.Filter(ctx, inputMetas, synced))
}

assert.Equal(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(markedForDeletionMeta)))
assert.Equal(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(block.MarkedForDeletionMeta)))
assert.Equal(t, expectedMetas, inputMetas)
assert.Equal(t, expectedDeletionMarks, f.DeletionMarkBlocks())
}
Loading