diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 970ea14c4c3f3..7fc096e18f836 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -206,9 +206,9 @@ func (e *IndexLookUpJoin) NextChunk(ctx context.Context, chk *chunk.Chunk) error outerRow := task.outerResult.GetRow(task.cursor) if e.innerIter.Len() == 0 { - err = e.resultGenerator.emitToChunk(outerRow, nil, chk) + err = e.resultGenerator.emit(outerRow, nil, chk) } else if e.innerIter.Current() != e.innerIter.End() { - err = e.resultGenerator.emitToChunk(outerRow, e.innerIter, chk) + err = e.resultGenerator.emit(outerRow, e.innerIter, chk) } if err != nil { return errors.Trace(err) diff --git a/executor/join.go b/executor/join.go index e2ae056ed11e5..8d91194f7ec67 100644 --- a/executor/join.go +++ b/executor/join.go @@ -356,7 +356,7 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R return false, joinResult } if hasNull { - err = e.resultGenerators[workerID].emitToChunk(outerRow, nil, joinResult.chk) + err = e.resultGenerators[workerID].emit(outerRow, nil, joinResult.chk) if err != nil { joinResult.err = errors.Trace(err) } @@ -365,7 +365,7 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0]) innerPtrs := e.hashTableValBufs[workerID] if len(innerPtrs) == 0 { - err = e.resultGenerators[workerID].emitToChunk(outerRow, nil, joinResult.chk) + err = e.resultGenerators[workerID].emit(outerRow, nil, joinResult.chk) if err != nil { joinResult.err = errors.Trace(err) } @@ -379,7 +379,7 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R } iter := chunk.NewIterator4Slice(innerRows) for iter.Begin(); iter.Current() != iter.End(); { - err = e.resultGenerators[workerID].emitToChunk(outerRow, iter, joinResult.chk) + err = e.resultGenerators[workerID].emit(outerRow, iter, joinResult.chk) if err != nil { joinResult.err = errors.Trace(err) return false, joinResult @@ -419,7 +419,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu } for i := range selected { if !selected[i] { // process unmatched outer rows - err = e.resultGenerators[workerID].emitToChunk(outerChk.GetRow(i), nil, joinResult.chk) + err = e.resultGenerators[workerID].emit(outerChk.GetRow(i), nil, joinResult.chk) if err != nil { joinResult.err = errors.Trace(err) return false, joinResult @@ -590,7 +590,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch if selected { return &outerRow, nil } else if e.outer { - err := e.resultGenerator.emitToChunk(outerRow, nil, chk) + err := e.resultGenerator.emit(outerRow, nil, chk) if err != nil || chk.NumRows() == e.maxChunkSize { return nil, errors.Trace(err) } @@ -648,7 +648,7 @@ func (e *NestedLoopApplyExec) NextChunk(ctx context.Context, chk *chunk.Chunk) ( e.innerIter.Begin() } - err = e.resultGenerator.emitToChunk(*e.outerRow, e.innerIter, chk) + err = e.resultGenerator.emit(*e.outerRow, e.innerIter, chk) if err != nil || chk.NumRows() == e.maxChunkSize { return errors.Trace(err) } diff --git a/executor/join_result_generators.go b/executor/join_result_generators.go index 7a08c488a0d4d..a11e7487a7aa8 100644 --- a/executor/join_result_generators.go +++ b/executor/join_result_generators.go @@ -35,7 +35,7 @@ var ( // joinResultGenerator is used to generate join results according the join type, see every implementor for detailed information. type joinResultGenerator interface { // emit tries to join an outer row with a batch of inner rows. - // When len(inners) == 0, it means that the outer row can not be joined with any inner row: + // When inners == nil or inners.Len() == 0, it means that the outer row can not be joined with any inner row: // 1. SemiJoin: unmatched outer row is ignored. // 2. AntiSemiJoin: unmatched outer row is appended to the result buffer. // 3. LeftOuterSemiJoin: unmatched outer row is appended with 0 and appended to the result buffer. @@ -43,13 +43,10 @@ type joinResultGenerator interface { // 5. LeftOuterJoin: unmatched outer row is joined with a row of NULLs and appended to the result buffer. // 6. RightOuterJoin: unmatched outer row is joined with a row of NULLs and appended to the result buffer. // 7. InnerJoin: unmatched outer row is ignored. - // When len(inner) != 0 but all the joined rows are filtered, this means that the outer row is unmatched and the above action is tacked as well. - // Otherwise, the outer row is matched and some joined rows is appended to the result buffer. - emit(outer Row, inners []Row, resultBuffer []Row) ([]Row, error) - - // emitToChunk takes the same operation as emit, but the joined rows is appended to `chk` instead of a result buffer. + // When inners.Len != 0 but all the joined rows are filtered, this means that the outer row is unmatched and the above action is tacked as well. + // Otherwise, the outer row is matched and some joined rows is appended to the `chk`. // The size of `chk` is MaxChunkSize at most. - emitToChunk(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error + emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error } func newJoinResultGenerator(ctx sessionctx.Context, joinType plan.JoinType, @@ -57,8 +54,7 @@ func newJoinResultGenerator(ctx sessionctx.Context, joinType plan.JoinType, lhsColTypes, rhsColTypes []*types.FieldType) joinResultGenerator { base := baseJoinResultGenerator{ ctx: ctx, - filter: filter, - defaultInner: defaultInner, + conditions: filter, outerIsRight: outerIsRight, maxChunkSize: ctx.GetSessionVars().MaxChunkSize, } @@ -72,7 +68,7 @@ func newJoinResultGenerator(ctx sessionctx.Context, joinType plan.JoinType, if !outerIsRight { innerColTypes = rhsColTypes } - base.initDefaultChunkInner(innerColTypes) + base.initDefaultInner(innerColTypes, defaultInner) } switch joinType { case plan.SemiJoin: @@ -96,29 +92,19 @@ func newJoinResultGenerator(ctx sessionctx.Context, joinType plan.JoinType, // baseJoinResultGenerator is not thread-safe, // so we should build individual generator for every join goroutine. type baseJoinResultGenerator struct { - ctx sessionctx.Context - filter []expression.Expression - defaultChunkInner chunk.Row - outerIsRight bool - chk *chunk.Chunk - selected []bool - defaultInner Row - maxChunkSize int + ctx sessionctx.Context + conditions []expression.Expression + defaultInner chunk.Row + outerIsRight bool + chk *chunk.Chunk + selected []bool + maxChunkSize int } -func (outputer *baseJoinResultGenerator) initDefaultChunkInner(innerTypes []*types.FieldType) { +func (outputer *baseJoinResultGenerator) initDefaultInner(innerTypes []*types.FieldType, defaultInner Row) { mutableRow := chunk.MutRowFromTypes(innerTypes) - mutableRow.SetDatums(outputer.defaultInner[:len(innerTypes)]...) - outputer.defaultChunkInner = mutableRow.ToRow() -} - -// makeJoinRowToBuffer concatenates "lhs" and "rhs" to "buffer" and return that buffer. -// With the help of this function, we can make all of the joined rows to a consecutive -// memory buffer and explore the best cache performance. -func (outputer *baseJoinResultGenerator) makeJoinRowToBuffer(buffer []types.Datum, lhs, rhs Row) []types.Datum { - buffer = append(buffer, lhs...) - buffer = append(buffer, rhs...) - return buffer + mutableRow.SetDatums(defaultInner[:len(innerTypes)]...) + outputer.defaultInner = mutableRow.ToRow() } func (outputer *baseJoinResultGenerator) makeJoinRowToChunk(chk *chunk.Chunk, lhs, rhs chunk.Row) { @@ -128,39 +114,8 @@ func (outputer *baseJoinResultGenerator) makeJoinRowToChunk(chk *chunk.Chunk, lh chk.AppendPartialRow(lhs.Len(), rhs) } -// growResultBufferIfNecessary grows resultBuffer if necessary. -func (outputer *baseJoinResultGenerator) growResultBufferIfNecessary(resultBuffer []Row, numToAppend int) []Row { - length := len(resultBuffer) - if cap(resultBuffer)-length >= numToAppend { - return resultBuffer - } - newResultBuffer := make([]Row, length, length+numToAppend) - copy(newResultBuffer, resultBuffer) - return newResultBuffer -} - -// filterResult filters resultBuffer according to filter. -func (outputer *baseJoinResultGenerator) filterResult(resultBuffer []Row, originLen int) ([]Row, bool, error) { - if outputer.filter == nil { - return resultBuffer, len(resultBuffer) > originLen, nil - } - - curLen := originLen - for _, joinedRow := range resultBuffer[originLen:] { - matched, err := expression.EvalBool(outputer.ctx, outputer.filter, joinedRow) - if err != nil { - return nil, false, errors.Trace(err) - } - if matched { - resultBuffer[curLen] = joinedRow - curLen++ - } - } - return resultBuffer[:curLen], curLen > originLen, nil -} - -func (outputer *baseJoinResultGenerator) filterChunk(input, output *chunk.Chunk) (matched bool, err error) { - outputer.selected, err = expression.VectorizedFilter(outputer.ctx, outputer.filter, chunk.NewIterator4Chunk(input), outputer.selected) +func (outputer *baseJoinResultGenerator) filter(input, output *chunk.Chunk) (matched bool, err error) { + outputer.selected, err = expression.VectorizedFilter(outputer.ctx, outputer.conditions, chunk.NewIterator4Chunk(input), outputer.selected) if err != nil { return false, errors.Trace(err) } @@ -179,44 +134,12 @@ type semiJoinResultGenerator struct { } // emit implements joinResultGenerator interface. -func (outputer *semiJoinResultGenerator) emit(outer Row, inners []Row, resultBuffer []Row) ([]Row, error) { - // outer row can not be joined with any inner row. - if len(inners) == 0 { - return resultBuffer, nil - } - // outer row can be joined with an inner row. - if len(outputer.filter) == 0 { - return append(resultBuffer, outer), nil - } - - buffer := make(Row, 0, len(inners[0])+len(outer)) - for _, inner := range inners { - if outputer.outerIsRight { - buffer = outputer.makeJoinRowToBuffer(buffer[:0], inner, outer) - } else { - buffer = outputer.makeJoinRowToBuffer(buffer[:0], outer, inner) - } - - matched, err := expression.EvalBool(outputer.ctx, outputer.filter, buffer) - if err != nil { - return resultBuffer, errors.Trace(err) - } - if matched { - // outer row can be joined with an inner row. - return append(resultBuffer, outer), nil - } - } - // outer row can not be joined with any inner row. - return resultBuffer, nil -} - -// emitToChunk implements joinResultGenerator interface. -func (outputer *semiJoinResultGenerator) emitToChunk(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { +func (outputer *semiJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { if inners == nil || inners.Len() == 0 { return nil } defer inners.ReachEnd() - if len(outputer.filter) == 0 { + if len(outputer.conditions) == 0 { chk.AppendPartialRow(0, outer) return nil } @@ -228,7 +151,7 @@ func (outputer *semiJoinResultGenerator) emitToChunk(outer chunk.Row, inners chu } else { outputer.makeJoinRowToChunk(outputer.chk, outer, inner) } - selected, err := expression.EvalBool(outputer.ctx, outputer.filter, outputer.chk.GetRow(0)) + selected, err := expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) if err != nil { return errors.Trace(err) } @@ -245,45 +168,13 @@ type antiSemiJoinResultGenerator struct { } // emit implements joinResultGenerator interface. -func (outputer *antiSemiJoinResultGenerator) emit(outer Row, inners []Row, resultBuffer []Row) (_ []Row, err error) { - // outer row can not be joined with any inner row. - if len(inners) == 0 { - return append(resultBuffer, outer), nil - } - // outer row can be joined with an inner row. - if len(outputer.filter) == 0 { - return resultBuffer, nil - } - - buffer := make(Row, 0, len(outer)+len(inners[0])) - for _, inner := range inners { - if outputer.outerIsRight { - buffer = outputer.makeJoinRowToBuffer(buffer[:0], inner, outer) - } else { - buffer = outputer.makeJoinRowToBuffer(buffer[:0], outer, inner) - } - - matched, err1 := expression.EvalBool(outputer.ctx, outputer.filter, buffer) - if err1 != nil { - return nil, errors.Trace(err1) - } - if matched { - // outer row can be joined with an inner row. - return resultBuffer, nil - } - } - // outer row can not be joined with any inner row. - return append(resultBuffer, outer), nil -} - -// emitToChunk implements joinResultGenerator interface. -func (outputer *antiSemiJoinResultGenerator) emitToChunk(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { +func (outputer *antiSemiJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { if inners == nil || inners.Len() == 0 { chk.AppendRow(outer) return nil } defer inners.ReachEnd() - if len(outputer.filter) == 0 { + if len(outputer.conditions) == 0 { return nil } @@ -295,7 +186,7 @@ func (outputer *antiSemiJoinResultGenerator) emitToChunk(outer chunk.Row, inners outputer.makeJoinRowToChunk(outputer.chk, outer, inner) } - matched, err := expression.EvalBool(outputer.ctx, outputer.filter, outputer.chk.GetRow(0)) + matched, err := expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) if err != nil { return errors.Trace(err) } @@ -312,36 +203,7 @@ type leftOuterSemiJoinResultGenerator struct { } // emit implements joinResultGenerator interface. -func (outputer *leftOuterSemiJoinResultGenerator) emit(outer Row, inners []Row, resultBuffer []Row) ([]Row, error) { - // outer row can not be joined with any inner row. - if len(inners) == 0 { - return outputer.emitUnMatchedOuter(outer, resultBuffer), nil - } - buffer := make(Row, 0, len(outer)+len(inners[0])) - // outer row can be joined with an inner row. - if len(outputer.filter) == 0 { - joinedRow := outputer.makeJoinRowToBuffer(buffer[:0], outer, Row{types.NewIntDatum(1)}) - return append(resultBuffer, joinedRow), nil - } - - for _, inner := range inners { - buffer = outputer.makeJoinRowToBuffer(buffer[:0], outer, inner) - matched, err := expression.EvalBool(outputer.ctx, outputer.filter, buffer) - if err != nil { - return resultBuffer, errors.Trace(err) - } - if matched { - // outer row can be joined with an inner row. - buffer = append(buffer[:len(outer)], types.NewDatum(true)) - return append(resultBuffer, buffer), nil - } - } - // outer row can not be joined with any inner row. - return outputer.emitUnMatchedOuter(outer, resultBuffer), nil -} - -// emitToChunk implements joinResultGenerator interface. -func (outputer *leftOuterSemiJoinResultGenerator) emitToChunk(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { +func (outputer *leftOuterSemiJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { if inners == nil || inners.Len() == 0 { chk.AppendPartialRow(0, outer) chk.AppendInt64(outer.Len(), 0) @@ -349,7 +211,7 @@ func (outputer *leftOuterSemiJoinResultGenerator) emitToChunk(outer chunk.Row, i } defer inners.ReachEnd() - if len(outputer.filter) == 0 { + if len(outputer.conditions) == 0 { chk.AppendPartialRow(0, outer) chk.AppendInt64(outer.Len(), 1) return nil @@ -358,7 +220,7 @@ func (outputer *leftOuterSemiJoinResultGenerator) emitToChunk(outer chunk.Row, i for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { outputer.chk.Reset() outputer.makeJoinRowToChunk(outputer.chk, outer, inner) - matched, err := expression.EvalBool(outputer.ctx, outputer.filter, outputer.chk.GetRow(0)) + matched, err := expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) if err != nil { return errors.Trace(err) } @@ -373,48 +235,12 @@ func (outputer *leftOuterSemiJoinResultGenerator) emitToChunk(outer chunk.Row, i return nil } -// emitUnMatchedOuter implements joinResultGenerator interface. -func (outputer *leftOuterSemiJoinResultGenerator) emitUnMatchedOuter(outer Row, resultBuffer []Row) []Row { - buffer := make(Row, 0, len(outer)+1) - joinedRow := outputer.makeJoinRowToBuffer(buffer, outer, Row{types.NewIntDatum(0)}) - return append(resultBuffer, joinedRow) -} - type antiLeftOuterSemiJoinResultGenerator struct { baseJoinResultGenerator } // emit implements joinResultGenerator interface. -func (outputer *antiLeftOuterSemiJoinResultGenerator) emit(outer Row, inners []Row, resultBuffer []Row) ([]Row, error) { - // outer row can not be joined with any inner row. - if len(inners) == 0 { - return outputer.emitUnMatchedOuter(outer, resultBuffer), nil - } - buffer := make(Row, 0, len(outer)+len(inners[0])) - // outer row can be joined with an inner row. - if len(outputer.filter) == 0 { - joinedRow := outputer.makeJoinRowToBuffer(buffer[:0], outer, Row{types.NewIntDatum(0)}) - return append(resultBuffer, joinedRow), nil - } - - for _, inner := range inners { - buffer = outputer.makeJoinRowToBuffer(buffer[:0], outer, inner) - matched, err := expression.EvalBool(outputer.ctx, outputer.filter, buffer) - if err != nil { - return resultBuffer, errors.Trace(err) - } - if matched { - // outer row can be joined with an inner row. - buffer = append(buffer[:len(outer)], types.NewDatum(false)) - return append(resultBuffer, buffer), nil - } - } - // outer row can not be joined with any inner row. - return outputer.emitUnMatchedOuter(outer, resultBuffer), nil -} - -// emitToChunk implements joinResultGenerator interface. -func (outputer *antiLeftOuterSemiJoinResultGenerator) emitToChunk(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { +func (outputer *antiLeftOuterSemiJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { // outer row can not be joined with any inner row. if inners == nil || inners.Len() == 0 { chk.AppendPartialRow(0, outer) @@ -424,7 +250,7 @@ func (outputer *antiLeftOuterSemiJoinResultGenerator) emitToChunk(outer chunk.Ro defer inners.ReachEnd() // outer row can be joined with an inner row. - if len(outputer.filter) == 0 { + if len(outputer.conditions) == 0 { chk.AppendPartialRow(0, outer) chk.AppendInt64(outer.Len(), 0) return nil @@ -433,7 +259,7 @@ func (outputer *antiLeftOuterSemiJoinResultGenerator) emitToChunk(outer chunk.Ro for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { outputer.chk.Reset() outputer.makeJoinRowToChunk(outputer.chk, outer, inner) - matched, err := expression.EvalBool(outputer.ctx, outputer.filter, outputer.chk.GetRow(0)) + matched, err := expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) if err != nil { return errors.Trace(err) } @@ -451,54 +277,21 @@ func (outputer *antiLeftOuterSemiJoinResultGenerator) emitToChunk(outer chunk.Ro return nil } -// emitUnMatchedOuter implements joinResultGenerator interface. -func (outputer *antiLeftOuterSemiJoinResultGenerator) emitUnMatchedOuter(outer Row, resultBuffer []Row) []Row { - buffer := make(Row, 0, len(outer)+1) - joinedRow := outputer.makeJoinRowToBuffer(buffer, outer, Row{types.NewIntDatum(1)}) - return append(resultBuffer, joinedRow) -} - type leftOuterJoinResultGenerator struct { baseJoinResultGenerator } // emit implements joinResultGenerator interface. -func (outputer *leftOuterJoinResultGenerator) emit(outer Row, inners []Row, resultBuffer []Row) ([]Row, error) { - // outer row can not be joined with any inner row. - if len(inners) == 0 { - return append(resultBuffer, makeJoinRow(outer, outputer.defaultInner)), nil - } - resultBuffer = outputer.growResultBufferIfNecessary(resultBuffer, len(inners)) - originLen := len(resultBuffer) - buffer := make([]types.Datum, 0, len(inners)*(len(outer)+len(inners[0]))) - for _, inner := range inners { - buffer = outputer.makeJoinRowToBuffer(buffer[len(buffer):], outer, inner) - resultBuffer = append(resultBuffer, buffer) - } - var matched bool - var err error - resultBuffer, matched, err = outputer.filterResult(resultBuffer, originLen) - if err != nil { - return nil, errors.Trace(err) - } - if !matched { - // outer row can not be joined with any inner row. - return append(resultBuffer, makeJoinRow(outer, outputer.defaultInner)), nil - } - return resultBuffer, nil -} - -// emitToChunk implements joinResultGenerator interface. -func (outputer *leftOuterJoinResultGenerator) emitToChunk(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { +func (outputer *leftOuterJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { // outer row can not be joined with any inner row. if inners == nil || inners.Len() == 0 { chk.AppendPartialRow(0, outer) - chk.AppendPartialRow(outer.Len(), outputer.defaultChunkInner) + chk.AppendPartialRow(outer.Len(), outputer.defaultInner) return nil } outputer.chk.Reset() chkForJoin := outputer.chk - if len(outputer.filter) == 0 { + if len(outputer.conditions) == 0 { chkForJoin = chk } numToAppend := outputer.maxChunkSize - chk.NumRows() @@ -506,11 +299,11 @@ func (outputer *leftOuterJoinResultGenerator) emitToChunk(outer chunk.Row, inner outputer.makeJoinRowToChunk(chkForJoin, outer, inners.Current()) inners.Next() } - if len(outputer.filter) == 0 { + if len(outputer.conditions) == 0 { return nil } // reach here, chkForJoin is outputer.chk - matched, err := outputer.filterChunk(chkForJoin, chk) + matched, err := outputer.filter(chkForJoin, chk) if err != nil { return errors.Trace(err) } @@ -518,7 +311,7 @@ func (outputer *leftOuterJoinResultGenerator) emitToChunk(outer chunk.Row, inner if !matched { // outer row can not be joined with any inner row. chk.AppendPartialRow(0, outer) - chk.AppendPartialRow(outer.Len(), outputer.defaultChunkInner) + chk.AppendPartialRow(outer.Len(), outputer.defaultInner) } return nil } @@ -528,42 +321,16 @@ type rightOuterJoinResultGenerator struct { } // emit implements joinResultGenerator interface. -func (outputer *rightOuterJoinResultGenerator) emit(outer Row, inners []Row, resultBuffer []Row) ([]Row, error) { - // outer row can not be joined with any inner row. - if len(inners) == 0 { - return append(resultBuffer, makeJoinRow(outputer.defaultInner, outer)), nil - } - resultBuffer = outputer.growResultBufferIfNecessary(resultBuffer, len(inners)) - originLen := len(resultBuffer) - buffer := make([]types.Datum, 0, len(inners)*(len(outer)+len(inners[0]))) - for _, inner := range inners { - buffer = outputer.makeJoinRowToBuffer(buffer[len(buffer):], inner, outer) - resultBuffer = append(resultBuffer, buffer) - } - var matched bool - var err error - resultBuffer, matched, err = outputer.filterResult(resultBuffer, originLen) - if err != nil { - return nil, errors.Trace(err) - } - // outer row can not be joined with any inner row. - if !matched { - return append(resultBuffer, makeJoinRow(outputer.defaultInner, outer)), nil - } - return resultBuffer, nil -} - -// emitToChunk implements joinResultGenerator interface. -func (outputer *rightOuterJoinResultGenerator) emitToChunk(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { +func (outputer *rightOuterJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { // outer row can not be joined with any inner row. if inners == nil || inners.Len() == 0 { - chk.AppendPartialRow(0, outputer.defaultChunkInner) - chk.AppendPartialRow(outputer.defaultChunkInner.Len(), outer) + chk.AppendPartialRow(0, outputer.defaultInner) + chk.AppendPartialRow(outputer.defaultInner.Len(), outer) return nil } outputer.chk.Reset() chkForJoin := outputer.chk - if len(outputer.filter) == 0 { + if len(outputer.conditions) == 0 { chkForJoin = chk } numToAppend := outputer.maxChunkSize - chk.NumRows() @@ -571,19 +338,19 @@ func (outputer *rightOuterJoinResultGenerator) emitToChunk(outer chunk.Row, inne outputer.makeJoinRowToChunk(chkForJoin, inners.Current(), outer) inners.Next() } - if len(outputer.filter) == 0 { + if len(outputer.conditions) == 0 { return nil } // reach here, chkForJoin is outputer.chk - matched, err := outputer.filterChunk(chkForJoin, chk) + matched, err := outputer.filter(chkForJoin, chk) if err != nil { return errors.Trace(err) } chkForJoin.Reset() // outer row can not be joined with any inner row. if !matched { - chk.AppendPartialRow(0, outputer.defaultChunkInner) - chk.AppendPartialRow(outputer.defaultChunkInner.Len(), outer) + chk.AppendPartialRow(0, outputer.defaultInner) + chk.AppendPartialRow(outputer.defaultInner.Len(), outer) } return nil } @@ -593,38 +360,13 @@ type innerJoinResultGenerator struct { } // emit implements joinResultGenerator interface. -func (outputer *innerJoinResultGenerator) emit(outer Row, inners []Row, resultBuffer []Row) ([]Row, error) { - // outer row can not be joined with any inner row. - if len(inners) == 0 { - return resultBuffer, nil - } - resultBuffer = outputer.growResultBufferIfNecessary(resultBuffer, len(inners)) - originLen := len(resultBuffer) - buffer := make([]types.Datum, 0, (len(outer)+len(inners[0]))*len(inners)) - if outputer.outerIsRight { - for _, inner := range inners { - buffer = outputer.makeJoinRowToBuffer(buffer[len(buffer):], inner, outer) - resultBuffer = append(resultBuffer, buffer) - } - } else { - for _, inner := range inners { - buffer = outputer.makeJoinRowToBuffer(buffer[len(buffer):], outer, inner) - resultBuffer = append(resultBuffer, buffer) - } - } - var err error - resultBuffer, _, err = outputer.filterResult(resultBuffer, originLen) - return resultBuffer, errors.Trace(err) -} - -// emitToChunk implements joinResultGenerator interface. -func (outputer *innerJoinResultGenerator) emitToChunk(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { +func (outputer *innerJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { if inners == nil || inners.Len() == 0 { return nil } outputer.chk.Reset() chkForJoin := outputer.chk - if len(outputer.filter) == 0 { + if len(outputer.conditions) == 0 { chkForJoin = chk } inner, numToAppend := inners.Current(), outputer.maxChunkSize-chk.NumRows() @@ -635,11 +377,11 @@ func (outputer *innerJoinResultGenerator) emitToChunk(outer chunk.Row, inners ch outputer.makeJoinRowToChunk(chkForJoin, outer, inner) } } - if len(outputer.filter) == 0 { + if len(outputer.conditions) == 0 { return nil } // reach here, chkForJoin is outputer.chk - _, err := outputer.filterChunk(chkForJoin, chk) + _, err := outputer.filter(chkForJoin, chk) if err != nil { return errors.Trace(err) } @@ -647,11 +389,3 @@ func (outputer *innerJoinResultGenerator) emitToChunk(outer chunk.Row, inners ch return nil } - -// makeJoinRow simply creates a new row that appends row b to row a. -func makeJoinRow(a Row, b Row) Row { - ret := make([]types.Datum, 0, len(a)+len(b)) - ret = append(ret, a...) - ret = append(ret, b...) - return ret -} diff --git a/executor/merge_join.go b/executor/merge_join.go index f3ab0a5fed40a..c8b528f468a2a 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -274,7 +274,7 @@ func (e *MergeJoinExec) joinToChunk(ctx context.Context, chk *chunk.Chunk) (hasM } if cmpResult < 0 { - err = e.resultGenerator.emitToChunk(e.outerTable.row, nil, chk) + err = e.resultGenerator.emit(e.outerTable.row, nil, chk) if err != nil { return false, errors.Trace(err) } @@ -287,7 +287,7 @@ func (e *MergeJoinExec) joinToChunk(ctx context.Context, chk *chunk.Chunk) (hasM continue } - err = e.resultGenerator.emitToChunk(e.outerTable.row, e.innerIter4Row, chk) + err = e.resultGenerator.emit(e.outerTable.row, e.innerIter4Row, chk) if err != nil { return false, errors.Trace(err) }