-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor/join : use shallow copy for join. #7433
Changes from 34 commits
a576cc0
79b74e7
d9fe98b
8fa2ad8
33ae5a0
255c64b
0973373
055a41a
c5eeff9
e7c6c64
ce8eb4f
499b6f9
2a295a1
9d82447
ef85948
45b4631
5bf279f
9690506
8db639f
7a55ff5
05c1273
66a133c
dadb047
b4192e4
4096997
b802941
c5cfdf1
947f9d4
24ab90e
2b8d896
3f82d2b
604e49d
e5f4cbe
abbc2c9
e1dd31d
593b31c
3a6fbb7
600fdc3
f4fbd70
23eaf1e
e9ef7dd
21b5417
0aadbf6
0de2063
c681658
f8ccdf2
c7b2301
1e8a9f0
b939b3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,7 +91,7 @@ func newJoiner(ctx sessionctx.Context, joinType plan.JoinType, | |
colTypes := make([]*types.FieldType, 0, len(lhsColTypes)+len(rhsColTypes)) | ||
colTypes = append(colTypes, lhsColTypes...) | ||
colTypes = append(colTypes, rhsColTypes...) | ||
base.chk = chunk.NewChunkWithCapacity(colTypes, ctx.GetSessionVars().MaxChunkSize) | ||
base.mutRow = chunk.MutRowFromTypes(colTypes) | ||
base.selected = make([]bool, 0, chunk.InitialCapacity) | ||
if joinType == plan.LeftOuterJoin || joinType == plan.RightOuterJoin { | ||
innerColTypes := lhsColTypes | ||
|
@@ -124,7 +124,7 @@ type baseJoiner struct { | |
conditions []expression.Expression | ||
defaultInner chunk.Row | ||
outerIsRight bool | ||
chk *chunk.Chunk | ||
mutRow chunk.MutRow | ||
selected []bool | ||
maxChunkSize int | ||
} | ||
|
@@ -142,6 +142,16 @@ func (j *baseJoiner) makeJoinRowToChunk(chk *chunk.Chunk, lhs, rhs chunk.Row) { | |
chk.AppendPartialRow(lhs.Len(), rhs) | ||
} | ||
|
||
// makeJoinRow combines inner, outer row into mutRow. | ||
// combines will uses shadow copy inner and outer row data to mutRow. | ||
func (j *baseJoiner) makeJoinRow(isRightJoin bool, inner, outer chunk.Row) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
if !isRightJoin { | ||
inner, outer = outer, inner | ||
} | ||
j.mutRow.ShadowCopyPartialRow(0, inner) | ||
j.mutRow.ShadowCopyPartialRow(inner.Len(), outer) | ||
} | ||
|
||
func (j *baseJoiner) filter(input, output *chunk.Chunk) (matched bool, err error) { | ||
j.selected, err = expression.VectorizedFilter(j.ctx, j.conditions, chunk.NewIterator4Chunk(input), j.selected) | ||
if err != nil { | ||
|
@@ -173,14 +183,9 @@ func (j *semiJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chu | |
} | ||
|
||
for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { | ||
j.chk.Reset() | ||
if j.outerIsRight { | ||
j.makeJoinRowToChunk(j.chk, inner, outer) | ||
} else { | ||
j.makeJoinRowToChunk(j.chk, outer, inner) | ||
} | ||
j.makeJoinRow(j.outerIsRight, inner, outer) | ||
|
||
matched, err = expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0)) | ||
matched, err = expression.EvalBool(j.ctx, j.conditions, j.mutRow.ToRow()) | ||
if err != nil { | ||
return false, errors.Trace(err) | ||
} | ||
|
@@ -212,14 +217,9 @@ func (j *antiSemiJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk | |
} | ||
|
||
for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { | ||
j.chk.Reset() | ||
if j.outerIsRight { | ||
j.makeJoinRowToChunk(j.chk, inner, outer) | ||
} else { | ||
j.makeJoinRowToChunk(j.chk, outer, inner) | ||
} | ||
j.makeJoinRow(j.outerIsRight, inner, outer) | ||
|
||
matched, err = expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0)) | ||
matched, err = expression.EvalBool(j.ctx, j.conditions, j.mutRow.ToRow()) | ||
if err != nil { | ||
return false, errors.Trace(err) | ||
} | ||
|
@@ -252,10 +252,9 @@ func (j *leftOuterSemiJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, | |
} | ||
|
||
for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { | ||
j.chk.Reset() | ||
j.makeJoinRowToChunk(j.chk, outer, inner) | ||
j.makeJoinRow(false, inner, outer) | ||
|
||
matched, err = expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0)) | ||
matched, err = expression.EvalBool(j.ctx, j.conditions, j.mutRow.ToRow()) | ||
if err != nil { | ||
return false, errors.Trace(err) | ||
} | ||
|
@@ -295,10 +294,9 @@ func (j *antiLeftOuterSemiJoiner) tryToMatch(outer chunk.Row, inners chunk.Itera | |
} | ||
|
||
for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { | ||
j.chk.Reset() | ||
j.makeJoinRowToChunk(j.chk, outer, inner) | ||
matched, err := expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0)) | ||
j.makeJoinRow(false, inner, outer) | ||
|
||
matched, err := expression.EvalBool(j.ctx, j.conditions, j.mutRow.ToRow()) | ||
if err != nil { | ||
return false, errors.Trace(err) | ||
} | ||
|
@@ -330,25 +328,7 @@ func (j *leftOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk | |
if inners.Len() == 0 { | ||
return false, nil | ||
} | ||
|
||
j.chk.Reset() | ||
chkForJoin := j.chk | ||
if len(j.conditions) == 0 { | ||
chkForJoin = chk | ||
} | ||
|
||
numToAppend := j.maxChunkSize - chk.NumRows() | ||
for ; inners.Current() != inners.End() && numToAppend > 0; numToAppend-- { | ||
j.makeJoinRowToChunk(chkForJoin, outer, inners.Current()) | ||
inners.Next() | ||
} | ||
if len(j.conditions) == 0 { | ||
return true, nil | ||
} | ||
|
||
// reach here, chkForJoin is j.chk | ||
matched, err := j.filter(chkForJoin, chk) | ||
return matched, errors.Trace(err) | ||
return j.tryToMatchInnerAndOuter(false, outer, inners, chk) | ||
} | ||
|
||
func (j *leftOuterJoiner) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { | ||
|
@@ -366,24 +346,7 @@ func (j *rightOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, ch | |
return false, nil | ||
} | ||
|
||
j.chk.Reset() | ||
chkForJoin := j.chk | ||
if len(j.conditions) == 0 { | ||
chkForJoin = chk | ||
} | ||
|
||
numToAppend := j.maxChunkSize - chk.NumRows() | ||
for ; inners.Current() != inners.End() && numToAppend > 0; numToAppend-- { | ||
j.makeJoinRowToChunk(chkForJoin, inners.Current(), outer) | ||
inners.Next() | ||
} | ||
if len(j.conditions) == 0 { | ||
return true, nil | ||
} | ||
|
||
// reach here, chkForJoin is j.chk | ||
matched, err := j.filter(chkForJoin, chk) | ||
return matched, errors.Trace(err) | ||
return j.tryToMatchInnerAndOuter(true, outer, inners, chk) | ||
} | ||
|
||
func (j *rightOuterJoiner) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { | ||
|
@@ -400,26 +363,29 @@ func (j *innerJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *ch | |
if inners.Len() == 0 { | ||
return false, nil | ||
} | ||
j.chk.Reset() | ||
chkForJoin := j.chk | ||
if len(j.conditions) == 0 { | ||
chkForJoin = chk | ||
} | ||
inner, numToAppend := inners.Current(), j.maxChunkSize-chk.NumRows() | ||
for ; inner != inners.End() && numToAppend > 0; inner, numToAppend = inners.Next(), numToAppend-1 { | ||
if j.outerIsRight { | ||
j.makeJoinRowToChunk(chkForJoin, inner, outer) | ||
} else { | ||
j.makeJoinRowToChunk(chkForJoin, outer, inner) | ||
|
||
return j.tryToMatchInnerAndOuter(j.outerIsRight, outer, inners, chk) | ||
} | ||
|
||
// tryToMatchInnerAndOuter does 2 things: | ||
// 1. Combine outer and inner row to join row. | ||
// 2. Check whether the join row matches the join conditions, if so, append it to the `outChk`. | ||
func (j *baseJoiner) tryToMatchInnerAndOuter(isRight bool, outer chunk.Row, inners chunk.Iterator, outChk *chunk.Chunk) (bool, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the execution logic of this function can be split to 2 stages:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we also can use this pattern in selectExec and other match then output execs as well if it works. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly, Nice catch! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should use an individual issue to trace and record the enhancement of the cache locality in the execution engine. But it's not easy to find the potential optimization opportunities without reading and analyzing the code. It may take us a long time to finish it. |
||
match := false | ||
numToAppend := j.maxChunkSize - outChk.NumRows() | ||
for inner := inners.Current(); inner != inners.End() && numToAppend > 0; inner, numToAppend = inners.Next(), numToAppend-1 { | ||
j.makeJoinRow(isRight, inner, outer) | ||
|
||
matched, err := expression.VectorizedFilterRow(j.ctx, j.conditions, j.mutRow.ToRow()) | ||
if err != nil { | ||
return false, errors.Trace(err) | ||
} | ||
if matched { | ||
match = true | ||
outChk.AppendRow(j.mutRow.ToRow()) | ||
} | ||
} | ||
if len(j.conditions) == 0 { | ||
return true, nil | ||
} | ||
|
||
// reach here, chkForJoin is j.chk | ||
matched, err := j.filter(chkForJoin, chk) | ||
return matched, errors.Trace(err) | ||
return match, nil | ||
} | ||
|
||
func (j *innerJoiner) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -236,29 +236,37 @@ func VectorizedFilter(ctx sessionctx.Context, filters []Expression, iterator *ch | |
for i, numRows := 0, iterator.Len(); i < numRows; i++ { | ||
selected = append(selected, true) | ||
} | ||
for _, filter := range filters { | ||
isIntType := true | ||
if filter.GetType().EvalType() != types.ETInt { | ||
isIntType = false | ||
var err error | ||
for row := iterator.Begin(); row != iterator.End(); row = iterator.Next() { | ||
if !selected[row.Idx()] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not needed because it must be false now. And why change this function? It is not vectorized anymore. |
||
continue | ||
} | ||
selected[row.Idx()], err = VectorizedFilterRow(ctx, filters, row) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
for row := iterator.Begin(); row != iterator.End(); row = iterator.Next() { | ||
if !selected[row.Idx()] { | ||
continue | ||
} | ||
return selected, nil | ||
} | ||
|
||
// VectorizedFilterRow applies a list of filters to a row. | ||
func VectorizedFilterRow(ctx sessionctx.Context, filters []Expression, row chunk.Row) (bool, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why call it |
||
selected := true | ||
for _, filter := range filters { | ||
isTypeInt := filter.GetType().EvalType() == types.ETInt | ||
if isTypeInt { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can line 269~284 be extracted as a function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, if we reuse line 269~284 in the function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe wen can extract line 271~285 to a function named |
||
filterResult, isNull, err := filter.EvalInt(ctx, row) | ||
if err != nil { | ||
return false, errors.Trace(err) | ||
} | ||
if isIntType { | ||
filterResult, isNull, err := filter.EvalInt(ctx, row) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
selected[row.Idx()] = selected[row.Idx()] && !isNull && (filterResult != 0) | ||
} else { | ||
// TODO: should rewrite the filter to `cast(expr as SIGNED) != 0` and always use `EvalInt`. | ||
bVal, err := EvalBool(ctx, []Expression{filter}, row) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
selected[row.Idx()] = selected[row.Idx()] && bVal | ||
selected = selected && !isNull && (filterResult != 0) | ||
} else { | ||
// TODO: should rewrite the filter to `cast(expr as SIGNED) != 0` and always use `EvalInt`. | ||
bVal, err := EvalBool(ctx, []Expression{filter}, row) | ||
if err != nil { | ||
return false, errors.Trace(err) | ||
} | ||
selected = selected && bVal | ||
} | ||
} | ||
return selected, nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
s/mutRow/shadowRow/
?