Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: avoid nil PhysicalProperty when to build agg #55201

Merged
merged 3 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,13 +814,13 @@ func buildIndexJoinInner2TableScan(
return nil
}
rangeInfo := indexJoinPathRangeInfo(p.SCtx(), outerJoinKeys, indexJoinResult)
innerTask = constructInnerTableScanTask(p, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
innerTask = constructInnerTableScanTask(p, prop, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
// The index merge join's inner plan is different from index join, so we
// should construct another inner plan for it.
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
if !wrapper.hasDitryWrite {
innerTask2 = constructInnerTableScanTask(p, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
innerTask2 = constructInnerTableScanTask(p, prop, wrapper, indexJoinResult.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
}
ranges = indexJoinResult.chosenRanges
} else {
Expand Down Expand Up @@ -854,13 +854,13 @@ func buildIndexJoinInner2TableScan(
}
buffer.WriteString("]")
rangeInfo := buffer.String()
innerTask = constructInnerTableScanTask(p, wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
innerTask = constructInnerTableScanTask(p, prop, wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt)
// The index merge join's inner plan is different from index join, so we
// should construct another inner plan for it.
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
if !wrapper.hasDitryWrite {
innerTask2 = constructInnerTableScanTask(p, wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
innerTask2 = constructInnerTableScanTask(p, prop, wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt)
}
}
var (
Expand Down Expand Up @@ -922,7 +922,7 @@ func buildIndexJoinInner2IndexScan(
maxOneRow = ok && (sf.FuncName.L == ast.EQ)
}
}
innerTask := constructInnerIndexScanTask(p, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, false, false, avgInnerRowCnt, maxOneRow)
innerTask := constructInnerIndexScanTask(p, prop, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, false, false, avgInnerRowCnt, maxOneRow)
failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) {
if val.(bool) && !p.SCtx().GetSessionVars().InRestrictedSQL && innerTask != nil {
failpoint.Return(constructIndexHashJoin(p, prop, outerIdx, innerTask, indexJoinResult.chosenRanges, keyOff2IdxOff, indexJoinResult.chosenPath, indexJoinResult.lastColManager))
Expand All @@ -939,7 +939,7 @@ func buildIndexJoinInner2IndexScan(
// Because we can't keep order for union scan, if there is a union scan in inner task,
// we can't construct index merge join.
if !wrapper.hasDitryWrite {
innerTask2 := constructInnerIndexScanTask(p, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow)
innerTask2 := constructInnerIndexScanTask(p, prop, wrapper, indexJoinResult.chosenPath, indexJoinResult.chosenRanges.Range(), indexJoinResult.chosenRemained, innerJoinKeys, indexJoinResult.idxOff2KeyOff, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow)
if innerTask2 != nil {
joins = append(joins, constructIndexMergeJoin(p, prop, outerIdx, innerTask2, indexJoinResult.chosenRanges, keyOff2IdxOff, indexJoinResult.chosenPath, indexJoinResult.lastColManager)...)
}
Expand All @@ -950,6 +950,7 @@ func buildIndexJoinInner2IndexScan(
// constructInnerTableScanTask is specially used to construct the inner plan for PhysicalIndexJoin.
func constructInnerTableScanTask(
p *LogicalJoin,
prop *property.PhysicalProperty,
wrapper *indexJoinInnerChildWrapper,
ranges ranger.Ranges,
_ []*expression.Column,
Expand Down Expand Up @@ -1022,61 +1023,61 @@ func constructInnerTableScanTask(
ts.PlanPartInfo = copTask.physPlanPartInfo
selStats := ts.StatsInfo().Scale(selectivity)
ts.addPushedDownSelection(copTask, selStats)
return constructIndexJoinInnerSideTask(p, copTask, ds, nil, wrapper)
return constructIndexJoinInnerSideTask(p, prop, copTask, ds, nil, wrapper)
}

func constructInnerByZippedChildren(zippedChildren []base.LogicalPlan, child base.PhysicalPlan) base.PhysicalPlan {
func constructInnerByZippedChildren(prop *property.PhysicalProperty, zippedChildren []base.LogicalPlan, child base.PhysicalPlan) base.PhysicalPlan {
for i := len(zippedChildren) - 1; i >= 0; i-- {
switch x := zippedChildren[i].(type) {
case *LogicalUnionScan:
child = constructInnerUnionScan(x, child)
child = constructInnerUnionScan(prop, x, child)
case *logicalop.LogicalProjection:
child = constructInnerProj(x, child)
child = constructInnerProj(prop, x, child)
case *LogicalSelection:
child = constructInnerSel(x, child)
child = constructInnerSel(prop, x, child)
case *LogicalAggregation:
child = constructInnerAgg(x, child)
child = constructInnerAgg(prop, x, child)
}
}
return child
}

func constructInnerAgg(logicalAgg *LogicalAggregation, child base.PhysicalPlan) base.PhysicalPlan {
func constructInnerAgg(prop *property.PhysicalProperty, logicalAgg *LogicalAggregation, child base.PhysicalPlan) base.PhysicalPlan {
if logicalAgg == nil {
return child
}
physicalHashAgg := NewPhysicalHashAgg(logicalAgg, logicalAgg.StatsInfo(), nil)
physicalHashAgg := NewPhysicalHashAgg(logicalAgg, logicalAgg.StatsInfo(), prop)
physicalHashAgg.SetSchema(logicalAgg.Schema().Clone())
physicalHashAgg.SetChildren(child)
return physicalHashAgg
}

func constructInnerSel(sel *LogicalSelection, child base.PhysicalPlan) base.PhysicalPlan {
func constructInnerSel(prop *property.PhysicalProperty, sel *LogicalSelection, child base.PhysicalPlan) base.PhysicalPlan {
if sel == nil {
return child
}
physicalSel := PhysicalSelection{
Conditions: sel.Conditions,
}.Init(sel.SCtx(), sel.StatsInfo(), sel.QueryBlockOffset(), nil)
}.Init(sel.SCtx(), sel.StatsInfo(), sel.QueryBlockOffset(), prop)
physicalSel.SetChildren(child)
return physicalSel
}

func constructInnerProj(proj *logicalop.LogicalProjection, child base.PhysicalPlan) base.PhysicalPlan {
func constructInnerProj(prop *property.PhysicalProperty, proj *logicalop.LogicalProjection, child base.PhysicalPlan) base.PhysicalPlan {
if proj == nil {
return child
}
physicalProj := PhysicalProjection{
Exprs: proj.Exprs,
CalculateNoDelay: proj.CalculateNoDelay,
AvoidColumnEvaluator: proj.AvoidColumnEvaluator,
}.Init(proj.SCtx(), proj.StatsInfo(), proj.QueryBlockOffset(), nil)
}.Init(proj.SCtx(), proj.StatsInfo(), proj.QueryBlockOffset(), prop)
physicalProj.SetChildren(child)
physicalProj.SetSchema(proj.Schema())
return physicalProj
}

func constructInnerUnionScan(us *LogicalUnionScan, reader base.PhysicalPlan) base.PhysicalPlan {
func constructInnerUnionScan(prop *property.PhysicalProperty, us *LogicalUnionScan, reader base.PhysicalPlan) base.PhysicalPlan {
if us == nil {
return reader
}
Expand All @@ -1085,7 +1086,7 @@ func constructInnerUnionScan(us *LogicalUnionScan, reader base.PhysicalPlan) bas
physicalUnionScan := PhysicalUnionScan{
Conditions: us.Conditions,
HandleCols: us.HandleCols,
}.Init(us.SCtx(), reader.StatsInfo(), us.QueryBlockOffset(), nil)
}.Init(us.SCtx(), reader.StatsInfo(), us.QueryBlockOffset(), prop)
physicalUnionScan.SetChildren(reader)
return physicalUnionScan
}
Expand Down Expand Up @@ -1142,6 +1143,7 @@ func getColsNDVLowerBoundFromHistColl(colUIDs []int64, histColl *statistics.Hist
// constructInnerIndexScanTask is specially used to construct the inner plan for PhysicalIndexJoin.
func constructInnerIndexScanTask(
p *LogicalJoin,
prop *property.PhysicalProperty,
wrapper *indexJoinInnerChildWrapper,
path *util.AccessPath,
ranges ranger.Ranges,
Expand Down Expand Up @@ -1323,7 +1325,7 @@ func constructInnerIndexScanTask(
logutil.BgLogger().Warn("unexpected error happened during addPushedDownSelection function", zap.Error(err))
return nil
}
return constructIndexJoinInnerSideTask(p, cop, ds, path, wrapper)
return constructIndexJoinInnerSideTask(p, prop, cop, ds, path, wrapper)
}

// construct the inner join task by inner child plan tree
Expand All @@ -1334,7 +1336,7 @@ func constructInnerIndexScanTask(
// There are two kinds of agg: stream agg and hash agg. Stream agg depends on some conditions, such as the group by cols
//
// Step2: build other inner plan node to task
func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task {
func constructIndexJoinInnerSideTask(p *LogicalJoin, prop *property.PhysicalProperty, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task {
var la *LogicalAggregation
var canPushAggToCop bool
if len(wrapper.zippedChildren) > 0 {
Expand All @@ -1351,7 +1353,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat
// If the bottom plan is not aggregation or the aggregation can't be pushed to coprocessor, we will construct a root task directly.
if !canPushAggToCop {
result := dsCopTask.ConvertToRootTask(ds.SCtx()).(*RootTask)
result.SetPlan(constructInnerByZippedChildren(wrapper.zippedChildren, result.GetPlan()))
result.SetPlan(constructInnerByZippedChildren(prop, wrapper.zippedChildren, result.GetPlan()))
return result
}

Expand Down Expand Up @@ -1403,7 +1405,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat
streamAgg := basePhysicalAgg{
GroupByItems: newGbyItems,
AggFuncs: newAggFuncs,
}.initForStream(la.SCtx(), la.StatsInfo(), la.QueryBlockOffset(), nil)
}.initForStream(la.SCtx(), la.StatsInfo(), la.QueryBlockOffset(), prop)
streamAgg.SetSchema(la.Schema().Clone())
// change to keep order for index scan and dsCopTask
if dsCopTask.indexPlan != nil {
Expand All @@ -1422,7 +1424,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat

// build hash agg, when the stream agg is illegal such as the order by prop is not matched
if aggTask == nil {
physicalHashAgg := NewPhysicalHashAgg(la, la.StatsInfo(), nil)
physicalHashAgg := NewPhysicalHashAgg(la, la.StatsInfo(), prop)
physicalHashAgg.SetSchema(la.Schema().Clone())
aggTask = physicalHashAgg.Attach2Task(dsCopTask)
}
Expand All @@ -1432,7 +1434,7 @@ func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *Dat
if !ok {
return nil
}
result.SetPlan(constructInnerByZippedChildren(wrapper.zippedChildren[0:len(wrapper.zippedChildren)-1], result.p))
result.SetPlan(constructInnerByZippedChildren(prop, wrapper.zippedChildren[0:len(wrapper.zippedChildren)-1], result.p))
return result
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/planner/core/issuetest/planner_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,9 @@ func TestIssue54535(t *testing.T) {
" │ └─IndexRangeScan_10 10000.00 cop[tikv] table:tb, index:idx_b(b1) range: decided by [eq(test.tb.b1, test.ta.a1)], keep order:false, stats:pseudo",
" └─HashAgg_13(Probe) 79840080.00 cop[tikv] group by:test.tb.b1, test.tb.b2, funcs:count(test.tb.b3)->Column#11",
" └─TableRowIDScan_11 9990.00 cop[tikv] table:tb keep order:false, stats:pseudo"))
// test for issues/55169
tk.MustExec("create table t1(col_1 int, index idx_1(col_1));")
tk.MustExec("create table t2(col_1 int, col_2 int, index idx_2(col_1));")
tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows())
tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(distinct col_2 order by col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows())
}