diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 568fa22f65134..1703a034a07a2 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -46,17 +46,15 @@ type DownstreamEngine struct { timeout time.Duration downstreamable Downstreamable limits Limits - metrics *ShardingMetrics } // NewDownstreamEngine constructs a *DownstreamEngine -func NewDownstreamEngine(opts EngineOpts, downstreamable Downstreamable, metrics *ShardingMetrics, limits Limits, logger log.Logger) *DownstreamEngine { +func NewDownstreamEngine(opts EngineOpts, downstreamable Downstreamable, limits Limits, logger log.Logger) *DownstreamEngine { opts.applyDefault() return &DownstreamEngine{ logger: logger, timeout: opts.Timeout, downstreamable: downstreamable, - metrics: metrics, limits: limits, } } diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index c8ffebfcf11fa..b584d721b22c9 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -14,7 +14,8 @@ import ( "github.com/grafana/loki/pkg/logproto" ) -var nilMetrics = NewShardingMetrics(nil) +var nilShardMetrics = NewShardMapperMetrics(nil) +var nilRangeMetrics = NewRangeMapperMetrics(nil) func TestMappingEquivalence(t *testing.T) { var ( @@ -61,7 +62,7 @@ func TestMappingEquivalence(t *testing.T) { opts := EngineOpts{} regular := NewEngine(opts, q, NoLimits, log.NewNopLogger()) - sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, nilMetrics, NoLimits, log.NewNopLogger()) + sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger()) t.Run(tc.query, func(t *testing.T) { params := NewLiteralParams( @@ -77,7 +78,7 @@ func TestMappingEquivalence(t *testing.T) { qry := regular.Query(params) ctx := user.InjectOrgID(context.Background(), "fake") - mapper, err := NewShardMapper(shards, nilMetrics) + mapper, err := NewShardMapper(shards, nilShardMetrics) require.Nil(t, err) _, mapped, err := mapper.Parse(tc.query) require.Nil(t, err) @@ -260,7 +261,7 @@ func TestRangeMappingEquivalence(t *testing.T) { opts := EngineOpts{} regularEngine := NewEngine(opts, q, NoLimits, log.NewNopLogger()) - downstreamEngine := NewDownstreamEngine(opts, MockDownstreamer{regularEngine}, nilMetrics, NoLimits, log.NewNopLogger()) + downstreamEngine := NewDownstreamEngine(opts, MockDownstreamer{regularEngine}, NoLimits, log.NewNopLogger()) t.Run(tc.query, func(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "fake") @@ -282,7 +283,7 @@ func TestRangeMappingEquivalence(t *testing.T) { require.Nil(t, err) // Downstream engine - split by range - rangeMapper, err := NewRangeMapper(tc.splitByInterval) + rangeMapper, err := NewRangeMapper(tc.splitByInterval, nilRangeMetrics) require.Nil(t, err) noop, rangeExpr, err := rangeMapper.Parse(tc.query) require.Nil(t, err) diff --git a/pkg/logql/mapper_metrics.go b/pkg/logql/mapper_metrics.go new file mode 100644 index 0000000000000..1e9e8d0df6d13 --- /dev/null +++ b/pkg/logql/mapper_metrics.go @@ -0,0 +1,81 @@ +package logql + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// expression type used in metrics +const ( + StreamsKey = "streams" + MetricsKey = "metrics" +) + +// parsing evaluation result used in metrics +const ( + SuccessKey = "success" + FailureKey = "failure" + NoopKey = "noop" +) + +// MapperMetrics is the metrics wrapper used in logql mapping (shard and range) +type MapperMetrics struct { + DownstreamQueries *prometheus.CounterVec // downstream queries total, partitioned by streams/metrics + ParsedQueries *prometheus.CounterVec // parsed ASTs total, partitioned by success/failure/noop + DownstreamFactor prometheus.Histogram // per request downstream factor +} + +func newMapperMetrics(registerer prometheus.Registerer, mapper string) *MapperMetrics { + return &MapperMetrics{ + DownstreamQueries: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "query_frontend_shards_total", + Help: "Number of downstream queries by expression type", + ConstLabels: prometheus.Labels{"mapper": mapper}, + }, []string{"type"}), + ParsedQueries: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "query_frontend_sharding_parsed_queries_total", + Help: "Number of parsed queries by evaluation type", + ConstLabels: prometheus.Labels{"mapper": mapper}, + }, []string{"type"}), + DownstreamFactor: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Namespace: "loki", + Name: "query_frontend_shard_factor", + Help: "Number of downstream queries per request", + Buckets: prometheus.LinearBuckets(0, 16, 4), + ConstLabels: prometheus.Labels{"mapper": mapper}, + }), + } +} + +// downstreamRecorder wraps a vector & histogram, providing an easy way to increment downstream counts. +// and unify them into histogram entries. +// NOT SAFE FOR CONCURRENT USE! We avoid introducing mutex locking here +// because AST mapping is single threaded. +type downstreamRecorder struct { + done bool + total int + *MapperMetrics +} + +// downstreamRecorder constructs a recorder using the underlying metrics. +func (m *MapperMetrics) downstreamRecorder() *downstreamRecorder { + return &downstreamRecorder{ + MapperMetrics: m, + } +} + +// Add increments both the downstream count and tracks it for the eventual histogram entry. +func (r *downstreamRecorder) Add(x int, key string) { + r.total += x + r.DownstreamQueries.WithLabelValues(key).Add(float64(x)) +} + +// Finish idemptotently records a histogram entry with the total downstream factor. +func (r *downstreamRecorder) Finish() { + if !r.done { + r.done = true + r.DownstreamFactor.Observe(float64(r.total)) + } +} diff --git a/pkg/logql/rangemapper.go b/pkg/logql/rangemapper.go index f7ac61d824880..86cf2d00274ed 100644 --- a/pkg/logql/rangemapper.go +++ b/pkg/logql/rangemapper.go @@ -6,6 +6,7 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/logql/syntax" util_log "github.com/grafana/loki/pkg/util/log" @@ -51,19 +52,25 @@ var splittableRangeVectorOp = map[string]struct{}{ // using the same rules as above. type RangeMapper struct { splitByInterval time.Duration + metrics *MapperMetrics } // NewRangeMapper creates a new RangeMapper instance with the given duration as // split interval. The interval must be greater than 0. -func NewRangeMapper(interval time.Duration) (RangeMapper, error) { +func NewRangeMapper(interval time.Duration, metrics *MapperMetrics) (RangeMapper, error) { if interval <= 0 { return RangeMapper{}, fmt.Errorf("cannot create RangeMapper with splitByInterval <= 0; got %s", interval) } return RangeMapper{ splitByInterval: interval, + metrics: metrics, }, nil } +func NewRangeMapperMetrics(registerer prometheus.Registerer) *MapperMetrics { + return newMapperMetrics(registerer, "range") +} + // Parse parses the given LogQL query string into a sample expression and // applies the rewrite rules for splitting it into a sample expression that can // be executed by the downstream engine. @@ -75,23 +82,36 @@ func (m RangeMapper) Parse(query string) (bool, syntax.Expr, error) { return true, nil, err } + recorder := m.metrics.downstreamRecorder() + if !isSplittableByRange(origExpr) { + m.metrics.ParsedQueries.WithLabelValues(NoopKey).Inc() return true, origExpr, nil } - modExpr, err := m.Map(origExpr, nil) + modExpr, err := m.Map(origExpr, nil, recorder) if err != nil { + m.metrics.ParsedQueries.WithLabelValues(FailureKey).Inc() return true, nil, err } - return origExpr.String() == modExpr.String(), modExpr, err + noop := origExpr.String() == modExpr.String() + if noop { + m.metrics.ParsedQueries.WithLabelValues(NoopKey).Inc() + } else { + m.metrics.ParsedQueries.WithLabelValues(SuccessKey).Inc() + } + + recorder.Finish() // only record metrics for successful mappings + + return noop, modExpr, err } // Map rewrites sample expression expr and returns the resultant sample expression to be executed by the downstream engine // It is called recursively on the expression tree. // The function takes an optional vector aggregation as second argument, that // is pushed down to the downstream expression. -func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.VectorAggregationExpr) (syntax.SampleExpr, error) { +func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.VectorAggregationExpr, recorder *downstreamRecorder) (syntax.SampleExpr, error) { // immediately clone the passed expr to avoid mutating the original expr, err := clone(expr) if err != nil { @@ -100,15 +120,15 @@ func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.Vect switch e := expr.(type) { case *syntax.VectorAggregationExpr: - return m.mapVectorAggregationExpr(e) + return m.mapVectorAggregationExpr(e, recorder) case *syntax.RangeAggregationExpr: - return m.mapRangeAggregationExpr(e, vectorAggrPushdown), nil + return m.mapRangeAggregationExpr(e, vectorAggrPushdown, recorder), nil case *syntax.BinOpExpr: - lhsMapped, err := m.Map(e.SampleExpr, vectorAggrPushdown) + lhsMapped, err := m.Map(e.SampleExpr, vectorAggrPushdown, recorder) if err != nil { return nil, err } - rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown) + rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown, recorder) if err != nil { return nil, err } @@ -158,7 +178,7 @@ func hasLabelExtractionStage(expr syntax.SampleExpr) bool { // Example: // rate({app="foo"}[2m]) // => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120) -func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration) syntax.SampleExpr { +func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr { var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{ Left: expr.Left, Operation: operation, @@ -174,7 +194,7 @@ func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrid } return &syntax.BinOpExpr{ SampleExpr: &syntax.VectorAggregationExpr{ - Left: m.mapConcatSampleExpr(downstreamExpr, rangeInterval), + Left: m.mapConcatSampleExpr(downstreamExpr, rangeInterval, recorder), Grouping: &syntax.Grouping{ Without: true, }, @@ -194,7 +214,7 @@ func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrid // => min without (bytes_over_time({job="bar"} [1m]) ++ bytes_over_time({job="bar"} [1m] offset 1m)) // min by (app) (bytes_over_time({job="bar"} [2m]) // => min without (min by (app) (bytes_over_time({job="bar"} [1m])) ++ min by (app) (bytes_over_time({job="bar"} [1m] offset 1m))) -func (m RangeMapper) vectorAggrWithRangeDownstreams(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr, op string, rangeInterval time.Duration) syntax.SampleExpr { +func (m RangeMapper) vectorAggrWithRangeDownstreams(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr, op string, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr { grouping := expr.Grouping if expr.Grouping == nil { grouping = &syntax.Grouping{ @@ -206,13 +226,13 @@ func (m RangeMapper) vectorAggrWithRangeDownstreams(expr *syntax.RangeAggregatio downstream = vectorAggrPushdown } return &syntax.VectorAggregationExpr{ - Left: m.mapConcatSampleExpr(downstream, rangeInterval), + Left: m.mapConcatSampleExpr(downstream, rangeInterval, recorder), Grouping: grouping, Operation: op, } } -// appendDownstream adds expression expr with a range interval 'interval' and offset 'offset' to the downstreams list. +// appendDownstream adds expression expr with a range interval 'interval' and offset 'offset' to the downstreams list. // Returns the updated downstream ConcatSampleExpr. func appendDownstream(downstreams *ConcatSampleExpr, expr syntax.SampleExpr, interval time.Duration, offset time.Duration) *ConcatSampleExpr { sampleExpr, _ := clone(expr) @@ -237,7 +257,7 @@ func appendDownstream(downstreams *ConcatSampleExpr, expr syntax.SampleExpr, int // mapConcatSampleExpr transform expr in multiple downstream subexpressions split by offset range interval // rangeInterval should be greater than m.splitByInterval, otherwise the resultant expression // will have an unnecessary aggregation operation -func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval time.Duration) syntax.SampleExpr { +func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr { splitCount := int(rangeInterval / m.splitByInterval) if splitCount == 0 { @@ -249,16 +269,19 @@ func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval t for split = 0; split < splitCount; split++ { downstreams = appendDownstream(downstreams, expr, m.splitByInterval, time.Duration(split)*m.splitByInterval) } + recorder.Add(splitCount, MetricsKey) + // Add the remainder offset interval if rangeInterval%m.splitByInterval != 0 { offset := time.Duration(split) * m.splitByInterval downstreams = appendDownstream(downstreams, expr, rangeInterval-offset, offset) + recorder.Add(1, MetricsKey) } return downstreams } -func (m RangeMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr) (syntax.SampleExpr, error) { +func (m RangeMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr, recorder *downstreamRecorder) (syntax.SampleExpr, error) { rangeInterval := getRangeInterval(expr) // in case the interval is smaller than the configured split interval, @@ -277,7 +300,7 @@ func (m RangeMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr } // Split the vector aggregation's inner expression - lhsMapped, err := m.Map(expr.Left, vectorAggrPushdown) + lhsMapped, err := m.Map(expr.Left, vectorAggrPushdown, recorder) if err != nil { return nil, err } @@ -297,7 +320,7 @@ func (m RangeMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr // contains the initial query which will be the downstream expression with a split range interval. // Example: `sum by (a) (bytes_over_time)` // Is mapped to `sum by (a) (sum without downstream++downstream++...)` -func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr) syntax.SampleExpr { +func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr, recorder *downstreamRecorder) syntax.SampleExpr { rangeInterval := getRangeInterval(expr) // in case the interval is smaller than the configured split interval, @@ -313,15 +336,15 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, } switch expr.Operation { case syntax.OpRangeTypeBytes, syntax.OpRangeTypeCount, syntax.OpRangeTypeSum: - return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeSum, rangeInterval) + return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeSum, rangeInterval, recorder) case syntax.OpRangeTypeMax: - return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMax, rangeInterval) + return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMax, rangeInterval, recorder) case syntax.OpRangeTypeMin: - return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMin, rangeInterval) + return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMin, rangeInterval, recorder) case syntax.OpRangeTypeRate: - return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval) + return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval, recorder) case syntax.OpRangeTypeBytesRate: - return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeBytes, rangeInterval) + return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeBytes, rangeInterval, recorder) default: // this should not be reachable. // If an operation is splittable it should have an optimization listed. diff --git a/pkg/logql/rangemapper_test.go b/pkg/logql/rangemapper_test.go index 9c77a03f22b0e..e3ed1ac726252 100644 --- a/pkg/logql/rangemapper_test.go +++ b/pkg/logql/rangemapper_test.go @@ -8,7 +8,7 @@ import ( ) func Test_SplitRangeInterval(t *testing.T) { - rvm, err := NewRangeMapper(2 * time.Second) + rvm, err := NewRangeMapper(2*time.Second, nilShardMetrics) require.NoError(t, err) for _, tc := range []struct { @@ -50,7 +50,7 @@ func Test_SplitRangeInterval(t *testing.T) { } func Test_SplitRangeVectorMapping(t *testing.T) { - rvm, err := NewRangeMapper(time.Minute) + rvm, err := NewRangeMapper(time.Minute, nilShardMetrics) require.NoError(t, err) for _, tc := range []struct { @@ -972,7 +972,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) { } func Test_SplitRangeVectorMapping_Noop(t *testing.T) { - rvm, err := NewRangeMapper(time.Minute) + rvm, err := NewRangeMapper(time.Minute, nilShardMetrics) require.NoError(t, err) for _, tc := range []struct { @@ -1050,7 +1050,7 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) { } func Test_FailQuery(t *testing.T) { - rvm, err := NewRangeMapper(2 * time.Minute) + rvm, err := NewRangeMapper(2*time.Minute, nilShardMetrics) require.NoError(t, err) _, _, err = rvm.Parse(`{app="foo"} |= "err"`) require.Error(t, err) diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index 0cb9ef1af5f1d..2d06c8d8735ab 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -6,86 +6,20 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/querier/astmapper" util_log "github.com/grafana/loki/pkg/util/log" ) -// keys used in metrics -const ( - StreamsKey = "streams" - MetricsKey = "metrics" - SuccessKey = "success" - FailureKey = "failure" - NoopKey = "noop" -) - -// ShardingMetrics is the metrics wrapper used in shard mapping -type ShardingMetrics struct { - Shards *prometheus.CounterVec // sharded queries total, partitioned by (streams/metric) - ShardFactor prometheus.Histogram // per request shard factor - parsed *prometheus.CounterVec // parsed ASTs total, partitioned by (success/failure/noop) -} - -func NewShardingMetrics(registerer prometheus.Registerer) *ShardingMetrics { - return &ShardingMetrics{ - Shards: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Namespace: "loki", - Name: "query_frontend_shards_total", - }, []string{"type"}), - parsed: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Namespace: "loki", - Name: "query_frontend_sharding_parsed_queries_total", - }, []string{"type"}), - ShardFactor: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ - Namespace: "loki", - Name: "query_frontend_shard_factor", - Help: "Number of shards per request", - Buckets: prometheus.LinearBuckets(0, 16, 4), // 16 is the default shard factor for later schemas - }), - } -} - -// shardRecorder constructs a recorder using the underlying metrics. -func (m *ShardingMetrics) shardRecorder() *shardRecorder { - return &shardRecorder{ - ShardingMetrics: m, - } -} - -// shardRecorder wraps a vector & histogram, providing an easy way to increment sharding counts. -// and unify them into histogram entries. -// NOT SAFE FOR CONCURRENT USE! We avoid introducing mutex locking here -// because AST mapping is single threaded. -type shardRecorder struct { - done bool - total int - *ShardingMetrics -} - -// Add increments both the shard count and tracks it for the eventual histogram entry. -func (r *shardRecorder) Add(x int, key string) { - r.total += x - r.Shards.WithLabelValues(key).Add(float64(x)) -} - -// Finish idemptotently records a histogram entry with the total shard factor. -func (r *shardRecorder) Finish() { - if !r.done { - r.done = true - r.ShardFactor.Observe(float64(r.total)) - } -} - -func badASTMapping(got syntax.Expr) error { - return fmt.Errorf("bad AST mapping: expected SampleExpr, but got (%T)", got) +type ShardMapper struct { + shards int + metrics *MapperMetrics } -func NewShardMapper(shards int, metrics *ShardingMetrics) (ShardMapper, error) { +func NewShardMapper(shards int, metrics *MapperMetrics) (ShardMapper, error) { if shards < 2 { - return ShardMapper{}, fmt.Errorf("Cannot create ShardMapper with <2 shards. Received %d", shards) + return ShardMapper{}, fmt.Errorf("cannot create ShardMapper with <2 shards. Received %d", shards) } return ShardMapper{ shards: shards, @@ -93,9 +27,8 @@ func NewShardMapper(shards int, metrics *ShardingMetrics) (ShardMapper, error) { }, nil } -type ShardMapper struct { - shards int - metrics *ShardingMetrics +func NewShardMapperMetrics(registerer prometheus.Registerer) *MapperMetrics { + return newMapperMetrics(registerer, "shard") } func (m ShardMapper) Parse(query string) (noop bool, expr syntax.Expr, err error) { @@ -104,11 +37,11 @@ func (m ShardMapper) Parse(query string) (noop bool, expr syntax.Expr, err error return false, nil, err } - recorder := m.metrics.shardRecorder() + recorder := m.metrics.downstreamRecorder() mapped, err := m.Map(parsed, recorder) if err != nil { - m.metrics.parsed.WithLabelValues(FailureKey).Inc() + m.metrics.ParsedQueries.WithLabelValues(FailureKey).Inc() return false, nil, err } @@ -116,9 +49,9 @@ func (m ShardMapper) Parse(query string) (noop bool, expr syntax.Expr, err error mappedStr := mapped.String() noop = originalStr == mappedStr if noop { - m.metrics.parsed.WithLabelValues(NoopKey).Inc() + m.metrics.ParsedQueries.WithLabelValues(NoopKey).Inc() } else { - m.metrics.parsed.WithLabelValues(SuccessKey).Inc() + m.metrics.ParsedQueries.WithLabelValues(SuccessKey).Inc() } recorder.Finish() // only record metrics for successful mappings @@ -126,7 +59,7 @@ func (m ShardMapper) Parse(query string) (noop bool, expr syntax.Expr, err error return noop, mapped, err } -func (m ShardMapper) Map(expr syntax.Expr, r *shardRecorder) (syntax.Expr, error) { +func (m ShardMapper) Map(expr syntax.Expr, r *downstreamRecorder) (syntax.Expr, error) { // immediately clone the passed expr to avoid mutating the original expr, err := syntax.Clone(expr) if err != nil { @@ -169,7 +102,7 @@ func (m ShardMapper) Map(expr syntax.Expr, r *shardRecorder) (syntax.Expr, error } } -func (m ShardMapper) mapLogSelectorExpr(expr syntax.LogSelectorExpr, r *shardRecorder) syntax.LogSelectorExpr { +func (m ShardMapper) mapLogSelectorExpr(expr syntax.LogSelectorExpr, r *downstreamRecorder) syntax.LogSelectorExpr { var head *ConcatLogSelectorExpr for i := m.shards - 1; i >= 0; i-- { head = &ConcatLogSelectorExpr{ @@ -188,7 +121,7 @@ func (m ShardMapper) mapLogSelectorExpr(expr syntax.LogSelectorExpr, r *shardRec return head } -func (m ShardMapper) mapSampleExpr(expr syntax.SampleExpr, r *shardRecorder) syntax.SampleExpr { +func (m ShardMapper) mapSampleExpr(expr syntax.SampleExpr, r *downstreamRecorder) syntax.SampleExpr { var head *ConcatSampleExpr for i := m.shards - 1; i >= 0; i-- { head = &ConcatSampleExpr{ @@ -209,7 +142,7 @@ func (m ShardMapper) mapSampleExpr(expr syntax.SampleExpr, r *shardRecorder) syn // technically, std{dev,var} are also parallelizable if there is no cross-shard merging // in descendent nodes in the AST. This optimization is currently avoided for simplicity. -func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr, r *shardRecorder) (syntax.SampleExpr, error) { +func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr, r *downstreamRecorder) (syntax.SampleExpr, error) { // if this AST contains unshardable operations, don't shard this at this level, // but attempt to shard a child node. if !expr.Shardable() { @@ -285,7 +218,7 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr } } -func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *shardRecorder) (syntax.SampleExpr, error) { +func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *downstreamRecorder) (syntax.SampleExpr, error) { subMapped, err := m.Map(expr.Left, r) if err != nil { return nil, err @@ -295,10 +228,10 @@ func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *shard return &cpy, nil } -func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, r *shardRecorder) syntax.SampleExpr { +func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, r *downstreamRecorder) syntax.SampleExpr { if hasLabelModifier(expr) { - // if an expr can modify labels this means multiple shards can returns the same labelset. - // When this happens the merge strategy needs to be different than a simple concatenation. + // if an expr can modify labels this means multiple shards can return the same labelset. + // When this happens the merge strategy needs to be different from a simple concatenation. // For instance for rates we need to sum data from different shards but same series. // Since we currently support only concatenation as merge strategy, we skip those queries. return expr @@ -329,3 +262,7 @@ func hasLabelModifier(expr *syntax.RangeAggregationExpr) bool { } return false } + +func badASTMapping(got syntax.Expr) error { + return fmt.Errorf("bad AST mapping: expected SampleExpr, but got (%T)", got) +} diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index 7030656b10615..c9c30824f159c 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -51,7 +51,7 @@ func TestShardedStringer(t *testing.T) { } func TestMapSampleExpr(t *testing.T) { - m, err := NewShardMapper(2, nilMetrics) + m, err := NewShardMapper(2, nilShardMetrics) require.Nil(t, err) for _, tc := range []struct { @@ -106,13 +106,13 @@ func TestMapSampleExpr(t *testing.T) { }, } { t.Run(tc.in.String(), func(t *testing.T) { - require.Equal(t, tc.out, m.mapSampleExpr(tc.in, nilMetrics.shardRecorder())) + require.Equal(t, tc.out, m.mapSampleExpr(tc.in, nilShardMetrics.downstreamRecorder())) }) } } func TestMappingStrings(t *testing.T) { - m, err := NewShardMapper(2, nilMetrics) + m, err := NewShardMapper(2, nilShardMetrics) require.Nil(t, err) for _, tc := range []struct { in string @@ -260,7 +260,7 @@ func TestMappingStrings(t *testing.T) { ast, err := syntax.ParseExpr(tc.in) require.Nil(t, err) - mapped, err := m.Map(ast, nilMetrics.shardRecorder()) + mapped, err := m.Map(ast, nilShardMetrics.downstreamRecorder()) require.Nil(t, err) require.Equal(t, removeWhiteSpace(tc.out), removeWhiteSpace(mapped.String())) @@ -269,7 +269,7 @@ func TestMappingStrings(t *testing.T) { } func TestMapping(t *testing.T) { - m, err := NewShardMapper(2, nilMetrics) + m, err := NewShardMapper(2, nilShardMetrics) require.Nil(t, err) for _, tc := range []struct { @@ -1109,7 +1109,7 @@ func TestMapping(t *testing.T) { ast, err := syntax.ParseExpr(tc.in) require.Equal(t, tc.err, err) - mapped, err := m.Map(ast, nilMetrics.shardRecorder()) + mapped, err := m.Map(ast, nilShardMetrics.downstreamRecorder()) require.Equal(t, tc.err, err) require.Equal(t, tc.expr.String(), mapped.String()) diff --git a/pkg/querier/queryrange/metrics.go b/pkg/querier/queryrange/metrics.go index 25a9d883b94e8..81c3ed025251c 100644 --- a/pkg/querier/queryrange/metrics.go +++ b/pkg/querier/queryrange/metrics.go @@ -10,16 +10,28 @@ import ( type Metrics struct { *queryrangebase.InstrumentMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics - *logql.ShardingMetrics + *MiddlewareMapperMetrics *SplitByMetrics *LogResultCacheMetrics } +type MiddlewareMapperMetrics struct { + shardMapper *logql.MapperMetrics + rangeMapper *logql.MapperMetrics +} + +func NewMiddlewareMapperMetrics(registerer prometheus.Registerer) *MiddlewareMapperMetrics { + return &MiddlewareMapperMetrics{ + shardMapper: logql.NewShardMapperMetrics(registerer), + rangeMapper: logql.NewRangeMapperMetrics(registerer), + } +} + func NewMetrics(registerer prometheus.Registerer) *Metrics { return &Metrics{ InstrumentMiddlewareMetrics: queryrangebase.NewInstrumentMiddlewareMetrics(registerer), RetryMiddlewareMetrics: queryrangebase.NewRetryMiddlewareMetrics(registerer), - ShardingMetrics: logql.NewShardingMetrics(registerer), + MiddlewareMapperMetrics: NewMiddlewareMapperMetrics(registerer), SplitByMetrics: NewSplitByMetrics(registerer), LogResultCacheMetrics: NewLogResultCacheMetrics(registerer), } diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index bdd742ddda83b..abab1186caf5f 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -33,7 +33,7 @@ func NewQueryShardMiddleware( logger log.Logger, confs ShardingConfigs, middlewareMetrics *queryrangebase.InstrumentMiddlewareMetrics, - shardingMetrics *logql.ShardingMetrics, + shardingMetrics *logql.MapperMetrics, limits Limits, ) queryrangebase.Middleware { noshards := !hasShards(confs) @@ -68,14 +68,14 @@ func newASTMapperware( confs ShardingConfigs, next queryrangebase.Handler, logger log.Logger, - metrics *logql.ShardingMetrics, + metrics *logql.MapperMetrics, limits logql.Limits, ) *astMapperware { return &astMapperware{ confs: confs, logger: log.With(logger, "middleware", "QueryShard.astMapperware"), next: next, - ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics, limits, logger), + ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{next}, limits, logger), metrics: metrics, } } @@ -85,7 +85,7 @@ type astMapperware struct { logger log.Logger next queryrangebase.Handler ng *logql.DownstreamEngine - metrics *logql.ShardingMetrics + metrics *logql.MapperMetrics } func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { @@ -262,7 +262,7 @@ func NewSeriesQueryShardMiddleware( logger log.Logger, confs ShardingConfigs, middlewareMetrics *queryrangebase.InstrumentMiddlewareMetrics, - shardingMetrics *logql.ShardingMetrics, + shardingMetrics *logql.MapperMetrics, limits queryrangebase.Limits, merger queryrangebase.Merger, ) queryrangebase.Middleware { @@ -294,7 +294,7 @@ type seriesShardingHandler struct { confs ShardingConfigs logger log.Logger next queryrangebase.Handler - metrics *logql.ShardingMetrics + metrics *logql.MapperMetrics limits queryrangebase.Limits merger queryrangebase.Merger } @@ -316,8 +316,8 @@ func (ss *seriesShardingHandler) Do(ctx context.Context, r queryrangebase.Reques return nil, fmt.Errorf("expected *LokiSeriesRequest, got (%T)", r) } - ss.metrics.Shards.WithLabelValues("series").Inc() - ss.metrics.ShardFactor.Observe(float64(conf.RowShards)) + ss.metrics.DownstreamQueries.WithLabelValues("series").Inc() + ss.metrics.DownstreamFactor.Observe(float64(conf.RowShards)) requests := make([]queryrangebase.Request, 0, conf.RowShards) for i := 0; i < int(conf.RowShards); i++ { diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index 4dc7ec5bc0d6f..96e2c107d02f7 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -23,7 +23,7 @@ import ( ) var ( - nilShardingMetrics = logql.NewShardingMetrics(nil) + nilShardingMetrics = logql.NewShardMapperMetrics(nil) defaultReq = func() *LokiRequest { return &LokiRequest{ Limit: 100, diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 79bbc706011a0..825e9e7ba7f4e 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -281,7 +281,7 @@ func NewLogFilterTripperware( log, schema.Configs, metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware - metrics.ShardingMetrics, + metrics.MiddlewareMapperMetrics.shardMapper, limits, ), ) @@ -333,7 +333,7 @@ func NewSeriesTripperware( log, schema.Configs, metrics.InstrumentMiddlewareMetrics, - metrics.ShardingMetrics, + metrics.MiddlewareMapperMetrics.shardMapper, limits, codec, ), @@ -438,7 +438,7 @@ func NewMetricTripperware( log, schema.Configs, metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware - metrics.ShardingMetrics, + metrics.MiddlewareMapperMetrics.shardMapper, limits, ), ) @@ -480,12 +480,12 @@ func NewInstantMetricTripperware( if cfg.ShardedQueries { queryRangeMiddleware = append(queryRangeMiddleware, - NewSplitByRangeMiddleware(log, limits, nil), + NewSplitByRangeMiddleware(log, limits, metrics.MiddlewareMapperMetrics.rangeMapper), NewQueryShardMiddleware( log, schema.Configs, metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware - metrics.ShardingMetrics, + metrics.MiddlewareMapperMetrics.shardMapper, limits, ), ) diff --git a/pkg/querier/queryrange/split_by_range.go b/pkg/querier/queryrange/split_by_range.go index 43890cf1e92fd..a0d33649584ce 100644 --- a/pkg/querier/queryrange/split_by_range.go +++ b/pkg/querier/queryrange/split_by_range.go @@ -20,21 +20,22 @@ import ( ) type splitByRange struct { - logger log.Logger - next queryrangebase.Handler - limits Limits - - ng *logql.DownstreamEngine + logger log.Logger + next queryrangebase.Handler + limits Limits + ng *logql.DownstreamEngine + metrics *logql.MapperMetrics } // NewSplitByRangeMiddleware creates a new Middleware that splits log requests by the range interval. -func NewSplitByRangeMiddleware(logger log.Logger, limits Limits, metrics *logql.ShardingMetrics) queryrangebase.Middleware { +func NewSplitByRangeMiddleware(logger log.Logger, limits Limits, metrics *logql.MapperMetrics) queryrangebase.Middleware { return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { return &splitByRange{ - logger: log.With(logger, "middleware", "InstantQuery.splitByRangeVector"), - next: next, - limits: limits, - ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics, limits, logger), + logger: log.With(logger, "middleware", "InstantQuery.splitByRangeVector"), + next: next, + limits: limits, + ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{next}, limits, logger), + metrics: metrics, } }) } @@ -53,7 +54,7 @@ func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) ( return s.next.Do(ctx, request) } - mapper, err := logql.NewRangeMapper(interval) + mapper, err := logql.NewRangeMapper(interval, s.metrics) if err != nil { return nil, err }