Skip to content

Commit

Permalink
[query] sort, topk, bottomk fixes for instant queries (#2792)
Browse files Browse the repository at this point in the history
* fixes most of topk, bottomk ordered tests when instant queries are used

* formatted some code and extracted instant logic into separate method

* Changes after review.
Small optimization in appendValuesInstant.

* fixed imports.
made valueAndMeta struct package private.

* Optimized instant queries - now only single block will be created.
OrderedFlush() now sorts the results instead popping them one by one.
FloatHeap won't be created if it's not used.

* improved how max series count is calculated

* removed unused method

* initial implementation

* Fixes NaN issues with m3 queries for sort and topk/bottomk.
FloatHeap now supports adding NaN values.
Uncommented topk, bottomk and sort compatibility tests (now they pass).

* Retain old KeepNans flag for non instant queries

* Changes after code review.

* Small cleanup.

* Added copyright header to sort_test.go.
equalsPairs now checks individual struct elements instead of their printed values.

* better init

* Removed Min() function in favour of using this logic inline.
Used append in place when indices are not needed.
Documented and extracted some comparison functions for better readability and clearness.

* Updated doc comment
  • Loading branch information
soundvibe authored Nov 5, 2020
1 parent 158ab10 commit 4129dbb
Show file tree
Hide file tree
Showing 18 changed files with 766 additions and 133 deletions.
4 changes: 2 additions & 2 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ type FilterConfiguration struct {

// ResultOptions are the result options for query.
type ResultOptions struct {
// KeepNans keeps NaNs before returning query results.
// KeepNaNs keeps NaNs before returning query results.
// The default is false, which matches Prometheus
KeepNans bool `yaml:"keepNans"`
KeepNaNs bool `yaml:"keepNans"`
}

// QueryConfiguration is the query configuration.
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ func TestTagOptionsConfig(t *testing.T) {

func TestKeepNaNsDefault(t *testing.T) {
r := ResultOptions{
KeepNans: true,
KeepNaNs: true,
}
assert.Equal(t, true, r.KeepNans)
assert.Equal(t, true, r.KeepNaNs)

r = ResultOptions{}
assert.Equal(t, false, r.KeepNans)
assert.Equal(t, false, r.KeepNaNs)
}
9 changes: 7 additions & 2 deletions src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,20 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
handleroptions.AddWarningHeaders(w, result.Meta)
h.promReadMetrics.fetchSuccess.Inc(1)

keepNaNs := h.opts.Config().ResultOptions.KeepNaNs
if !keepNaNs {
keepNaNs = result.Meta.KeepNaNs
}

if h.instant {
renderResultsInstantaneousJSON(w, result, h.opts.Config().ResultOptions.KeepNans)
renderResultsInstantaneousJSON(w, result, keepNaNs)
return
}

err = RenderResultsJSON(w, result, RenderResultsOptions{
Start: parsedOptions.Params.Start,
End: parsedOptions.Params.End,
KeepNaNs: h.opts.Config().ResultOptions.KeepNans,
KeepNaNs: h.opts.Config().ResultOptions.KeepNaNs,
})

if err != nil {
Expand Down
1 change: 1 addition & 0 deletions src/query/api/v1/handler/prometheus/native/read_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func parseRequest(
QueryContextOptions: models.QueryContextOptions{
LimitMaxTimeseries: fetchOpts.SeriesLimit,
LimitMaxDocs: fetchOpts.DocsLimit,
Instantaneous: instantaneous,
}}

restrictOpts := fetchOpts.RestrictQueryOptions.GetRestrictByType()
Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/prometheus/native/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func newTestSetup(
fetchOptsBuilder := handleroptions.NewFetchOptionsBuilder(fetchOptsBuilderCfg)
tagOpts := models.NewTagOptions()
limitsConfig := config.LimitsConfiguration{}
keepNans := false
keepNaNs := false

opts := options.EmptyHandlerOptions().
SetEngine(engine).
Expand All @@ -219,7 +219,7 @@ func newTestSetup(
SetConfig(config.Configuration{
Limits: limitsConfig,
ResultOptions: config.ResultOptions{
KeepNans: keepNans,
KeepNaNs: keepNaNs,
},
})

Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/prometheus/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func TestMultipleRead(t *testing.T) {
handlerOpts := options.EmptyHandlerOptions().SetEngine(engine).
SetConfig(config.Configuration{
ResultOptions: config.ResultOptions{
KeepNans: true,
KeepNaNs: true,
},
})

Expand Down Expand Up @@ -454,7 +454,7 @@ func TestReadWithOptions(t *testing.T) {
handlerOpts := options.EmptyHandlerOptions().SetEngine(engine).
SetConfig(config.Configuration{
ResultOptions: config.ResultOptions{
KeepNans: true,
KeepNaNs: true,
},
})

Expand Down
2 changes: 2 additions & 0 deletions src/query/block/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type ResultMetadata struct {
Warnings Warnings
// Resolutions is a list of resolutions for series obtained by this query.
Resolutions []time.Duration
// KeepNaNs indicates if NaNs should be kept when returning results.
KeepNaNs bool
}

// NewResultMetadata creates a new result metadata.
Expand Down
184 changes: 145 additions & 39 deletions src/query/functions/aggregation/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,39 +39,36 @@ const (
TopKType = "topk"
)

type takeFunc func(values []float64, buckets [][]int) []float64
type valueAndMeta struct {
val float64
seriesMeta block.SeriesMeta
}

type takeFunc func(heap utils.FloatHeap, values []float64, buckets [][]int) []float64
type takeInstantFunc func(heap utils.FloatHeap, values []float64, buckets [][]int, seriesMetas []block.SeriesMeta) []valueAndMeta

// NewTakeOp creates a new takeK operation
func NewTakeOp(
opType string,
params NodeParams,
) (parser.Params, error) {
takeTop := opType == TopKType
if !takeTop && opType != BottomKType {
return baseOp{}, fmt.Errorf("operator not supported: %s", opType)
}

var fn takeFunc
k := int(params.Parameter)
if k < 1 {
fn = func(values []float64, buckets [][]int) []float64 {
return takeNone(values, buckets)
}
} else {
heap := utils.NewFloatHeap(takeTop, k)
fn = func(values []float64, buckets [][]int) []float64 {
return takeFn(heap, values, buckets)
}
fn := func(heap utils.FloatHeap, values []float64, buckets [][]int) []float64 {
return takeFn(heap, values, buckets)
}

return newTakeOp(params, opType, fn), nil
fnInstant := func(heap utils.FloatHeap, values []float64, buckets [][]int, seriesMetas []block.SeriesMeta) []valueAndMeta {
return takeInstantFn(heap, values, buckets, seriesMetas)
}
return newTakeOp(params, opType, k, fn, fnInstant), nil
}

// takeOp stores required properties for take ops
type takeOp struct {
params NodeParams
opType string
takeFunc takeFunc
params NodeParams
opType string
k int
takeFunc takeFunc
takeInstantFunc takeInstantFunc
}

// OpType for the operator
Expand All @@ -95,11 +92,13 @@ func (o takeOp) Node(
}
}

func newTakeOp(params NodeParams, opType string, takeFunc takeFunc) takeOp {
func newTakeOp(params NodeParams, opType string, k int, takeFunc takeFunc, takeInstantFunc takeInstantFunc) takeOp {
return takeOp{
params: params,
opType: opType,
takeFunc: takeFunc,
params: params,
opType: opType,
k: k,
takeFunc: takeFunc,
takeInstantFunc: takeInstantFunc,
}
}

Expand All @@ -126,6 +125,12 @@ func (n *takeNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID,
return nil, err
}

instantaneous := queryCtx.Options.Instantaneous
takeTop := n.op.opType == TopKType
if !takeTop && n.op.opType != BottomKType {
return nil, fmt.Errorf("operator not supported: %s", n.op.opType)
}

params := n.op.params
meta := b.Meta()
seriesMetas := utils.FlattenMetadata(meta, stepIter.SeriesMeta())
Expand All @@ -136,8 +141,23 @@ func (n *takeNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID,
seriesMetas,
)

// retain original metadatas
builder, err := n.controller.BlockBuilder(queryCtx, meta, stepIter.SeriesMeta())
seriesCount := maxSeriesCount(buckets)
if instantaneous {
heapSize := seriesCount
if n.op.k < seriesCount {
heapSize = n.op.k
}

heap := utils.NewFloatHeap(takeTop, heapSize)
return n.processBlockInstantaneous(heap, queryCtx, meta, stepIter, seriesMetas, buckets)
}

if n.op.k >= seriesCount {
return b, nil
}

heap := utils.NewFloatHeap(takeTop, n.op.k)
builder, err := n.controller.BlockBuilder(queryCtx, meta, seriesMetas)
if err != nil {
return nil, err
}
Expand All @@ -147,34 +167,95 @@ func (n *takeNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID,
}

for index := 0; stepIter.Next(); index++ {
step := stepIter.Current()
values := step.Values()
aggregatedValues := n.op.takeFunc(values, buckets)
if err := builder.AppendValues(index, aggregatedValues); err != nil {
values := stepIter.Current().Values()
if err := builder.AppendValues(index, n.op.takeFunc(heap, values, buckets)); err != nil {
return nil, err
}
}

if err = stepIter.Err(); err != nil {
return nil, err
}

return builder.Build(), nil
}

// shortcut to return empty when taking <= 0 values
func takeNone(values []float64, buckets [][]int) []float64 {
util.Memset(values, math.NaN())
return values
func maxSeriesCount(buckets [][]int) int {
result := 0

for _, bucket := range buckets {
if len(bucket) > result {
result = len(bucket)
}
}

return result
}

func (n *takeNode) processBlockInstantaneous(
heap utils.FloatHeap,
queryCtx *models.QueryContext,
metadata block.Metadata,
stepIter block.StepIter,
seriesMetas []block.SeriesMeta,
buckets [][]int) (block.Block, error) {
ixLastStep := stepIter.StepCount() - 1 //we only care for the last step values for the instant query
for i := 0; i <= ixLastStep; i++ {
if !stepIter.Next() {
return nil, fmt.Errorf("invalid step count; expected %d got %d", stepIter.StepCount(), i+1)
}
}
metadata.ResultMetadata.KeepNaNs = true
values := stepIter.Current().Values()
takenSortedValues := n.op.takeInstantFunc(heap, values, buckets, seriesMetas)
blockValues, blockSeries := mapToValuesAndSeriesMetas(takenSortedValues)

//adjust bounds to contain single step
time, err := metadata.Bounds.TimeForIndex(ixLastStep)
if err != nil {
return nil, err
}
metadata.Bounds = models.Bounds{
Start: time,
Duration: metadata.Bounds.StepSize,
StepSize: metadata.Bounds.StepSize,
}

blockBuilder, err := n.controller.BlockBuilder(queryCtx, metadata, blockSeries)
if err != nil {
return nil, err
}
if err = blockBuilder.AddCols(1); err != nil {
return nil, err
}
if err := blockBuilder.AppendValues(0, blockValues); err != nil {
return nil, err
}
if err = stepIter.Err(); err != nil {
return nil, err
}
return blockBuilder.Build(), nil
}

func mapToValuesAndSeriesMetas(takenSortedValues []valueAndMeta) ([]float64, []block.SeriesMeta) {
blockValues := make([]float64, 0, len(takenSortedValues))
blockSeries := make([]block.SeriesMeta, 0, len(takenSortedValues))
for _, sortedValue := range takenSortedValues {
blockValues = append(blockValues, sortedValue.val)
blockSeries = append(blockSeries, sortedValue.seriesMeta)
}
return blockValues, blockSeries
}

func takeFn(heap utils.FloatHeap, values []float64, buckets [][]int) []float64 {
cap := heap.Cap()
capacity := heap.Cap()
if capacity < 1 {
util.Memset(values, math.NaN())
return values
}
for _, bucket := range buckets {
// If this bucket's length is less than or equal to the heap's
// capacity do not need to clear any values from the input vector,
// as they are all included in the output.
if len(bucket) <= cap {
if len(bucket) <= capacity {
continue
}

Expand All @@ -198,3 +279,28 @@ func takeFn(heap utils.FloatHeap, values []float64, buckets [][]int) []float64 {

return values
}

func takeInstantFn(heap utils.FloatHeap, values []float64, buckets [][]int, metas []block.SeriesMeta) []valueAndMeta {
var result = make([]valueAndMeta, 0, heap.Cap())
if heap.Cap() < 1 {
return result
}
for _, bucket := range buckets {
for _, idx := range bucket {
val := values[idx]
heap.Push(val, idx)
}

valIndexPairs := heap.OrderedFlush()
for _, pair := range valIndexPairs {
prevIndex := pair.Index
prevMeta := metas[prevIndex]

result = append(result, valueAndMeta{
val: pair.Val,
seriesMeta: prevMeta,
})
}
}
return result
}
Loading

0 comments on commit 4129dbb

Please sign in to comment.