Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#55201
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hawkingrei authored and ti-chi-bot committed Aug 6, 2024
1 parent 70bfd90 commit ea153e2
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 0 deletions.
177 changes: 177 additions & 0 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,14 +880,24 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan(
if helper == nil {
return nil
}
<<<<<<< HEAD
rangeInfo := helper.buildRangeDecidedByInformation(helper.chosenPath.IdxCols, outerJoinKeys)
innerTask = p.constructInnerTableScanTask(wrapper, helper.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
=======
rangeInfo := indexJoinPathRangeInfo(p.SCtx(), outerJoinKeys, indexJoinResult)
innerTask = constructInnerTableScanTask(p, prop, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
// The index merge join's inner plan is different from index join, so we
// should construct another inner plan for it.
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
<<<<<<< HEAD
if us == nil {
innerTask2 = p.constructInnerTableScanTask(wrapper, helper.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
=======
if !wrapper.hasDitryWrite {
innerTask2 = constructInnerTableScanTask(p, prop, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
}
ranges = helper.chosenRanges
} else {
Expand Down Expand Up @@ -921,13 +931,22 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan(
}
buffer.WriteString("]")
rangeInfo := buffer.String()
<<<<<<< HEAD
innerTask = p.constructInnerTableScanTask(wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
=======
innerTask = constructInnerTableScanTask(p, prop, wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
// The index merge join's inner plan is different from index join, so we
// should construct another inner plan for it.
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
<<<<<<< HEAD
if us == nil {
innerTask2 = p.constructInnerTableScanTask(wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
=======
if !wrapper.hasDitryWrite {
innerTask2 = constructInnerTableScanTask(p, prop, wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
}
}
var (
Expand Down Expand Up @@ -989,7 +1008,11 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan(
maxOneRow = ok && (sf.FuncName.L == ast.EQ)
}
}
<<<<<<< HEAD
innerTask := p.constructInnerIndexScanTask(wrapper, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, innerJoinKeys, helper.idxOff2KeyOff, rangeInfo, false, false, avgInnerRowCnt, maxOneRow)
=======
innerTask := constructInnerIndexScanTask(p, prop, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, false, false, avgInnerRowCnt, maxOneRow)
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) {
if val.(bool) && !p.SCtx().GetSessionVars().InRestrictedSQL {
failpoint.Return(p.constructIndexHashJoin(prop, outerIdx, innerTask, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager))
Expand All @@ -1003,8 +1026,13 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan(
// should construct another inner plan for it.
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
<<<<<<< HEAD
if us == nil {
innerTask2 := p.constructInnerIndexScanTask(wrapper, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, innerJoinKeys, helper.idxOff2KeyOff, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow)
=======
if !wrapper.hasDitryWrite {
innerTask2 := constructInnerIndexScanTask(p, prop, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow)
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
if innerTask2 != nil {
joins = append(joins, p.constructIndexMergeJoin(prop, outerIdx, innerTask2, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager)...)
}
Expand Down Expand Up @@ -1058,7 +1086,13 @@ func (ijHelper *indexJoinBuildHelper) buildRangeDecidedByInformation(idxCols []*
}

// constructInnerTableScanTask is specially used to construct the inner plan for PhysicalIndexJoin.
<<<<<<< HEAD
func (p *LogicalJoin) constructInnerTableScanTask(
=======
func constructInnerTableScanTask(
p *LogicalJoin,
prop *property.PhysicalProperty,
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
wrapper *indexJoinInnerChildWrapper,
ranges ranger.Ranges,
_ []*expression.Column,
Expand Down Expand Up @@ -1131,6 +1165,7 @@ func (p *LogicalJoin) constructInnerTableScanTask(
ts.PartitionInfo = copTask.partitionInfo
selStats := ts.StatsInfo().Scale(selectivity)
ts.addPushedDownSelection(copTask, selStats)
<<<<<<< HEAD
t := copTask.convertToRootTask(ds.SCtx())
reader := t.p
t.p = p.constructInnerByWrapper(wrapper, reader)
Expand All @@ -1141,6 +1176,22 @@ func (p *LogicalJoin) constructInnerByWrapper(wrapper *indexJoinInnerChildWrappe
if !p.SCtx().GetSessionVars().EnableINLJoinInnerMultiPattern {
if wrapper.us != nil {
return p.constructInnerUnionScan(wrapper.us, child)
=======
return constructIndexJoinInnerSideTask(p, prop, copTask, ds, nil, wrapper)
}

func constructInnerByZippedChildren(prop *property.PhysicalProperty, zippedChildren []base.LogicalPlan, child base.PhysicalPlan) base.PhysicalPlan {
for i := len(zippedChildren) - 1; i >= 0; i-- {
switch x := zippedChildren[i].(type) {
case *LogicalUnionScan:
child = constructInnerUnionScan(prop, x, child)
case *logicalop.LogicalProjection:
child = constructInnerProj(prop, x, child)
case *LogicalSelection:
child = constructInnerSel(prop, x, child)
case *LogicalAggregation:
child = constructInnerAgg(prop, x, child)
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
}
return child
}
Expand All @@ -1154,40 +1205,76 @@ func (p *LogicalJoin) constructInnerByWrapper(wrapper *indexJoinInnerChildWrappe
return child
}

<<<<<<< HEAD
func (*LogicalJoin) constructInnerSel(sel *LogicalSelection, child PhysicalPlan) PhysicalPlan {
=======
func constructInnerAgg(prop *property.PhysicalProperty, logicalAgg *LogicalAggregation, child base.PhysicalPlan) base.PhysicalPlan {
if logicalAgg == nil {
return child
}
physicalHashAgg := NewPhysicalHashAgg(logicalAgg, logicalAgg.StatsInfo(), prop)
physicalHashAgg.SetSchema(logicalAgg.Schema().Clone())
physicalHashAgg.SetChildren(child)
return physicalHashAgg
}

func constructInnerSel(prop *property.PhysicalProperty, sel *LogicalSelection, child base.PhysicalPlan) base.PhysicalPlan {
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
if sel == nil {
return child
}
physicalSel := PhysicalSelection{
Conditions: sel.Conditions,
<<<<<<< HEAD
}.Init(sel.SCtx(), sel.StatsInfo(), sel.SelectBlockOffset(), nil)
=======
}.Init(sel.SCtx(), sel.StatsInfo(), sel.QueryBlockOffset(), prop)
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
physicalSel.SetChildren(child)
return physicalSel
}

<<<<<<< HEAD
func (*LogicalJoin) constructInnerProj(proj *LogicalProjection, child PhysicalPlan) PhysicalPlan {
=======
func constructInnerProj(prop *property.PhysicalProperty, proj *logicalop.LogicalProjection, child base.PhysicalPlan) base.PhysicalPlan {
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
if proj == nil {
return child
}
physicalProj := PhysicalProjection{
Exprs: proj.Exprs,
CalculateNoDelay: proj.CalculateNoDelay,
AvoidColumnEvaluator: proj.AvoidColumnEvaluator,
<<<<<<< HEAD
}.Init(proj.SCtx(), proj.StatsInfo(), proj.SelectBlockOffset(), nil)
=======
}.Init(proj.SCtx(), proj.StatsInfo(), proj.QueryBlockOffset(), prop)
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
physicalProj.SetChildren(child)
return physicalProj
}

<<<<<<< HEAD
func (*LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan {
=======
func constructInnerUnionScan(prop *property.PhysicalProperty, us *LogicalUnionScan, reader base.PhysicalPlan) base.PhysicalPlan {
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
if us == nil {
return reader
}
// Use `reader.StatsInfo()` instead of `us.StatsInfo()` because it should be more accurate. No need to specify
// childrenReqProps now since we have got reader already.
physicalUnionScan := PhysicalUnionScan{
<<<<<<< HEAD
Conditions: us.conditions,
HandleCols: us.handleCols,
}.Init(us.SCtx(), reader.StatsInfo(), us.SelectBlockOffset(), nil)
=======
Conditions: us.Conditions,
HandleCols: us.HandleCols,
}.Init(us.SCtx(), reader.StatsInfo(), us.QueryBlockOffset(), prop)
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
physicalUnionScan.SetChildren(reader)
return physicalUnionScan
}
Expand Down Expand Up @@ -1242,7 +1329,13 @@ func getColsNDVLowerBoundFromHistColl(colUIDs []int64, histColl *statistics.Hist
}

// constructInnerIndexScanTask is specially used to construct the inner plan for PhysicalIndexJoin.
<<<<<<< HEAD
func (p *LogicalJoin) constructInnerIndexScanTask(
=======
func constructInnerIndexScanTask(
p *LogicalJoin,
prop *property.PhysicalProperty,
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
wrapper *indexJoinInnerChildWrapper,
path *util.AccessPath,
ranges ranger.Ranges,
Expand Down Expand Up @@ -1419,6 +1512,7 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
if usedStats != nil && usedStats[is.physicalTableID] != nil {
is.usedStatsInfo = usedStats[is.physicalTableID]
}
<<<<<<< HEAD
finalStats := ds.tableStats.ScaleByExpectCnt(rowCount)
is.addPushedDownSelection(cop, ds, tmpPath, finalStats)
t := cop.convertToRootTask(ds.SCtx())
Expand Down Expand Up @@ -1465,11 +1559,41 @@ func (cwc *ColWithCmpFuncManager) CompareRow(lhs, rhs chunk.Row) int {
ret := cwc.compareFuncs[i](lhs, col.Index, rhs, col.Index)
if ret != 0 {
return ret
=======
finalStats := ds.TableStats.ScaleByExpectCnt(rowCount)
if err := is.addPushedDownSelection(cop, ds, tmpPath, finalStats); err != nil {
logutil.BgLogger().Warn("unexpected error happened during addPushedDownSelection function", zap.Error(err))
return nil
}
return constructIndexJoinInnerSideTask(p, prop, cop, ds, path, wrapper)
}

// construct the inner join task by inner child plan tree
// The Logical include two parts: logicalplan->physicalplan, physicalplan->task
// Step1: whether agg can be pushed down to coprocessor
//
// Step1.1: If the agg can be pushded down to coprocessor, we will build a copTask and attach the agg to the copTask
// There are two kinds of agg: stream agg and hash agg. Stream agg depends on some conditions, such as the group by cols
//
// Step2: build other inner plan node to task
func constructIndexJoinInnerSideTask(p *LogicalJoin, prop *property.PhysicalProperty, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task {
var la *LogicalAggregation
var canPushAggToCop bool
if len(wrapper.zippedChildren) > 0 {
la, canPushAggToCop = wrapper.zippedChildren[len(wrapper.zippedChildren)-1].(*LogicalAggregation)
if la != nil && la.HasDistinct() {
// TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented.
// If AllowDistinctAggPushDown is set to true, we should not consider RootTask.
if !la.SCtx().GetSessionVars().AllowDistinctAggPushDown {
canPushAggToCop = false
}
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
}
}
return 0
}

<<<<<<< HEAD
// BuildRangesByRow will build range of the given row. It will eval each function's arg then call BuildRange.
func (cwc *ColWithCmpFuncManager) BuildRangesByRow(ctx sessionctx.Context, row chunk.Row) ([]*ranger.Range, error) {
exprs := make([]expression.Expression, len(cwc.OpType))
Expand All @@ -1484,6 +1608,13 @@ func (cwc *ColWithCmpFuncManager) BuildRangesByRow(ctx sessionctx.Context, row c
return nil, err
}
exprs = append(exprs, newExpr) // nozero
=======
// If the bottom plan is not aggregation or the aggregation can't be pushed to coprocessor, we will construct a root task directly.
if !canPushAggToCop {
result := dsCopTask.ConvertToRootTask(ds.SCtx()).(*RootTask)
result.SetPlan(constructInnerByZippedChildren(prop, wrapper.zippedChildren, result.GetPlan()))
return result
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
}
// We already limit range mem usage when buildTemplateRange for inner table of IndexJoin in optimizer phase, so we
// don't need and shouldn't limit range mem usage when we refill inner ranges during the execution phase.
Expand Down Expand Up @@ -1567,6 +1698,52 @@ func (ijHelper *indexJoinBuildHelper) resetContextForIndex(innerKeys []*expressi
ijHelper.curNotUsedIndexCols = append(ijHelper.curNotUsedIndexCols, idxCol)
ijHelper.curNotUsedColLens = append(ijHelper.curNotUsedColLens, colLens[i])
}
<<<<<<< HEAD
=======

// build physical agg and attach to task
var aggTask base.Task
// build stream agg and change ds keep order to true
if preferStream {
newGbyItems := make([]expression.Expression, len(la.GroupByItems))
copy(newGbyItems, la.GroupByItems)
newAggFuncs := make([]*aggregation.AggFuncDesc, len(la.AggFuncs))
copy(newAggFuncs, la.AggFuncs)
streamAgg := basePhysicalAgg{
GroupByItems: newGbyItems,
AggFuncs: newAggFuncs,
}.initForStream(la.SCtx(), la.StatsInfo(), la.QueryBlockOffset(), prop)
streamAgg.SetSchema(la.Schema().Clone())
// change to keep order for index scan and dsCopTask
if dsCopTask.indexPlan != nil {
// get the index scan from dsCopTask.indexPlan
physicalIndexScan, _ := dsCopTask.indexPlan.(*PhysicalIndexScan)
if physicalIndexScan == nil && len(dsCopTask.indexPlan.Children()) == 1 {
physicalIndexScan, _ = dsCopTask.indexPlan.Children()[0].(*PhysicalIndexScan)
}
if physicalIndexScan != nil {
physicalIndexScan.KeepOrder = true
dsCopTask.keepOrder = true
aggTask = streamAgg.Attach2Task(dsCopTask)
}
}
}

// build hash agg, when the stream agg is illegal such as the order by prop is not matched
if aggTask == nil {
physicalHashAgg := NewPhysicalHashAgg(la, la.StatsInfo(), prop)
physicalHashAgg.SetSchema(la.Schema().Clone())
aggTask = physicalHashAgg.Attach2Task(dsCopTask)
}

// build other inner plan node to task
result, ok := aggTask.(*RootTask)
if !ok {
return nil
}
result.SetPlan(constructInnerByZippedChildren(prop, wrapper.zippedChildren[0:len(wrapper.zippedChildren)-1], result.p))
return result
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))
}

// findUsefulEqAndInFilters analyzes the pushedDownConds held by inner child and split them to three parts.
Expand Down
34 changes: 34 additions & 0 deletions pkg/planner/core/issuetest/planner_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,37 @@ func Test53726(t *testing.T) {
" └─TableReader_11 2.00 root data:TableFullScan_10",
" └─TableFullScan_10 2.00 cop[tikv] table:t7 keep order:false"))
}
<<<<<<< HEAD
=======

func TestIssue54535(t *testing.T) {
// test for tidb_enable_inl_join_inner_multi_pattern system variable
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set session tidb_enable_inl_join_inner_multi_pattern='ON'")
tk.MustExec("create table ta(a1 int, a2 int, a3 int, index idx_a(a1))")
tk.MustExec("create table tb(b1 int, b2 int, b3 int, index idx_b(b1))")
tk.MustExec("analyze table ta")
tk.MustExec("analyze table tb")

tk.MustQuery("explain SELECT /*+ inl_join(tmp) */ * FROM ta, (SELECT b1, COUNT(b3) AS cnt FROM tb GROUP BY b1, b2) as tmp where ta.a1 = tmp.b1").
Check(testkit.Rows(
"Projection_9 9990.00 root test.ta.a1, test.ta.a2, test.ta.a3, test.tb.b1, Column#9",
"└─IndexJoin_16 9990.00 root inner join, inner:HashAgg_14, outer key:test.ta.a1, inner key:test.tb.b1, equal cond:eq(test.ta.a1, test.tb.b1)",
" ├─TableReader_43(Build) 9990.00 root data:Selection_42",
" │ └─Selection_42 9990.00 cop[tikv] not(isnull(test.ta.a1))",
" │ └─TableFullScan_41 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo",
" └─HashAgg_14(Probe) 79840080.00 root group by:test.tb.b1, test.tb.b2, funcs:count(Column#11)->Column#9, funcs:firstrow(test.tb.b1)->test.tb.b1",
" └─IndexLookUp_15 79840080.00 root ",
" ├─Selection_12(Build) 9990.00 cop[tikv] not(isnull(test.tb.b1))",
" │ └─IndexRangeScan_10 10000.00 cop[tikv] table:tb, index:idx_b(b1) range: decided by [eq(test.tb.b1, test.ta.a1)], keep order:false, stats:pseudo",
" └─HashAgg_13(Probe) 79840080.00 cop[tikv] group by:test.tb.b1, test.tb.b2, funcs:count(test.tb.b3)->Column#11",
" └─TableRowIDScan_11 9990.00 cop[tikv] table:tb keep order:false, stats:pseudo"))
// test for issues/55169
tk.MustExec("create table t1(col_1 int, index idx_1(col_1));")
tk.MustExec("create table t2(col_1 int, col_2 int, index idx_2(col_1));")
tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows())
tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(distinct col_2 order by col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows())
}
>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))

0 comments on commit ea153e2

Please sign in to comment.