From ea153e2673b6257064298570806740e56ddac49e Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 6 Aug 2024 15:56:24 +0800 Subject: [PATCH] This is an automated cherry-pick of #55201 Signed-off-by: ti-chi-bot --- pkg/planner/core/exhaust_physical_plans.go | 177 ++++++++++++++++++ .../core/issuetest/planner_issue_test.go | 34 ++++ 2 files changed, 211 insertions(+) diff --git a/pkg/planner/core/exhaust_physical_plans.go b/pkg/planner/core/exhaust_physical_plans.go index 6be47c298ea21..64fd6ad83073a 100644 --- a/pkg/planner/core/exhaust_physical_plans.go +++ b/pkg/planner/core/exhaust_physical_plans.go @@ -880,14 +880,24 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan( if helper == nil { return nil } +<<<<<<< HEAD rangeInfo := helper.buildRangeDecidedByInformation(helper.chosenPath.IdxCols, outerJoinKeys) innerTask = p.constructInnerTableScanTask(wrapper, helper.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt) +======= + rangeInfo := indexJoinPathRangeInfo(p.SCtx(), outerJoinKeys, indexJoinResult) + innerTask = constructInnerTableScanTask(p, prop, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt) +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) // The index merge join's inner plan is different from index join, so we // should construct another inner plan for it. // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. +<<<<<<< HEAD if us == nil { innerTask2 = p.constructInnerTableScanTask(wrapper, helper.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) +======= + if !wrapper.hasDitryWrite { + innerTask2 = constructInnerTableScanTask(p, prop, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) } ranges = helper.chosenRanges } else { @@ -921,13 +931,22 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan( } buffer.WriteString("]") rangeInfo := buffer.String() +<<<<<<< HEAD innerTask = p.constructInnerTableScanTask(wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt) +======= + innerTask = constructInnerTableScanTask(p, prop, wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt) +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) // The index merge join's inner plan is different from index join, so we // should construct another inner plan for it. // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. +<<<<<<< HEAD if us == nil { innerTask2 = p.constructInnerTableScanTask(wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) +======= + if !wrapper.hasDitryWrite { + innerTask2 = constructInnerTableScanTask(p, prop, wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) } } var ( @@ -989,7 +1008,11 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan( maxOneRow = ok && (sf.FuncName.L == ast.EQ) } } +<<<<<<< HEAD innerTask := p.constructInnerIndexScanTask(wrapper, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, innerJoinKeys, helper.idxOff2KeyOff, rangeInfo, false, false, avgInnerRowCnt, maxOneRow) +======= + innerTask := constructInnerIndexScanTask(p, prop, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, false, false, avgInnerRowCnt, maxOneRow) +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) { if val.(bool) && !p.SCtx().GetSessionVars().InRestrictedSQL { failpoint.Return(p.constructIndexHashJoin(prop, outerIdx, innerTask, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager)) @@ -1003,8 +1026,13 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan( // should construct another inner plan for it. // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. +<<<<<<< HEAD if us == nil { innerTask2 := p.constructInnerIndexScanTask(wrapper, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, innerJoinKeys, helper.idxOff2KeyOff, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow) +======= + if !wrapper.hasDitryWrite { + innerTask2 := constructInnerIndexScanTask(p, prop, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow) +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) if innerTask2 != nil { joins = append(joins, p.constructIndexMergeJoin(prop, outerIdx, innerTask2, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager)...) } @@ -1058,7 +1086,13 @@ func (ijHelper *indexJoinBuildHelper) buildRangeDecidedByInformation(idxCols []* } // constructInnerTableScanTask is specially used to construct the inner plan for PhysicalIndexJoin. +<<<<<<< HEAD func (p *LogicalJoin) constructInnerTableScanTask( +======= +func constructInnerTableScanTask( + p *LogicalJoin, + prop *property.PhysicalProperty, +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) wrapper *indexJoinInnerChildWrapper, ranges ranger.Ranges, _ []*expression.Column, @@ -1131,6 +1165,7 @@ func (p *LogicalJoin) constructInnerTableScanTask( ts.PartitionInfo = copTask.partitionInfo selStats := ts.StatsInfo().Scale(selectivity) ts.addPushedDownSelection(copTask, selStats) +<<<<<<< HEAD t := copTask.convertToRootTask(ds.SCtx()) reader := t.p t.p = p.constructInnerByWrapper(wrapper, reader) @@ -1141,6 +1176,22 @@ func (p *LogicalJoin) constructInnerByWrapper(wrapper *indexJoinInnerChildWrappe if !p.SCtx().GetSessionVars().EnableINLJoinInnerMultiPattern { if wrapper.us != nil { return p.constructInnerUnionScan(wrapper.us, child) +======= + return constructIndexJoinInnerSideTask(p, prop, copTask, ds, nil, wrapper) +} + +func constructInnerByZippedChildren(prop *property.PhysicalProperty, zippedChildren []base.LogicalPlan, child base.PhysicalPlan) base.PhysicalPlan { + for i := len(zippedChildren) - 1; i >= 0; i-- { + switch x := zippedChildren[i].(type) { + case *LogicalUnionScan: + child = constructInnerUnionScan(prop, x, child) + case *logicalop.LogicalProjection: + child = constructInnerProj(prop, x, child) + case *LogicalSelection: + child = constructInnerSel(prop, x, child) + case *LogicalAggregation: + child = constructInnerAgg(prop, x, child) +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) } return child } @@ -1154,18 +1205,40 @@ func (p *LogicalJoin) constructInnerByWrapper(wrapper *indexJoinInnerChildWrappe return child } +<<<<<<< HEAD func (*LogicalJoin) constructInnerSel(sel *LogicalSelection, child PhysicalPlan) PhysicalPlan { +======= +func constructInnerAgg(prop *property.PhysicalProperty, logicalAgg *LogicalAggregation, child base.PhysicalPlan) base.PhysicalPlan { + if logicalAgg == nil { + return child + } + physicalHashAgg := NewPhysicalHashAgg(logicalAgg, logicalAgg.StatsInfo(), prop) + physicalHashAgg.SetSchema(logicalAgg.Schema().Clone()) + physicalHashAgg.SetChildren(child) + return physicalHashAgg +} + +func constructInnerSel(prop *property.PhysicalProperty, sel *LogicalSelection, child base.PhysicalPlan) base.PhysicalPlan { +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) if sel == nil { return child } physicalSel := PhysicalSelection{ Conditions: sel.Conditions, +<<<<<<< HEAD }.Init(sel.SCtx(), sel.StatsInfo(), sel.SelectBlockOffset(), nil) +======= + }.Init(sel.SCtx(), sel.StatsInfo(), sel.QueryBlockOffset(), prop) +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) physicalSel.SetChildren(child) return physicalSel } +<<<<<<< HEAD func (*LogicalJoin) constructInnerProj(proj *LogicalProjection, child PhysicalPlan) PhysicalPlan { +======= +func constructInnerProj(prop *property.PhysicalProperty, proj *logicalop.LogicalProjection, child base.PhysicalPlan) base.PhysicalPlan { +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) if proj == nil { return child } @@ -1173,21 +1246,35 @@ func (*LogicalJoin) constructInnerProj(proj *LogicalProjection, child PhysicalPl Exprs: proj.Exprs, CalculateNoDelay: proj.CalculateNoDelay, AvoidColumnEvaluator: proj.AvoidColumnEvaluator, +<<<<<<< HEAD }.Init(proj.SCtx(), proj.StatsInfo(), proj.SelectBlockOffset(), nil) +======= + }.Init(proj.SCtx(), proj.StatsInfo(), proj.QueryBlockOffset(), prop) +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) physicalProj.SetChildren(child) return physicalProj } +<<<<<<< HEAD func (*LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan { +======= +func constructInnerUnionScan(prop *property.PhysicalProperty, us *LogicalUnionScan, reader base.PhysicalPlan) base.PhysicalPlan { +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) if us == nil { return reader } // Use `reader.StatsInfo()` instead of `us.StatsInfo()` because it should be more accurate. No need to specify // childrenReqProps now since we have got reader already. physicalUnionScan := PhysicalUnionScan{ +<<<<<<< HEAD Conditions: us.conditions, HandleCols: us.handleCols, }.Init(us.SCtx(), reader.StatsInfo(), us.SelectBlockOffset(), nil) +======= + Conditions: us.Conditions, + HandleCols: us.HandleCols, + }.Init(us.SCtx(), reader.StatsInfo(), us.QueryBlockOffset(), prop) +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) physicalUnionScan.SetChildren(reader) return physicalUnionScan } @@ -1242,7 +1329,13 @@ func getColsNDVLowerBoundFromHistColl(colUIDs []int64, histColl *statistics.Hist } // constructInnerIndexScanTask is specially used to construct the inner plan for PhysicalIndexJoin. +<<<<<<< HEAD func (p *LogicalJoin) constructInnerIndexScanTask( +======= +func constructInnerIndexScanTask( + p *LogicalJoin, + prop *property.PhysicalProperty, +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) wrapper *indexJoinInnerChildWrapper, path *util.AccessPath, ranges ranger.Ranges, @@ -1419,6 +1512,7 @@ func (p *LogicalJoin) constructInnerIndexScanTask( if usedStats != nil && usedStats[is.physicalTableID] != nil { is.usedStatsInfo = usedStats[is.physicalTableID] } +<<<<<<< HEAD finalStats := ds.tableStats.ScaleByExpectCnt(rowCount) is.addPushedDownSelection(cop, ds, tmpPath, finalStats) t := cop.convertToRootTask(ds.SCtx()) @@ -1465,11 +1559,41 @@ func (cwc *ColWithCmpFuncManager) CompareRow(lhs, rhs chunk.Row) int { ret := cwc.compareFuncs[i](lhs, col.Index, rhs, col.Index) if ret != 0 { return ret +======= + finalStats := ds.TableStats.ScaleByExpectCnt(rowCount) + if err := is.addPushedDownSelection(cop, ds, tmpPath, finalStats); err != nil { + logutil.BgLogger().Warn("unexpected error happened during addPushedDownSelection function", zap.Error(err)) + return nil + } + return constructIndexJoinInnerSideTask(p, prop, cop, ds, path, wrapper) +} + +// construct the inner join task by inner child plan tree +// The Logical include two parts: logicalplan->physicalplan, physicalplan->task +// Step1: whether agg can be pushed down to coprocessor +// +// Step1.1: If the agg can be pushded down to coprocessor, we will build a copTask and attach the agg to the copTask +// There are two kinds of agg: stream agg and hash agg. Stream agg depends on some conditions, such as the group by cols +// +// Step2: build other inner plan node to task +func constructIndexJoinInnerSideTask(p *LogicalJoin, prop *property.PhysicalProperty, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task { + var la *LogicalAggregation + var canPushAggToCop bool + if len(wrapper.zippedChildren) > 0 { + la, canPushAggToCop = wrapper.zippedChildren[len(wrapper.zippedChildren)-1].(*LogicalAggregation) + if la != nil && la.HasDistinct() { + // TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented. + // If AllowDistinctAggPushDown is set to true, we should not consider RootTask. + if !la.SCtx().GetSessionVars().AllowDistinctAggPushDown { + canPushAggToCop = false + } +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) } } return 0 } +<<<<<<< HEAD // BuildRangesByRow will build range of the given row. It will eval each function's arg then call BuildRange. func (cwc *ColWithCmpFuncManager) BuildRangesByRow(ctx sessionctx.Context, row chunk.Row) ([]*ranger.Range, error) { exprs := make([]expression.Expression, len(cwc.OpType)) @@ -1484,6 +1608,13 @@ func (cwc *ColWithCmpFuncManager) BuildRangesByRow(ctx sessionctx.Context, row c return nil, err } exprs = append(exprs, newExpr) // nozero +======= + // If the bottom plan is not aggregation or the aggregation can't be pushed to coprocessor, we will construct a root task directly. + if !canPushAggToCop { + result := dsCopTask.ConvertToRootTask(ds.SCtx()).(*RootTask) + result.SetPlan(constructInnerByZippedChildren(prop, wrapper.zippedChildren, result.GetPlan())) + return result +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) } // We already limit range mem usage when buildTemplateRange for inner table of IndexJoin in optimizer phase, so we // don't need and shouldn't limit range mem usage when we refill inner ranges during the execution phase. @@ -1567,6 +1698,52 @@ func (ijHelper *indexJoinBuildHelper) resetContextForIndex(innerKeys []*expressi ijHelper.curNotUsedIndexCols = append(ijHelper.curNotUsedIndexCols, idxCol) ijHelper.curNotUsedColLens = append(ijHelper.curNotUsedColLens, colLens[i]) } +<<<<<<< HEAD +======= + + // build physical agg and attach to task + var aggTask base.Task + // build stream agg and change ds keep order to true + if preferStream { + newGbyItems := make([]expression.Expression, len(la.GroupByItems)) + copy(newGbyItems, la.GroupByItems) + newAggFuncs := make([]*aggregation.AggFuncDesc, len(la.AggFuncs)) + copy(newAggFuncs, la.AggFuncs) + streamAgg := basePhysicalAgg{ + GroupByItems: newGbyItems, + AggFuncs: newAggFuncs, + }.initForStream(la.SCtx(), la.StatsInfo(), la.QueryBlockOffset(), prop) + streamAgg.SetSchema(la.Schema().Clone()) + // change to keep order for index scan and dsCopTask + if dsCopTask.indexPlan != nil { + // get the index scan from dsCopTask.indexPlan + physicalIndexScan, _ := dsCopTask.indexPlan.(*PhysicalIndexScan) + if physicalIndexScan == nil && len(dsCopTask.indexPlan.Children()) == 1 { + physicalIndexScan, _ = dsCopTask.indexPlan.Children()[0].(*PhysicalIndexScan) + } + if physicalIndexScan != nil { + physicalIndexScan.KeepOrder = true + dsCopTask.keepOrder = true + aggTask = streamAgg.Attach2Task(dsCopTask) + } + } + } + + // build hash agg, when the stream agg is illegal such as the order by prop is not matched + if aggTask == nil { + physicalHashAgg := NewPhysicalHashAgg(la, la.StatsInfo(), prop) + physicalHashAgg.SetSchema(la.Schema().Clone()) + aggTask = physicalHashAgg.Attach2Task(dsCopTask) + } + + // build other inner plan node to task + result, ok := aggTask.(*RootTask) + if !ok { + return nil + } + result.SetPlan(constructInnerByZippedChildren(prop, wrapper.zippedChildren[0:len(wrapper.zippedChildren)-1], result.p)) + return result +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201)) } // findUsefulEqAndInFilters analyzes the pushedDownConds held by inner child and split them to three parts. diff --git a/pkg/planner/core/issuetest/planner_issue_test.go b/pkg/planner/core/issuetest/planner_issue_test.go index 772a66b644388..d1327f83133f0 100644 --- a/pkg/planner/core/issuetest/planner_issue_test.go +++ b/pkg/planner/core/issuetest/planner_issue_test.go @@ -85,3 +85,37 @@ func Test53726(t *testing.T) { " └─TableReader_11 2.00 root data:TableFullScan_10", " └─TableFullScan_10 2.00 cop[tikv] table:t7 keep order:false")) } +<<<<<<< HEAD +======= + +func TestIssue54535(t *testing.T) { + // test for tidb_enable_inl_join_inner_multi_pattern system variable + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set session tidb_enable_inl_join_inner_multi_pattern='ON'") + tk.MustExec("create table ta(a1 int, a2 int, a3 int, index idx_a(a1))") + tk.MustExec("create table tb(b1 int, b2 int, b3 int, index idx_b(b1))") + tk.MustExec("analyze table ta") + tk.MustExec("analyze table tb") + + tk.MustQuery("explain SELECT /*+ inl_join(tmp) */ * FROM ta, (SELECT b1, COUNT(b3) AS cnt FROM tb GROUP BY b1, b2) as tmp where ta.a1 = tmp.b1"). + Check(testkit.Rows( + "Projection_9 9990.00 root test.ta.a1, test.ta.a2, test.ta.a3, test.tb.b1, Column#9", + "└─IndexJoin_16 9990.00 root inner join, inner:HashAgg_14, outer key:test.ta.a1, inner key:test.tb.b1, equal cond:eq(test.ta.a1, test.tb.b1)", + " ├─TableReader_43(Build) 9990.00 root data:Selection_42", + " │ └─Selection_42 9990.00 cop[tikv] not(isnull(test.ta.a1))", + " │ └─TableFullScan_41 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo", + " └─HashAgg_14(Probe) 79840080.00 root group by:test.tb.b1, test.tb.b2, funcs:count(Column#11)->Column#9, funcs:firstrow(test.tb.b1)->test.tb.b1", + " └─IndexLookUp_15 79840080.00 root ", + " ├─Selection_12(Build) 9990.00 cop[tikv] not(isnull(test.tb.b1))", + " │ └─IndexRangeScan_10 10000.00 cop[tikv] table:tb, index:idx_b(b1) range: decided by [eq(test.tb.b1, test.ta.a1)], keep order:false, stats:pseudo", + " └─HashAgg_13(Probe) 79840080.00 cop[tikv] group by:test.tb.b1, test.tb.b2, funcs:count(test.tb.b3)->Column#11", + " └─TableRowIDScan_11 9990.00 cop[tikv] table:tb keep order:false, stats:pseudo")) + // test for issues/55169 + tk.MustExec("create table t1(col_1 int, index idx_1(col_1));") + tk.MustExec("create table t2(col_1 int, col_2 int, index idx_2(col_1));") + tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows()) + tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(distinct col_2 order by col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows()) +} +>>>>>>> de943d1a2ca (planner: avoid nil PhysicalProperty when to build agg (#55201))