Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB/index-sampling-wiring #6338

Merged
merged 5 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/util"
)

Expand Down Expand Up @@ -320,6 +321,10 @@ func (s *storeMock) GetSeries(ctx context.Context, userID string, from, through
panic("don't call me please")
}

func (s *storeMock) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
return nil, nil
}

func (s *storeMock) Stop() {
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/stores/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/series"
"github.com/grafana/loki/pkg/util"
)
Expand All @@ -25,6 +26,7 @@ type Store interface {
LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)
GetChunkFetcher(tm model.Time) *fetcher.Fetcher
SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)
Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error)
Stop()
}

Expand Down Expand Up @@ -161,6 +163,22 @@ func (c compositeStore) GetChunkRefs(ctx context.Context, userID string, from, t
return chunkIDs, fetchers, err
}

func (c compositeStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
xs := make([]*stats.Stats, 0, len(c.stores))
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
x, err := store.Stats(innerCtx, userID, from, through, matchers...)
xs = append(xs, x)
return err
})

if err != nil {
return nil, err

}
res := stats.MergeStats(xs...)
return &res, err
}

func (c compositeStore) GetChunkFetcher(tm model.Time) *fetcher.Fetcher {
// find the schema with the lowest start _after_ tm
j := sort.Search(len(c.stores), func(j int) bool {
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/stores/composite_store_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/errors"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/series"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
Expand Down Expand Up @@ -107,6 +108,20 @@ func (c *storeEntry) LabelValuesForMetricName(ctx context.Context, userID string
return c.index.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...)
}

func (c *storeEntry) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.Stats")
defer log.Span.Finish()

shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}

return c.index.Stats(ctx, userID, from, through, matchers...)
}

func (c *storeEntry) validateQueryTimeRange(ctx context.Context, userID string, from *model.Time, through *model.Time) (bool, error) {
//nolint:ineffassign,staticcheck //Leaving ctx even though we don't currently use it, we want to make it available for when we might need it and hopefully will ensure us using the correct context at that time

Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/stores/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
)

type mockStore int
Expand Down Expand Up @@ -47,6 +48,10 @@ func (m mockStore) GetChunkFetcher(tm model.Time) *fetcher.Fetcher {
return nil
}

func (m mockStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
return nil, nil
}

func (m mockStore) Stop() {}

func TestCompositeStore(t *testing.T) {
Expand Down
131 changes: 131 additions & 0 deletions pkg/storage/stores/index/stats/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package stats

import (
"encoding/binary"
"sync"

"github.com/prometheus/common/model"
"github.com/willf/bloom"

"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

var BloomPool PoolBloom

type Stats = indexgatewaypb.IndexStatsResponse

func MergeStats(xs ...*Stats) (s Stats) {
for _, x := range xs {
if x == nil {
continue
}
s.Streams += x.Streams
s.Chunks += x.Chunks
s.Bytes += x.Bytes
s.Entries += x.Entries

}
return s
}

type PoolBloom struct {
pool sync.Pool
}

func (p *PoolBloom) Get() *Blooms {
if x := p.pool.Get(); x != nil {
return x.(*Blooms)
}

return newBlooms()

}

func (p *PoolBloom) Put(x *Blooms) {
x.Streams.ClearAll()
x.Chunks.ClearAll()
x.stats = Stats{}
p.pool.Put(x)
}

// These are very expensive in terms of memory usage,
// each requiring ~12.5MB. Therefore we heavily rely on pool usage.
// See https://hur.st/bloomfilter for play around with this idea.
// We use bloom filter per process per query to avoid double-counting duplicates
// when calculating statistics across multiple tsdb files, however
// we cannot guarantee this when querying across period config boundaries
// as the data is requested via separate calls to the underlying store,
// which may reside on a different process (index-gateway).
// This is an accepted fault and we may double-count some values which
// are on both sides of a schema line:
// streams+chunks and thus bytes/lines.
// To avoid this, we'd need significant refactoring
// to ensure we resolve statistics for all periods together
// and this doesn't seem worth it: the code paths for iterating across different
// stores are separate.
// Another option is to ship the bloom filter bitmaps sequentially to each
// store, but this is too inefficient (~12.5MB payloads).
// signed, @owen-d
func newBlooms() *Blooms {
// 1 million streams @ 1% error =~ 1.14MB
streams := bloom.NewWithEstimates(1e6, 0.01)
// 10 million chunks @ 1% error =~ 11.43MB
chunks := bloom.NewWithEstimates(10e6, 0.01)
return &Blooms{
Streams: streams,
Chunks: chunks,
}
}

// TODO(owen-d): shard this across a slice of smaller bloom filters to reduce
// lock contention
// Bloom filters for estimating duplicate statistics across both series
// and chunks within TSDB indices. These are used to calculate data topology
// statistics prior to running queries.
type Blooms struct {
sync.RWMutex
Streams, Chunks *bloom.BloomFilter
stats Stats
}

func (b *Blooms) Stats() Stats { return b.stats }

func (b *Blooms) AddStream(fp model.Fingerprint) {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, uint64(fp))
b.add(b.Streams, key, func() {
b.stats.Streams++
})
}

func (b *Blooms) AddChunk(fp model.Fingerprint, chk index.ChunkMeta) {
// fingerprint + mintime + maxtime + checksum
ln := 8 + 8 + 8 + 4
key := make([]byte, ln)
binary.BigEndian.PutUint64(key, uint64(fp))
binary.BigEndian.PutUint64(key[8:], uint64(chk.MinTime))
binary.BigEndian.PutUint64(key[16:], uint64(chk.MaxTime))
binary.BigEndian.PutUint32(key[24:], chk.Checksum)
b.add(b.Chunks, key, func() {
b.stats.Chunks++
b.stats.Bytes += uint64(chk.KB << 10)
b.stats.Entries += uint64(chk.Entries)
})
}

func (b *Blooms) add(filter *bloom.BloomFilter, key []byte, update func()) {
b.RLock()
ok := filter.Test(key)
b.RUnlock()

if ok {
return
}

b.Lock()
defer b.Unlock()
if ok = filter.TestAndAdd(key); !ok {
update()
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tsdb
package stats

import (
"sync"
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,21 @@ func (s *GatewayClient) LabelValuesForMetricName(ctx context.Context, in *indexg
return s.grpcClient.LabelValuesForMetricName(ctx, in, opts...)
}

func (s *GatewayClient) GetStats(ctx context.Context, in *indexgatewaypb.IndexStatsRequest, opts ...grpc.CallOption) (*indexgatewaypb.IndexStatsResponse, error) {
if s.cfg.Mode == indexgateway.RingMode {
var (
resp *indexgatewaypb.IndexStatsResponse
err error
)
err = s.ringModeDo(ctx, func(client indexgatewaypb.IndexGatewayClient) error {
resp, err = client.GetStats(ctx, in, opts...)
return err
})
return resp, err
}
return s.grpcClient.GetStats(ctx, in, opts...)
}

func (s *GatewayClient) doQueries(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error {
queryKeyQueryMap := make(map[string]index.Query, len(queries))
gatewayQueries := make([]*indexgatewaypb.IndexQuery, 0, len(queries))
Expand Down
22 changes: 22 additions & 0 deletions pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
)

Expand All @@ -29,6 +30,7 @@ type IndexGatewayClient interface {
GetSeries(ctx context.Context, in *indexgatewaypb.GetSeriesRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetSeriesResponse, error)
LabelNamesForMetricName(ctx context.Context, in *indexgatewaypb.LabelNamesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error)
LabelValuesForMetricName(ctx context.Context, in *indexgatewaypb.LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error)
GetStats(ctx context.Context, req *indexgatewaypb.IndexStatsRequest, opts ...grpc.CallOption) (*indexgatewaypb.IndexStatsResponse, error)
}

func NewIndexGatewayClientStore(client IndexGatewayClient, fallbackStore IndexStore) IndexStore {
Expand Down Expand Up @@ -116,6 +118,26 @@ func (c *IndexGatewayClientStore) LabelValuesForMetricName(ctx context.Context,
return resp.Values, nil
}

func (c *IndexGatewayClientStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
resp, err := c.client.GetStats(ctx, &indexgatewaypb.IndexStatsRequest{
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if err != nil {
if isUnimplementedCallError(err) && c.fallbackStore != nil {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
// Note: this is likely a noop anyway since only
// tsdb+ enables this and the prior index returns an
// empty response.
return c.fallbackStore.Stats(ctx, userID, from, through, matchers...)
}
return nil, err
}

return resp, nil
}

func (c *IndexGatewayClientStore) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
// if there is no fallback store, we can't set the chunk filterer and index gateway would take care of filtering out data
if c.fallbackStore != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/stores/series/series_index_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
storageerrors "github.com/grafana/loki/pkg/storage/errors"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/extract"
Expand Down Expand Up @@ -63,6 +64,7 @@ type IndexStore interface {
GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error)
LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error)
LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)
Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error)
// SetChunkFilterer sets a chunk filter to be used when retrieving chunks.
// This is only used for GetSeries implementation.
// Todo we might want to pass it as a parameter to GetSeries instead.
Expand Down Expand Up @@ -642,3 +644,8 @@ func (c *indexStore) convertChunkIDsToChunkRefs(_ context.Context, userID string

return chunkSet, nil
}

// old index stores do not implement stats -- skip
func (c *indexStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
return nil, nil
}
15 changes: 15 additions & 0 deletions pkg/storage/stores/shipper/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
Expand All @@ -29,6 +30,7 @@ type IndexQuerier interface {
GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error)
LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error)
LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)
Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error)
Stop()
}

Expand Down Expand Up @@ -230,3 +232,16 @@ func (g *Gateway) LabelValuesForMetricName(ctx context.Context, req *indexgatewa
Values: names,
}, nil
}

func (g *Gateway) GetStats(ctx context.Context, req *indexgatewaypb.IndexStatsRequest) (*indexgatewaypb.IndexStatsResponse, error) {
instanceID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
matchers, err := syntax.ParseMatchers(req.Matchers)
if err != nil {
return nil, err
}

return g.indexQuerier.Stats(ctx, instanceID, req.From, req.Through, matchers...)
}
Loading