Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Expose bucket index operation duration histogram #2725

Merged
merged 7 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (c *memcachedClient) resolveAddrs() error {

// If some of the dns resolution fails, log the error.
if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil {
level.Error(c.logger).Log("msg", "failed to resolve addresses for storeAPIs", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
}
// Fail in case no server address is resolved.
servers := c.dnsProvider.Addresses()
Expand Down
2 changes: 2 additions & 0 deletions pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/runutil"
)

Expand Down Expand Up @@ -257,6 +258,7 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metric
ConstLabels: prometheus.Labels{"bucket": name},
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
}, []string{"operation"}),

lastSuccessfulUploadTime: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_objstore_bucket_last_successful_upload_time",
Help: "Second timestamp of the last successful upload to the bucket.",
Expand Down
41 changes: 34 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type bucketStoreMetrics struct {
cachedPostingsCompressionTimeSeconds *prometheus.CounterVec
cachedPostingsOriginalSizeBytes prometheus.Counter
cachedPostingsCompressedSizeBytes prometheus.Counter

seriesFetchDuration prometheus.Histogram
postingsFetchDuration prometheus.Histogram
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -221,6 +224,18 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Help: "Compressed size of postings stored into cache.",
})

m.seriesFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_cached_series_fetch_duration_seconds",
Help: "Time it takes to fetch series from a bucket to respond a query. It also includes the time it takes to cache fetch and store operations.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

m.postingsFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_cached_postings_fetch_duration_seconds",
Help: "Time it takes to fetch postings from a bucket to respond a query. It also includes the time it takes to cache fetch and store operations.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

return &m
}

Expand Down Expand Up @@ -473,7 +488,14 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
lset := labels.FromMap(meta.Thanos.Labels)
h := lset.Hash()

indexHeaderReader, err := indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID, s.postingOffsetsInMemSampling)
indexHeaderReader, err := indexheader.NewBinaryReader(
ctx,
s.logger,
s.bkt,
s.dir,
meta.ULID,
s.postingOffsetsInMemSampling,
)
if err != nil {
return errors.Wrap(err, "create index header reader")
}
Expand All @@ -486,14 +508,14 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
b, err := newBucketBlock(
ctx,
log.With(s.logger, "block", meta.ULID),
s.metrics,
meta,
s.bkt,
dir,
s.indexCache,
s.chunkPool,
indexHeaderReader,
s.partitioner,
s.metrics.seriesRefetches,
s.enablePostingsCompression,
)
if err != nil {
Expand Down Expand Up @@ -1258,6 +1280,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M
// state for the block on local disk.
type bucketBlock struct {
logger log.Logger
metrics *bucketStoreMetrics
bkt objstore.BucketReader
meta *metadata.Meta
dir string
Expand All @@ -1272,8 +1295,6 @@ type bucketBlock struct {

partitioner partitioner

seriesRefetches prometheus.Counter

enablePostingsCompression bool

// Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using
Expand All @@ -1284,26 +1305,26 @@ type bucketBlock struct {
func newBucketBlock(
ctx context.Context,
logger log.Logger,
metrics *bucketStoreMetrics,
meta *metadata.Meta,
bkt objstore.BucketReader,
dir string,
indexCache storecache.IndexCache,
chunkPool pool.BytesPool,
indexHeadReader indexheader.Reader,
p partitioner,
seriesRefetches prometheus.Counter,
enablePostingsCompression bool,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
metrics: metrics,
bkt: bkt,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
seriesRefetches: seriesRefetches,
enablePostingsCompression: enablePostingsCompression,
}

Expand Down Expand Up @@ -1610,6 +1631,9 @@ type postingPtr struct {
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) {
timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration)
defer timer.ObserveDuration()

var ptrs []postingPtr

output := make([]index.Postings, len(keys))
Expand Down Expand Up @@ -1827,6 +1851,9 @@ func (it *bigEndianPostings) length() int {
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration)
defer timer.ObserveDuration()

// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
fromCache, ids := r.block.indexCache.FetchMultiSeries(r.ctx, r.block.meta.ULID, ids)
Expand Down Expand Up @@ -1877,7 +1904,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetc
}

// Inefficient, but should be rare.
r.block.seriesRefetches.Inc()
r.block.metrics.seriesRefetches.Inc()
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)

// Fetch plus to get the size of next one if exists.
Expand Down
37 changes: 20 additions & 17 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ import (
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/encoding"

"go.uber.org/atomic"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand All @@ -52,7 +52,6 @@ import (
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"go.uber.org/atomic"
)

var emptyRelabelConfig = make([]*relabel.Config, 0)
Expand Down Expand Up @@ -209,7 +208,7 @@ func TestBucketBlock_matchLabels(t *testing.T) {
},
}

b, err := newBucketBlock(context.Background(), log.NewNopLogger(), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, true)
b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, true)
testutil.Ok(t, err)

cases := []struct {
Expand Down Expand Up @@ -921,10 +920,10 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {
ULID: ulid.MustNew(1, nil),
},
},
bkt: bkt,
seriesRefetches: s.seriesRefetches,
logger: log.NewNopLogger(),
indexCache: noopCache{},
bkt: bkt,
logger: log.NewNopLogger(),
metrics: s,
indexCache: noopCache{},
}

buf := encoding.Encbuf{}
Expand Down Expand Up @@ -1130,6 +1129,7 @@ func benchmarkExpandedPostings(
t.Run(c.name, func(t testutil.TB) {
b := &bucketBlock{
logger: log.NewNopLogger(),
metrics: newBucketStoreMetrics(nil),
indexHeaderReader: r,
indexCache: noopCache{},
bkt: bkt,
Expand Down Expand Up @@ -1228,15 +1228,16 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String())))

m := newBucketStoreMetrics(nil)
b := &bucketBlock{
indexCache: noopCache{},
logger: logger,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
seriesRefetches: promauto.With(nil).NewCounter(prometheus.CounterOpts{}),
indexCache: noopCache{},
logger: logger,
metrics: m,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
blocks = append(blocks, b)
}
Expand Down Expand Up @@ -1289,7 +1290,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request

for _, b := range blocks {
// NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series.
testutil.Equals(t, 0.0, promtest.ToFloat64(b.seriesRefetches))
testutil.Equals(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches))
}
}
}
Expand Down Expand Up @@ -1393,6 +1394,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
b1 = &bucketBlock{
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
Expand Down Expand Up @@ -1431,6 +1433,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
b2 = &bucketBlock{
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
Expand Down