Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: copy row data instead of refereeing chunk.Row in some window functions. (#11678) #11823

Merged
merged 5 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func buildLeadLag(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) baseLeadLag
args: aggFuncDesc.Args,
ordinal: ordinal,
}
return baseLeadLag{baseAggFunc: base, offset: offset, defaultExpr: defaultExpr, valueEvaluator: buildValueEvaluator(aggFuncDesc.RetTp)}
return baseLeadLag{baseAggFunc: base, offset: offset, defaultExpr: defaultExpr, retTp: aggFuncDesc.RetTp}
}

func buildLead(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
Expand Down
24 changes: 11 additions & 13 deletions executor/aggfuncs/func_cume_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,33 @@ type cumeDist struct {
}

type partialResult4CumeDist struct {
curIdx int
lastRank int
rows []chunk.Row
partialResult4Rank
cum int64
}

func (r *cumeDist) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4Rank{})
return PartialResult(&partialResult4CumeDist{})
}

func (r *cumeDist) ResetPartialResult(pr PartialResult) {
p := (*partialResult4Rank)(pr)
p.curIdx = 0
p.lastRank = 0
p.rows = p.rows[:0]
p := (*partialResult4CumeDist)(pr)
p.partialResult4Rank.reset()
p.cum = 0
}

func (r *cumeDist) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4CumeDist)(pr)
p.rows = append(p.rows, rowsInGroup...)
p.partialResult4Rank.updatePartialResult(rowsInGroup, false, r.compareRows)
return nil
}

func (r *cumeDist) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4CumeDist)(pr)
numRows := len(p.rows)
for p.lastRank < numRows && r.compareRows(p.rows[p.curIdx], p.rows[p.lastRank]) == 0 {
p.lastRank++
numRows := int64(len(p.results))
for p.cum < numRows && p.results[p.cum] == p.results[p.curIdx] {
p.cum++
}
p.curIdx++
chk.AppendFloat64(r.ordinal, float64(p.lastRank)/float64(numRows))
chk.AppendFloat64(r.ordinal, float64(p.cum)/float64(numRows))
return nil
}
188 changes: 156 additions & 32 deletions executor/aggfuncs/func_lead_lag.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,76 +14,200 @@
package aggfuncs

import (
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

type baseLeadLag struct {
baseAggFunc
valueEvaluator // TODO: move it to partial result when parallel execution is supported.

defaultExpr expression.Expression
offset uint64
retTp *types.FieldType
}

type partialResult4LeadLag struct {
rows []chunk.Row
curIdx uint64
type circleBuf struct {
buf []valueExtractor
head, tail int
size int
}

func (v *baseLeadLag) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4LeadLag{})
func (cb *circleBuf) reset() {
cb.buf = cb.buf[:0]
cb.head, cb.tail = 0, 0
}

func (v *baseLeadLag) ResetPartialResult(pr PartialResult) {
p := (*partialResult4LeadLag)(pr)
p.rows = p.rows[:0]
p.curIdx = 0
func (cb *circleBuf) append(e valueExtractor) {
if len(cb.buf) < cb.size {
cb.buf = append(cb.buf, e)
cb.tail++
} else {
if cb.tail >= cb.size {
cb.tail = 0
}
cb.buf[cb.tail] = e
cb.tail++
}
}

func (v *baseLeadLag) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4LeadLag)(pr)
p.rows = append(p.rows, rowsInGroup...)
return nil
func (cb *circleBuf) get() (e valueExtractor) {
if len(cb.buf) < cb.size {
e = cb.buf[cb.head]
cb.head++
} else {
if cb.tail >= cb.size {
cb.tail = 0
}
e = cb.buf[cb.tail]
cb.tail++
}
return e
}

type partialResult4Lead struct {
seenRows uint64
curIdx int
extractors []valueExtractor
defaultExtractors circleBuf
defaultConstExtractor valueExtractor
}

const maxDefaultExtractorBufferSize = 1000

type lead struct {
baseLeadLag
}

func (v *lead) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4Lead{
defaultExtractors: circleBuf{
// Do not use v.offset directly since v.offset is defined by user
// and may larger than a table size.
buf: make([]valueExtractor, 0, mathutil.MinUint64(v.offset, maxDefaultExtractorBufferSize)),
size: int(v.offset),
},
})
}

func (v *lead) ResetPartialResult(pr PartialResult) {
p := (*partialResult4Lead)(pr)
p.seenRows = 0
p.curIdx = 0
p.extractors = p.extractors[:0]
p.defaultExtractors.reset()
p.defaultConstExtractor = nil
}

func (v *lead) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4Lead)(pr)
for _, row := range rowsInGroup {
p.seenRows++
if p.seenRows > v.offset {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.args[0], row)
if err != nil {
return err
}
p.extractors = append(p.extractors, e)
}
if v.offset > 0 {
if !v.defaultExpr.ConstItem() {
// We must cache the results of last v.offset lines.
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, row)
if err != nil {
return err
}
p.defaultExtractors.append(e)
} else if p.defaultConstExtractor == nil {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, chunk.Row{})
if err != nil {
return err
}
p.defaultConstExtractor = e
}
}
}
return nil
}

func (v *lead) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4LeadLag)(pr)
var err error
if p.curIdx+v.offset < uint64(len(p.rows)) {
err = v.evaluateRow(sctx, v.args[0], p.rows[p.curIdx+v.offset])
p := (*partialResult4Lead)(pr)
var e valueExtractor
if p.curIdx < len(p.extractors) {
e = p.extractors[p.curIdx]
} else {
err = v.evaluateRow(sctx, v.defaultExpr, p.rows[p.curIdx])
if !v.defaultExpr.ConstItem() {
e = p.defaultExtractors.get()
} else {
e = p.defaultConstExtractor
}
}
if err != nil {
return err
}
v.appendResult(chk, v.ordinal)
e.appendResult(chk, v.ordinal)
p.curIdx++
return nil
}

type partialResult4Lag struct {
seenRows uint64
curIdx uint64
extractors []valueExtractor
defaultExtractors []valueExtractor
}

type lag struct {
baseLeadLag
}

func (v *lag) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4Lag{
defaultExtractors: make([]valueExtractor, 0, mathutil.MinUint64(v.offset, maxDefaultExtractorBufferSize)),
})
}

func (v *lag) ResetPartialResult(pr PartialResult) {
p := (*partialResult4Lag)(pr)
p.seenRows = 0
p.curIdx = 0
p.extractors = p.extractors[:0]
p.defaultExtractors = p.defaultExtractors[:0]
}

func (v *lag) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4Lag)(pr)
for _, row := range rowsInGroup {
p.seenRows++
if p.seenRows <= v.offset {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, row)
if err != nil {
return err
}
p.defaultExtractors = append(p.defaultExtractors, e)
}
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.args[0], row)
if err != nil {
return err
}
p.extractors = append(p.extractors, e)
}
return nil
}

func (v *lag) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4LeadLag)(pr)
var err error
if p.curIdx >= v.offset {
err = v.evaluateRow(sctx, v.args[0], p.rows[p.curIdx-v.offset])
p := (*partialResult4Lag)(pr)
var e valueExtractor
if p.curIdx < v.offset {
e = p.defaultExtractors[p.curIdx]
} else {
err = v.evaluateRow(sctx, v.defaultExpr, p.rows[p.curIdx])
}
if err != nil {
return err
e = p.extractors[p.curIdx-v.offset]
}
v.appendResult(chk, v.ordinal)
e.appendResult(chk, v.ordinal)
p.curIdx++
return nil
}
Loading