From 4129dbb1ed246c0e6fa4b1b277c35d2d32820db9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CE=BBinas?= Date: Thu, 5 Nov 2020 10:18:18 +0200 Subject: [PATCH] [query] sort, topk, bottomk fixes for instant queries (#2792) * fixes most of topk, bottomk ordered tests when instant queries are used * formatted some code and extracted instant logic into separate method * Changes after review. Small optimization in appendValuesInstant. * fixed imports. made valueAndMeta struct package private. * Optimized instant queries - now only single block will be created. OrderedFlush() now sorts the results instead popping them one by one. FloatHeap won't be created if it's not used. * improved how max series count is calculated * removed unused method * initial implementation * Fixes NaN issues with m3 queries for sort and topk/bottomk. FloatHeap now supports adding NaN values. Uncommented topk, bottomk and sort compatibility tests (now they pass). * Retain old KeepNans flag for non instant queries * Changes after code review. * Small cleanup. * Added copyright header to sort_test.go. equalsPairs now checks individual struct elements instead of their printed values. * better init * Removed Min() function in favour of using this logic inline. Used append in place when indices are not needed. Documented and extracted some comparison functions for better readability and clearness. * Updated doc comment --- src/cmd/services/m3query/config/config.go | 4 +- .../services/m3query/config/config_test.go | 6 +- .../api/v1/handler/prometheus/native/read.go | 9 +- .../handler/prometheus/native/read_common.go | 1 + .../v1/handler/prometheus/native/read_test.go | 4 +- .../v1/handler/prometheus/remote/read_test.go | 4 +- src/query/block/meta.go | 2 + src/query/functions/aggregation/take.go | 184 ++++++++++++++---- src/query/functions/aggregation/take_test.go | 72 +++++++ src/query/functions/linear/sort.go | 141 +++++++++++++- src/query/functions/linear/sort_test.go | 141 ++++++++++++++ src/query/functions/utils/heap.go | 54 +++-- src/query/functions/utils/heap_test.go | 135 +++++++++++++ src/query/models/query_context.go | 3 + src/query/parser/promql/matchers.go | 5 +- src/query/parser/promql/parse_test.go | 6 +- .../compatibility/testdata/aggregators.test | 85 ++++---- .../compatibility/testdata/functions.test | 43 ++-- 18 files changed, 766 insertions(+), 133 deletions(-) create mode 100644 src/query/functions/linear/sort_test.go diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 3d1a0adfcf..e1da5d5e13 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -203,9 +203,9 @@ type FilterConfiguration struct { // ResultOptions are the result options for query. type ResultOptions struct { - // KeepNans keeps NaNs before returning query results. + // KeepNaNs keeps NaNs before returning query results. // The default is false, which matches Prometheus - KeepNans bool `yaml:"keepNans"` + KeepNaNs bool `yaml:"keepNans"` } // QueryConfiguration is the query configuration. diff --git a/src/cmd/services/m3query/config/config_test.go b/src/cmd/services/m3query/config/config_test.go index acb787da8c..88bb05855d 100644 --- a/src/cmd/services/m3query/config/config_test.go +++ b/src/cmd/services/m3query/config/config_test.go @@ -185,10 +185,10 @@ func TestTagOptionsConfig(t *testing.T) { func TestKeepNaNsDefault(t *testing.T) { r := ResultOptions{ - KeepNans: true, + KeepNaNs: true, } - assert.Equal(t, true, r.KeepNans) + assert.Equal(t, true, r.KeepNaNs) r = ResultOptions{} - assert.Equal(t, false, r.KeepNans) + assert.Equal(t, false, r.KeepNaNs) } diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index 9db53457b1..cbb72f4ad3 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -141,15 +141,20 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { handleroptions.AddWarningHeaders(w, result.Meta) h.promReadMetrics.fetchSuccess.Inc(1) + keepNaNs := h.opts.Config().ResultOptions.KeepNaNs + if !keepNaNs { + keepNaNs = result.Meta.KeepNaNs + } + if h.instant { - renderResultsInstantaneousJSON(w, result, h.opts.Config().ResultOptions.KeepNans) + renderResultsInstantaneousJSON(w, result, keepNaNs) return } err = RenderResultsJSON(w, result, RenderResultsOptions{ Start: parsedOptions.Params.Start, End: parsedOptions.Params.End, - KeepNaNs: h.opts.Config().ResultOptions.KeepNans, + KeepNaNs: h.opts.Config().ResultOptions.KeepNaNs, }) if err != nil { diff --git a/src/query/api/v1/handler/prometheus/native/read_common.go b/src/query/api/v1/handler/prometheus/native/read_common.go index 44cb14662a..f86ba1a304 100644 --- a/src/query/api/v1/handler/prometheus/native/read_common.go +++ b/src/query/api/v1/handler/prometheus/native/read_common.go @@ -101,6 +101,7 @@ func parseRequest( QueryContextOptions: models.QueryContextOptions{ LimitMaxTimeseries: fetchOpts.SeriesLimit, LimitMaxDocs: fetchOpts.DocsLimit, + Instantaneous: instantaneous, }} restrictOpts := fetchOpts.RestrictQueryOptions.GetRestrictByType() diff --git a/src/query/api/v1/handler/prometheus/native/read_test.go b/src/query/api/v1/handler/prometheus/native/read_test.go index 5ae88efae2..c1b3bffca9 100644 --- a/src/query/api/v1/handler/prometheus/native/read_test.go +++ b/src/query/api/v1/handler/prometheus/native/read_test.go @@ -208,7 +208,7 @@ func newTestSetup( fetchOptsBuilder := handleroptions.NewFetchOptionsBuilder(fetchOptsBuilderCfg) tagOpts := models.NewTagOptions() limitsConfig := config.LimitsConfiguration{} - keepNans := false + keepNaNs := false opts := options.EmptyHandlerOptions(). SetEngine(engine). @@ -219,7 +219,7 @@ func newTestSetup( SetConfig(config.Configuration{ Limits: limitsConfig, ResultOptions: config.ResultOptions{ - KeepNans: keepNans, + KeepNaNs: keepNaNs, }, }) diff --git a/src/query/api/v1/handler/prometheus/remote/read_test.go b/src/query/api/v1/handler/prometheus/remote/read_test.go index c8c2504211..9ded0bdc53 100644 --- a/src/query/api/v1/handler/prometheus/remote/read_test.go +++ b/src/query/api/v1/handler/prometheus/remote/read_test.go @@ -376,7 +376,7 @@ func TestMultipleRead(t *testing.T) { handlerOpts := options.EmptyHandlerOptions().SetEngine(engine). SetConfig(config.Configuration{ ResultOptions: config.ResultOptions{ - KeepNans: true, + KeepNaNs: true, }, }) @@ -454,7 +454,7 @@ func TestReadWithOptions(t *testing.T) { handlerOpts := options.EmptyHandlerOptions().SetEngine(engine). SetConfig(config.Configuration{ ResultOptions: config.ResultOptions{ - KeepNans: true, + KeepNaNs: true, }, }) diff --git a/src/query/block/meta.go b/src/query/block/meta.go index 85bdc98347..0e58ad24b3 100644 --- a/src/query/block/meta.go +++ b/src/query/block/meta.go @@ -68,6 +68,8 @@ type ResultMetadata struct { Warnings Warnings // Resolutions is a list of resolutions for series obtained by this query. Resolutions []time.Duration + // KeepNaNs indicates if NaNs should be kept when returning results. + KeepNaNs bool } // NewResultMetadata creates a new result metadata. diff --git a/src/query/functions/aggregation/take.go b/src/query/functions/aggregation/take.go index eab7ceee2c..098cea41d9 100644 --- a/src/query/functions/aggregation/take.go +++ b/src/query/functions/aggregation/take.go @@ -39,39 +39,36 @@ const ( TopKType = "topk" ) -type takeFunc func(values []float64, buckets [][]int) []float64 +type valueAndMeta struct { + val float64 + seriesMeta block.SeriesMeta +} + +type takeFunc func(heap utils.FloatHeap, values []float64, buckets [][]int) []float64 +type takeInstantFunc func(heap utils.FloatHeap, values []float64, buckets [][]int, seriesMetas []block.SeriesMeta) []valueAndMeta // NewTakeOp creates a new takeK operation func NewTakeOp( opType string, params NodeParams, ) (parser.Params, error) { - takeTop := opType == TopKType - if !takeTop && opType != BottomKType { - return baseOp{}, fmt.Errorf("operator not supported: %s", opType) - } - - var fn takeFunc k := int(params.Parameter) - if k < 1 { - fn = func(values []float64, buckets [][]int) []float64 { - return takeNone(values, buckets) - } - } else { - heap := utils.NewFloatHeap(takeTop, k) - fn = func(values []float64, buckets [][]int) []float64 { - return takeFn(heap, values, buckets) - } + fn := func(heap utils.FloatHeap, values []float64, buckets [][]int) []float64 { + return takeFn(heap, values, buckets) } - - return newTakeOp(params, opType, fn), nil + fnInstant := func(heap utils.FloatHeap, values []float64, buckets [][]int, seriesMetas []block.SeriesMeta) []valueAndMeta { + return takeInstantFn(heap, values, buckets, seriesMetas) + } + return newTakeOp(params, opType, k, fn, fnInstant), nil } // takeOp stores required properties for take ops type takeOp struct { - params NodeParams - opType string - takeFunc takeFunc + params NodeParams + opType string + k int + takeFunc takeFunc + takeInstantFunc takeInstantFunc } // OpType for the operator @@ -95,11 +92,13 @@ func (o takeOp) Node( } } -func newTakeOp(params NodeParams, opType string, takeFunc takeFunc) takeOp { +func newTakeOp(params NodeParams, opType string, k int, takeFunc takeFunc, takeInstantFunc takeInstantFunc) takeOp { return takeOp{ - params: params, - opType: opType, - takeFunc: takeFunc, + params: params, + opType: opType, + k: k, + takeFunc: takeFunc, + takeInstantFunc: takeInstantFunc, } } @@ -126,6 +125,12 @@ func (n *takeNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, return nil, err } + instantaneous := queryCtx.Options.Instantaneous + takeTop := n.op.opType == TopKType + if !takeTop && n.op.opType != BottomKType { + return nil, fmt.Errorf("operator not supported: %s", n.op.opType) + } + params := n.op.params meta := b.Meta() seriesMetas := utils.FlattenMetadata(meta, stepIter.SeriesMeta()) @@ -136,8 +141,23 @@ func (n *takeNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, seriesMetas, ) - // retain original metadatas - builder, err := n.controller.BlockBuilder(queryCtx, meta, stepIter.SeriesMeta()) + seriesCount := maxSeriesCount(buckets) + if instantaneous { + heapSize := seriesCount + if n.op.k < seriesCount { + heapSize = n.op.k + } + + heap := utils.NewFloatHeap(takeTop, heapSize) + return n.processBlockInstantaneous(heap, queryCtx, meta, stepIter, seriesMetas, buckets) + } + + if n.op.k >= seriesCount { + return b, nil + } + + heap := utils.NewFloatHeap(takeTop, n.op.k) + builder, err := n.controller.BlockBuilder(queryCtx, meta, seriesMetas) if err != nil { return nil, err } @@ -147,34 +167,95 @@ func (n *takeNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, } for index := 0; stepIter.Next(); index++ { - step := stepIter.Current() - values := step.Values() - aggregatedValues := n.op.takeFunc(values, buckets) - if err := builder.AppendValues(index, aggregatedValues); err != nil { + values := stepIter.Current().Values() + if err := builder.AppendValues(index, n.op.takeFunc(heap, values, buckets)); err != nil { return nil, err } } - if err = stepIter.Err(); err != nil { return nil, err } - return builder.Build(), nil } -// shortcut to return empty when taking <= 0 values -func takeNone(values []float64, buckets [][]int) []float64 { - util.Memset(values, math.NaN()) - return values +func maxSeriesCount(buckets [][]int) int { + result := 0 + + for _, bucket := range buckets { + if len(bucket) > result { + result = len(bucket) + } + } + + return result +} + +func (n *takeNode) processBlockInstantaneous( + heap utils.FloatHeap, + queryCtx *models.QueryContext, + metadata block.Metadata, + stepIter block.StepIter, + seriesMetas []block.SeriesMeta, + buckets [][]int) (block.Block, error) { + ixLastStep := stepIter.StepCount() - 1 //we only care for the last step values for the instant query + for i := 0; i <= ixLastStep; i++ { + if !stepIter.Next() { + return nil, fmt.Errorf("invalid step count; expected %d got %d", stepIter.StepCount(), i+1) + } + } + metadata.ResultMetadata.KeepNaNs = true + values := stepIter.Current().Values() + takenSortedValues := n.op.takeInstantFunc(heap, values, buckets, seriesMetas) + blockValues, blockSeries := mapToValuesAndSeriesMetas(takenSortedValues) + + //adjust bounds to contain single step + time, err := metadata.Bounds.TimeForIndex(ixLastStep) + if err != nil { + return nil, err + } + metadata.Bounds = models.Bounds{ + Start: time, + Duration: metadata.Bounds.StepSize, + StepSize: metadata.Bounds.StepSize, + } + + blockBuilder, err := n.controller.BlockBuilder(queryCtx, metadata, blockSeries) + if err != nil { + return nil, err + } + if err = blockBuilder.AddCols(1); err != nil { + return nil, err + } + if err := blockBuilder.AppendValues(0, blockValues); err != nil { + return nil, err + } + if err = stepIter.Err(); err != nil { + return nil, err + } + return blockBuilder.Build(), nil +} + +func mapToValuesAndSeriesMetas(takenSortedValues []valueAndMeta) ([]float64, []block.SeriesMeta) { + blockValues := make([]float64, 0, len(takenSortedValues)) + blockSeries := make([]block.SeriesMeta, 0, len(takenSortedValues)) + for _, sortedValue := range takenSortedValues { + blockValues = append(blockValues, sortedValue.val) + blockSeries = append(blockSeries, sortedValue.seriesMeta) + } + return blockValues, blockSeries } func takeFn(heap utils.FloatHeap, values []float64, buckets [][]int) []float64 { - cap := heap.Cap() + capacity := heap.Cap() + if capacity < 1 { + util.Memset(values, math.NaN()) + return values + } for _, bucket := range buckets { // If this bucket's length is less than or equal to the heap's // capacity do not need to clear any values from the input vector, // as they are all included in the output. - if len(bucket) <= cap { + if len(bucket) <= capacity { continue } @@ -198,3 +279,28 @@ func takeFn(heap utils.FloatHeap, values []float64, buckets [][]int) []float64 { return values } + +func takeInstantFn(heap utils.FloatHeap, values []float64, buckets [][]int, metas []block.SeriesMeta) []valueAndMeta { + var result = make([]valueAndMeta, 0, heap.Cap()) + if heap.Cap() < 1 { + return result + } + for _, bucket := range buckets { + for _, idx := range bucket { + val := values[idx] + heap.Push(val, idx) + } + + valIndexPairs := heap.OrderedFlush() + for _, pair := range valIndexPairs { + prevIndex := pair.Index + prevMeta := metas[prevIndex] + + result = append(result, valueAndMeta{ + val: pair.Val, + seriesMeta: prevMeta, + }) + } + } + return result +} diff --git a/src/query/functions/aggregation/take_test.go b/src/query/functions/aggregation/take_test.go index c0a5bef13b..854879062c 100644 --- a/src/query/functions/aggregation/take_test.go +++ b/src/query/functions/aggregation/take_test.go @@ -21,9 +21,11 @@ package aggregation import ( + "fmt" "math" "testing" + "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/functions/utils" "github.com/m3db/m3/src/query/models" @@ -35,6 +37,66 @@ import ( "github.com/stretchr/testify/require" ) +func TestTakeInstantFn(t *testing.T) { + valuesMin := []float64{1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1} + buckets := [][]int{{0, 1, 2, 3}, {4}, {5, 6, 7, 8}} + + var ( + seriesMetasTakeOrdered = []block.SeriesMeta{ + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "api-server"}, {N: "instance", V: "0"}, {N: "group", V: "production"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "api-server"}, {N: "instance", V: "1"}, {N: "group", V: "production"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "api-server"}, {N: "instance", V: "2"}, {N: "group", V: "production"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "api-server"}, {N: "instance", V: "0"}, {N: "group", V: "canary"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "api-server"}, {N: "instance", V: "1"}, {N: "group", V: "canary"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "app-server"}, {N: "instance", V: "0"}, {N: "group", V: "production"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "app-server"}, {N: "instance", V: "1"}, {N: "group", V: "production"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "app-server"}, {N: "instance", V: "0"}, {N: "group", V: "canary"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "app-server"}, {N: "instance", V: "1"}, {N: "group", V: "canary"}})}, + } + ) + + expectedMin := []valueAndMeta{ + {val: 1.1, seriesMeta: seriesMetasTakeOrdered[0]}, + {val: 2.1, seriesMeta: seriesMetasTakeOrdered[1]}, + {val: 3.1, seriesMeta: seriesMetasTakeOrdered[2]}, + + {val: 5.1, seriesMeta: seriesMetasTakeOrdered[4]}, + + {val: 6.1, seriesMeta: seriesMetasTakeOrdered[5]}, + {val: 7.1, seriesMeta: seriesMetasTakeOrdered[6]}, + {val: 8.1, seriesMeta: seriesMetasTakeOrdered[7]}, + } + + size := 3 + minHeap := utils.NewFloatHeap(false, size) + actual := takeInstantFn(minHeap, valuesMin, buckets, seriesMetasTakeOrdered) //9 + + actualString := fmt.Sprint(actual) + expectedString := fmt.Sprint(expectedMin) + + assert.EqualValues(t, expectedString, actualString) + + valuesMax := []float64{1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1} + expectedMax := []valueAndMeta{ + {val: 4.1, seriesMeta: seriesMetasTakeOrdered[3]}, + {val: 3.1, seriesMeta: seriesMetasTakeOrdered[2]}, + {val: 2.1, seriesMeta: seriesMetasTakeOrdered[1]}, + + {val: 5.1, seriesMeta: seriesMetasTakeOrdered[4]}, + + {val: 9.1, seriesMeta: seriesMetasTakeOrdered[8]}, + {val: 8.1, seriesMeta: seriesMetasTakeOrdered[7]}, + {val: 7.1, seriesMeta: seriesMetasTakeOrdered[6]}, + } + + maxHeap := utils.NewFloatHeap(true, size) + actual = takeInstantFn(maxHeap, valuesMax, buckets, seriesMetasTakeOrdered) + actualString = fmt.Sprint(actual) + expectedString = fmt.Sprint(expectedMax) + + assert.EqualValues(t, expectedString, actualString) +} + func TestTakeFn(t *testing.T) { valuesMin := []float64{1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1} buckets := [][]int{{0, 1, 2, 3}, {4}, {5, 6, 7}} @@ -98,6 +160,7 @@ func TestTakeTopFunctionFilteringWithoutA(t *testing.T) { }) require.NoError(t, err) sink := processTakeOp(t, op) + expected := [][]float64{ // Taking bottomk(1) of first two series, keeping both series {0, math.NaN(), math.NaN(), math.NaN(), math.NaN()}, @@ -139,3 +202,12 @@ func TestTakeTopFunctionFilteringWithoutALessThanOne(t *testing.T) { test.EqualsWithNansWithDelta(t, expected, sink.Values, math.Pow10(-5)) assert.Equal(t, bounds, sink.Meta.Bounds) } + +func TestTakeOpParamIsNaN(t *testing.T) { + op, err := NewTakeOp(TopKType, NodeParams{ + Parameter: math.NaN(), + }) + require.NoError(t, err) + assert.True(t, op.(takeOp).k < 0) +} + diff --git a/src/query/functions/linear/sort.go b/src/query/functions/linear/sort.go index b0f7b88665..9d696b9d04 100644 --- a/src/query/functions/linear/sort.go +++ b/src/query/functions/linear/sort.go @@ -20,13 +20,148 @@ package linear -const ( - // NB: Because Prometheus's sort and sort_desc only look at the last value, - // these functions are essentially noops in M3 as we don't support instant queries. +import ( + "fmt" + "sort" + + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/functions/utils" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" +) +const ( // SortType returns timeseries elements sorted by their values, in ascending order. SortType = "sort" // SortDescType is the same as sort, but sorts in descending order. SortDescType = "sort_desc" ) + +type sortOp struct { + opType string + lessFn lessFn +} + +// OpType for the operator +func (o sortOp) OpType() string { + return o.opType +} + +// String representation +func (o sortOp) String() string { + return fmt.Sprintf("type: %s", o.opType) +} + +type sortNode struct { + op sortOp + controller *transform.Controller +} + +type valueAndMeta struct { + val float64 + seriesMeta block.SeriesMeta +} + +type lessFn func (i, j float64) bool + +// Node creates an execution node +func (o sortOp) Node( + controller *transform.Controller, + _ transform.Options, +) transform.OpNode { + return &sortNode{ + op: o, + controller: controller, + } +} + +func (n *sortNode) Params() parser.Params { + return n.op +} + +func (n *sortNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) error { + return transform.ProcessSimpleBlock(n, n.controller, queryCtx, ID, b) +} + +func (n *sortNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID, b block.Block) (block.Block, error) { + if !queryCtx.Options.Instantaneous { + return b, nil + } + stepIter, err := b.StepIter() + if err != nil { + return nil, err + } + + meta := b.Meta() + seriesMetas := utils.FlattenMetadata(meta, stepIter.SeriesMeta()) + return n.processInstantBlock(queryCtx, stepIter, meta, seriesMetas) +} + +func (n *sortNode) processInstantBlock(queryCtx *models.QueryContext, stepIter block.StepIter, meta block.Metadata, seriesMetas []block.SeriesMeta) (block.Block, error) { + ixLastStep := stepIter.StepCount() - 1 //we only care for the last step values for the instant query + for i := 0; i <= ixLastStep; i++ { + if !stepIter.Next() { + return nil, fmt.Errorf("invalid step count; expected %d got %d", stepIter.StepCount(), i+1) + } + } + values := stepIter.Current().Values() + meta.ResultMetadata.KeepNaNs = true + valuesToSort := make([]valueAndMeta, len(values)) + for i, value := range values { + valuesToSort[i] = valueAndMeta{ + val: value, + seriesMeta: seriesMetas[i], + } + } + + sort.Slice(valuesToSort, func(i, j int) bool { + return n.op.lessFn(valuesToSort[i].val, valuesToSort[j].val) + }) + + for i, sorted := range valuesToSort { + values[i] = sorted.val + seriesMetas[i] = sorted.seriesMeta + } + + //adjust bounds to contain single step + time, err := meta.Bounds.TimeForIndex(ixLastStep) + if err != nil { + return nil, err + } + meta.Bounds = models.Bounds{ + Start: time, + Duration: meta.Bounds.StepSize, + StepSize: meta.Bounds.StepSize, + } + + blockBuilder, err := n.controller.BlockBuilder(queryCtx, meta, seriesMetas) + if err != nil { + return nil, err + } + if err = blockBuilder.AddCols(1); err != nil { + return nil, err + } + if err := blockBuilder.AppendValues(0, values); err != nil { + return nil, err + } + if err = stepIter.Err(); err != nil { + return nil, err + } + return blockBuilder.Build(), nil +} + +func NewSortOp(opType string) (parser.Params, error) { + ascending := opType == SortType + if !ascending && opType != SortDescType { + return nil, fmt.Errorf("operator not supported: %s", opType) + } + + lessFn := utils.GreaterWithNaNs + if ascending { + lessFn = utils.LesserWithNaNs + } + + return sortOp{opType, lessFn}, nil +} diff --git a/src/query/functions/linear/sort_test.go b/src/query/functions/linear/sort_test.go new file mode 100644 index 0000000000..e1a4d46b3e --- /dev/null +++ b/src/query/functions/linear/sort_test.go @@ -0,0 +1,141 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package linear + +import ( + "math" + "testing" + "time" + + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/test" + "github.com/m3db/m3/src/query/test/executor" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestShouldFailWhenOpTypeIsInvalid(t *testing.T) { + _, err := NewSortOp("sortAsc") + require.Error(t, err) +} + +func TestSortAscInstant(t *testing.T) { + op, err := NewSortOp(SortType) + require.NoError(t, err) + + sink := processSortOp(t, op, true) + + expected := [][]float64{ + {100}, + {200}, + {300}, + {400}, + {500}, + {600}, + {700}, + {800}, + {math.NaN()}, + } + + assert.Equal(t, seriesMetas, sink.Metas) + test.EqualsWithNansWithDelta(t, expected, sink.Values, math.Pow10(-5)) +} + +func TestSortDescInstant(t *testing.T) { + op, err := NewSortOp(SortDescType) + require.NoError(t, err) + + sink := processSortOp(t, op, true) + + expected := [][]float64{ + {800}, + {700}, + {600}, + {500}, + {400}, + {300}, + {200}, + {100}, + {math.NaN()}, + } + + assert.Equal(t, seriesMetas, sink.Metas) + test.EqualsWithNansWithDelta(t, expected, sink.Values, math.Pow10(-5)) +} + +func TestSortNop(t *testing.T) { + op, err := NewSortOp(SortDescType) + require.NoError(t, err) + + sink := processSortOp(t, op, false) + + expected := v + + assert.Equal(t, seriesMetas, sink.Metas) + test.EqualsWithNansWithDelta(t, expected, sink.Values, math.Pow10(-5)) +} + +var ( + seriesMetas = []block.SeriesMeta{ + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "api-server"}, {N: "instance", V: "0"}, {N: "group", V: "production"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "api-server"}, {N: "instance", V: "1"}, {N: "group", V: "production"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "api-server"}, {N: "instance", V: "2"}, {N: "group", V: "production"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "api-server"}, {N: "instance", V: "0"}, {N: "group", V: "canary"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "api-server"}, {N: "instance", V: "1"}, {N: "group", V: "canary"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "app-server"}, {N: "instance", V: "0"}, {N: "group", V: "production"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "app-server"}, {N: "instance", V: "1"}, {N: "group", V: "production"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "app-server"}, {N: "instance", V: "0"}, {N: "group", V: "canary"}})}, + {Tags: test.StringTagsToTags(test.StringTags{{N: "job", V: "app-server"}, {N: "instance", V: "1"}, {N: "group", V: "canary"}})}, + } + + v = [][]float64{ + {60, 70, 80, 90, 100}, + {150, 160, 170, 180, 200}, + {180, 210, 240, 270, 300}, + {240, 280, 320, 360, 400}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + {300, 350, 400, 450, 500}, + {360, 420, 480, 540, 600}, + {320, 390, 460, 530, 700}, + {480, 560, 640, 720, 800}, + } + + bounds = models.Bounds{ + Start: time.Now(), + Duration: time.Minute * 5, + StepSize: time.Minute, + } +) + +func processSortOp(t *testing.T, op parser.Params, instant bool) *executor.SinkNode { + bl := test.NewBlockFromValuesWithSeriesMeta(bounds, seriesMetas, v) + c, sink := executor.NewControllerWithSink(parser.NodeID(1)) + node := op.(sortOp).Node(c, transform.Options{}) + queryContext := models.NoopQueryContext() + queryContext.Options.Instantaneous = instant + err := node.Process(queryContext, parser.NodeID(0), bl) + require.NoError(t, err) + return sink +} diff --git a/src/query/functions/utils/heap.go b/src/query/functions/utils/heap.go index 1e3078025a..1587a1da00 100644 --- a/src/query/functions/utils/heap.go +++ b/src/query/functions/utils/heap.go @@ -22,6 +22,8 @@ package utils import ( "container/heap" + "math" + "sort" ) // ValueIndexPair is a pair of float value and index at which it exists @@ -33,17 +35,38 @@ type ValueIndexPair struct { type lessFn func(ValueIndexPair, ValueIndexPair) bool func maxHeapLess(i, j ValueIndexPair) bool { - if i.Val == j.Val { + if equalWithNaNs(i.Val, j.Val) { return i.Index > j.Index } - return i.Val < j.Val + return i.Val < j.Val || lesserIfNaNs(i.Val, j.Val) } func minHeapLess(i, j ValueIndexPair) bool { - if i.Val == j.Val { + if equalWithNaNs(i.Val, j.Val) { return i.Index > j.Index } - return i.Val > j.Val + return i.Val > j.Val || lesserIfNaNs(i.Val, j.Val) +} + +// Compares two floats for equality with NaNs taken into account. +func equalWithNaNs(i,j float64) bool { + return i == j || math.IsNaN(i) && math.IsNaN(j) +} + +// Compares NaNs. +// Basically, we do not want to add NaNs to the heap when it has reached it's cap so this fn should be used to prevent this. +func lesserIfNaNs(i,j float64) bool { + return math.IsNaN(i) && !math.IsNaN(j) +} + +// Compares two float64 values which one is lesser with NaNs. NaNs are always sorted away. +func LesserWithNaNs(i, j float64) bool { + return i < j || math.IsNaN(j) && !math.IsNaN(i) +} + +// Compares two float64 values which one is greater with NaNs. NaNs are always sorted away. +func GreaterWithNaNs(i, j float64) bool { + return i > j || math.IsNaN(j) && !math.IsNaN(i) } // FloatHeap is a heap that can be given a maximum size @@ -82,7 +105,7 @@ func NewFloatHeap(isMaxHeap bool, capacity int) FloatHeap { } // Push pushes a value and index pair to the heap -func (fh FloatHeap) Push(value float64, index int) { +func (fh *FloatHeap) Push(value float64, index int) { h := fh.floatHeap // If capacity is zero or negative, allow infinite size heap if fh.capacity > 0 { @@ -99,8 +122,8 @@ func (fh FloatHeap) Push(value float64, index int) { // NB(arnikola): unfortunately, can't just replace first // element as it may not respect internal order. Need to // run heap.Fix() to rectify this - if fh.isMaxHeap && value > peek.Val || - (!fh.isMaxHeap && value < peek.Val) { + if (fh.isMaxHeap && GreaterWithNaNs(value, peek.Val)) || + (!fh.isMaxHeap && LesserWithNaNs(value, peek.Val)) { h.heap[0] = ValueIndexPair{ Val: value, Index: index, @@ -122,12 +145,12 @@ func (fh FloatHeap) Push(value float64, index int) { } // Len returns the current length of the heap -func (fh FloatHeap) Len() int { +func (fh *FloatHeap) Len() int { return fh.floatHeap.Len() } // Cap returns the capacity of the heap -func (fh FloatHeap) Cap() int { +func (fh *FloatHeap) Cap() int { return fh.capacity } @@ -137,14 +160,23 @@ func (fh *FloatHeap) Reset() { } // Flush flushes the float heap and resets it. Does not guarantee order. -func (fh FloatHeap) Flush() []ValueIndexPair { +func (fh *FloatHeap) Flush() []ValueIndexPair { values := fh.floatHeap.heap fh.Reset() return values } +// OrderedFlush flushes the float heap and returns values in order. +func (fh *FloatHeap) OrderedFlush() []ValueIndexPair { + flushed := fh.Flush() + sort.Slice(flushed, func(i, j int) bool { + return !fh.floatHeap.less(flushed[i], flushed[j]) //reverse sort + }) + return flushed +} + // Peek reveals the top value of the heap without mutating the heap. -func (fh FloatHeap) Peek() (ValueIndexPair, bool) { +func (fh *FloatHeap) Peek() (ValueIndexPair, bool) { h := fh.floatHeap.heap if len(h) == 0 { return ValueIndexPair{}, false diff --git a/src/query/functions/utils/heap_test.go b/src/query/functions/utils/heap_test.go index c8a8e0f842..79e2a305fa 100644 --- a/src/query/functions/utils/heap_test.go +++ b/src/query/functions/utils/heap_test.go @@ -21,10 +21,13 @@ package utils import ( + "math" "math/rand" "sort" "testing" + "github.com/m3db/m3/src/query/test" + "github.com/stretchr/testify/assert" ) @@ -216,3 +219,135 @@ func TestNegativeCapacityHeap(t *testing.T) { assert.Equal(t, testArray[pair.Index], pair.Val) } } + +func equalPairs(t *testing.T, expected, actual []ValueIndexPair) { + assert.Equal(t, len(expected), len(actual)) + for i, e := range expected { + test.EqualsWithNans(t, e.Val, actual[i].Val) + assert.Equal(t, e.Index, actual[i].Index) + } +} + +func TestFlushOrdered(t *testing.T) { + maxHeap := NewFloatHeap(true, 3) + + maxHeap.Push(0.1, 0) + maxHeap.Push(1.1, 1) + maxHeap.Push(2.1, 2) + maxHeap.Push(3.1, 3) + + actualMax := maxHeap.OrderedFlush() + + assert.Equal(t, []ValueIndexPair{ + {Val: 3.1, Index: 3}, + {Val: 2.1, Index: 2}, + {Val: 1.1, Index: 1}, + }, actualMax) + assert.Equal(t, 0, maxHeap.Len()) + + minHeap := NewFloatHeap(false, 3) + minHeap.Push(0.1, 0) + minHeap.Push(1.1, 1) + minHeap.Push(2.1, 2) + minHeap.Push(3.1, 3) + + actualMin := minHeap.OrderedFlush() + + assert.Equal(t, []ValueIndexPair{ + {Val: 0.1, Index: 0}, + {Val: 1.1, Index: 1}, + {Val: 2.1, Index: 2}, + }, actualMin) + assert.Equal(t, 0, minHeap.Len()) +} + +func TestFlushOrderedWhenRandomInsertionOrder(t *testing.T) { + maxHeap := NewFloatHeap(true, 3) + + maxHeap.Push(math.NaN(), 4) + maxHeap.Push(0.1, 0) + maxHeap.Push(2.1, 2) + maxHeap.Push(1.1, 1) + maxHeap.Push(3.1, 3) + maxHeap.Push(math.NaN(), 5) + + actualMax := maxHeap.OrderedFlush() + + assert.Equal(t, []ValueIndexPair{ + {Val: 3.1, Index: 3}, + {Val: 2.1, Index: 2}, + {Val: 1.1, Index: 1}, + }, actualMax) + assert.Equal(t, 0, maxHeap.Len()) + + minHeap := NewFloatHeap(false, 3) + maxHeap.Push(math.NaN(), 4) + minHeap.Push(0.1, 0) + minHeap.Push(2.1, 2) + minHeap.Push(1.1, 1) + minHeap.Push(3.1, 3) + maxHeap.Push(math.NaN(), 5) + + actualMin := minHeap.OrderedFlush() + + assert.Equal(t, []ValueIndexPair{ + {Val: 0.1, Index: 0}, + {Val: 1.1, Index: 1}, + {Val: 2.1, Index: 2}, + }, actualMin) + assert.Equal(t, 0, minHeap.Len()) +} + +func TestFlushOrderedWhenRandomInsertionOrderAndTakeNaNs(t *testing.T) { + maxHeap := NewFloatHeap(true, 3) + maxHeap.Push(math.NaN(), 4) + maxHeap.Push(1.1, 1) + maxHeap.Push(3.1, 3) + maxHeap.Push(math.NaN(), 5) + + actualMax := maxHeap.OrderedFlush() + + equalPairs(t, []ValueIndexPair{ + {Val: 3.1, Index: 3}, + {Val: 1.1, Index: 1}, + {Val: math.NaN(), Index: 4}, + }, actualMax) + assert.Equal(t, 0, maxHeap.Len()) + + minHeap := NewFloatHeap(false, 3) + minHeap.Push(math.NaN(), 4) + minHeap.Push(0.1, 0) + minHeap.Push(2.1, 2) + minHeap.Push(math.NaN(), 5) + + actualMin := minHeap.OrderedFlush() + + equalPairs(t, []ValueIndexPair{ + {Val: 0.1, Index: 0}, + {Val: 2.1, Index: 2}, + {Val: math.NaN(), Index: 4}, + }, actualMin) + assert.Equal(t, 0, minHeap.Len()) +} + +func TestSortLesserWithNaNs(t *testing.T) { + actual := []float64{ 5.0, 4.1, math.NaN(), 8.6, 0.1 } + expected := []float64{ 0.1, 4.1, 5.0, 8.6, math.NaN() } + + sort.Slice(actual, func(i, j int) bool { + return LesserWithNaNs(actual[i], actual[j]) + }) + + test.EqualsWithNans(t, expected, actual) +} + +func TestSortGreaterWithNaNs(t *testing.T) { + actual := []float64{ 5.0, 4.1, math.NaN(), 8.6, 0.1 } + expected := []float64{ 8.6, 5.0, 4.1, 0.1, math.NaN() } + + sort.Slice(actual, func(i, j int) bool { + return GreaterWithNaNs(actual[i], actual[j]) + }) + + test.EqualsWithNans(t, expected, actual) +} diff --git a/src/query/models/query_context.go b/src/query/models/query_context.go index 0da1e3daf5..3522b05833 100644 --- a/src/query/models/query_context.go +++ b/src/query/models/query_context.go @@ -44,6 +44,9 @@ type QueryContextOptions struct { LimitMaxDocs int // RequireExhaustive results in an error if the query exceeds the series limit. RequireExhaustive bool + // Instantaneous indicates an instant query. + Instantaneous bool + // RestrictFetchType restricts the query fetches. RestrictFetchType *RestrictFetchTypeQueryContextOptions } diff --git a/src/query/parser/promql/matchers.go b/src/query/parser/promql/matchers.go index 50b965761c..c25e618a61 100644 --- a/src/query/parser/promql/matchers.go +++ b/src/query/parser/promql/matchers.go @@ -295,10 +295,11 @@ func NewFunctionExpr( p, err = scalar.NewTimeOp(tagOptions) return p, true, err - // NB: no-ops. case linear.SortType, linear.SortDescType: - return nil, false, err + p, err = linear.NewSortOp(name) + return p, true, err + // NB: no-ops. case scalar.ScalarType: return nil, false, err diff --git a/src/query/parser/promql/parse_test.go b/src/query/parser/promql/parse_test.go index 656d578a05..9e7aef9ee6 100644 --- a/src/query/parser/promql/parse_test.go +++ b/src/query/parser/promql/parse_test.go @@ -303,10 +303,12 @@ func TestSort(t *testing.T) { require.NoError(t, err) transforms, edges, err := p.DAG() require.NoError(t, err) - assert.Len(t, transforms, 1) + assert.Len(t, transforms, 2) assert.Equal(t, transforms[0].Op.OpType(), functions.FetchType) assert.Equal(t, transforms[0].ID, parser.NodeID("0")) - assert.Len(t, edges, 0) + assert.Equal(t, transforms[1].Op.OpType(), tt.expectedType) + assert.Equal(t, transforms[1].ID, parser.NodeID("1")) + assert.Len(t, edges, 1) }) } } diff --git a/src/query/test/compatibility/testdata/aggregators.test b/src/query/test/compatibility/testdata/aggregators.test index 8eb9cbcc7e..ebcbcc9fd5 100644 --- a/src/query/test/compatibility/testdata/aggregators.test +++ b/src/query/test/compatibility/testdata/aggregators.test @@ -159,64 +159,63 @@ load 5m http_requests{job="app-server", instance="1", group="canary"} 0+80x10 foo 3+0x10 -# FAILING issue #12. All topk and bottomk tests are failing. -#eval_ordered instant at 50m topk(3, http_requests) -# http_requests{group="canary", instance="1", job="app-server"} 800 -# http_requests{group="canary", instance="0", job="app-server"} 700 -# http_requests{group="production", instance="1", job="app-server"} 600 +eval_ordered instant at 50m topk(3, http_requests) + http_requests{group="canary", instance="1", job="app-server"} 800 + http_requests{group="canary", instance="0", job="app-server"} 700 + http_requests{group="production", instance="1", job="app-server"} 600 -#eval_ordered instant at 50m topk((3), (http_requests)) -# http_requests{group="canary", instance="1", job="app-server"} 800 -# http_requests{group="canary", instance="0", job="app-server"} 700 -# http_requests{group="production", instance="1", job="app-server"} 600 +eval_ordered instant at 50m topk((3), (http_requests)) + http_requests{group="canary", instance="1", job="app-server"} 800 + http_requests{group="canary", instance="0", job="app-server"} 700 + http_requests{group="production", instance="1", job="app-server"} 600 -#eval_ordered instant at 50m topk(5, http_requests{group="canary",job="app-server"}) -# http_requests{group="canary", instance="1", job="app-server"} 800 -# http_requests{group="canary", instance="0", job="app-server"} 700 +eval_ordered instant at 50m topk(5, http_requests{group="canary",job="app-server"}) + http_requests{group="canary", instance="1", job="app-server"} 800 + http_requests{group="canary", instance="0", job="app-server"} 700 -#eval_ordered instant at 50m bottomk(3, http_requests) -# http_requests{group="production", instance="0", job="api-server"} 100 -# http_requests{group="production", instance="1", job="api-server"} 200 -# http_requests{group="canary", instance="0", job="api-server"} 300 +eval_ordered instant at 50m bottomk(3, http_requests) + http_requests{group="production", instance="0", job="api-server"} 100 + http_requests{group="production", instance="1", job="api-server"} 200 + http_requests{group="canary", instance="0", job="api-server"} 300 -#eval_ordered instant at 50m bottomk(5, http_requests{group="canary",job="app-server"}) -# http_requests{group="canary", instance="0", job="app-server"} 700 -# http_requests{group="canary", instance="1", job="app-server"} 800 +eval_ordered instant at 50m bottomk(5, http_requests{group="canary",job="app-server"}) + http_requests{group="canary", instance="0", job="app-server"} 700 + http_requests{group="canary", instance="1", job="app-server"} 800 eval instant at 50m topk by (group) (1, http_requests) http_requests{group="production", instance="1", job="app-server"} 600 http_requests{group="canary", instance="1", job="app-server"} 800 -#eval instant at 50m bottomk by (group) (2, http_requests) -# http_requests{group="canary", instance="0", job="api-server"} 300 -# http_requests{group="canary", instance="1", job="api-server"} 400 -# http_requests{group="production", instance="0", job="api-server"} 100 -# http_requests{group="production", instance="1", job="api-server"} 200 +eval instant at 50m bottomk by (group) (2, http_requests) + http_requests{group="canary", instance="0", job="api-server"} 300 + http_requests{group="canary", instance="1", job="api-server"} 400 + http_requests{group="production", instance="0", job="api-server"} 100 + http_requests{group="production", instance="1", job="api-server"} 200 -#eval_ordered instant at 50m bottomk by (group) (2, http_requests{group="production"}) -# http_requests{group="production", instance="0", job="api-server"} 100 -# http_requests{group="production", instance="1", job="api-server"} 200 +eval_ordered instant at 50m bottomk by (group) (2, http_requests{group="production"}) + http_requests{group="production", instance="0", job="api-server"} 100 + http_requests{group="production", instance="1", job="api-server"} 200 # Test NaN is sorted away from the top/bottom. -#eval_ordered instant at 50m topk(3, http_requests{job="api-server",group="production"}) -# http_requests{job="api-server", instance="1", group="production"} 200 -# http_requests{job="api-server", instance="0", group="production"} 100 -# http_requests{job="api-server", instance="2", group="production"} NaN +eval_ordered instant at 50m topk(3, http_requests{job="api-server",group="production"}) + http_requests{job="api-server", instance="1", group="production"} 200 + http_requests{job="api-server", instance="0", group="production"} 100 + http_requests{job="api-server", instance="2", group="production"} NaN -#eval_ordered instant at 50m bottomk(3, http_requests{job="api-server",group="production"}) -# http_requests{job="api-server", instance="0", group="production"} 100 -# http_requests{job="api-server", instance="1", group="production"} 200 -# http_requests{job="api-server", instance="2", group="production"} NaN +eval_ordered instant at 50m bottomk(3, http_requests{job="api-server",group="production"}) + http_requests{job="api-server", instance="0", group="production"} 100 + http_requests{job="api-server", instance="1", group="production"} 200 + http_requests{job="api-server", instance="2", group="production"} NaN # Test topk and bottomk allocate min(k, input_vector) for results vector -#eval_ordered instant at 50m bottomk(9999999999, http_requests{job="app-server",group="canary"}) -# http_requests{group="canary", instance="0", job="app-server"} 700 -# http_requests{group="canary", instance="1", job="app-server"} 800 - -#eval_ordered instant at 50m topk(9999999999, http_requests{job="api-server",group="production"}) -# http_requests{job="api-server", instance="1", group="production"} 200 -# http_requests{job="api-server", instance="0", group="production"} 100 -# http_requests{job="api-server", instance="2", group="production"} NaN +eval_ordered instant at 50m bottomk(9999999999, http_requests{job="app-server",group="canary"}) + http_requests{group="canary", instance="0", job="app-server"} 700 + http_requests{group="canary", instance="1", job="app-server"} 800 + +eval_ordered instant at 50m topk(9999999999, http_requests{job="api-server",group="production"}) + http_requests{job="api-server", instance="1", group="production"} 200 + http_requests{job="api-server", instance="0", group="production"} 100 + http_requests{job="api-server", instance="2", group="production"} NaN # Bug #5276. #eval_ordered instant at 50m topk(scalar(foo), http_requests) diff --git a/src/query/test/compatibility/testdata/functions.test b/src/query/test/compatibility/testdata/functions.test index e311bdc461..49de42bb1a 100644 --- a/src/query/test/compatibility/testdata/functions.test +++ b/src/query/test/compatibility/testdata/functions.test @@ -342,28 +342,27 @@ load 5m http_requests{job="app-server", instance="0", group="canary"} 0+70x10 http_requests{job="app-server", instance="1", group="canary"} 0+80x10 -# FAILING issue #28: -#eval_ordered instant at 50m sort(http_requests) -# http_requests{group="production", instance="0", job="api-server"} 100 -# http_requests{group="production", instance="1", job="api-server"} 200 -# http_requests{group="canary", instance="0", job="api-server"} 300 -# http_requests{group="canary", instance="1", job="api-server"} 400 -# http_requests{group="production", instance="0", job="app-server"} 500 -# http_requests{group="production", instance="1", job="app-server"} 600 -# http_requests{group="canary", instance="0", job="app-server"} 700 -# http_requests{group="canary", instance="1", job="app-server"} 800 -# http_requests{group="canary", instance="2", job="api-server"} NaN - -#eval_ordered instant at 50m sort_desc(http_requests) -# http_requests{group="canary", instance="1", job="app-server"} 800 -# http_requests{group="canary", instance="0", job="app-server"} 700 -# http_requests{group="production", instance="1", job="app-server"} 600 -# http_requests{group="production", instance="0", job="app-server"} 500 -# http_requests{group="canary", instance="1", job="api-server"} 400 -# http_requests{group="canary", instance="0", job="api-server"} 300 -# http_requests{group="production", instance="1", job="api-server"} 200 -# http_requests{group="production", instance="0", job="api-server"} 100 -# http_requests{group="canary", instance="2", job="api-server"} NaN +eval_ordered instant at 50m sort(http_requests) + http_requests{group="production", instance="0", job="api-server"} 100 + http_requests{group="production", instance="1", job="api-server"} 200 + http_requests{group="canary", instance="0", job="api-server"} 300 + http_requests{group="canary", instance="1", job="api-server"} 400 + http_requests{group="production", instance="0", job="app-server"} 500 + http_requests{group="production", instance="1", job="app-server"} 600 + http_requests{group="canary", instance="0", job="app-server"} 700 + http_requests{group="canary", instance="1", job="app-server"} 800 + http_requests{group="canary", instance="2", job="api-server"} NaN + +eval_ordered instant at 50m sort_desc(http_requests) + http_requests{group="canary", instance="1", job="app-server"} 800 + http_requests{group="canary", instance="0", job="app-server"} 700 + http_requests{group="production", instance="1", job="app-server"} 600 + http_requests{group="production", instance="0", job="app-server"} 500 + http_requests{group="canary", instance="1", job="api-server"} 400 + http_requests{group="canary", instance="0", job="api-server"} 300 + http_requests{group="production", instance="1", job="api-server"} 200 + http_requests{group="production", instance="0", job="api-server"} 100 + http_requests{group="canary", instance="2", job="api-server"} NaN # Tests for holt_winters clear