Skip to content

Commit

Permalink
Add LabelFilterer and Store wrapper (#4818)
Browse files Browse the repository at this point in the history
* Add LabelFilterer and Store wrapper

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Address review comments

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
  • Loading branch information
MichelHollands authored Nov 26, 2021
1 parent c53457f commit e69ceef
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 5 deletions.
34 changes: 32 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Config struct {
WAL WALConfig `yaml:"wal,omitempty"`

ChunkFilterer storage.RequestChunkFilterer `yaml:"-"`
LabelFilterer LabelValueFilterer `yaml:"-"`

IndexShards int `yaml:"index_shards"`
}
Expand Down Expand Up @@ -125,12 +126,17 @@ func (cfg *Config) Validate() error {
}

if cfg.IndexShards <= 0 {
return fmt.Errorf("Invalid ingester index shard factor: %d", cfg.IndexShards)
return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards)
}

return nil
}

// ChunkFilterer filters chunks based on the metric.
type LabelValueFilterer interface {
Filter(ctx context.Context, labelName string, labelValues []string) ([]string, error)
}

// Ingester builds chunks for incoming log streams.
type Ingester struct {
services.Service
Expand Down Expand Up @@ -173,6 +179,7 @@ type Ingester struct {
wal WAL

chunkFilter storage.RequestChunkFilterer
labelFilter LabelValueFilterer
}

// ChunkStore is the interface we need to store chunks.
Expand Down Expand Up @@ -245,13 +252,21 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
i.SetChunkFilterer(i.cfg.ChunkFilterer)
}

if i.cfg.LabelFilterer != nil {
i.SetLabelFilterer(i.cfg.LabelFilterer)
}

return i, nil
}

func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer) {
i.chunkFilter = chunkFilter
}

func (i *Ingester) SetLabelFilterer(labelFilter LabelValueFilterer) {
i.labelFilter = labelFilter
}

// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled
// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod`
func (i *Ingester) setupAutoForget() {
Expand Down Expand Up @@ -717,8 +732,23 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
}
}

allValues := listutil.MergeStringLists(resp.Values, storeValues)

if req.Values && i.labelFilter != nil {
var filteredValues []string

filteredValues, err = i.labelFilter.Filter(ctx, req.Name, allValues)
if err != nil {
return nil, err
}

return &logproto.LabelResponse{
Values: filteredValues,
}, nil
}

return &logproto.LabelResponse{
Values: listutil.MergeStringLists(resp.Values, storeValues),
Values: allValues,
}, nil
}

Expand Down
130 changes: 128 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ func (s *mockStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
return nil, nil
}

func (s *mockStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
return nil, nil, nil
func (s *mockStore) GetSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
return nil, nil
}

func (s *mockStore) GetSchemaConfigs() []chunk.PeriodConfig {
Expand All @@ -289,6 +289,41 @@ func (s *mockStore) GetSchemaConfigs() []chunk.PeriodConfig {
func (s *mockStore) SetChunkFilterer(_ storage.RequestChunkFilterer) {
}

// chunk.Store methods
func (s *mockStore) PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error {
return nil
}

func (s *mockStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
return nil, nil
}

func (s *mockStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
return nil, nil, nil
}

func (s *mockStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error) {
return []string{"val1", "val2"}, nil
}

func (s *mockStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
return nil, nil
}

func (s *mockStore) GetChunkFetcher(tm model.Time) *chunk.Fetcher {
return nil
}

func (s *mockStore) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error {
return nil
}

func (s *mockStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error {
return nil
}

func (s *mockStore) Stop() {}

type mockQuerierServer struct {
ctx context.Context
resps []*logproto.QueryResponse
Expand Down Expand Up @@ -559,3 +594,94 @@ func Test_InMemoryLabels(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{"bar", "foo"}, res.Values)
}

func InMemoryLabels(t *testing.T, labelFilterer LabelValueFilterer, expectedValues []string) {
ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.QueryStore = true
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

store := &mockStore{
chunks: map[string][]chunk.Chunk{},
}

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
i.labelFilter = labelFilterer
future := time.Now().Local().Add(time.Minute * 5)
i.periodicConfigs = []chunk.PeriodConfig{
{
From: chunk.DayTime{
Time: model.TimeFromUnix(future.Unix()),
},
},
}
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
},
}
for i := 0; i < 10; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}

ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)

start := time.Now().Add(-5 * time.Minute)
end := time.Now()
res, err := i.Label(ctx, &logproto.LabelRequest{
Name: "bar",
Values: true,
Start: &start,
End: &end,
})

require.NoError(t, err)
require.Equal(t, expectedValues, res.Values)

res, err = i.Label(ctx, &logproto.LabelRequest{})
require.NoError(t, err)
require.Equal(t, []string{"bar", "foo"}, res.Values)
}

func Test_InMemoryLabels_WithoutLabelFilter(t *testing.T) {
expectedValues := []string{"baz1", "baz2", "val1", "val2"}

InMemoryLabels(t, nil, expectedValues)
}

// DummyLabelValuedFilterer adds i to the front of the label value
type DummyLabelValueFilterer struct{}

func (*DummyLabelValueFilterer) Filter(ctx context.Context, labelName string, labelValues []string) ([]string, error) {
var updatedValues []string

for _, v := range labelValues {
updatedValues = append(updatedValues, fmt.Sprintf("i%v", v))
}

return updatedValues, nil
}

func Test_InMemoryLabels_WithLabelFilter(t *testing.T) {
labelFilter := DummyLabelValueFilterer{}
expectedValues := []string{"ibaz1", "ibaz2", "ival1", "ival2"}

InMemoryLabels(t, &labelFilter, expectedValues)
}
1 change: 0 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr
}

results := append(ingesterValues, storeValues)

return &logproto.LabelResponse{
Values: listutil.MergeStringLists(results...),
}, nil
Expand Down

0 comments on commit e69ceef

Please sign in to comment.