Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] sort, topk, bottomk fixes for instant queries #2792

Merged
merged 21 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ type FilterConfiguration struct {

// ResultOptions are the result options for query.
type ResultOptions struct {
// KeepNans keeps NaNs before returning query results.
// KeepNaNs keeps NaNs before returning query results.
// The default is false, which matches Prometheus
KeepNans bool `yaml:"keepNans"`
KeepNaNs bool `yaml:"keepNans"`
}

// QueryConfiguration is the query configuration.
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ func TestTagOptionsConfig(t *testing.T) {

func TestKeepNaNsDefault(t *testing.T) {
r := ResultOptions{
KeepNans: true,
KeepNaNs: true,
}
assert.Equal(t, true, r.KeepNans)
assert.Equal(t, true, r.KeepNaNs)

r = ResultOptions{}
assert.Equal(t, false, r.KeepNans)
assert.Equal(t, false, r.KeepNaNs)
}
9 changes: 7 additions & 2 deletions src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,20 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
handleroptions.AddWarningHeaders(w, result.Meta)
h.promReadMetrics.fetchSuccess.Inc(1)

keepNaNs := h.opts.Config().ResultOptions.KeepNaNs
if !keepNaNs {
keepNaNs = result.Meta.KeepNaNs
}

if h.instant {
renderResultsInstantaneousJSON(w, result, h.opts.Config().ResultOptions.KeepNans)
renderResultsInstantaneousJSON(w, result, keepNaNs)
return
}

err = RenderResultsJSON(w, result, RenderResultsOptions{
Start: parsedOptions.Params.Start,
End: parsedOptions.Params.End,
KeepNaNs: h.opts.Config().ResultOptions.KeepNans,
KeepNaNs: h.opts.Config().ResultOptions.KeepNaNs,
})

if err != nil {
Expand Down
1 change: 1 addition & 0 deletions src/query/api/v1/handler/prometheus/native/read_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func parseRequest(
QueryContextOptions: models.QueryContextOptions{
LimitMaxTimeseries: fetchOpts.SeriesLimit,
LimitMaxDocs: fetchOpts.DocsLimit,
Instantaneous: instantaneous,
}}

restrictOpts := fetchOpts.RestrictQueryOptions.GetRestrictByType()
Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/prometheus/native/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func newTestSetup(
fetchOptsBuilder := handleroptions.NewFetchOptionsBuilder(fetchOptsBuilderCfg)
tagOpts := models.NewTagOptions()
limitsConfig := config.LimitsConfiguration{}
keepNans := false
keepNaNs := false

opts := options.EmptyHandlerOptions().
SetEngine(engine).
Expand All @@ -219,7 +219,7 @@ func newTestSetup(
SetConfig(config.Configuration{
Limits: limitsConfig,
ResultOptions: config.ResultOptions{
KeepNans: keepNans,
KeepNaNs: keepNaNs,
},
})

Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/prometheus/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func TestMultipleRead(t *testing.T) {
handlerOpts := options.EmptyHandlerOptions().SetEngine(engine).
SetConfig(config.Configuration{
ResultOptions: config.ResultOptions{
KeepNans: true,
KeepNaNs: true,
},
})

Expand Down Expand Up @@ -454,7 +454,7 @@ func TestReadWithOptions(t *testing.T) {
handlerOpts := options.EmptyHandlerOptions().SetEngine(engine).
SetConfig(config.Configuration{
ResultOptions: config.ResultOptions{
KeepNans: true,
KeepNaNs: true,
},
})

Expand Down
2 changes: 2 additions & 0 deletions src/query/block/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type ResultMetadata struct {
Warnings Warnings
// Resolutions is a list of resolutions for series obtained by this query.
Resolutions []time.Duration
// KeepNaNs indicates if NaNs should be kept when returning results.
KeepNaNs bool
}

// NewResultMetadata creates a new result metadata.
Expand Down
184 changes: 145 additions & 39 deletions src/query/functions/aggregation/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,39 +39,36 @@ const (
TopKType = "topk"
)

type takeFunc func(values []float64, buckets [][]int) []float64
type valueAndMeta struct {
val float64
seriesMeta block.SeriesMeta
}

type takeFunc func(heap utils.FloatHeap, values []float64, buckets [][]int) []float64
type takeInstantFunc func(heap utils.FloatHeap, values []float64, buckets [][]int, seriesMetas []block.SeriesMeta) []valueAndMeta

// NewTakeOp creates a new takeK operation
func NewTakeOp(
opType string,
params NodeParams,
) (parser.Params, error) {
takeTop := opType == TopKType
if !takeTop && opType != BottomKType {
return baseOp{}, fmt.Errorf("operator not supported: %s", opType)
}

var fn takeFunc
k := int(params.Parameter)
if k < 1 {
fn = func(values []float64, buckets [][]int) []float64 {
return takeNone(values, buckets)
}
} else {
heap := utils.NewFloatHeap(takeTop, k)
fn = func(values []float64, buckets [][]int) []float64 {
return takeFn(heap, values, buckets)
}
fn := func(heap utils.FloatHeap, values []float64, buckets [][]int) []float64 {
return takeFn(heap, values, buckets)
}

return newTakeOp(params, opType, fn), nil
fnInstant := func(heap utils.FloatHeap, values []float64, buckets [][]int, seriesMetas []block.SeriesMeta) []valueAndMeta {
return takeInstantFn(heap, values, buckets, seriesMetas)
}
return newTakeOp(params, opType, k, fn, fnInstant), nil
}

// takeOp stores required properties for take ops
type takeOp struct {
params NodeParams
opType string
takeFunc takeFunc
params NodeParams
opType string
k int
takeFunc takeFunc
takeInstantFunc takeInstantFunc
}

// OpType for the operator
Expand All @@ -95,11 +92,13 @@ func (o takeOp) Node(
}
}

func newTakeOp(params NodeParams, opType string, takeFunc takeFunc) takeOp {
func newTakeOp(params NodeParams, opType string, k int, takeFunc takeFunc, takeInstantFunc takeInstantFunc) takeOp {
return takeOp{
params: params,
opType: opType,
takeFunc: takeFunc,
params: params,
opType: opType,
k: k,
takeFunc: takeFunc,
takeInstantFunc: takeInstantFunc,
}
}

Expand All @@ -126,6 +125,12 @@ func (n *takeNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID,
return nil, err
}

instantaneous := queryCtx.Options.Instantaneous
takeTop := n.op.opType == TopKType
if !takeTop && n.op.opType != BottomKType {
return nil, fmt.Errorf("operator not supported: %s", n.op.opType)
}

params := n.op.params
meta := b.Meta()
seriesMetas := utils.FlattenMetadata(meta, stepIter.SeriesMeta())
Expand All @@ -136,8 +141,23 @@ func (n *takeNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID,
seriesMetas,
)

// retain original metadatas
builder, err := n.controller.BlockBuilder(queryCtx, meta, stepIter.SeriesMeta())
seriesCount := maxSeriesCount(buckets)
if instantaneous {
heapSize := seriesCount
if n.op.k < seriesCount {
heapSize = n.op.k
}

heap := utils.NewFloatHeap(takeTop, heapSize)
return n.processBlockInstantaneous(heap, queryCtx, meta, stepIter, seriesMetas, buckets)
}

if n.op.k >= seriesCount {
return b, nil
}

heap := utils.NewFloatHeap(takeTop, n.op.k)
builder, err := n.controller.BlockBuilder(queryCtx, meta, seriesMetas)
if err != nil {
return nil, err
}
Expand All @@ -147,34 +167,95 @@ func (n *takeNode) ProcessBlock(queryCtx *models.QueryContext, ID parser.NodeID,
}

for index := 0; stepIter.Next(); index++ {
step := stepIter.Current()
values := step.Values()
aggregatedValues := n.op.takeFunc(values, buckets)
if err := builder.AppendValues(index, aggregatedValues); err != nil {
values := stepIter.Current().Values()
if err := builder.AppendValues(index, n.op.takeFunc(heap, values, buckets)); err != nil {
return nil, err
}
}

if err = stepIter.Err(); err != nil {
return nil, err
}

return builder.Build(), nil
}

// shortcut to return empty when taking <= 0 values
func takeNone(values []float64, buckets [][]int) []float64 {
util.Memset(values, math.NaN())
return values
func maxSeriesCount(buckets [][]int) int {
result := 0

for _, bucket := range buckets {
if len(bucket) > result {
result = len(bucket)
}
}

return result
}

func (n *takeNode) processBlockInstantaneous(
heap utils.FloatHeap,
queryCtx *models.QueryContext,
metadata block.Metadata,
stepIter block.StepIter,
seriesMetas []block.SeriesMeta,
buckets [][]int) (block.Block, error) {
ixLastStep := stepIter.StepCount() - 1 //we only care for the last step values for the instant query
for i := 0; i <= ixLastStep; i++ {
if !stepIter.Next() {
return nil, fmt.Errorf("invalid step count; expected %d got %d", stepIter.StepCount(), i+1)
}
}
metadata.ResultMetadata.KeepNaNs = true
values := stepIter.Current().Values()
takenSortedValues := n.op.takeInstantFunc(heap, values, buckets, seriesMetas)
blockValues, blockSeries := mapToValuesAndSeriesMetas(takenSortedValues)

//adjust bounds to contain single step
time, err := metadata.Bounds.TimeForIndex(ixLastStep)
if err != nil {
return nil, err
}
metadata.Bounds = models.Bounds{
Start: time,
Duration: metadata.Bounds.StepSize,
StepSize: metadata.Bounds.StepSize,
}

blockBuilder, err := n.controller.BlockBuilder(queryCtx, metadata, blockSeries)
if err != nil {
return nil, err
}
if err = blockBuilder.AddCols(1); err != nil {
return nil, err
}
if err := blockBuilder.AppendValues(0, blockValues); err != nil {
return nil, err
}
if err = stepIter.Err(); err != nil {
return nil, err
}
return blockBuilder.Build(), nil
}

func mapToValuesAndSeriesMetas(takenSortedValues []valueAndMeta) ([]float64, []block.SeriesMeta) {
blockValues := make([]float64, 0, len(takenSortedValues))
blockSeries := make([]block.SeriesMeta, 0, len(takenSortedValues))
for _, sortedValue := range takenSortedValues {
blockValues = append(blockValues, sortedValue.val)
blockSeries = append(blockSeries, sortedValue.seriesMeta)
}
return blockValues, blockSeries
}

func takeFn(heap utils.FloatHeap, values []float64, buckets [][]int) []float64 {
cap := heap.Cap()
capacity := heap.Cap()
if capacity < 1 {
util.Memset(values, math.NaN())
return values
}
for _, bucket := range buckets {
// If this bucket's length is less than or equal to the heap's
// capacity do not need to clear any values from the input vector,
// as they are all included in the output.
if len(bucket) <= cap {
if len(bucket) <= capacity {
continue
}

Expand All @@ -198,3 +279,28 @@ func takeFn(heap utils.FloatHeap, values []float64, buckets [][]int) []float64 {

return values
}

func takeInstantFn(heap utils.FloatHeap, values []float64, buckets [][]int, metas []block.SeriesMeta) []valueAndMeta {
var result = make([]valueAndMeta, 0, heap.Cap())
if heap.Cap() < 1 {
return result
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
}
for _, bucket := range buckets {
for _, idx := range bucket {
val := values[idx]
heap.Push(val, idx)
}

valIndexPairs := heap.OrderedFlush()
for _, pair := range valIndexPairs {
prevIndex := pair.Index
prevMeta := metas[prevIndex]

result = append(result, valueAndMeta{
val: pair.Val,
seriesMeta: prevMeta,
})
}
}
return result
}
Loading