Skip to content

Commit

Permalink
expression: Support stddev_pop function (pingcap#19195) (pingcap#19540)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Sep 2, 2020
1 parent dfc45a6 commit 11a9b55
Show file tree
Hide file tree
Showing 11 changed files with 430 additions and 15 deletions.
46 changes: 46 additions & 0 deletions executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func Build(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDesc, ordinal
return buildBitXor(aggFuncDesc, ordinal)
case ast.AggFuncBitAnd:
return buildBitAnd(aggFuncDesc, ordinal)
case ast.AggFuncVarPop:
return buildVarPop(aggFuncDesc, ordinal)
case ast.AggFuncStddevPop:
return buildStdDevPop(aggFuncDesc, ordinal)
}
return nil
}
Expand Down Expand Up @@ -362,6 +366,48 @@ func buildBitAnd(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
return &bitAndUint64{baseBitAggFunc{base}}
}

// buildVarPop builds the AggFunc implementation for function "VAR_POP".
func buildVarPop(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
base := baseVarPopAggFunc{
baseAggFunc{
args: aggFuncDesc.Args,
ordinal: ordinal,
},
}
switch aggFuncDesc.Mode {
case aggregation.DedupMode:
return nil
default:
if aggFuncDesc.HasDistinct {
return &varPop4DistinctFloat64{base}
}
return &varPop4Float64{base}
}
}

// buildStdDevPop builds the AggFunc implementation for function "STD()/STDDEV()/STDDEV_POP()"
func buildStdDevPop(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
base := baseStdDevPopAggFunc{
varPop4Float64{
baseVarPopAggFunc{
baseAggFunc{
args: aggFuncDesc.Args,
ordinal: ordinal,
},
},
},
}
switch aggFuncDesc.Mode {
case aggregation.DedupMode:
return nil
default:
if aggFuncDesc.HasDistinct {
return &stdDevPop4DistinctFloat64{base}
}
return &stdDevPop4Float64{base}
}
}

// buildRowNumber builds the AggFunc implementation for function "ROW_NUMBER".
func buildRowNumber(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
base := baseAggFunc{
Expand Down
55 changes: 55 additions & 0 deletions executor/aggfuncs/func_stddevpop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package aggfuncs

import (
"math"

"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
)

type baseStdDevPopAggFunc struct {
varPop4Float64
}

type stdDevPop4Float64 struct {
baseStdDevPopAggFunc
}

func (e *stdDevPop4Float64) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4VarPopFloat64)(pr)
if p.count == 0 {
chk.AppendNull(e.ordinal)
return nil
}
variance := p.variance / float64(p.count)
chk.AppendFloat64(e.ordinal, math.Sqrt(variance))
return nil
}

type stdDevPop4DistinctFloat64 struct {
baseStdDevPopAggFunc
}

func (e *stdDevPop4DistinctFloat64) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4VarPopDistinctFloat64)(pr)
if p.count == 0 {
chk.AppendNull(e.ordinal)
return nil
}
variance := p.variance / float64(p.count)
chk.AppendFloat64(e.ordinal, math.Sqrt(variance))
return nil
}
25 changes: 25 additions & 0 deletions executor/aggfuncs/func_stddevpop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package aggfuncs_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
)

func (s *testSuite) TestMergePartialResult4Stddevpop(c *C) {
tests := []aggTest{
buildAggTester(ast.AggFuncStddevPop, mysql.TypeDouble, 5, 1.4142135623730951, 0.816496580927726, 1.3169567191065923),
}
for _, test := range tests {
s.testMergePartialResult(c, test)
}
}

func (s *testSuite) TestStddevpop(c *C) {
tests := []aggTest{
buildAggTester(ast.AggFuncStddevPop, mysql.TypeDouble, 5, nil, 1.4142135623730951),
}
for _, test := range tests {
s.testAggFunc(c, test)
}
}
169 changes: 169 additions & 0 deletions executor/aggfuncs/func_varpop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package aggfuncs

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/set"
)

type baseVarPopAggFunc struct {
baseAggFunc
}

type varPop4Float64 struct {
baseVarPopAggFunc
}

type partialResult4VarPopFloat64 struct {
count int64
sum float64
variance float64
}

func (e *varPop4Float64) AllocPartialResult() (pr PartialResult) {
return PartialResult(&partialResult4VarPopFloat64{})
}

func (e *varPop4Float64) ResetPartialResult(pr PartialResult) {
p := (*partialResult4VarPopFloat64)(pr)
p.count = 0
p.sum = 0
p.variance = 0
}

func (e *varPop4Float64) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4VarPopFloat64)(pr)
if p.count == 0 {
chk.AppendNull(e.ordinal)
return nil
}
variance := p.variance / float64(p.count)
chk.AppendFloat64(e.ordinal, variance)
return nil
}

func calculateIntermediate(count int64, sum float64, input float64, variance float64) float64 {
t := float64(count)*input - sum
variance += (t * t) / (float64(count * (count - 1)))
return variance
}

func (e *varPop4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4VarPopFloat64)(pr)
for _, row := range rowsInGroup {
input, isNull, err := e.args[0].EvalReal(sctx, row)
if err != nil {
return errors.Trace(err)
}
if isNull {
continue
}
p.count++
p.sum += input
if p.count > 1 {
p.variance = calculateIntermediate(p.count, p.sum, input, p.variance)
}
}
return nil
}

func calculateMerge(srcCount, dstCount int64, srcSum, dstSum, srcVariance, dstVariance float64) float64 {
srcCountFloat64 := float64(srcCount)
dstCountFloat64 := float64(dstCount)

t := (srcCountFloat64/dstCountFloat64)*dstSum - srcSum
dstVariance += srcVariance + ((dstCountFloat64/srcCountFloat64)/(dstCountFloat64+srcCountFloat64))*t*t
return dstVariance
}

func (e *varPop4Float64) MergePartialResult(sctx sessionctx.Context, src, dst PartialResult) (err error) {
p1, p2 := (*partialResult4VarPopFloat64)(src), (*partialResult4VarPopFloat64)(dst)
if p1.count == 0 {
return nil
}
if p2.count == 0 {
p2.count = p1.count
p2.sum = p1.sum
p2.variance = p1.variance
return nil
}
if p2.count != 0 && p1.count != 0 {
p2.variance = calculateMerge(p1.count, p2.count, p1.sum, p2.sum, p1.variance, p2.variance)
p2.count += p1.count
p2.sum += p1.sum
}
return nil
}

type varPop4DistinctFloat64 struct {
baseVarPopAggFunc
}

type partialResult4VarPopDistinctFloat64 struct {
count int64
sum float64
variance float64
valSet set.Float64Set
}

func (e *varPop4DistinctFloat64) AllocPartialResult() (pr PartialResult) {
p := new(partialResult4VarPopDistinctFloat64)
p.count = 0
p.sum = 0
p.variance = 0
p.valSet = set.NewFloat64Set()
return PartialResult(p)
}

func (e *varPop4DistinctFloat64) ResetPartialResult(pr PartialResult) {
p := (*partialResult4VarPopDistinctFloat64)(pr)
p.count = 0
p.sum = 0
p.variance = 0
p.valSet = set.NewFloat64Set()
}

func (e *varPop4DistinctFloat64) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4VarPopDistinctFloat64)(pr)
if p.count == 0 {
chk.AppendNull(e.ordinal)
return nil
}
variance := p.variance / float64(p.count)
chk.AppendFloat64(e.ordinal, variance)
return nil
}

func (e *varPop4DistinctFloat64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4VarPopDistinctFloat64)(pr)
for _, row := range rowsInGroup {
input, isNull, err := e.args[0].EvalReal(sctx, row)
if err != nil {
return errors.Trace(err)
}
if isNull || p.valSet.Exist(input) {
continue
}
p.valSet.Insert(input)
p.count++
p.sum += input
if p.count > 1 {
p.variance = calculateIntermediate(p.count, p.sum, input, p.variance)
}
}
return nil
}
26 changes: 26 additions & 0 deletions executor/aggfuncs/func_varpop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package aggfuncs_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/types"
)

func (s *testSuite) TestMergePartialResult4Varpop(c *C) {
tests := []aggTest{
buildAggTester(ast.AggFuncVarPop, mysql.TypeDouble, 5, types.NewFloat64Datum(float64(2)), types.NewFloat64Datum(float64(2)/float64(3)), types.NewFloat64Datum(float64(59)/float64(8)-float64(19*19)/float64(8*8))),
}
for _, test := range tests {
s.testMergePartialResult(c, test)
}
}

func (s *testSuite) TestVarpop(c *C) {
tests := []aggTest{
buildAggTester(ast.AggFuncVarPop, mysql.TypeDouble, 5, nil, types.NewFloat64Datum(float64(2))),
}
for _, test := range tests {
s.testAggFunc(c, test)
}
}
Loading

0 comments on commit 11a9b55

Please sign in to comment.