Skip to content

Commit

Permalink
fix: grouping
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed May 24, 2024
1 parent b897fc5 commit 2587657
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 84 deletions.
2 changes: 2 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ schema_config:

pattern_ingester:
enabled: true
metric_aggregation:
enabled: true

ruler:
alertmanager_url: http://localhost:9093
Expand Down
4 changes: 2 additions & 2 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ type Config struct {
func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("pattern-ingester.", fs, util_log.Logger)
cfg.ClientConfig.RegisterFlags(fs)
cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.")

fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.")
fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")

cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.")
}

func (cfg *Config) Validate() error {
Expand Down
13 changes: 1 addition & 12 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,13 @@ func (i *instance) QuerySample(
return nil, err
}

typ, err := metric.ExtractMetricType(expr)
if err != nil || typ == metric.Unsupported {
return nil, err
}

var iters []iter.Iterator
err = i.forMatchingStreams(
selector.Matchers(),
func(stream *stream) error {
var iter iter.Iterator
var err error
if typ == metric.Bytes {
iter, err = stream.BytesIterator(ctx, expr, from, through, step)
} else if typ == metric.Count {
iter, err = stream.CountIterator(ctx, expr, from, through, step)
} else {
return fmt.Errorf("unsupported query operation")
}
iter, err = stream.SampleIterator(ctx, expr, from, through, step)

if err != nil {
return err
Expand Down
19 changes: 16 additions & 3 deletions pkg/pattern/metric/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/chunk"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -51,12 +52,23 @@ func (c *Chunks) Observe(bytes, count uint64, ts model.Time) {
func (c *Chunks) Iterator(
ctx context.Context,

Check warning on line 53 in pkg/pattern/metric/chunk.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
typ MetricType,
grouping *syntax.Grouping,
from, through, step model.Time,
) (iter.Iterator, error) {
if typ == Unsupported {
return nil, fmt.Errorf("unsupported metric type")
}

lbls := c.labels
if grouping != nil {
sort.Strings(grouping.Groups)
lbls = make(labels.Labels, 0, len(grouping.Groups))
for _, group := range grouping.Groups {
value := c.labels.Get(group)
lbls = append(lbls, labels.Label{Name: group, Value: value})
}
}

iters := make([]iter.Iterator, 0, len(c.chunks))
for _, chunk := range c.chunks {
samples, err := chunk.ForRangeAndType(typ, from, through, step)
Expand All @@ -68,9 +80,10 @@ func (c *Chunks) Iterator(
continue
}

iters = append(iters, iter.NewLabelsSlice(c.labels, samples))
iters = append(iters, iter.NewLabelsSlice(lbls, samples))
}
return iter.NewNonOverlappingLabelsIterator(c.labels, iters), nil

return iter.NewNonOverlappingLabelsIterator(lbls, iters), nil
}

// TODO(twhitney): These values should be float64s (to match prometheus samples) or int64s (to match pattern samples)
Expand Down Expand Up @@ -127,7 +140,7 @@ func (c *Chunk) spaceFor(ts model.Time) bool {
return ts.Sub(c.Samples[0].Timestamp) < chunk.MaxChunkTime
}

//TODO(twhitney): any way to remove the duplication between this and the drain chunk ForRange method?
// TODO(twhitney): any way to remove the duplication between this and the drain chunk ForRange method?
// ForRangeAndType returns samples with only the values
// in the given range [start:end] and aggregates them by step duration.
// start and end are in milliseconds since epoch. step is a duration in milliseconds.
Expand Down
110 changes: 110 additions & 0 deletions pkg/pattern/metric/chunk_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package metric

import (
"context"
"reflect"
"testing"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -327,3 +331,109 @@ func TestForRangeAndType(t *testing.T) {
})
}
}

func Test_Chunks_Iterator(t *testing.T) {
ctx := context.Background()
lbls := labels.Labels{
labels.Label{Name: "foo", Value: "bar"},
labels.Label{Name: "container", Value: "jar"},
}
chunks := Chunks{
chunks: []Chunk{
{
Samples: []MetricSample{
{Timestamp: 2, Bytes: 2, Count: 1},
{Timestamp: 4, Bytes: 4, Count: 3},
{Timestamp: 6, Bytes: 6, Count: 5},
},
mint: 2,
maxt: 6,
},
},
labels: lbls,
}

t.Run("without grouping", func(t *testing.T) {
it, err := chunks.Iterator(ctx, Bytes, nil, 0, 10, 2)
require.NoError(t, err)

res, err := iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, lbls.String(), res.Series[0].GetLabels())

it, err = chunks.Iterator(ctx, Count, nil, 0, 10, 2)
require.NoError(t, err)

res, err = iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, lbls.String(), res.Series[0].GetLabels())
})

t.Run("grouping", func(t *testing.T) {
grouping := &syntax.Grouping{
Groups: []string{"container"},
Without: false,
}

expectedLabels := labels.Labels{
labels.Label{
Name: "container",
Value: "jar",
},
}

it, err := chunks.Iterator(ctx, Bytes, grouping, 0, 10, 2)
require.NoError(t, err)

res, err := iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels())

it, err = chunks.Iterator(ctx, Count, grouping, 0, 10, 2)
require.NoError(t, err)

res, err = iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels())
})

t.Run("grouping by a missing label", func(t *testing.T) {
grouping := &syntax.Grouping{
Groups: []string{"missing"},
Without: false,
}

expectedLabels := labels.Labels{
labels.Label{
Name: "missing",
Value: "",
},
}

it, err := chunks.Iterator(ctx, Bytes, grouping, 0, 10, 2)
require.NoError(t, err)

res, err := iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels())

it, err = chunks.Iterator(ctx, Count, grouping, 0, 10, 2)
require.NoError(t, err)

res, err = iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels())
})
}
22 changes: 12 additions & 10 deletions pkg/pattern/metric/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

// TODO(twhitney): duplication with code in NewStepEvaluator
func ExtractMetricType(expr syntax.SampleExpr) (MetricType, error) {
func extractMetricType(expr syntax.SampleExpr) (MetricType, error) {
var typ MetricType
switch e := expr.(type) {
case *syntax.VectorAggregationExpr:
Expand Down Expand Up @@ -57,7 +57,6 @@ type SampleEvaluatorFactory interface {
ctx context.Context,
nextEvaluatorFactory SampleEvaluatorFactory,
expr syntax.SampleExpr,
typ MetricType,
from, through, step model.Time,
) (logql.StepEvaluator, error)
}
Expand All @@ -66,18 +65,16 @@ type SampleEvaluatorFunc func(
ctx context.Context,
nextEvaluatorFactory SampleEvaluatorFactory,
expr syntax.SampleExpr,
typ MetricType,
from, through, step model.Time,
) (logql.StepEvaluator, error)

func (s SampleEvaluatorFunc) NewStepEvaluator(
ctx context.Context,
nextEvaluatorFactory SampleEvaluatorFactory,
expr syntax.SampleExpr,
typ MetricType,
from, through, step model.Time,
) (logql.StepEvaluator, error) {
return s(ctx, nextEvaluatorFactory, expr, typ, from, through, step)
return s(ctx, nextEvaluatorFactory, expr, from, through, step)
}

type DefaultEvaluatorFactory struct {
Expand All @@ -94,9 +91,13 @@ func (ev *DefaultEvaluatorFactory) NewStepEvaluator(
ctx context.Context,
evFactory SampleEvaluatorFactory,
expr syntax.SampleExpr,
typ MetricType,
from, through, step model.Time,
) (logql.StepEvaluator, error) {
metricType, err := extractMetricType(expr)
if err != nil || metricType == Unsupported {
return nil, err
}

switch e := expr.(type) {
case *syntax.VectorAggregationExpr:
if rangExpr, ok := e.Left.(*syntax.RangeAggregationExpr); ok && e.Operation == syntax.OpTypeSum {
Expand All @@ -106,12 +107,11 @@ func (ev *DefaultEvaluatorFactory) NewStepEvaluator(
func(ctx context.Context,
_ SampleEvaluatorFactory,
_ syntax.SampleExpr,
typ MetricType,
from, through, step model.Time,
) (logql.StepEvaluator, error) {
fromWithRangeAndOffset := from.Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset)
throughWithOffset := through.Add(-rangExpr.Left.Offset)
it, err := ev.chunks.Iterator(ctx, typ, fromWithRangeAndOffset, throughWithOffset, step)
it, err := ev.chunks.Iterator(ctx, metricType, e.Grouping, fromWithRangeAndOffset, throughWithOffset, step)
if err != nil {
return nil, err
}
Expand All @@ -129,7 +129,7 @@ func (ev *DefaultEvaluatorFactory) NewStepEvaluator(
if e.Grouping == nil {
return nil, errors.Errorf("aggregation operator '%q' without grouping", e.Operation)
}
nextEvaluator, err := evFactory.NewStepEvaluator(ctx, evFactory, e.Left, typ, from, through, step)
nextEvaluator, err := evFactory.NewStepEvaluator(ctx, evFactory, e.Left, from, through, step)
if err != nil {
return nil, err
}
Expand All @@ -145,7 +145,7 @@ func (ev *DefaultEvaluatorFactory) NewStepEvaluator(
case *syntax.RangeAggregationExpr:
fromWithRangeAndOffset := from.Add(-e.Left.Interval).Add(-e.Left.Offset)
throughWithOffset := through.Add(-e.Left.Offset)
it, err := ev.chunks.Iterator(ctx, typ, fromWithRangeAndOffset, throughWithOffset, step)
it, err := ev.chunks.Iterator(ctx, metricType, e.Grouping, fromWithRangeAndOffset, throughWithOffset, step)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -250,6 +250,8 @@ type SeriesToSampleIterator struct {
lbls labels.Labels
}

// TODO: could this me a matrix iterator that returned multiple samples with
// different labels for the same timestamp?
func NewSeriesToSampleIterator(series *promql.Series) *SeriesToSampleIterator {
return &SeriesToSampleIterator{
floats: series.Floats,
Expand Down
4 changes: 0 additions & 4 deletions pkg/pattern/metric/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@ func Test_SampleEvaluator(t *testing.T) {
expr, err := syntax.ParseSampleExpr(query)
require.NoError(t, err)

typ, err := ExtractMetricType(expr)
require.NoError(t, err)

evaluator, err := factory.NewStepEvaluator(
context.Background(),
factory,
expr.(syntax.SampleExpr),

Check failure on line 36 in pkg/pattern/metric/evaluator_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

S1040: type assertion to the same type: expr already has type syntax.SampleExpr (gosimple)
typ,
model.Time(now-fiveMin), model.Time(now), model.Time(fiveMin),
)

Expand Down
Loading

0 comments on commit 2587657

Please sign in to comment.