Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tsdb/index sampling #6329

Merged
merged 7 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ require (
k8s.io/klog v1.0.0
)

require github.com/willf/bloom v2.0.3+incompatible

require (
cloud.google.com/go v0.100.2 // indirect
cloud.google.com/go/compute v1.3.0 // indirect
Expand Down Expand Up @@ -235,12 +237,14 @@ require (
github.com/sercand/kuberesolver v2.4.0+incompatible // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/willf/bitset v1.1.11 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1902,7 +1902,10 @@ github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMU
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/willf/bloom v2.0.3+incompatible h1:QDacWdqcAUI1MPOwIQZRy9kOR7yxfyEmxX8Wdm2/JPA=
github.com/willf/bloom v2.0.3+incompatible/go.mod h1:MmAltL9pDMNTrvUkxdg0k0q5I0suxmuwp3KbyrZLOZ8=
github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf/go.mod h1:nxx7XRXbR9ykhnC8lXqQyJS0rfvJGxKyKw/sT1YOttg=
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a/go.mod h1:vQQATAGxVK20DC1rRubTJbZDDhhpA4QfU02pMdPxGO4=
github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs=
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/stores/tsdb/head_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,14 @@ func (t *tenantHeads) LabelValues(ctx context.Context, userID string, from, thro

}

func (t *tenantHeads) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) {
idx, ok := t.tenantIndex(userID, from, through)
if !ok {
return blooms, nil
}
return idx.Stats(ctx, userID, from, through, blooms, shard, matchers...)
}

// helper only used in building TSDBs
func (t *tenantHeads) forAll(fn func(user string, ls labels.Labels, chks index.ChunkMetas)) error {
for i, shard := range t.tenants {
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/stores/tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ type Index interface {
Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error)
LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error)
LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error)
Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error)
}

type Stats struct {
Streams uint64
Chunks uint64
Bytes uint64
Entries uint64
}

func (s Stats) Merge(x Stats) Stats {
s.Streams += x.Streams
s.Chunks += x.Chunks
s.Bytes += x.Bytes
s.Entries += x.Entries
return s
}

type NoopIndex struct{}
Expand All @@ -71,4 +87,8 @@ func (NoopIndex) LabelValues(ctx context.Context, userID string, from, through m
return nil, nil
}

func (NoopIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) {
return nil, nil
}

func (NoopIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {}
9 changes: 9 additions & 0 deletions pkg/storage/stores/tsdb/index_shipper_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,12 @@ func (i *indexShipperQuerier) LabelValues(ctx context.Context, userID string, fr
}
return idx.LabelValues(ctx, userID, from, through, name, matchers...)
}

func (i *indexShipperQuerier) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) {
idx, err := i.indices(ctx, from, through, userID)
if err != nil {
return blooms, err
}

return idx.Stats(ctx, userID, from, through, blooms, shard, matchers...)
}
8 changes: 8 additions & 0 deletions pkg/storage/stores/tsdb/lazy_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,11 @@ func (f LazyIndex) LabelValues(ctx context.Context, userID string, from, through
}
return i.LabelValues(ctx, userID, from, through, name, matchers...)
}

func (f LazyIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) {
i, err := f()
if err != nil {
return nil, err
}
return i.Stats(ctx, userID, from, through, blooms, shard, matchers...)
}
11 changes: 11 additions & 0 deletions pkg/storage/stores/tsdb/multi_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,14 @@ func (i *MultiIndex) LabelValues(ctx context.Context, userID string, from, throu

return results, nil
}

func (i *MultiIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) {
if blooms == nil {
blooms = BloomPool.Get()
}

_, err := i.forIndices(ctx, from, through, func(ctx context.Context, idx Index) (interface{}, error) {
return idx.Stats(ctx, userID, from, through, blooms, shard, matchers...)
})
return blooms, err
}
4 changes: 4 additions & 0 deletions pkg/storage/stores/tsdb/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,7 @@ func (m *MultiTenantIndex) LabelValues(ctx context.Context, userID string, from,
}
return m.idx.LabelValues(ctx, userID, from, through, name, withTenantLabelMatcher(userID, matchers)...)
}

func (m *MultiTenantIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) {
return m.idx.Stats(ctx, userID, from, through, blooms, shard, withTenantLabelMatcher(userID, matchers)...)
}
91 changes: 91 additions & 0 deletions pkg/storage/stores/tsdb/pool.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package tsdb

import (
"encoding/binary"
"sync"

"github.com/prometheus/common/model"
"github.com/willf/bloom"

"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

var (
ChunkMetasPool = &index.ChunkMetasPool // re-exporting
SeriesPool PoolSeries
ChunkRefsPool PoolChunkRefs
BloomPool PoolBloom
)

type PoolSeries struct {
Expand Down Expand Up @@ -45,3 +50,89 @@ func (p *PoolChunkRefs) Put(xs []ChunkRef) {
//nolint:staticcheck
p.pool.Put(xs)
}

type PoolBloom struct {
pool sync.Pool
}

func (p *PoolBloom) Get() *StatsBlooms {
if x := p.pool.Get(); x != nil {
return x.(*StatsBlooms)
}

return newStatsBlooms()

}

func (p *PoolBloom) Put(x *StatsBlooms) {
x.Streams.ClearAll()
x.Chunks.ClearAll()
x.stats = Stats{}
p.pool.Put(x)
}

// These are very expensive in terms of memory usage,
// each requiring ~12.5MB. Therefore we heavily rely on pool usage.
// See https://hur.st/bloomfilter for play around with this idea.
func newStatsBlooms() *StatsBlooms {
// 1 million streams @ 1% error =~ 1.14MB
streams := bloom.NewWithEstimates(1e6, 0.01)
// 10 million chunks @ 1% error =~ 11.43MB
chunks := bloom.NewWithEstimates(10e6, 0.01)
return &StatsBlooms{
Streams: streams,
Chunks: chunks,
}
}

// TODO(owen-d): shard this across a slice of smaller bloom filters to reduce
// lock contention
// Bloom filters for estimating duplicate statistics across both series
// and chunks within TSDB indices. These are used to calculate data topology
// statistics prior to running queries.
type StatsBlooms struct {
sync.RWMutex
Streams, Chunks *bloom.BloomFilter
stats Stats
}

func (b *StatsBlooms) Stats() Stats { return b.stats }

func (b *StatsBlooms) AddStream(fp model.Fingerprint) {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, uint64(fp))
b.add(b.Streams, key, func() {
b.stats.Streams++
})
}

func (b *StatsBlooms) AddChunk(fp model.Fingerprint, chk index.ChunkMeta) {
// fingerprint + mintime + maxtime + checksum
ln := 8 + 8 + 8 + 4
key := make([]byte, ln)
binary.BigEndian.PutUint64(key, uint64(fp))
binary.BigEndian.PutUint64(key[8:], uint64(chk.MinTime))
binary.BigEndian.PutUint64(key[16:], uint64(chk.MaxTime))
binary.BigEndian.PutUint32(key[24:], chk.Checksum)
b.add(b.Chunks, key, func() {
b.stats.Chunks++
b.stats.Bytes += uint64(chk.KB << 10)
b.stats.Entries += uint64(chk.Entries)
})
}

func (b *StatsBlooms) add(filter *bloom.BloomFilter, key []byte, update func()) {
b.RLock()
ok := filter.Test(key)
b.RUnlock()

if ok {
return
}

b.Lock()
defer b.Unlock()
if ok = filter.TestAndAdd(key); !ok {
update()
}
}
47 changes: 47 additions & 0 deletions pkg/storage/stores/tsdb/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tsdb

import (
"sync"
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

func TestStatsBloom_Stream(t *testing.T) {
sb := BloomPool.Get()
var wg sync.WaitGroup
for i := 0; i < 40; i++ {
wg.Add(1)
go func(x int) {
sb.AddStream(model.Fingerprint(x % 2))
wg.Done()
}(i)
}
wg.Wait()

require.Equal(t, uint64(2), sb.stats.Streams)
}

func TestStatsBloom_Chunks(t *testing.T) {
sb := BloomPool.Get()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(x int) {
sb.AddChunk(model.Fingerprint(x%2), index.ChunkMeta{
Checksum: uint32(x) % 4,
KB: 1,
Entries: 1,
})
wg.Done()
}(i)
}
wg.Wait()

require.Equal(t, 4, int(sb.stats.Chunks))
require.Equal(t, 4<<10, int(sb.stats.Bytes))
require.Equal(t, 4, int(sb.stats.Entries))
}
27 changes: 27 additions & 0 deletions pkg/storage/stores/tsdb/single_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,30 @@ func (i *TSDBIndex) Identifier(tenant string) SingleTenantTSDBIdentifier {
Checksum: i.Checksum(),
}
}

func (i *TSDBIndex) Stats(ctx context.Context, userID string, from, through model.Time, blooms *StatsBlooms, shard *index.ShardAnnotation, matchers ...*labels.Matcher) (*StatsBlooms, error) {
if blooms == nil {
blooms = BloomPool.Get()
}
queryBounds := newBounds(from, through)

if err := i.forSeries(ctx, shard,
func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
// TODO(owen-d): use logarithmic approach
var addedStream bool
for _, chk := range chks {
if Overlap(queryBounds, chk) {
if !addedStream {
blooms.AddStream(fp)
addedStream = true
}
blooms.AddChunk(fp, chk)
}
}
},
matchers...); err != nil {
return blooms, err
}

return blooms, nil
}
22 changes: 22 additions & 0 deletions vendor/github.com/spaolacci/murmur3/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions vendor/github.com/spaolacci/murmur3/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions vendor/github.com/spaolacci/murmur3/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading