Skip to content

Commit

Permalink
Tsdb/inverted index wiring (#6252)
Browse files Browse the repository at this point in the history
* multi inverted index

* delete in inverted index signature

* multi inverted index

* inverted index testing + skip nil indices

* wires up period config aware multi-inverted-idx through ingesters

* better inverted index validation

* apply shipper defaults to last relevant period config, not current one.

* more verbose error
  • Loading branch information
owen-d authored Jun 3, 2022
1 parent ea0a524 commit 4903df6
Show file tree
Hide file tree
Showing 15 changed files with 507 additions and 63 deletions.
5 changes: 3 additions & 2 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
require.Nil(t, err)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
instances = append(instances, inst)
Expand Down Expand Up @@ -495,7 +496,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

for i := range instances {
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)

require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (s *testStore) GetChunkRefs(ctx context.Context, userID string, from, throu
}

func (s *testStore) GetSchemaConfigs() []config.PeriodConfig {
return nil
return defaultPeriodConfigs
}

func (s *testStore) Stop() {}
Expand Down
11 changes: 9 additions & 2 deletions pkg/ingester/index/bitprefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ type BitPrefixInvertedIndex struct {
shards []*indexShard
}

func ValidateBitPrefixShardFactor(factor uint32) error {
if requiredBits := index.NewShard(0, factor).RequiredBits(); 1<<requiredBits != factor {
return fmt.Errorf("Incompatible inverted index shard factor on ingester: It must be a power of two, got %d", factor)
}
return nil
}

func NewBitPrefixWithShards(totalShards uint32) (*BitPrefixInvertedIndex, error) {
if requiredBits := index.NewShard(0, totalShards).RequiredBits(); 1<<requiredBits != totalShards {
return nil, fmt.Errorf("Shard factor must be a power of two, got %d", totalShards)
if err := ValidateBitPrefixShardFactor(totalShards); err != nil {
return nil, err
}

shards := make([]*indexShard, totalShards)
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Interface interface {
Lookup(matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error)
LabelNames(shard *astmapper.ShardAnnotation) ([]string, error)
LabelValues(name string, shard *astmapper.ShardAnnotation) ([]string, error)
Delete(labels labels.Labels, fp model.Fingerprint)
}

// InvertedIndex implements a in-memory inverted index from label pairs to fingerprints.
Expand Down
124 changes: 124 additions & 0 deletions pkg/ingester/index/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package index

import (
"time"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/config"
)

type periodIndex struct {
time.Time
idx int // address of the index to use
}

type Multi struct {
periods []periodIndex
indices []Interface
}

func NewMultiInvertedIndex(periods []config.PeriodConfig, indexShards uint32) (*Multi, error) {
var (
err error

ii Interface // always stored in 0th index
bitPrefixed Interface // always stored in 1st index

periodIndices []periodIndex
)

for _, pd := range periods {
switch pd.IndexType {
case config.TSDBType:
if bitPrefixed == nil {
bitPrefixed, err = NewBitPrefixWithShards(indexShards)
if err != nil {
return nil, errors.Wrapf(err, "creating tsdb inverted index for period starting %v", pd.From)
}
}
periodIndices = append(periodIndices, periodIndex{
Time: pd.From.Time.Time(),
idx: 1, // tsdb inverted index is always stored in position one
})
default:
if ii == nil {
ii = NewWithShards(indexShards)
}
periodIndices = append(periodIndices, periodIndex{
Time: pd.From.Time.Time(),
idx: 0, // regular inverted index is always stored in position zero
})
}
}

return &Multi{
periods: periodIndices,
indices: []Interface{ii, bitPrefixed},
}, nil
}

func (m *Multi) Add(labels []logproto.LabelAdapter, fp model.Fingerprint) (result labels.Labels) {
for _, i := range m.indices {
if i != nil {
result = i.Add(labels, fp)
}
}
return
}

func (m *Multi) Delete(labels labels.Labels, fp model.Fingerprint) {
for _, i := range m.indices {
if i != nil {
i.Delete(labels, fp)
}
}

}

func (m *Multi) Lookup(t time.Time, matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error) {
return m.indexFor(t).Lookup(matchers, shard)
}

func (m *Multi) LabelNames(t time.Time, shard *astmapper.ShardAnnotation) ([]string, error) {
return m.indexFor(t).LabelNames(shard)
}

func (m *Multi) LabelValues(t time.Time, name string, shard *astmapper.ShardAnnotation) ([]string, error) {
return m.indexFor(t).LabelValues(name, shard)
}

// Query planning is responsible for ensuring no query spans more than one inverted index.
// Therefore we don't need to account for both `from` and `through`.
func (m *Multi) indexFor(t time.Time) Interface {
for i := range m.periods {
if !m.periods[i].Time.After(t) && (i+1 == len(m.periods) || t.Before(m.periods[i+1].Time)) {
return m.indices[m.periods[i].idx]
}
}
return noopInvertedIndex{}
}

type noopInvertedIndex struct{}

func (noopInvertedIndex) Add(labels []logproto.LabelAdapter, fp model.Fingerprint) labels.Labels {
return nil
}

func (noopInvertedIndex) Delete(labels labels.Labels, fp model.Fingerprint) {}

func (noopInvertedIndex) Lookup(matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error) {
return nil, nil
}

func (noopInvertedIndex) LabelNames(shard *astmapper.ShardAnnotation) ([]string, error) {
return nil, nil
}

func (noopInvertedIndex) LabelValues(name string, shard *astmapper.ShardAnnotation) ([]string, error) {
return nil, nil
}
179 changes: 179 additions & 0 deletions pkg/ingester/index/multi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package index

import (
"sort"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

func MustParseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
panic(err)
}
return config.DayTime{Time: model.TimeFromUnix(t.Unix())}
}

var testPeriodConfigs = []config.PeriodConfig{
{
From: MustParseDayTime("2020-01-01"),
IndexType: config.StorageTypeBigTable,
},
{
From: MustParseDayTime("2021-01-01"),
IndexType: config.TSDBType,
},
{
From: MustParseDayTime("2022-01-01"),
IndexType: config.BoltDBShipperType,
},
{
From: MustParseDayTime("2023-01-01"),
IndexType: config.TSDBType,
},
}

// Only run the specific shard factor validation logic if a period config using
// tsdb exists
func TestIgnoresInvalidShardFactorWhenTSDBNotPresent(t *testing.T) {
factor := uint32(6)
_, err := NewMultiInvertedIndex(
[]config.PeriodConfig{
{
From: MustParseDayTime("2020-01-01"),
IndexType: config.StorageTypeBigTable,
},
},
factor,
)
require.Nil(t, err)

_, err = NewMultiInvertedIndex(
[]config.PeriodConfig{
{
From: MustParseDayTime("2020-01-01"),
IndexType: config.StorageTypeBigTable,
},
{
From: MustParseDayTime("2021-01-01"),
IndexType: config.TSDBType,
},
},
factor,
)
require.Error(t, err)

}

func TestMultiIndexCreation(t *testing.T) {
multi, err := NewMultiInvertedIndex(testPeriodConfigs, uint32(2))
require.Nil(t, err)

x, _ := NewBitPrefixWithShards(2)
expected := &Multi{
periods: []periodIndex{
{
Time: testPeriodConfigs[0].From.Time.Time(),
idx: 0,
},
{
Time: testPeriodConfigs[1].From.Time.Time(),
idx: 1,
},
{
Time: testPeriodConfigs[2].From.Time.Time(),
idx: 0,
},
{
Time: testPeriodConfigs[3].From.Time.Time(),
idx: 1,
},
},
indices: []Interface{
NewWithShards(2),
x,
},
}
require.Equal(t, expected, multi)
}

func TestMultiIndex(t *testing.T) {
factor := uint32(32)
multi, err := NewMultiInvertedIndex(testPeriodConfigs, factor)
require.Nil(t, err)

lbs := []logproto.LabelAdapter{
{Name: "foo", Value: "foo"},
{Name: "bar", Value: "bar"},
{Name: "buzz", Value: "buzz"},
}
sort.Sort(logproto.FromLabelAdaptersToLabels(lbs))
fp := model.Fingerprint((logproto.FromLabelAdaptersToLabels(lbs).Hash()))

ls := multi.Add(lbs, fp)

// Lookup at a time corresponding to a non-tsdb periodconfig
// and ensure we use modulo hashing
expShard := labelsSeriesIDHash(logproto.FromLabelAdaptersToLabels(lbs)) % factor
ids, err := multi.Lookup(
testPeriodConfigs[0].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
&astmapper.ShardAnnotation{Shard: int(expShard), Of: int(factor)},
)

require.Nil(t, err)
require.Equal(t, []model.Fingerprint{fp}, ids)

// Lookup at a time corresponding to a tsdb periodconfig
// and ensure we use bit prefix hashing
requiredBits := index.NewShard(0, factor).RequiredBits()
expShard = uint32(fp >> (64 - requiredBits))
ids, err = multi.Lookup(
testPeriodConfigs[1].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
&astmapper.ShardAnnotation{Shard: int(expShard), Of: int(factor)},
)

require.Nil(t, err)
require.Equal(t, []model.Fingerprint{fp}, ids)

// Delete the entry
multi.Delete(ls, fp)

// Ensure deleted entry is not in modulo variant
ids, err = multi.Lookup(
testPeriodConfigs[0].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
nil,
)

require.Nil(t, err)
require.Equal(t, 0, len(ids))

// Ensure deleted entry is not in bit prefix variant
ids, err = multi.Lookup(
testPeriodConfigs[1].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
nil,
)

require.Nil(t, err)
require.Equal(t, 0, len(ids))
}
Loading

0 comments on commit 4903df6

Please sign in to comment.