diff --git a/pkg/planner/core/exhaust_physical_plans.go b/pkg/planner/core/exhaust_physical_plans.go index 977f396eb4767..bfff75375fd46 100644 --- a/pkg/planner/core/exhaust_physical_plans.go +++ b/pkg/planner/core/exhaust_physical_plans.go @@ -814,13 +814,13 @@ func buildIndexJoinInner2TableScan( return nil } rangeInfo := indexJoinPathRangeInfo(p.SCtx(), outerJoinKeys, indexJoinResult) - innerTask = constructInnerTableScanTask(p, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt) + innerTask = constructInnerTableScanTask(p, prop, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt) // The index merge join's inner plan is different from index join, so we // should construct another inner plan for it. // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. if !wrapper.hasDitryWrite { - innerTask2 = constructInnerTableScanTask(p, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) + innerTask2 = constructInnerTableScanTask(p, prop, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) } ranges = indexJoinResult.chosenRanges } else { @@ -854,13 +854,13 @@ func buildIndexJoinInner2TableScan( } buffer.WriteString("]") rangeInfo := buffer.String() - innerTask = constructInnerTableScanTask(p, wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt) + innerTask = constructInnerTableScanTask(p, prop, wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt) // The index merge join's inner plan is different from index join, so we // should construct another inner plan for it. // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. if !wrapper.hasDitryWrite { - innerTask2 = constructInnerTableScanTask(p, wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) + innerTask2 = constructInnerTableScanTask(p, prop, wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) } } var ( @@ -922,7 +922,7 @@ func buildIndexJoinInner2IndexScan( maxOneRow = ok && (sf.FuncName.L == ast.EQ) } } - innerTask := constructInnerIndexScanTask(p, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, false, false, avgInnerRowCnt, maxOneRow) + innerTask := constructInnerIndexScanTask(p, prop, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, false, false, avgInnerRowCnt, maxOneRow) failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) { if val.(bool) && !p.SCtx().GetSessionVars().InRestrictedSQL && innerTask != nil { failpoint.Return(constructIndexHashJoin(p, prop, outerIdx, innerTask, indexJoinResult.chosenRanges, keyOff2IdxOff, indexJoinResult.chosenPath, indexJoinResult.lastColManager)) @@ -939,7 +939,7 @@ func buildIndexJoinInner2IndexScan( // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. if !wrapper.hasDitryWrite { - innerTask2 := constructInnerIndexScanTask(p, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow) + innerTask2 := constructInnerIndexScanTask(p, prop, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow) if innerTask2 != nil { joins = append(joins, constructIndexMergeJoin(p, prop, outerIdx, innerTask2, indexJoinResult.chosenRanges, keyOff2IdxOff, indexJoinResult.chosenPath, indexJoinResult.lastColManager)...) } @@ -950,6 +950,7 @@ func buildIndexJoinInner2IndexScan( // constructInnerTableScanTask is specially used to construct the inner plan for PhysicalIndexJoin. func constructInnerTableScanTask( p *LogicalJoin, + prop *property.PhysicalProperty, wrapper *indexJoinInnerChildWrapper, ranges ranger.Ranges, _ []*expression.Column, @@ -1022,47 +1023,47 @@ func constructInnerTableScanTask( ts.PlanPartInfo = copTask.physPlanPartInfo selStats := ts.StatsInfo().Scale(selectivity) ts.addPushedDownSelection(copTask, selStats) - return constructIndexJoinInnerSideTask(p, copTask, ds, nil, wrapper) + return constructIndexJoinInnerSideTask(p, prop, copTask, ds, nil, wrapper) } -func constructInnerByZippedChildren(zippedChildren []base.LogicalPlan, child base.PhysicalPlan) base.PhysicalPlan { +func constructInnerByZippedChildren(prop *property.PhysicalProperty, zippedChildren []base.LogicalPlan, child base.PhysicalPlan) base.PhysicalPlan { for i := len(zippedChildren) - 1; i >= 0; i-- { switch x := zippedChildren[i].(type) { case *LogicalUnionScan: - child = constructInnerUnionScan(x, child) + child = constructInnerUnionScan(prop, x, child) case *logicalop.LogicalProjection: - child = constructInnerProj(x, child) + child = constructInnerProj(prop, x, child) case *LogicalSelection: - child = constructInnerSel(x, child) + child = constructInnerSel(prop, x, child) case *LogicalAggregation: - child = constructInnerAgg(x, child) + child = constructInnerAgg(prop, x, child) } } return child } -func constructInnerAgg(logicalAgg *LogicalAggregation, child base.PhysicalPlan) base.PhysicalPlan { +func constructInnerAgg(prop *property.PhysicalProperty, logicalAgg *LogicalAggregation, child base.PhysicalPlan) base.PhysicalPlan { if logicalAgg == nil { return child } - physicalHashAgg := NewPhysicalHashAgg(logicalAgg, logicalAgg.StatsInfo(), nil) + physicalHashAgg := NewPhysicalHashAgg(logicalAgg, logicalAgg.StatsInfo(), prop) physicalHashAgg.SetSchema(logicalAgg.Schema().Clone()) physicalHashAgg.SetChildren(child) return physicalHashAgg } -func constructInnerSel(sel *LogicalSelection, child base.PhysicalPlan) base.PhysicalPlan { +func constructInnerSel(prop *property.PhysicalProperty, sel *LogicalSelection, child base.PhysicalPlan) base.PhysicalPlan { if sel == nil { return child } physicalSel := PhysicalSelection{ Conditions: sel.Conditions, - }.Init(sel.SCtx(), sel.StatsInfo(), sel.QueryBlockOffset(), nil) + }.Init(sel.SCtx(), sel.StatsInfo(), sel.QueryBlockOffset(), prop) physicalSel.SetChildren(child) return physicalSel } -func constructInnerProj(proj *logicalop.LogicalProjection, child base.PhysicalPlan) base.PhysicalPlan { +func constructInnerProj(prop *property.PhysicalProperty, proj *logicalop.LogicalProjection, child base.PhysicalPlan) base.PhysicalPlan { if proj == nil { return child } @@ -1070,13 +1071,13 @@ func constructInnerProj(proj *logicalop.LogicalProjection, child base.PhysicalPl Exprs: proj.Exprs, CalculateNoDelay: proj.CalculateNoDelay, AvoidColumnEvaluator: proj.AvoidColumnEvaluator, - }.Init(proj.SCtx(), proj.StatsInfo(), proj.QueryBlockOffset(), nil) + }.Init(proj.SCtx(), proj.StatsInfo(), proj.QueryBlockOffset(), prop) physicalProj.SetChildren(child) physicalProj.SetSchema(proj.Schema()) return physicalProj } -func constructInnerUnionScan(us *LogicalUnionScan, reader base.PhysicalPlan) base.PhysicalPlan { +func constructInnerUnionScan(prop *property.PhysicalProperty, us *LogicalUnionScan, reader base.PhysicalPlan) base.PhysicalPlan { if us == nil { return reader } @@ -1085,7 +1086,7 @@ func constructInnerUnionScan(us *LogicalUnionScan, reader base.PhysicalPlan) bas physicalUnionScan := PhysicalUnionScan{ Conditions: us.Conditions, HandleCols: us.HandleCols, - }.Init(us.SCtx(), reader.StatsInfo(), us.QueryBlockOffset(), nil) + }.Init(us.SCtx(), reader.StatsInfo(), us.QueryBlockOffset(), prop) physicalUnionScan.SetChildren(reader) return physicalUnionScan } @@ -1142,6 +1143,7 @@ func getColsNDVLowerBoundFromHistColl(colUIDs []int64, histColl *statistics.Hist // constructInnerIndexScanTask is specially used to construct the inner plan for PhysicalIndexJoin. func constructInnerIndexScanTask( p *LogicalJoin, + prop *property.PhysicalProperty, wrapper *indexJoinInnerChildWrapper, path *util.AccessPath, ranges ranger.Ranges, @@ -1323,7 +1325,7 @@ func constructInnerIndexScanTask( logutil.BgLogger().Warn("unexpected error happened during addPushedDownSelection function", zap.Error(err)) return nil } - return constructIndexJoinInnerSideTask(p, cop, ds, path, wrapper) + return constructIndexJoinInnerSideTask(p, prop, cop, ds, path, wrapper) } // construct the inner join task by inner child plan tree @@ -1334,7 +1336,7 @@ func constructInnerIndexScanTask( // There are two kinds of agg: stream agg and hash agg. Stream agg depends on some conditions, such as the group by cols // // Step2: build other inner plan node to task -func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task { +func constructIndexJoinInnerSideTask(p *LogicalJoin, prop *property.PhysicalProperty, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task { var la *LogicalAggregation var canPushAggToCop bool if len(wrapper.zippedChildren) > 0 { @@ -1351,7 +1353,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat // If the bottom plan is not aggregation or the aggregation can't be pushed to coprocessor, we will construct a root task directly. if !canPushAggToCop { result := dsCopTask.ConvertToRootTask(ds.SCtx()).(*RootTask) - result.SetPlan(constructInnerByZippedChildren(wrapper.zippedChildren, result.GetPlan())) + result.SetPlan(constructInnerByZippedChildren(prop, wrapper.zippedChildren, result.GetPlan())) return result } @@ -1403,7 +1405,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat streamAgg := basePhysicalAgg{ GroupByItems: newGbyItems, AggFuncs: newAggFuncs, - }.initForStream(la.SCtx(), la.StatsInfo(), la.QueryBlockOffset(), nil) + }.initForStream(la.SCtx(), la.StatsInfo(), la.QueryBlockOffset(), prop) streamAgg.SetSchema(la.Schema().Clone()) // change to keep order for index scan and dsCopTask if dsCopTask.indexPlan != nil { @@ -1422,7 +1424,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat // build hash agg, when the stream agg is illegal such as the order by prop is not matched if aggTask == nil { - physicalHashAgg := NewPhysicalHashAgg(la, la.StatsInfo(), nil) + physicalHashAgg := NewPhysicalHashAgg(la, la.StatsInfo(), prop) physicalHashAgg.SetSchema(la.Schema().Clone()) aggTask = physicalHashAgg.Attach2Task(dsCopTask) } @@ -1432,7 +1434,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat if !ok { return nil } - result.SetPlan(constructInnerByZippedChildren(wrapper.zippedChildren[0:len(wrapper.zippedChildren)-1], result.p)) + result.SetPlan(constructInnerByZippedChildren(prop, wrapper.zippedChildren[0:len(wrapper.zippedChildren)-1], result.p)) return result } diff --git a/pkg/planner/core/issuetest/planner_issue_test.go b/pkg/planner/core/issuetest/planner_issue_test.go index d1b38296965b7..c4ae68acb6eb7 100644 --- a/pkg/planner/core/issuetest/planner_issue_test.go +++ b/pkg/planner/core/issuetest/planner_issue_test.go @@ -111,4 +111,9 @@ func TestIssue54535(t *testing.T) { " │ └─IndexRangeScan_10 10000.00 cop[tikv] table:tb, index:idx_b(b1) range: decided by [eq(test.tb.b1, test.ta.a1)], keep order:false, stats:pseudo", " └─HashAgg_13(Probe) 79840080.00 cop[tikv] group by:test.tb.b1, test.tb.b2, funcs:count(test.tb.b3)->Column#11", " └─TableRowIDScan_11 9990.00 cop[tikv] table:tb keep order:false, stats:pseudo")) + // test for issues/55169 + tk.MustExec("create table t1(col_1 int, index idx_1(col_1));") + tk.MustExec("create table t2(col_1 int, col_2 int, index idx_2(col_1));") + tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows()) + tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(distinct col_2 order by col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows()) }