diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index cab037bf6832..a3fbe57abd92 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/util" ) @@ -320,6 +321,10 @@ func (s *storeMock) GetSeries(ctx context.Context, userID string, from, through panic("don't call me please") } +func (s *storeMock) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { + return nil, nil +} + func (s *storeMock) Stop() { } diff --git a/pkg/storage/stores/composite_store.go b/pkg/storage/stores/composite_store.go index e24eda134f1a..2f537022e4fb 100644 --- a/pkg/storage/stores/composite_store.go +++ b/pkg/storage/stores/composite_store.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/fetcher" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/series" "github.com/grafana/loki/pkg/util" ) @@ -25,6 +26,7 @@ type Store interface { LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) GetChunkFetcher(tm model.Time) *fetcher.Fetcher SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) + Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) Stop() } @@ -161,6 +163,22 @@ func (c compositeStore) GetChunkRefs(ctx context.Context, userID string, from, t return chunkIDs, fetchers, err } +func (c compositeStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { + xs := make([]*stats.Stats, 0, len(c.stores)) + err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { + x, err := store.Stats(innerCtx, userID, from, through, matchers...) + xs = append(xs, x) + return err + }) + + if err != nil { + return nil, err + + } + res := stats.MergeStats(xs...) + return &res, err +} + func (c compositeStore) GetChunkFetcher(tm model.Time) *fetcher.Fetcher { // find the schema with the lowest start _after_ tm j := sort.Search(len(c.stores), func(j int) bool { diff --git a/pkg/storage/stores/composite_store_entry.go b/pkg/storage/stores/composite_store_entry.go index 6c8e5a0eae90..4d0a97fb0630 100644 --- a/pkg/storage/stores/composite_store_entry.go +++ b/pkg/storage/stores/composite_store_entry.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/errors" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/series" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/spanlogger" @@ -107,6 +108,20 @@ func (c *storeEntry) LabelValuesForMetricName(ctx context.Context, userID string return c.index.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...) } +func (c *storeEntry) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { + log, ctx := spanlogger.New(ctx, "SeriesStore.Stats") + defer log.Span.Finish() + + shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through) + if err != nil { + return nil, err + } else if shortcut { + return nil, nil + } + + return c.index.Stats(ctx, userID, from, through, matchers...) +} + func (c *storeEntry) validateQueryTimeRange(ctx context.Context, userID string, from *model.Time, through *model.Time) (bool, error) { //nolint:ineffassign,staticcheck //Leaving ctx even though we don't currently use it, we want to make it available for when we might need it and hopefully will ensure us using the correct context at that time diff --git a/pkg/storage/stores/composite_store_test.go b/pkg/storage/stores/composite_store_test.go index 0a6bc5e371d8..a97fb078d0b9 100644 --- a/pkg/storage/stores/composite_store_test.go +++ b/pkg/storage/stores/composite_store_test.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/fetcher" + "github.com/grafana/loki/pkg/storage/stores/index/stats" ) type mockStore int @@ -47,6 +48,10 @@ func (m mockStore) GetChunkFetcher(tm model.Time) *fetcher.Fetcher { return nil } +func (m mockStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { + return nil, nil +} + func (m mockStore) Stop() {} func TestCompositeStore(t *testing.T) { diff --git a/pkg/storage/stores/index/stats/stats.go b/pkg/storage/stores/index/stats/stats.go new file mode 100644 index 000000000000..f6f67e816ea1 --- /dev/null +++ b/pkg/storage/stores/index/stats/stats.go @@ -0,0 +1,131 @@ +package stats + +import ( + "encoding/binary" + "sync" + + "github.com/prometheus/common/model" + "github.com/willf/bloom" + + "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" + "github.com/grafana/loki/pkg/storage/stores/tsdb/index" +) + +var BloomPool PoolBloom + +type Stats = indexgatewaypb.IndexStatsResponse + +func MergeStats(xs ...*Stats) (s Stats) { + for _, x := range xs { + if x == nil { + continue + } + s.Streams += x.Streams + s.Chunks += x.Chunks + s.Bytes += x.Bytes + s.Entries += x.Entries + + } + return s +} + +type PoolBloom struct { + pool sync.Pool +} + +func (p *PoolBloom) Get() *Blooms { + if x := p.pool.Get(); x != nil { + return x.(*Blooms) + } + + return newBlooms() + +} + +func (p *PoolBloom) Put(x *Blooms) { + x.Streams.ClearAll() + x.Chunks.ClearAll() + x.stats = Stats{} + p.pool.Put(x) +} + +// These are very expensive in terms of memory usage, +// each requiring ~12.5MB. Therefore we heavily rely on pool usage. +// See https://hur.st/bloomfilter for play around with this idea. +// We use bloom filter per process per query to avoid double-counting duplicates +// when calculating statistics across multiple tsdb files, however +// we cannot guarantee this when querying across period config boundaries +// as the data is requested via separate calls to the underlying store, +// which may reside on a different process (index-gateway). +// This is an accepted fault and we may double-count some values which +// are on both sides of a schema line: +// streams+chunks and thus bytes/lines. +// To avoid this, we'd need significant refactoring +// to ensure we resolve statistics for all periods together +// and this doesn't seem worth it: the code paths for iterating across different +// stores are separate. +// Another option is to ship the bloom filter bitmaps sequentially to each +// store, but this is too inefficient (~12.5MB payloads). +// signed, @owen-d +func newBlooms() *Blooms { + // 1 million streams @ 1% error =~ 1.14MB + streams := bloom.NewWithEstimates(1e6, 0.01) + // 10 million chunks @ 1% error =~ 11.43MB + chunks := bloom.NewWithEstimates(10e6, 0.01) + return &Blooms{ + Streams: streams, + Chunks: chunks, + } +} + +// TODO(owen-d): shard this across a slice of smaller bloom filters to reduce +// lock contention +// Bloom filters for estimating duplicate statistics across both series +// and chunks within TSDB indices. These are used to calculate data topology +// statistics prior to running queries. +type Blooms struct { + sync.RWMutex + Streams, Chunks *bloom.BloomFilter + stats Stats +} + +func (b *Blooms) Stats() Stats { return b.stats } + +func (b *Blooms) AddStream(fp model.Fingerprint) { + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, uint64(fp)) + b.add(b.Streams, key, func() { + b.stats.Streams++ + }) +} + +func (b *Blooms) AddChunk(fp model.Fingerprint, chk index.ChunkMeta) { + // fingerprint + mintime + maxtime + checksum + ln := 8 + 8 + 8 + 4 + key := make([]byte, ln) + binary.BigEndian.PutUint64(key, uint64(fp)) + binary.BigEndian.PutUint64(key[8:], uint64(chk.MinTime)) + binary.BigEndian.PutUint64(key[16:], uint64(chk.MaxTime)) + binary.BigEndian.PutUint32(key[24:], chk.Checksum) + b.add(b.Chunks, key, func() { + b.stats.Chunks++ + b.stats.Bytes += uint64(chk.KB << 10) + b.stats.Entries += uint64(chk.Entries) + }) +} + +func (b *Blooms) add(filter *bloom.BloomFilter, key []byte, update func()) { + b.RLock() + ok := filter.Test(key) + b.RUnlock() + + if ok { + return + } + + b.Lock() + defer b.Unlock() + if ok = filter.TestAndAdd(key); !ok { + update() + } +} diff --git a/pkg/storage/stores/tsdb/pool_test.go b/pkg/storage/stores/index/stats/stats_test.go similarity index 98% rename from pkg/storage/stores/tsdb/pool_test.go rename to pkg/storage/stores/index/stats/stats_test.go index 934faa0fa367..38784da180c0 100644 --- a/pkg/storage/stores/tsdb/pool_test.go +++ b/pkg/storage/stores/index/stats/stats_test.go @@ -1,4 +1,4 @@ -package tsdb +package stats import ( "sync" diff --git a/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go b/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go index acd7387ce033..798b00fb20a2 100644 --- a/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go +++ b/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go @@ -238,6 +238,21 @@ func (s *GatewayClient) LabelValuesForMetricName(ctx context.Context, in *indexg return s.grpcClient.LabelValuesForMetricName(ctx, in, opts...) } +func (s *GatewayClient) GetStats(ctx context.Context, in *indexgatewaypb.IndexStatsRequest, opts ...grpc.CallOption) (*indexgatewaypb.IndexStatsResponse, error) { + if s.cfg.Mode == indexgateway.RingMode { + var ( + resp *indexgatewaypb.IndexStatsResponse + err error + ) + err = s.ringModeDo(ctx, func(client indexgatewaypb.IndexGatewayClient) error { + resp, err = client.GetStats(ctx, in, opts...) + return err + }) + return resp, err + } + return s.grpcClient.GetStats(ctx, in, opts...) +} + func (s *GatewayClient) doQueries(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error { queryKeyQueryMap := make(map[string]index.Query, len(queries)) gatewayQueries := make([]*indexgatewaypb.IndexQuery, 0, len(queries)) diff --git a/pkg/storage/stores/series/series_index_gateway_store.go b/pkg/storage/stores/series/series_index_gateway_store.go index 8886f4616a98..29ebac70e25f 100644 --- a/pkg/storage/stores/series/series_index_gateway_store.go +++ b/pkg/storage/stores/series/series_index_gateway_store.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" ) @@ -29,6 +30,7 @@ type IndexGatewayClient interface { GetSeries(ctx context.Context, in *indexgatewaypb.GetSeriesRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetSeriesResponse, error) LabelNamesForMetricName(ctx context.Context, in *indexgatewaypb.LabelNamesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error) LabelValuesForMetricName(ctx context.Context, in *indexgatewaypb.LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error) + GetStats(ctx context.Context, req *indexgatewaypb.IndexStatsRequest, opts ...grpc.CallOption) (*indexgatewaypb.IndexStatsResponse, error) } func NewIndexGatewayClientStore(client IndexGatewayClient, fallbackStore IndexStore) IndexStore { @@ -116,6 +118,26 @@ func (c *IndexGatewayClientStore) LabelValuesForMetricName(ctx context.Context, return resp.Values, nil } +func (c *IndexGatewayClientStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { + resp, err := c.client.GetStats(ctx, &indexgatewaypb.IndexStatsRequest{ + From: from, + Through: through, + Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(), + }) + if err != nil { + if isUnimplementedCallError(err) && c.fallbackStore != nil { + // Handle communication with older index gateways gracefully, by falling back to the index store calls. + // Note: this is likely a noop anyway since only + // tsdb+ enables this and the prior index returns an + // empty response. + return c.fallbackStore.Stats(ctx, userID, from, through, matchers...) + } + return nil, err + } + + return resp, nil +} + func (c *IndexGatewayClientStore) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { // if there is no fallback store, we can't set the chunk filterer and index gateway would take care of filtering out data if c.fallbackStore != nil { diff --git a/pkg/storage/stores/series/series_index_store.go b/pkg/storage/stores/series/series_index_store.go index 5160145dbc40..5eb7093c7399 100644 --- a/pkg/storage/stores/series/series_index_store.go +++ b/pkg/storage/stores/series/series_index_store.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/config" storageerrors "github.com/grafana/loki/pkg/storage/errors" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/extract" @@ -63,6 +64,7 @@ type IndexStore interface { GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) + Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) // SetChunkFilterer sets a chunk filter to be used when retrieving chunks. // This is only used for GetSeries implementation. // Todo we might want to pass it as a parameter to GetSeries instead. @@ -642,3 +644,8 @@ func (c *indexStore) convertChunkIDsToChunkRefs(_ context.Context, userID string return chunkSet, nil } + +// old index stores do not implement stats -- skip +func (c *indexStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { + return nil, nil +} diff --git a/pkg/storage/stores/shipper/indexgateway/gateway.go b/pkg/storage/stores/shipper/indexgateway/gateway.go index 89c1aa1c905e..eebb2945032e 100644 --- a/pkg/storage/stores/shipper/indexgateway/gateway.go +++ b/pkg/storage/stores/shipper/indexgateway/gateway.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/fetcher" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" "github.com/grafana/loki/pkg/storage/stores/shipper/util" @@ -29,6 +30,7 @@ type IndexQuerier interface { GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) + Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) Stop() } @@ -230,3 +232,16 @@ func (g *Gateway) LabelValuesForMetricName(ctx context.Context, req *indexgatewa Values: names, }, nil } + +func (g *Gateway) GetStats(ctx context.Context, req *indexgatewaypb.IndexStatsRequest) (*indexgatewaypb.IndexStatsResponse, error) { + instanceID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + matchers, err := syntax.ParseMatchers(req.Matchers) + if err != nil { + return nil, err + } + + return g.indexQuerier.Stats(ctx, instanceID, req.From, req.Through, matchers...) +} diff --git a/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.pb.go b/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.pb.go index 408cbf1297c3..cedf3b342092 100644 --- a/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.pb.go +++ b/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.pb.go @@ -614,6 +614,118 @@ func (m *IndexQuery) GetValueEqual() []byte { return nil } +type IndexStatsRequest struct { + From github_com_prometheus_common_model.Time `protobuf:"varint,1,opt,name=from,proto3,customtype=github.com/prometheus/common/model.Time" json:"from"` + Through github_com_prometheus_common_model.Time `protobuf:"varint,2,opt,name=through,proto3,customtype=github.com/prometheus/common/model.Time" json:"through"` + Matchers string `protobuf:"bytes,3,opt,name=matchers,proto3" json:"matchers,omitempty"` +} + +func (m *IndexStatsRequest) Reset() { *m = IndexStatsRequest{} } +func (*IndexStatsRequest) ProtoMessage() {} +func (*IndexStatsRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_33a7bd4603d312b2, []int{12} +} +func (m *IndexStatsRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *IndexStatsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_IndexStatsRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *IndexStatsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_IndexStatsRequest.Merge(m, src) +} +func (m *IndexStatsRequest) XXX_Size() int { + return m.Size() +} +func (m *IndexStatsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_IndexStatsRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_IndexStatsRequest proto.InternalMessageInfo + +func (m *IndexStatsRequest) GetMatchers() string { + if m != nil { + return m.Matchers + } + return "" +} + +type IndexStatsResponse struct { + Streams uint64 `protobuf:"varint,1,opt,name=streams,proto3" json:"streams,omitempty"` + Chunks uint64 `protobuf:"varint,2,opt,name=chunks,proto3" json:"chunks,omitempty"` + Bytes uint64 `protobuf:"varint,3,opt,name=bytes,proto3" json:"bytes,omitempty"` + Entries uint64 `protobuf:"varint,4,opt,name=entries,proto3" json:"entries,omitempty"` +} + +func (m *IndexStatsResponse) Reset() { *m = IndexStatsResponse{} } +func (*IndexStatsResponse) ProtoMessage() {} +func (*IndexStatsResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_33a7bd4603d312b2, []int{13} +} +func (m *IndexStatsResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *IndexStatsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_IndexStatsResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *IndexStatsResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_IndexStatsResponse.Merge(m, src) +} +func (m *IndexStatsResponse) XXX_Size() int { + return m.Size() +} +func (m *IndexStatsResponse) XXX_DiscardUnknown() { + xxx_messageInfo_IndexStatsResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_IndexStatsResponse proto.InternalMessageInfo + +func (m *IndexStatsResponse) GetStreams() uint64 { + if m != nil { + return m.Streams + } + return 0 +} + +func (m *IndexStatsResponse) GetChunks() uint64 { + if m != nil { + return m.Chunks + } + return 0 +} + +func (m *IndexStatsResponse) GetBytes() uint64 { + if m != nil { + return m.Bytes + } + return 0 +} + +func (m *IndexStatsResponse) GetEntries() uint64 { + if m != nil { + return m.Entries + } + return 0 +} + func init() { proto.RegisterType((*LabelValuesForMetricNameRequest)(nil), "indexgatewaypb.LabelValuesForMetricNameRequest") proto.RegisterType((*LabelNamesForMetricNameRequest)(nil), "indexgatewaypb.LabelNamesForMetricNameRequest") @@ -627,6 +739,8 @@ func init() { proto.RegisterType((*Row)(nil), "indexgatewaypb.Row") proto.RegisterType((*QueryIndexRequest)(nil), "indexgatewaypb.QueryIndexRequest") proto.RegisterType((*IndexQuery)(nil), "indexgatewaypb.IndexQuery") + proto.RegisterType((*IndexStatsRequest)(nil), "indexgatewaypb.IndexStatsRequest") + proto.RegisterType((*IndexStatsResponse)(nil), "indexgatewaypb.IndexStatsResponse") } func init() { @@ -634,58 +748,63 @@ func init() { } var fileDescriptor_33a7bd4603d312b2 = []byte{ - // 809 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x55, 0xcf, 0x6e, 0xfb, 0x44, - 0x10, 0xf6, 0x26, 0x69, 0xda, 0x4c, 0x03, 0x94, 0x2d, 0x2a, 0x91, 0xa1, 0x4e, 0x6a, 0x24, 0x1a, - 0x21, 0x11, 0x43, 0xe9, 0x05, 0x21, 0x0e, 0xa4, 0x40, 0x55, 0x51, 0xaa, 0xb2, 0x85, 0x0a, 0x4e, - 0x68, 0x93, 0x6e, 0x6c, 0x53, 0x3b, 0xeb, 0xae, 0x6d, 0xd2, 0xde, 0x78, 0x01, 0x24, 0x1e, 0x83, - 0xa7, 0x40, 0x1c, 0x7b, 0x2c, 0xb7, 0x8a, 0x43, 0x45, 0xd3, 0x0b, 0x17, 0xa4, 0x3e, 0x02, 0xf2, - 0xfa, 0x6f, 0x92, 0x36, 0x48, 0xa5, 0x97, 0xdf, 0xc9, 0x9e, 0x6f, 0xbe, 0xd9, 0x9d, 0x6f, 0x76, - 0x67, 0x16, 0x3e, 0xf5, 0x4e, 0x4d, 0xc3, 0x0f, 0xb8, 0xa0, 0x26, 0x93, 0x5f, 0xe6, 0x1b, 0xbe, - 0x65, 0x7b, 0x1e, 0x13, 0x86, 0x3d, 0x3c, 0x61, 0xe7, 0x26, 0x0d, 0xd8, 0x88, 0x5e, 0x4c, 0x18, - 0x5e, 0xcf, 0x48, 0xfe, 0x3a, 0x9e, 0xe0, 0x01, 0xc7, 0x2f, 0x4f, 0x7a, 0xd5, 0x77, 0x4d, 0x3b, - 0xb0, 0xc2, 0x5e, 0xa7, 0xcf, 0x5d, 0xc3, 0xe4, 0x26, 0x37, 0x24, 0xad, 0x17, 0x0e, 0xa4, 0x25, - 0x0d, 0xf9, 0x17, 0x87, 0xab, 0x6f, 0x44, 0x49, 0x38, 0xdc, 0x8c, 0x1d, 0xe9, 0x4f, 0xec, 0xd4, - 0x7f, 0x2e, 0x41, 0x73, 0x9f, 0xf6, 0x98, 0x73, 0x4c, 0x9d, 0x90, 0xf9, 0x9f, 0x73, 0xf1, 0x25, - 0x0b, 0x84, 0xdd, 0x3f, 0xa0, 0x2e, 0x23, 0xec, 0x2c, 0x64, 0x7e, 0x80, 0x9b, 0xb0, 0xec, 0x4a, - 0xf0, 0xfb, 0x21, 0x75, 0x59, 0x03, 0xb5, 0x50, 0xbb, 0x46, 0xc0, 0xcd, 0x78, 0x78, 0x1d, 0xc0, - 0x89, 0xd6, 0x88, 0xfd, 0x25, 0xe9, 0xaf, 0x49, 0x44, 0xba, 0x77, 0xa0, 0x32, 0x10, 0xdc, 0x6d, - 0x94, 0x5b, 0xa8, 0x5d, 0xee, 0x1a, 0x97, 0x37, 0x4d, 0xe5, 0xcf, 0x9b, 0xe6, 0x66, 0x41, 0x85, - 0x27, 0xb8, 0xcb, 0x02, 0x8b, 0x85, 0xbe, 0xd1, 0xe7, 0xae, 0xcb, 0x87, 0x86, 0xcb, 0x4f, 0x98, - 0xd3, 0xf9, 0xda, 0x76, 0x19, 0x91, 0xc1, 0x78, 0x0f, 0x16, 0x03, 0x4b, 0xf0, 0xd0, 0xb4, 0x1a, - 0x95, 0xa7, 0xad, 0x93, 0xc6, 0x63, 0x15, 0x96, 0x5c, 0x1a, 0xf4, 0x2d, 0x26, 0xfc, 0xc6, 0x82, - 0x4c, 0x36, 0xb3, 0xf5, 0x3f, 0x10, 0x68, 0xfb, 0x69, 0xe6, 0x4f, 0x2c, 0x47, 0xaa, 0xb7, 0xf4, - 0x4c, 0x7a, 0xcb, 0xff, 0x4f, 0xaf, 0xbe, 0x09, 0x2f, 0x49, 0x49, 0x84, 0xf9, 0x1e, 0x1f, 0xfa, - 0x0c, 0xaf, 0x41, 0xf5, 0x47, 0x79, 0xdc, 0x0d, 0xd4, 0x2a, 0xb7, 0x6b, 0x24, 0xb1, 0xf4, 0xdf, - 0x11, 0xe0, 0x5d, 0x16, 0xec, 0x58, 0xe1, 0xf0, 0x94, 0xb0, 0x41, 0x2a, 0x38, 0xd5, 0x83, 0x9e, - 0x49, 0x4f, 0xe9, 0x19, 0xcf, 0xaf, 0x3c, 0x75, 0x7e, 0x1f, 0xc3, 0xea, 0x84, 0x82, 0x44, 0xf1, - 0xdb, 0x50, 0x11, 0x6c, 0x10, 0xeb, 0x5d, 0xde, 0xc2, 0x9d, 0xac, 0x0b, 0x32, 0xa6, 0xf4, 0xeb, - 0xbf, 0x21, 0x58, 0xd9, 0x65, 0xc1, 0x11, 0x13, 0x36, 0xf3, 0x5f, 0x44, 0xfd, 0x7b, 0xf0, 0x6a, - 0x21, 0xff, 0x44, 0xfd, 0x36, 0x54, 0x7d, 0x89, 0x24, 0xfa, 0xd7, 0x3a, 0x93, 0x13, 0xa5, 0x13, - 0xf3, 0xbb, 0x95, 0x28, 0x25, 0x92, 0x70, 0x75, 0x0f, 0xaa, 0x31, 0x8e, 0x07, 0x50, 0x95, 0xdd, - 0x9c, 0xc6, 0xaf, 0xe6, 0xf5, 0x93, 0x17, 0xeb, 0x90, 0xda, 0xa2, 0xfb, 0x61, 0xa2, 0xe7, 0xfd, - 0xe2, 0x74, 0x12, 0x74, 0x40, 0x87, 0xd4, 0x70, 0xf8, 0xa9, 0x6d, 0x14, 0xc7, 0x50, 0x1c, 0xf7, - 0xc9, 0x09, 0xf5, 0x02, 0x26, 0x48, 0xb2, 0xba, 0xfe, 0x1d, 0xe0, 0xaf, 0x42, 0x26, 0x2e, 0xf6, - 0xa2, 0xec, 0xb2, 0xec, 0x55, 0x58, 0x92, 0xe8, 0x17, 0xec, 0x22, 0x69, 0xb6, 0xcc, 0xc6, 0x9b, - 0x50, 0x11, 0x7c, 0xe4, 0x37, 0x4a, 0x49, 0x5e, 0x53, 0xba, 0x08, 0x1f, 0x11, 0x49, 0xd0, 0x3f, - 0x82, 0x32, 0xe1, 0x23, 0xac, 0x01, 0x08, 0x3a, 0x34, 0x99, 0x9c, 0x76, 0x72, 0xb5, 0x3a, 0x29, - 0x20, 0xf8, 0x35, 0x58, 0x90, 0xbd, 0x20, 0xcf, 0xa8, 0x4e, 0x62, 0x23, 0x2a, 0x6a, 0x31, 0xaf, - 0xf8, 0x56, 0x6c, 0xc3, 0x62, 0x04, 0xe6, 0x55, 0x55, 0xa7, 0x77, 0x97, 0x74, 0x19, 0x48, 0x52, - 0x6a, 0x74, 0xc1, 0x20, 0xc7, 0xf1, 0x9b, 0x50, 0x0b, 0x68, 0xcf, 0x61, 0x07, 0xf9, 0x24, 0xc9, - 0x81, 0xc8, 0x6b, 0x51, 0xdf, 0x3a, 0xce, 0x32, 0xaa, 0x91, 0x1c, 0xc0, 0xef, 0xc0, 0x4a, 0x9e, - 0xf9, 0xa1, 0x60, 0x03, 0xfb, 0x5c, 0x5e, 0x87, 0x3a, 0x99, 0xc1, 0x71, 0x1b, 0x5e, 0xc9, 0xb1, - 0xa3, 0x80, 0x8a, 0x40, 0x4e, 0xd1, 0x3a, 0x99, 0x86, 0xa3, 0x0a, 0x49, 0xd1, 0x9f, 0x9d, 0x85, - 0xd4, 0x91, 0xe3, 0xb1, 0x4e, 0x0a, 0xc8, 0xd6, 0x3f, 0x65, 0xa8, 0x4b, 0x01, 0xbb, 0xb1, 0x4e, - 0xfc, 0x0d, 0x40, 0x5e, 0x1c, 0xbc, 0x31, 0x5d, 0x84, 0x99, 0xc2, 0xa9, 0xfa, 0x3c, 0x4a, 0x7c, - 0xe6, 0xef, 0x21, 0xfc, 0x2d, 0x2c, 0x17, 0x1a, 0x19, 0xcf, 0x04, 0xcd, 0xce, 0x29, 0xf5, 0xad, - 0xb9, 0x9c, 0x78, 0x65, 0x5d, 0xc1, 0x04, 0x6a, 0x59, 0x8b, 0xe0, 0xd6, 0x03, 0x31, 0x13, 0xdd, - 0xaf, 0x6e, 0xcc, 0x61, 0x64, 0x6b, 0xfe, 0x00, 0xaf, 0x3f, 0xf2, 0x6a, 0xe0, 0xce, 0x74, 0xfc, - 0xfc, 0xe7, 0x45, 0x5d, 0x7f, 0x90, 0x5f, 0xd8, 0xcb, 0x81, 0xc6, 0x63, 0x2f, 0x36, 0x36, 0x1e, - 0x0c, 0x7e, 0xfc, 0x6d, 0xff, 0xcf, 0xdd, 0xba, 0xdb, 0x57, 0xb7, 0x9a, 0x72, 0x7d, 0xab, 0x29, - 0xf7, 0xb7, 0x1a, 0xfa, 0x69, 0xac, 0xa1, 0x5f, 0xc7, 0x1a, 0xba, 0x1c, 0x6b, 0xe8, 0x6a, 0xac, - 0xa1, 0xbf, 0xc6, 0x1a, 0xfa, 0x7b, 0xac, 0x29, 0xf7, 0x63, 0x0d, 0xfd, 0x72, 0xa7, 0x29, 0x57, - 0x77, 0x9a, 0x72, 0x7d, 0xa7, 0x29, 0xbd, 0xaa, 0xec, 0xf2, 0x0f, 0xfe, 0x0d, 0x00, 0x00, 0xff, - 0xff, 0x36, 0x8e, 0xd5, 0x13, 0x01, 0x09, 0x00, 0x00, + // 894 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x56, 0x4f, 0x6f, 0x1b, 0x45, + 0x14, 0xdf, 0xb1, 0xb7, 0x4e, 0xfc, 0x62, 0xa0, 0x9d, 0xa2, 0x62, 0x2d, 0x74, 0x93, 0x2e, 0x12, + 0x89, 0x90, 0xf0, 0x42, 0xc9, 0x05, 0x21, 0x0e, 0xa4, 0x40, 0x14, 0x51, 0xaa, 0x32, 0x81, 0x0a, + 0x4e, 0x68, 0xec, 0x8c, 0x77, 0x97, 0xec, 0x7a, 0xb6, 0xb3, 0xb3, 0x4d, 0x73, 0xe3, 0x0b, 0x20, + 0xf1, 0x31, 0xf8, 0x14, 0x80, 0x38, 0xf5, 0x18, 0x6e, 0x15, 0x87, 0x8a, 0x38, 0x17, 0x8e, 0xfd, + 0x08, 0x68, 0xdf, 0xfe, 0xb5, 0x9d, 0x04, 0xa9, 0xcd, 0x25, 0x27, 0xcf, 0xfb, 0xbd, 0x3f, 0xf3, + 0x7e, 0x33, 0x6f, 0x7e, 0x5e, 0xf8, 0x2c, 0xde, 0xf7, 0xdc, 0x44, 0x4b, 0xc5, 0x3d, 0x81, 0xbf, + 0x22, 0x71, 0x13, 0x3f, 0x88, 0x63, 0xa1, 0xdc, 0x60, 0xb2, 0x27, 0x1e, 0x7b, 0x5c, 0x8b, 0x03, + 0x7e, 0x38, 0x63, 0xc4, 0x43, 0xb7, 0x58, 0x0d, 0x62, 0x25, 0xb5, 0xa4, 0xaf, 0xce, 0x7a, 0xad, + 0xf7, 0xbc, 0x40, 0xfb, 0xe9, 0x70, 0x30, 0x92, 0x91, 0xeb, 0x49, 0x4f, 0xba, 0x18, 0x36, 0x4c, + 0xc7, 0x68, 0xa1, 0x81, 0xab, 0x3c, 0xdd, 0x7a, 0x33, 0x6b, 0x22, 0x94, 0x5e, 0xee, 0x28, 0x17, + 0xb9, 0xd3, 0xf9, 0xb9, 0x05, 0xab, 0x77, 0xf9, 0x50, 0x84, 0x0f, 0x78, 0x98, 0x8a, 0xe4, 0x0b, + 0xa9, 0xbe, 0x12, 0x5a, 0x05, 0xa3, 0x7b, 0x3c, 0x12, 0x4c, 0x3c, 0x4c, 0x45, 0xa2, 0xe9, 0x2a, + 0xac, 0x44, 0x08, 0xfe, 0x30, 0xe1, 0x91, 0xe8, 0x93, 0x35, 0xb2, 0xd1, 0x65, 0x10, 0x55, 0x71, + 0xf4, 0x26, 0x40, 0x98, 0xd5, 0xc8, 0xfd, 0x2d, 0xf4, 0x77, 0x11, 0x41, 0xf7, 0x1d, 0x30, 0xc7, + 0x4a, 0x46, 0xfd, 0xf6, 0x1a, 0xd9, 0x68, 0x6f, 0xb9, 0x4f, 0x9e, 0xad, 0x1a, 0x7f, 0x3f, 0x5b, + 0x5d, 0x6f, 0xb0, 0x88, 0x95, 0x8c, 0x84, 0xf6, 0x45, 0x9a, 0xb8, 0x23, 0x19, 0x45, 0x72, 0xe2, + 0x46, 0x72, 0x4f, 0x84, 0x83, 0x6f, 0x82, 0x48, 0x30, 0x4c, 0xa6, 0x3b, 0xb0, 0xa4, 0x7d, 0x25, + 0x53, 0xcf, 0xef, 0x9b, 0x2f, 0x56, 0xa7, 0xcc, 0xa7, 0x16, 0x2c, 0x47, 0x5c, 0x8f, 0x7c, 0xa1, + 0x92, 0xfe, 0x15, 0x6c, 0xb6, 0xb2, 0x9d, 0xbf, 0x08, 0xd8, 0x77, 0xcb, 0xce, 0x5f, 0xf0, 0x38, + 0x4a, 0xbe, 0xad, 0x0b, 0xe2, 0xdb, 0x7e, 0x39, 0xbe, 0xce, 0x3a, 0xbc, 0x82, 0x94, 0x98, 0x48, + 0x62, 0x39, 0x49, 0x04, 0xbd, 0x01, 0x9d, 0x47, 0x78, 0xdd, 0x7d, 0xb2, 0xd6, 0xde, 0xe8, 0xb2, + 0xc2, 0x72, 0xfe, 0x20, 0x40, 0xb7, 0x85, 0xbe, 0xe3, 0xa7, 0x93, 0x7d, 0x26, 0xc6, 0x25, 0xe1, + 0x92, 0x0f, 0xb9, 0x20, 0x3e, 0xad, 0x0b, 0xbc, 0xbf, 0xf6, 0xdc, 0xfd, 0x7d, 0x02, 0xd7, 0x67, + 0x18, 0x14, 0x8c, 0xdf, 0x01, 0x53, 0x89, 0x71, 0xce, 0x77, 0xe5, 0x36, 0x1d, 0x54, 0xaf, 0xa0, + 0x8a, 0x44, 0xbf, 0xf3, 0x1b, 0x81, 0xab, 0xdb, 0x42, 0xef, 0x0a, 0x15, 0x88, 0xe4, 0x32, 0xf2, + 0xdf, 0x81, 0x6b, 0x8d, 0xfe, 0x0b, 0xf6, 0x9b, 0xd0, 0x49, 0x10, 0x29, 0xf8, 0xdf, 0x18, 0xcc, + 0x2a, 0xca, 0x20, 0x8f, 0xdf, 0x32, 0xb3, 0x96, 0x58, 0x11, 0xeb, 0xc4, 0xd0, 0xc9, 0x71, 0x3a, + 0x86, 0x0e, 0xbe, 0xe6, 0x32, 0xff, 0x7a, 0x7d, 0x7e, 0x38, 0x58, 0xf7, 0x79, 0xa0, 0xb6, 0x3e, + 0x2a, 0xf8, 0x7c, 0xd0, 0x54, 0x27, 0xc5, 0xc7, 0x7c, 0xc2, 0xdd, 0x50, 0xee, 0x07, 0x6e, 0x53, + 0x86, 0xf2, 0xbc, 0x4f, 0xf7, 0x78, 0xac, 0x85, 0x62, 0x45, 0x75, 0xe7, 0x7b, 0xa0, 0x5f, 0xa7, + 0x42, 0x1d, 0xee, 0x64, 0xdd, 0x55, 0xdd, 0x5b, 0xb0, 0x8c, 0xe8, 0x97, 0xe2, 0xb0, 0x78, 0x6c, + 0x95, 0x4d, 0xd7, 0xc1, 0x54, 0xf2, 0x20, 0xe9, 0xb7, 0x8a, 0xbe, 0xe6, 0x78, 0x31, 0x79, 0xc0, + 0x30, 0xc0, 0xf9, 0x18, 0xda, 0x4c, 0x1e, 0x50, 0x1b, 0x40, 0xf1, 0x89, 0x27, 0x50, 0xed, 0xb0, + 0x5a, 0x8f, 0x35, 0x10, 0xfa, 0x3a, 0x5c, 0xc1, 0xb7, 0x80, 0x77, 0xd4, 0x63, 0xb9, 0x91, 0x1d, + 0x6a, 0xb3, 0xaf, 0x7c, 0x2a, 0x36, 0x61, 0x29, 0x03, 0xeb, 0x53, 0xb5, 0xe6, 0x77, 0xc7, 0x70, + 0x4c, 0x64, 0x65, 0x68, 0x36, 0x60, 0x50, 0xe3, 0xf4, 0x2d, 0xe8, 0x6a, 0x3e, 0x0c, 0xc5, 0xbd, + 0x5a, 0x49, 0x6a, 0x20, 0xf3, 0xfa, 0x3c, 0xf1, 0x1f, 0x54, 0x1d, 0x75, 0x59, 0x0d, 0xd0, 0x77, + 0xe1, 0x6a, 0xdd, 0xf9, 0x7d, 0x25, 0xc6, 0xc1, 0x63, 0x1c, 0x87, 0x1e, 0x5b, 0xc0, 0xe9, 0x06, + 0xbc, 0x56, 0x63, 0xbb, 0x9a, 0x2b, 0x8d, 0x2a, 0xda, 0x63, 0xf3, 0x70, 0x76, 0x42, 0x48, 0xfa, + 0xf3, 0x87, 0x29, 0x0f, 0x51, 0x1e, 0x7b, 0xac, 0x81, 0x38, 0xbf, 0x13, 0xb8, 0x86, 0x04, 0x76, + 0x35, 0xd7, 0x97, 0xf2, 0x89, 0x3c, 0x02, 0xda, 0x24, 0x50, 0x4c, 0x59, 0x1f, 0x96, 0x12, 0xad, + 0x04, 0x8f, 0x12, 0x24, 0x61, 0xb2, 0xd2, 0xcc, 0xd4, 0x72, 0x94, 0xa9, 0x44, 0x82, 0x5d, 0x99, + 0xac, 0xb0, 0xb2, 0x59, 0x19, 0x1e, 0x6a, 0x91, 0x6f, 0x60, 0xb2, 0xdc, 0xc8, 0xea, 0x88, 0x89, + 0xc6, 0xb1, 0x30, 0xf3, 0x3a, 0x85, 0x79, 0xfb, 0x4f, 0x13, 0x7a, 0xb8, 0xf1, 0x76, 0x3e, 0x21, + 0xf4, 0x5b, 0x80, 0x7a, 0xac, 0xe8, 0xad, 0xf9, 0xf1, 0x59, 0x18, 0x39, 0xcb, 0x39, 0x2f, 0x24, + 0xe7, 0xf1, 0x3e, 0xa1, 0xdf, 0xc1, 0x4a, 0x43, 0x02, 0xe9, 0x42, 0xd2, 0xa2, 0xc2, 0x5b, 0x6f, + 0x9f, 0x1b, 0x93, 0x57, 0x76, 0x0c, 0xca, 0xa0, 0x5b, 0x89, 0x0b, 0x5d, 0x3b, 0x25, 0x67, 0x46, + 0x37, 0xad, 0x5b, 0xe7, 0x44, 0x54, 0x35, 0x7f, 0x84, 0x37, 0xce, 0xf8, 0xbf, 0xa5, 0x83, 0xf9, + 0xfc, 0xf3, 0xff, 0x98, 0xad, 0x9b, 0xa7, 0xc6, 0x37, 0xf6, 0x0a, 0xa1, 0x7f, 0xd6, 0xb7, 0x0e, + 0x75, 0x4f, 0x4d, 0x3e, 0xfb, 0xab, 0xe8, 0xff, 0x77, 0xdb, 0x85, 0xe5, 0x8c, 0x70, 0x36, 0x65, + 0x8b, 0x97, 0xbb, 0xf0, 0x84, 0x16, 0x2f, 0x77, 0x71, 0x48, 0x1d, 0x63, 0x6b, 0xf3, 0xe8, 0xd8, + 0x36, 0x9e, 0x1e, 0xdb, 0xc6, 0xf3, 0x63, 0x9b, 0xfc, 0x34, 0xb5, 0xc9, 0xaf, 0x53, 0x9b, 0x3c, + 0x99, 0xda, 0xe4, 0x68, 0x6a, 0x93, 0x7f, 0xa6, 0x36, 0xf9, 0x77, 0x6a, 0x1b, 0xcf, 0xa7, 0x36, + 0xf9, 0xe5, 0xc4, 0x36, 0x8e, 0x4e, 0x6c, 0xe3, 0xe9, 0x89, 0x6d, 0x0c, 0x3b, 0x28, 0xba, 0x1f, + 0xfe, 0x17, 0x00, 0x00, 0xff, 0xff, 0x62, 0xcc, 0xc6, 0x11, 0x90, 0x0a, 0x00, 0x00, } func (this *LabelValuesForMetricNameRequest) Equal(that interface{}) bool { @@ -1054,6 +1173,69 @@ func (this *IndexQuery) Equal(that interface{}) bool { } return true } +func (this *IndexStatsRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*IndexStatsRequest) + if !ok { + that2, ok := that.(IndexStatsRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.From.Equal(that1.From) { + return false + } + if !this.Through.Equal(that1.Through) { + return false + } + if this.Matchers != that1.Matchers { + return false + } + return true +} +func (this *IndexStatsResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*IndexStatsResponse) + if !ok { + that2, ok := that.(IndexStatsResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Streams != that1.Streams { + return false + } + if this.Chunks != that1.Chunks { + return false + } + if this.Bytes != that1.Bytes { + return false + } + if this.Entries != that1.Entries { + return false + } + return true +} func (this *LabelValuesForMetricNameRequest) GoString() string { if this == nil { return "nil" @@ -1202,6 +1384,31 @@ func (this *IndexQuery) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *IndexStatsRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&indexgatewaypb.IndexStatsRequest{") + s = append(s, "From: "+fmt.Sprintf("%#v", this.From)+",\n") + s = append(s, "Through: "+fmt.Sprintf("%#v", this.Through)+",\n") + s = append(s, "Matchers: "+fmt.Sprintf("%#v", this.Matchers)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *IndexStatsResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&indexgatewaypb.IndexStatsResponse{") + s = append(s, "Streams: "+fmt.Sprintf("%#v", this.Streams)+",\n") + s = append(s, "Chunks: "+fmt.Sprintf("%#v", this.Chunks)+",\n") + s = append(s, "Bytes: "+fmt.Sprintf("%#v", this.Bytes)+",\n") + s = append(s, "Entries: "+fmt.Sprintf("%#v", this.Entries)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringGateway(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -1231,6 +1438,7 @@ type IndexGatewayClient interface { GetSeries(ctx context.Context, in *GetSeriesRequest, opts ...grpc.CallOption) (*GetSeriesResponse, error) LabelNamesForMetricName(ctx context.Context, in *LabelNamesForMetricNameRequest, opts ...grpc.CallOption) (*LabelResponse, error) LabelValuesForMetricName(ctx context.Context, in *LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*LabelResponse, error) + GetStats(ctx context.Context, in *IndexStatsRequest, opts ...grpc.CallOption) (*IndexStatsResponse, error) } type indexGatewayClient struct { @@ -1309,6 +1517,15 @@ func (c *indexGatewayClient) LabelValuesForMetricName(ctx context.Context, in *L return out, nil } +func (c *indexGatewayClient) GetStats(ctx context.Context, in *IndexStatsRequest, opts ...grpc.CallOption) (*IndexStatsResponse, error) { + out := new(IndexStatsResponse) + err := c.cc.Invoke(ctx, "/indexgatewaypb.IndexGateway/GetStats", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // IndexGatewayServer is the server API for IndexGateway service. type IndexGatewayServer interface { /// QueryIndex reads the indexes required for given query & sends back the batch of rows @@ -1319,6 +1536,7 @@ type IndexGatewayServer interface { GetSeries(context.Context, *GetSeriesRequest) (*GetSeriesResponse, error) LabelNamesForMetricName(context.Context, *LabelNamesForMetricNameRequest) (*LabelResponse, error) LabelValuesForMetricName(context.Context, *LabelValuesForMetricNameRequest) (*LabelResponse, error) + GetStats(context.Context, *IndexStatsRequest) (*IndexStatsResponse, error) } // UnimplementedIndexGatewayServer can be embedded to have forward compatible implementations. @@ -1340,6 +1558,9 @@ func (*UnimplementedIndexGatewayServer) LabelNamesForMetricName(ctx context.Cont func (*UnimplementedIndexGatewayServer) LabelValuesForMetricName(ctx context.Context, req *LabelValuesForMetricNameRequest) (*LabelResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method LabelValuesForMetricName not implemented") } +func (*UnimplementedIndexGatewayServer) GetStats(ctx context.Context, req *IndexStatsRequest) (*IndexStatsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetStats not implemented") +} func RegisterIndexGatewayServer(s *grpc.Server, srv IndexGatewayServer) { s.RegisterService(&_IndexGateway_serviceDesc, srv) @@ -1438,6 +1659,24 @@ func _IndexGateway_LabelValuesForMetricName_Handler(srv interface{}, ctx context return interceptor(ctx, in, info, handler) } +func _IndexGateway_GetStats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(IndexStatsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IndexGatewayServer).GetStats(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/indexgatewaypb.IndexGateway/GetStats", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IndexGatewayServer).GetStats(ctx, req.(*IndexStatsRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _IndexGateway_serviceDesc = grpc.ServiceDesc{ ServiceName: "indexgatewaypb.IndexGateway", HandlerType: (*IndexGatewayServer)(nil), @@ -1458,6 +1697,10 @@ var _IndexGateway_serviceDesc = grpc.ServiceDesc{ MethodName: "LabelValuesForMetricName", Handler: _IndexGateway_LabelValuesForMetricName_Handler, }, + { + MethodName: "GetStats", + Handler: _IndexGateway_GetStats_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -1962,6 +2205,89 @@ func (m *IndexQuery) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *IndexStatsRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *IndexStatsRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *IndexStatsRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Matchers) > 0 { + i -= len(m.Matchers) + copy(dAtA[i:], m.Matchers) + i = encodeVarintGateway(dAtA, i, uint64(len(m.Matchers))) + i-- + dAtA[i] = 0x1a + } + if m.Through != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.Through)) + i-- + dAtA[i] = 0x10 + } + if m.From != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.From)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *IndexStatsResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *IndexStatsResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *IndexStatsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Entries != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.Entries)) + i-- + dAtA[i] = 0x20 + } + if m.Bytes != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.Bytes)) + i-- + dAtA[i] = 0x18 + } + if m.Chunks != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.Chunks)) + i-- + dAtA[i] = 0x10 + } + if m.Streams != 0 { + i = encodeVarintGateway(dAtA, i, uint64(m.Streams)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func encodeVarintGateway(dAtA []byte, offset int, v uint64) int { offset -= sovGateway(v) base := offset @@ -2197,6 +2523,46 @@ func (m *IndexQuery) Size() (n int) { return n } +func (m *IndexStatsRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.From != 0 { + n += 1 + sovGateway(uint64(m.From)) + } + if m.Through != 0 { + n += 1 + sovGateway(uint64(m.Through)) + } + l = len(m.Matchers) + if l > 0 { + n += 1 + l + sovGateway(uint64(l)) + } + return n +} + +func (m *IndexStatsResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Streams != 0 { + n += 1 + sovGateway(uint64(m.Streams)) + } + if m.Chunks != 0 { + n += 1 + sovGateway(uint64(m.Chunks)) + } + if m.Bytes != 0 { + n += 1 + sovGateway(uint64(m.Bytes)) + } + if m.Entries != 0 { + n += 1 + sovGateway(uint64(m.Entries)) + } + return n +} + func sovGateway(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -2359,6 +2725,31 @@ func (this *IndexQuery) String() string { }, "") return s } +func (this *IndexStatsRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&IndexStatsRequest{`, + `From:` + fmt.Sprintf("%v", this.From) + `,`, + `Through:` + fmt.Sprintf("%v", this.Through) + `,`, + `Matchers:` + fmt.Sprintf("%v", this.Matchers) + `,`, + `}`, + }, "") + return s +} +func (this *IndexStatsResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&IndexStatsResponse{`, + `Streams:` + fmt.Sprintf("%v", this.Streams) + `,`, + `Chunks:` + fmt.Sprintf("%v", this.Chunks) + `,`, + `Bytes:` + fmt.Sprintf("%v", this.Bytes) + `,`, + `Entries:` + fmt.Sprintf("%v", this.Entries) + `,`, + `}`, + }, "") + return s +} func valueToStringGateway(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -3815,6 +4206,258 @@ func (m *IndexQuery) Unmarshal(dAtA []byte) error { } return nil } +func (m *IndexStatsRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: IndexStatsRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: IndexStatsRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field From", wireType) + } + m.From = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.From |= github_com_prometheus_common_model.Time(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Through", wireType) + } + m.Through = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Through |= github_com_prometheus_common_model.Time(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Matchers", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGateway + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGateway + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Matchers = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGateway(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *IndexStatsResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: IndexStatsResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: IndexStatsResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Streams", wireType) + } + m.Streams = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Streams |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + m.Chunks = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Chunks |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Bytes", wireType) + } + m.Bytes = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Bytes |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) + } + m.Entries = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGateway + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Entries |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipGateway(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGateway + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipGateway(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto b/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto index faf91540d8df..c89db84980c4 100644 --- a/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto +++ b/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/gateway.proto @@ -15,6 +15,8 @@ service IndexGateway { rpc LabelNamesForMetricName(LabelNamesForMetricNameRequest) returns (LabelResponse) {} rpc LabelValuesForMetricName(LabelValuesForMetricNameRequest) returns (LabelResponse) {} + + rpc GetStats(IndexStatsRequest) returns (IndexStatsResponse) {} } message LabelValuesForMetricNameRequest { @@ -107,3 +109,24 @@ message IndexQuery { bytes rangeValueStart = 4; bytes valueEqual = 5; } + +message IndexStatsRequest { + int64 from = 1 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + int64 through = 2 [ + (gogoproto.customtype) = "github.com/prometheus/common/model.Time", + (gogoproto.nullable) = false + ]; + string matchers = 3; + // TODO(owen-d): add shards to grpc calls so we don't have + // to extract via labels +} + +message IndexStatsResponse { + uint64 streams = 1; + uint64 chunks = 2; + uint64 bytes = 3; + uint64 entries = 4; +} diff --git a/pkg/storage/stores/tsdb/head_manager.go b/pkg/storage/stores/tsdb/head_manager.go index 186367190fd1..41adf9a89de9 100644 --- a/pkg/storage/stores/tsdb/head_manager.go +++ b/pkg/storage/stores/tsdb/head_manager.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/client/util" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" "github.com/grafana/loki/pkg/util/wal" ) @@ -601,7 +602,7 @@ func (t *tenantHeads) LabelValues(ctx context.Context, userID string, from, thro } -func (t *tenantHeads) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) { +func (t *tenantHeads) Stats(ctx context.Context, userID string, from, through model.Time, blooms *stats.Blooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*stats.Blooms, error) { idx, ok := t.tenantIndex(userID, from, through) if !ok { return blooms, nil diff --git a/pkg/storage/stores/tsdb/index.go b/pkg/storage/stores/tsdb/index.go index df29eca95c2f..4dd7e3586015 100644 --- a/pkg/storage/stores/tsdb/index.go +++ b/pkg/storage/stores/tsdb/index.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) @@ -50,22 +51,7 @@ type Index interface { Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) - Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) -} - -type Stats struct { - Streams uint64 - Chunks uint64 - Bytes uint64 - Entries uint64 -} - -func (s Stats) Merge(x Stats) Stats { - s.Streams += x.Streams - s.Chunks += x.Chunks - s.Bytes += x.Bytes - s.Entries += x.Entries - return s + Stats(ctx context.Context, userID string, from, through model.Time, blooms *stats.Blooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*stats.Blooms, error) } type NoopIndex struct{} @@ -87,7 +73,7 @@ func (NoopIndex) LabelValues(ctx context.Context, userID string, from, through m return nil, nil } -func (NoopIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) { +func (NoopIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *stats.Blooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*stats.Blooms, error) { return nil, nil } diff --git a/pkg/storage/stores/tsdb/index_client.go b/pkg/storage/stores/tsdb/index_client.go index 8a5fb86e198b..53bb2fef6746 100644 --- a/pkg/storage/stores/tsdb/index_client.go +++ b/pkg/storage/stores/tsdb/index_client.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) @@ -108,6 +109,25 @@ func (c *IndexClient) LabelNamesForMetricName(ctx context.Context, userID string return c.idx.LabelNames(ctx, userID, from, through) } +func (c *IndexClient) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { + matchers = withoutNameLabel(matchers) + matchers, shard, err := c.shard(matchers...) + if err != nil { + return nil, err + } + + blooms := stats.BloomPool.Get() + defer stats.BloomPool.Put(blooms) + blooms, err = c.idx.Stats(ctx, userID, from, through, blooms, shard, matchers...) + + if err != nil { + return nil, err + } + res := blooms.Stats() + + return &res, nil +} + // SetChunkFilterer sets a chunk filter to be used when retrieving chunks. // This is only used for GetSeries implementation. // Todo we might want to pass it as a parameter to GetSeries instead. diff --git a/pkg/storage/stores/tsdb/index_shipper_querier.go b/pkg/storage/stores/tsdb/index_shipper_querier.go index 45c192535d06..1023fd4e195d 100644 --- a/pkg/storage/stores/tsdb/index_shipper_querier.go +++ b/pkg/storage/stores/tsdb/index_shipper_querier.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/indexshipper" shipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" @@ -111,7 +112,7 @@ func (i *indexShipperQuerier) LabelValues(ctx context.Context, userID string, fr return idx.LabelValues(ctx, userID, from, through, name, matchers...) } -func (i *indexShipperQuerier) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) { +func (i *indexShipperQuerier) Stats(ctx context.Context, userID string, from, through model.Time, blooms *stats.Blooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*stats.Blooms, error) { idx, err := i.indices(ctx, from, through, userID) if err != nil { return blooms, err diff --git a/pkg/storage/stores/tsdb/lazy_index.go b/pkg/storage/stores/tsdb/lazy_index.go index e29dcb4d1563..e5c9392734e1 100644 --- a/pkg/storage/stores/tsdb/lazy_index.go +++ b/pkg/storage/stores/tsdb/lazy_index.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) @@ -65,7 +66,7 @@ func (f LazyIndex) LabelValues(ctx context.Context, userID string, from, through return i.LabelValues(ctx, userID, from, through, name, matchers...) } -func (f LazyIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) { +func (f LazyIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *stats.Blooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*stats.Blooms, error) { i, err := f() if err != nil { return nil, err diff --git a/pkg/storage/stores/tsdb/multi_file_index.go b/pkg/storage/stores/tsdb/multi_file_index.go index 834063fead2c..9ba520fb923d 100644 --- a/pkg/storage/stores/tsdb/multi_file_index.go +++ b/pkg/storage/stores/tsdb/multi_file_index.go @@ -10,6 +10,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) @@ -236,9 +237,9 @@ func (i *MultiIndex) LabelValues(ctx context.Context, userID string, from, throu return results, nil } -func (i *MultiIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) { +func (i *MultiIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *stats.Blooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*stats.Blooms, error) { if blooms == nil { - blooms = BloomPool.Get() + blooms = stats.BloomPool.Get() } _, err := i.forIndices(ctx, from, through, func(ctx context.Context, idx Index) (interface{}, error) { diff --git a/pkg/storage/stores/tsdb/multitenant.go b/pkg/storage/stores/tsdb/multitenant.go index e252826ab107..945ab4e8d604 100644 --- a/pkg/storage/stores/tsdb/multitenant.go +++ b/pkg/storage/stores/tsdb/multitenant.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) @@ -27,7 +28,7 @@ func NewMultiTenantIndex(idx Index) *MultiTenantIndex { } func withTenantLabelMatcher(userID string, matchers []*labels.Matcher) []*labels.Matcher { - cpy := make([]*labels.Matcher, len(matchers)) + cpy := make([]*labels.Matcher, len(matchers)+1) copy(cpy, matchers) cpy = append(cpy, labels.MustNewMatcher(labels.MatchEqual, TenantLabel, userID)) return cpy @@ -89,6 +90,6 @@ func (m *MultiTenantIndex) LabelValues(ctx context.Context, userID string, from, return m.idx.LabelValues(ctx, userID, from, through, name, withTenantLabelMatcher(userID, matchers)...) } -func (m *MultiTenantIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) { +func (m *MultiTenantIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *stats.Blooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*stats.Blooms, error) { return m.idx.Stats(ctx, userID, from, through, blooms, shard, withTenantLabelMatcher(userID, matchers)...) } diff --git a/pkg/storage/stores/tsdb/pool.go b/pkg/storage/stores/tsdb/pool.go index 86bdb64f5d8a..0b02f5c11be6 100644 --- a/pkg/storage/stores/tsdb/pool.go +++ b/pkg/storage/stores/tsdb/pool.go @@ -1,12 +1,8 @@ package tsdb import ( - "encoding/binary" "sync" - "github.com/prometheus/common/model" - "github.com/willf/bloom" - "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) @@ -14,7 +10,6 @@ var ( ChunkMetasPool = &index.ChunkMetasPool // re-exporting SeriesPool PoolSeries ChunkRefsPool PoolChunkRefs - BloomPool PoolBloom ) type PoolSeries struct { @@ -50,89 +45,3 @@ func (p *PoolChunkRefs) Put(xs []ChunkRef) { //nolint:staticcheck p.pool.Put(xs) } - -type PoolBloom struct { - pool sync.Pool -} - -func (p *PoolBloom) Get() *StatsBlooms { - if x := p.pool.Get(); x != nil { - return x.(*StatsBlooms) - } - - return newStatsBlooms() - -} - -func (p *PoolBloom) Put(x *StatsBlooms) { - x.Streams.ClearAll() - x.Chunks.ClearAll() - x.stats = Stats{} - p.pool.Put(x) -} - -// These are very expensive in terms of memory usage, -// each requiring ~12.5MB. Therefore we heavily rely on pool usage. -// See https://hur.st/bloomfilter for play around with this idea. -func newStatsBlooms() *StatsBlooms { - // 1 million streams @ 1% error =~ 1.14MB - streams := bloom.NewWithEstimates(1e6, 0.01) - // 10 million chunks @ 1% error =~ 11.43MB - chunks := bloom.NewWithEstimates(10e6, 0.01) - return &StatsBlooms{ - Streams: streams, - Chunks: chunks, - } -} - -// TODO(owen-d): shard this across a slice of smaller bloom filters to reduce -// lock contention -// Bloom filters for estimating duplicate statistics across both series -// and chunks within TSDB indices. These are used to calculate data topology -// statistics prior to running queries. -type StatsBlooms struct { - sync.RWMutex - Streams, Chunks *bloom.BloomFilter - stats Stats -} - -func (b *StatsBlooms) Stats() Stats { return b.stats } - -func (b *StatsBlooms) AddStream(fp model.Fingerprint) { - key := make([]byte, 8) - binary.BigEndian.PutUint64(key, uint64(fp)) - b.add(b.Streams, key, func() { - b.stats.Streams++ - }) -} - -func (b *StatsBlooms) AddChunk(fp model.Fingerprint, chk index.ChunkMeta) { - // fingerprint + mintime + maxtime + checksum - ln := 8 + 8 + 8 + 4 - key := make([]byte, ln) - binary.BigEndian.PutUint64(key, uint64(fp)) - binary.BigEndian.PutUint64(key[8:], uint64(chk.MinTime)) - binary.BigEndian.PutUint64(key[16:], uint64(chk.MaxTime)) - binary.BigEndian.PutUint32(key[24:], chk.Checksum) - b.add(b.Chunks, key, func() { - b.stats.Chunks++ - b.stats.Bytes += uint64(chk.KB << 10) - b.stats.Entries += uint64(chk.Entries) - }) -} - -func (b *StatsBlooms) add(filter *bloom.BloomFilter, key []byte, update func()) { - b.RLock() - ok := filter.Test(key) - b.RUnlock() - - if ok { - return - } - - b.Lock() - defer b.Unlock() - if ok = filter.TestAndAdd(key); !ok { - update() - } -} diff --git a/pkg/storage/stores/tsdb/single_file_index.go b/pkg/storage/stores/tsdb/single_file_index.go index b71bd813dae3..c11af1dea881 100644 --- a/pkg/storage/stores/tsdb/single_file_index.go +++ b/pkg/storage/stores/tsdb/single_file_index.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/index/stats" index_shipper "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) @@ -262,9 +263,9 @@ func (i *TSDBIndex) Identifier(tenant string) SingleTenantTSDBIdentifier { } } -func (i *TSDBIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) { +func (i *TSDBIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *stats.Blooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*stats.Blooms, error) { if blooms == nil { - blooms = BloomPool.Get() + blooms = stats.BloomPool.Get() } queryBounds := newBounds(from, through) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 771552c9a285..c2be1a2d51a8 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores" + index_stats "github.com/grafana/loki/pkg/storage/stores/index/stats" loki_util "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -254,6 +255,10 @@ func (m *mockChunkStore) GetChunkRefs(ctx context.Context, userID string, from, return [][]chunk.Chunk{refs}, []*fetcher.Fetcher{f}, nil } +func (m *mockChunkStore) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*index_stats.Stats, error) { + return nil, nil +} + type mockChunkStoreClient struct { chunks []chunk.Chunk scfg config.SchemaConfig