Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to range mapper #6030

Merged
merged 2 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,15 @@ type DownstreamEngine struct {
timeout time.Duration
downstreamable Downstreamable
limits Limits
metrics *ShardingMetrics
}

// NewDownstreamEngine constructs a *DownstreamEngine
func NewDownstreamEngine(opts EngineOpts, downstreamable Downstreamable, metrics *ShardingMetrics, limits Limits, logger log.Logger) *DownstreamEngine {
func NewDownstreamEngine(opts EngineOpts, downstreamable Downstreamable, limits Limits, logger log.Logger) *DownstreamEngine {
opts.applyDefault()
return &DownstreamEngine{
logger: logger,
timeout: opts.Timeout,
downstreamable: downstreamable,
metrics: metrics,
limits: limits,
}
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
"github.com/grafana/loki/pkg/logproto"
)

var nilMetrics = NewShardingMetrics(nil)
var nilShardMetrics = NewShardMapperMetrics(nil)
var nilRangeMetrics = NewRangeMapperMetrics(nil)

func TestMappingEquivalence(t *testing.T) {
var (
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestMappingEquivalence(t *testing.T) {

opts := EngineOpts{}
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, nilMetrics, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
params := NewLiteralParams(
Expand All @@ -77,7 +78,7 @@ func TestMappingEquivalence(t *testing.T) {
qry := regular.Query(params)
ctx := user.InjectOrgID(context.Background(), "fake")

mapper, err := NewShardMapper(shards, nilMetrics)
mapper, err := NewShardMapper(shards, nilShardMetrics)
require.Nil(t, err)
_, mapped, err := mapper.Parse(tc.query)
require.Nil(t, err)
Expand Down Expand Up @@ -260,7 +261,7 @@ func TestRangeMappingEquivalence(t *testing.T) {

opts := EngineOpts{}
regularEngine := NewEngine(opts, q, NoLimits, log.NewNopLogger())
downstreamEngine := NewDownstreamEngine(opts, MockDownstreamer{regularEngine}, nilMetrics, NoLimits, log.NewNopLogger())
downstreamEngine := NewDownstreamEngine(opts, MockDownstreamer{regularEngine}, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")
Expand All @@ -282,7 +283,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
require.Nil(t, err)

// Downstream engine - split by range
rangeMapper, err := NewRangeMapper(tc.splitByInterval)
rangeMapper, err := NewRangeMapper(tc.splitByInterval, nilRangeMetrics)
require.Nil(t, err)
noop, rangeExpr, err := rangeMapper.Parse(tc.query)
require.Nil(t, err)
Expand Down
81 changes: 81 additions & 0 deletions pkg/logql/mapper_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package logql

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// expression type used in metrics
const (
StreamsKey = "streams"
MetricsKey = "metrics"
)

// parsing evaluation result used in metrics
const (
SuccessKey = "success"
FailureKey = "failure"
NoopKey = "noop"
)

// MapperMetrics is the metrics wrapper used in logql mapping (shard and range)
type MapperMetrics struct {
DownstreamQueries *prometheus.CounterVec // downstream queries total, partitioned by streams/metrics
ParsedQueries *prometheus.CounterVec // parsed ASTs total, partitioned by success/failure/noop
DownstreamFactor prometheus.Histogram // per request downstream factor
}

func newMapperMetrics(registerer prometheus.Registerer, mapper string) *MapperMetrics {
return &MapperMetrics{
DownstreamQueries: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "query_frontend_shards_total",
Help: "Number of downstream queries by expression type",
ConstLabels: prometheus.Labels{"mapper": mapper},
}, []string{"type"}),
ParsedQueries: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "query_frontend_sharding_parsed_queries_total",
Help: "Number of parsed queries by evaluation type",
ConstLabels: prometheus.Labels{"mapper": mapper},
}, []string{"type"}),
DownstreamFactor: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "query_frontend_shard_factor",
Help: "Number of downstream queries per request",
Buckets: prometheus.LinearBuckets(0, 16, 4),
ConstLabels: prometheus.Labels{"mapper": mapper},
}),
Comment on lines +30 to +48
Copy link
Contributor Author

@ssncferreira ssncferreira Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these metric names are now shared amongst all mappers, the names should be updated to make them more general, i.e., replacing "shards" with "downstream". However, this would result in a breaking change. This is my preferred option, but don't know if there is a process for "deprecating" metric names as well as the real impact this might have on current users (e.g., dashboards would stop working, ...)

An alternative would be to remove this generalization by keeping the shard and the range mapper (and future mappers) metrics with their own exclusive names. Wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can create a PR or an issue for that and merge it during a 3.0.

}
}

// downstreamRecorder wraps a vector & histogram, providing an easy way to increment downstream counts.
// and unify them into histogram entries.
// NOT SAFE FOR CONCURRENT USE! We avoid introducing mutex locking here
// because AST mapping is single threaded.
type downstreamRecorder struct {
done bool
total int
*MapperMetrics
}

// downstreamRecorder constructs a recorder using the underlying metrics.
func (m *MapperMetrics) downstreamRecorder() *downstreamRecorder {
return &downstreamRecorder{
MapperMetrics: m,
}
}

// Add increments both the downstream count and tracks it for the eventual histogram entry.
func (r *downstreamRecorder) Add(x int, key string) {
r.total += x
r.DownstreamQueries.WithLabelValues(key).Add(float64(x))
}

// Finish idemptotently records a histogram entry with the total downstream factor.
func (r *downstreamRecorder) Finish() {
if !r.done {
r.done = true
r.DownstreamFactor.Observe(float64(r.total))
}
}
67 changes: 45 additions & 22 deletions pkg/logql/rangemapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/logql/syntax"
util_log "github.com/grafana/loki/pkg/util/log"
Expand Down Expand Up @@ -51,19 +52,25 @@ var splittableRangeVectorOp = map[string]struct{}{
// using the same rules as above.
type RangeMapper struct {
splitByInterval time.Duration
metrics *MapperMetrics
}

// NewRangeMapper creates a new RangeMapper instance with the given duration as
// split interval. The interval must be greater than 0.
func NewRangeMapper(interval time.Duration) (RangeMapper, error) {
func NewRangeMapper(interval time.Duration, metrics *MapperMetrics) (RangeMapper, error) {
if interval <= 0 {
return RangeMapper{}, fmt.Errorf("cannot create RangeMapper with splitByInterval <= 0; got %s", interval)
}
return RangeMapper{
splitByInterval: interval,
metrics: metrics,
}, nil
}

func NewRangeMapperMetrics(registerer prometheus.Registerer) *MapperMetrics {
return newMapperMetrics(registerer, "range")
}

// Parse parses the given LogQL query string into a sample expression and
// applies the rewrite rules for splitting it into a sample expression that can
// be executed by the downstream engine.
Expand All @@ -75,23 +82,36 @@ func (m RangeMapper) Parse(query string) (bool, syntax.Expr, error) {
return true, nil, err
}

recorder := m.metrics.downstreamRecorder()

if !isSplittableByRange(origExpr) {
m.metrics.ParsedQueries.WithLabelValues(NoopKey).Inc()
return true, origExpr, nil
}

modExpr, err := m.Map(origExpr, nil)
modExpr, err := m.Map(origExpr, nil, recorder)
if err != nil {
m.metrics.ParsedQueries.WithLabelValues(FailureKey).Inc()
return true, nil, err
}

return origExpr.String() == modExpr.String(), modExpr, err
noop := origExpr.String() == modExpr.String()
if noop {
m.metrics.ParsedQueries.WithLabelValues(NoopKey).Inc()
} else {
m.metrics.ParsedQueries.WithLabelValues(SuccessKey).Inc()
}

recorder.Finish() // only record metrics for successful mappings

return noop, modExpr, err
}

// Map rewrites sample expression expr and returns the resultant sample expression to be executed by the downstream engine
// It is called recursively on the expression tree.
// The function takes an optional vector aggregation as second argument, that
// is pushed down to the downstream expression.
func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.VectorAggregationExpr) (syntax.SampleExpr, error) {
func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.VectorAggregationExpr, recorder *downstreamRecorder) (syntax.SampleExpr, error) {
// immediately clone the passed expr to avoid mutating the original
expr, err := clone(expr)
if err != nil {
Expand All @@ -100,15 +120,15 @@ func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.Vect

switch e := expr.(type) {
case *syntax.VectorAggregationExpr:
return m.mapVectorAggregationExpr(e)
return m.mapVectorAggregationExpr(e, recorder)
case *syntax.RangeAggregationExpr:
return m.mapRangeAggregationExpr(e, vectorAggrPushdown), nil
return m.mapRangeAggregationExpr(e, vectorAggrPushdown, recorder), nil
case *syntax.BinOpExpr:
lhsMapped, err := m.Map(e.SampleExpr, vectorAggrPushdown)
lhsMapped, err := m.Map(e.SampleExpr, vectorAggrPushdown, recorder)
if err != nil {
return nil, err
}
rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown)
rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown, recorder)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,7 +178,7 @@ func hasLabelExtractionStage(expr syntax.SampleExpr) bool {
// Example:
// rate({app="foo"}[2m])
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration) syntax.SampleExpr {
func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{
Left: expr.Left,
Operation: operation,
Expand All @@ -174,7 +194,7 @@ func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrid
}
return &syntax.BinOpExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: m.mapConcatSampleExpr(downstreamExpr, rangeInterval),
Left: m.mapConcatSampleExpr(downstreamExpr, rangeInterval, recorder),
Grouping: &syntax.Grouping{
Without: true,
},
Expand All @@ -194,7 +214,7 @@ func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrid
// => min without (bytes_over_time({job="bar"} [1m]) ++ bytes_over_time({job="bar"} [1m] offset 1m))
// min by (app) (bytes_over_time({job="bar"} [2m])
// => min without (min by (app) (bytes_over_time({job="bar"} [1m])) ++ min by (app) (bytes_over_time({job="bar"} [1m] offset 1m)))
func (m RangeMapper) vectorAggrWithRangeDownstreams(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr, op string, rangeInterval time.Duration) syntax.SampleExpr {
func (m RangeMapper) vectorAggrWithRangeDownstreams(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr, op string, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
grouping := expr.Grouping
if expr.Grouping == nil {
grouping = &syntax.Grouping{
Expand All @@ -206,13 +226,13 @@ func (m RangeMapper) vectorAggrWithRangeDownstreams(expr *syntax.RangeAggregatio
downstream = vectorAggrPushdown
}
return &syntax.VectorAggregationExpr{
Left: m.mapConcatSampleExpr(downstream, rangeInterval),
Left: m.mapConcatSampleExpr(downstream, rangeInterval, recorder),
Grouping: grouping,
Operation: op,
}
}

// appendDownstream adds expression expr with a range interval 'interval' and offset 'offset' to the downstreams list.
// appendDownstream adds expression expr with a range interval 'interval' and offset 'offset' to the downstreams list.
// Returns the updated downstream ConcatSampleExpr.
func appendDownstream(downstreams *ConcatSampleExpr, expr syntax.SampleExpr, interval time.Duration, offset time.Duration) *ConcatSampleExpr {
sampleExpr, _ := clone(expr)
Expand All @@ -237,7 +257,7 @@ func appendDownstream(downstreams *ConcatSampleExpr, expr syntax.SampleExpr, int
// mapConcatSampleExpr transform expr in multiple downstream subexpressions split by offset range interval
// rangeInterval should be greater than m.splitByInterval, otherwise the resultant expression
// will have an unnecessary aggregation operation
func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval time.Duration) syntax.SampleExpr {
func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
splitCount := int(rangeInterval / m.splitByInterval)

if splitCount == 0 {
Expand All @@ -249,16 +269,19 @@ func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval t
for split = 0; split < splitCount; split++ {
downstreams = appendDownstream(downstreams, expr, m.splitByInterval, time.Duration(split)*m.splitByInterval)
}
recorder.Add(splitCount, MetricsKey)

// Add the remainder offset interval
if rangeInterval%m.splitByInterval != 0 {
offset := time.Duration(split) * m.splitByInterval
downstreams = appendDownstream(downstreams, expr, rangeInterval-offset, offset)
recorder.Add(1, MetricsKey)
}

return downstreams
}

func (m RangeMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr) (syntax.SampleExpr, error) {
func (m RangeMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr, recorder *downstreamRecorder) (syntax.SampleExpr, error) {
rangeInterval := getRangeInterval(expr)

// in case the interval is smaller than the configured split interval,
Expand All @@ -277,7 +300,7 @@ func (m RangeMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
}

// Split the vector aggregation's inner expression
lhsMapped, err := m.Map(expr.Left, vectorAggrPushdown)
lhsMapped, err := m.Map(expr.Left, vectorAggrPushdown, recorder)
if err != nil {
return nil, err
}
Expand All @@ -297,7 +320,7 @@ func (m RangeMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
// contains the initial query which will be the downstream expression with a split range interval.
// Example: `sum by (a) (bytes_over_time)`
// Is mapped to `sum by (a) (sum without downstream<sum by (a) (bytes_over_time)>++downstream<sum by (a) (bytes_over_time)>++...)`
func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr) syntax.SampleExpr {
func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr, recorder *downstreamRecorder) syntax.SampleExpr {
rangeInterval := getRangeInterval(expr)

// in case the interval is smaller than the configured split interval,
Expand All @@ -313,15 +336,15 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
}
switch expr.Operation {
case syntax.OpRangeTypeBytes, syntax.OpRangeTypeCount, syntax.OpRangeTypeSum:
return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeSum, rangeInterval)
return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeSum, rangeInterval, recorder)
case syntax.OpRangeTypeMax:
return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMax, rangeInterval)
return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMax, rangeInterval, recorder)
case syntax.OpRangeTypeMin:
return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMin, rangeInterval)
return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMin, rangeInterval, recorder)
case syntax.OpRangeTypeRate:
return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval)
return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval, recorder)
case syntax.OpRangeTypeBytesRate:
return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeBytes, rangeInterval)
return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeBytes, rangeInterval, recorder)
default:
// this should not be reachable.
// If an operation is splittable it should have an optimization listed.
Expand Down
8 changes: 4 additions & 4 deletions pkg/logql/rangemapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func Test_SplitRangeInterval(t *testing.T) {
rvm, err := NewRangeMapper(2 * time.Second)
rvm, err := NewRangeMapper(2*time.Second, nilShardMetrics)
require.NoError(t, err)

for _, tc := range []struct {
Expand Down Expand Up @@ -50,7 +50,7 @@ func Test_SplitRangeInterval(t *testing.T) {
}

func Test_SplitRangeVectorMapping(t *testing.T) {
rvm, err := NewRangeMapper(time.Minute)
rvm, err := NewRangeMapper(time.Minute, nilShardMetrics)
require.NoError(t, err)

for _, tc := range []struct {
Expand Down Expand Up @@ -972,7 +972,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
}

func Test_SplitRangeVectorMapping_Noop(t *testing.T) {
rvm, err := NewRangeMapper(time.Minute)
rvm, err := NewRangeMapper(time.Minute, nilShardMetrics)
require.NoError(t, err)

for _, tc := range []struct {
Expand Down Expand Up @@ -1050,7 +1050,7 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) {
}

func Test_FailQuery(t *testing.T) {
rvm, err := NewRangeMapper(2 * time.Minute)
rvm, err := NewRangeMapper(2*time.Minute, nilShardMetrics)
require.NoError(t, err)
_, _, err = rvm.Parse(`{app="foo"} |= "err"`)
require.Error(t, err)
Expand Down
Loading