Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tsdb/inverted index wiring #6252

Merged
merged 9 commits into from
Jun 3, 2022
5 changes: 3 additions & 2 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
require.Nil(t, err)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
instances = append(instances, inst)
Expand Down Expand Up @@ -495,7 +496,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

for i := range instances {
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)

require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (s *testStore) GetChunkRefs(ctx context.Context, userID string, from, throu
}

func (s *testStore) GetSchemaConfigs() []config.PeriodConfig {
return nil
return defaultPeriodConfigs
}

func (s *testStore) Stop() {}
Expand Down
11 changes: 9 additions & 2 deletions pkg/ingester/index/bitprefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ type BitPrefixInvertedIndex struct {
shards []*indexShard
}

func ValidateBitPrefixShardFactor(factor uint32) error {
if requiredBits := index.NewShard(0, factor).RequiredBits(); 1<<requiredBits != factor {
return fmt.Errorf("Incompatible inverted index shard factor on ingester: It must be a power of two, got %d", factor)
}
return nil
}

func NewBitPrefixWithShards(totalShards uint32) (*BitPrefixInvertedIndex, error) {
if requiredBits := index.NewShard(0, totalShards).RequiredBits(); 1<<requiredBits != totalShards {
return nil, fmt.Errorf("Shard factor must be a power of two, got %d", totalShards)
if err := ValidateBitPrefixShardFactor(totalShards); err != nil {
return nil, err
}

shards := make([]*indexShard, totalShards)
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Interface interface {
Lookup(matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error)
LabelNames(shard *astmapper.ShardAnnotation) ([]string, error)
LabelValues(name string, shard *astmapper.ShardAnnotation) ([]string, error)
Delete(labels labels.Labels, fp model.Fingerprint)
}

// InvertedIndex implements a in-memory inverted index from label pairs to fingerprints.
Expand Down
123 changes: 123 additions & 0 deletions pkg/ingester/index/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package index

import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/config"
)

type periodIndex struct {
time.Time
idx int // address of the index to use
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit confused about what this represents. it seems to be a type more than an address to me, ie 0 is a regular inverted index, whereas 1 is a tsdb inverted index. could we reflect that in the name and/or comment?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is this the index in the indices property on Multi? If so, and there can only be 2, I'm curious if instead of storing these in a fixed length slice with magic indices, maybe Multi should have a field for each type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point. I wrote it like this in case we had period configs like [boltdb, tsdb, boltdb, tsdb], in which case we'd want the first period to point to the index of the boltdb inverted index, the second to point to the tsdb inverted index and so on. Decoupling the period from the index it points to allows us to instantiate a maximum of one inverted index for each type, regardless of the number of period configs.

The way it's written now will allow us to add other index types in the future if necessary. Does that make sense?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does, but we could also come back and update this struct with the necessary types when we get to that point. just feels like we're overloading slice indexing as a mechanism for expressing type, whereas there are clearer ways to do that.

that being said I have no interest in dying on this hill, and I'm cool with keeping it this way if you think it's more flexible.

}

type Multi struct {
periods []periodIndex
indices []Interface
}

func NewMultiInvertedIndex(periods []config.PeriodConfig, indexShards uint32) (*Multi, error) {
var (
err error

ii Interface // always stored in 0th index
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does ii stand for invertedIndex? if so I might opt for the longer version of that variable as it's not that long.

bitPrefixed Interface // always stored in 1st index

periodIndices []periodIndex
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
)

for _, pd := range periods {
switch pd.IndexType {
case config.TSDBType:
if bitPrefixed == nil {
bitPrefixed, err = NewBitPrefixWithShards(indexShards)
if err != nil {
return nil, err
Copy link
Contributor

@DylanGuedes DylanGuedes May 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe you could wrap this error with errors.Wrap and add more details to the message, like which period erroed and its index type. Motivation:
there's a chain of calls that aren't wrapping errors that will end up here, so it might be hard to identify the root of the problem. The chain is:

  • GetOrCreateInstance is invoked at multiple places without wrapping errors and triggers newInstance
    • newInstance will invoke index.NewMultiInvertedIndex without wrapping errors
      • we end up here, with NewMultiInvertedIndex invoking NewBitPrefixWithShards

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

}
}
periodIndices = append(periodIndices, periodIndex{
Time: pd.From.Time.Time(),
idx: 1, // tsdb inverted index is always stored in position one
})
default:
if ii == nil {
ii = NewWithShards(indexShards)
}
periodIndices = append(periodIndices, periodIndex{
Time: pd.From.Time.Time(),
idx: 0, // regular inverted index is always stored in position zero
})
}
}

return &Multi{
periods: periodIndices,
indices: []Interface{ii, bitPrefixed},
}, nil
}

func (m *Multi) Add(labels []logproto.LabelAdapter, fp model.Fingerprint) (result labels.Labels) {
for _, i := range m.indices {
if i != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for curiosity, why would an index inside m.indices be nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where there's only tsdb period configs or non-tsdb period configs (such as boltdb-shipper), only one of the sub-indices will be needed. If that's the case, we can avoid the overhead of running an unnecessary one.

result = i.Add(labels, fp)
}
}
return
}

func (m *Multi) Delete(labels labels.Labels, fp model.Fingerprint) {
for _, i := range m.indices {
if i != nil {
i.Delete(labels, fp)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change

}

func (m *Multi) Lookup(t time.Time, matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error) {
return m.indexFor(t).Lookup(matchers, shard)
}

func (m *Multi) LabelNames(t time.Time, shard *astmapper.ShardAnnotation) ([]string, error) {
return m.indexFor(t).LabelNames(shard)
}

func (m *Multi) LabelValues(t time.Time, name string, shard *astmapper.ShardAnnotation) ([]string, error) {
return m.indexFor(t).LabelValues(name, shard)
}

// Query planning is responsible for ensuring no query spans more than one inverted index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment out of place?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intended to put it here because most queries in loki use a from, through, but in this case we're using another part of the code base to ensure we'd never query more than one period at a time, so we don't need to account for the range from->through, only the first point (from).

// Therefore we don't need to account for both `from` and `through`.
func (m *Multi) indexFor(t time.Time) Interface {
for i := range m.periods {
if !m.periods[i].Time.After(t) && (i+1 == len(m.periods) || t.Before(m.periods[i+1].Time)) {
return m.indices[m.periods[i].idx]
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
}
}
return noopInvertedIndex{}
}

type noopInvertedIndex struct{}

func (noopInvertedIndex) Add(labels []logproto.LabelAdapter, fp model.Fingerprint) labels.Labels {
return nil
}

func (noopInvertedIndex) Delete(labels labels.Labels, fp model.Fingerprint) {}

func (noopInvertedIndex) Lookup(matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error) {
return nil, nil
}

func (noopInvertedIndex) LabelNames(shard *astmapper.ShardAnnotation) ([]string, error) {
return nil, nil
}

func (noopInvertedIndex) LabelValues(name string, shard *astmapper.ShardAnnotation) ([]string, error) {
return nil, nil
}
179 changes: 179 additions & 0 deletions pkg/ingester/index/multi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package index

import (
"sort"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

func MustParseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
panic(err)
}
return config.DayTime{Time: model.TimeFromUnix(t.Unix())}
}

var testPeriodConfigs = []config.PeriodConfig{
{
From: MustParseDayTime("2020-01-01"),
IndexType: config.StorageTypeBigTable,
},
{
From: MustParseDayTime("2021-01-01"),
IndexType: config.TSDBType,
},
{
From: MustParseDayTime("2022-01-01"),
IndexType: config.BoltDBShipperType,
},
{
From: MustParseDayTime("2023-01-01"),
IndexType: config.TSDBType,
},
}

// Only run the specific shard factor validation logic if a period config using
// tsdb exists
func TestIgnoresInvalidShardFactorWhenTSDBNotPresent(t *testing.T) {
factor := uint32(6)
_, err := NewMultiInvertedIndex(
[]config.PeriodConfig{
{
From: MustParseDayTime("2020-01-01"),
IndexType: config.StorageTypeBigTable,
},
},
factor,
)
require.Nil(t, err)
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved

_, err = NewMultiInvertedIndex(
[]config.PeriodConfig{
{
From: MustParseDayTime("2020-01-01"),
IndexType: config.StorageTypeBigTable,
},
{
From: MustParseDayTime("2021-01-01"),
IndexType: config.TSDBType,
},
},
factor,
)
require.Error(t, err)

DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
}

func TestMultiIndexCreation(t *testing.T) {
multi, err := NewMultiInvertedIndex(testPeriodConfigs, uint32(2))
require.Nil(t, err)

x, _ := NewBitPrefixWithShards(2)
expected := &Multi{
periods: []periodIndex{
{
Time: testPeriodConfigs[0].From.Time.Time(),
idx: 0,
},
{
Time: testPeriodConfigs[1].From.Time.Time(),
idx: 1,
},
{
Time: testPeriodConfigs[2].From.Time.Time(),
idx: 0,
},
{
Time: testPeriodConfigs[3].From.Time.Time(),
idx: 1,
},
},
indices: []Interface{
NewWithShards(2),
x,
},
}
require.Equal(t, expected, multi)
}

func TestMultiIndex(t *testing.T) {
factor := uint32(32)
multi, err := NewMultiInvertedIndex(testPeriodConfigs, factor)
require.Nil(t, err)

lbs := []logproto.LabelAdapter{
{Name: "foo", Value: "foo"},
{Name: "bar", Value: "bar"},
{Name: "buzz", Value: "buzz"},
}
sort.Sort(logproto.FromLabelAdaptersToLabels(lbs))
fp := model.Fingerprint((logproto.FromLabelAdaptersToLabels(lbs).Hash()))

ls := multi.Add(lbs, fp)

// Lookup at a time corresponding to a non-tsdb periodconfig
// and ensure we use modulo hashing
expShard := labelsSeriesIDHash(logproto.FromLabelAdaptersToLabels(lbs)) % factor
ids, err := multi.Lookup(
testPeriodConfigs[0].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
&astmapper.ShardAnnotation{Shard: int(expShard), Of: int(factor)},
)

require.Nil(t, err)
require.Equal(t, []model.Fingerprint{fp}, ids)

// Lookup at a time corresponding to a tsdb periodconfig
// and ensure we use bit prefix hashing
requiredBits := index.NewShard(0, factor).RequiredBits()
expShard = uint32(fp >> (64 - requiredBits))
ids, err = multi.Lookup(
testPeriodConfigs[1].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
&astmapper.ShardAnnotation{Shard: int(expShard), Of: int(factor)},
)

require.Nil(t, err)
require.Equal(t, []model.Fingerprint{fp}, ids)

// Delete the entry
multi.Delete(ls, fp)

// Ensure deleted entry is not in modulo variant
ids, err = multi.Lookup(
testPeriodConfigs[0].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
nil,
)

require.Nil(t, err)
require.Equal(t, 0, len(ids))

// Ensure deleted entry is not in bit prefix variant
ids, err = multi.Lookup(
testPeriodConfigs[1].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
nil,
)

require.Nil(t, err)
require.Equal(t, 0, len(ids))
}
Loading