Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix index out of range in constructIndexJoinInnerSideTask (#54534) #55145

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,12 +1375,131 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
if usedStats != nil && usedStats.GetUsedInfo(is.physicalTableID) != nil {
is.usedStatsInfo = usedStats.GetUsedInfo(is.physicalTableID)
}
<<<<<<< HEAD
finalStats := ds.tableStats.ScaleByExpectCnt(rowCount)
is.addPushedDownSelection(cop, ds, tmpPath, finalStats)
t := cop.convertToRootTask(ds.SCtx())
reader := t.p
t.p = p.constructInnerByWrapper(wrapper, reader)
return t
=======
finalStats := ds.TableStats.ScaleByExpectCnt(rowCount)
if err := is.addPushedDownSelection(cop, ds, tmpPath, finalStats); err != nil {
logutil.BgLogger().Warn("unexpected error happened during addPushedDownSelection function", zap.Error(err))
return nil
}
return constructIndexJoinInnerSideTask(p, cop, ds, path, wrapper)
}

// construct the inner join task by inner child plan tree
// The Logical include two parts: logicalplan->physicalplan, physicalplan->task
// Step1: whether agg can be pushed down to coprocessor
//
// Step1.1: If the agg can be pushded down to coprocessor, we will build a copTask and attach the agg to the copTask
// There are two kinds of agg: stream agg and hash agg. Stream agg depends on some conditions, such as the group by cols
//
// Step2: build other inner plan node to task
func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task {
var la *LogicalAggregation
var canPushAggToCop bool
if len(wrapper.zippedChildren) > 0 {
la, canPushAggToCop = wrapper.zippedChildren[len(wrapper.zippedChildren)-1].(*LogicalAggregation)
if la != nil && la.HasDistinct() {
// TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented.
// If AllowDistinctAggPushDown is set to true, we should not consider RootTask.
if !la.SCtx().GetSessionVars().AllowDistinctAggPushDown {
canPushAggToCop = false
}
}
}

// If the bottom plan is not aggregation or the aggregation can't be pushed to coprocessor, we will construct a root task directly.
if !canPushAggToCop {
result := dsCopTask.ConvertToRootTask(ds.SCtx()).(*RootTask)
result.SetPlan(constructInnerByZippedChildren(wrapper.zippedChildren, result.GetPlan()))
return result
}

// Try stream aggregation first.
// We will choose the stream aggregation if the following conditions are met:
// 1. Force hint stream agg by /*+ stream_agg() */
// 2. Other conditions copy from getStreamAggs() in exhaust_physical_plans.go
_, preferStream := la.ResetHintIfConflicted()
for _, aggFunc := range la.AggFuncs {
if aggFunc.Mode == aggregation.FinalMode {
preferStream = false
break
}
}
// group by a + b is not interested in any order.
groupByCols := la.GetGroupByCols()
if len(groupByCols) != len(la.GroupByItems) {
preferStream = false
}
if la.HasDistinct() && !la.DistinctArgsMeetsProperty() {
preferStream = false
}
// sort items must be the super set of group by items
if path != nil && path.Index != nil && !path.Index.MVIndex &&
ds.TableInfo.GetPartitionInfo() == nil {
if len(path.IdxCols) < len(groupByCols) {
preferStream = false
} else {
sctx := p.SCtx()
for i, groupbyCol := range groupByCols {
if path.IdxColLens[i] != types.UnspecifiedLength ||
!groupbyCol.EqualByExprAndID(sctx.GetExprCtx().GetEvalCtx(), path.IdxCols[i]) {
preferStream = false
}
}
}
} else {
preferStream = false
}

// build physical agg and attach to task
var aggTask base.Task
// build stream agg and change ds keep order to true
if preferStream {
newGbyItems := make([]expression.Expression, len(la.GroupByItems))
copy(newGbyItems, la.GroupByItems)
newAggFuncs := make([]*aggregation.AggFuncDesc, len(la.AggFuncs))
copy(newAggFuncs, la.AggFuncs)
streamAgg := basePhysicalAgg{
GroupByItems: newGbyItems,
AggFuncs: newAggFuncs,
}.initForStream(la.SCtx(), la.StatsInfo(), la.QueryBlockOffset(), nil)
streamAgg.SetSchema(la.Schema().Clone())
// change to keep order for index scan and dsCopTask
if dsCopTask.indexPlan != nil {
// get the index scan from dsCopTask.indexPlan
physicalIndexScan, _ := dsCopTask.indexPlan.(*PhysicalIndexScan)
if physicalIndexScan == nil && len(dsCopTask.indexPlan.Children()) == 1 {
physicalIndexScan, _ = dsCopTask.indexPlan.Children()[0].(*PhysicalIndexScan)
}
if physicalIndexScan != nil {
physicalIndexScan.KeepOrder = true
dsCopTask.keepOrder = true
aggTask = streamAgg.Attach2Task(dsCopTask)
}
}
}

// build hash agg, when the stream agg is illegal such as the order by prop is not matched
if aggTask == nil {
physicalHashAgg := NewPhysicalHashAgg(la, la.StatsInfo(), nil)
physicalHashAgg.SetSchema(la.Schema().Clone())
aggTask = physicalHashAgg.Attach2Task(dsCopTask)
}

// build other inner plan node to task
result, ok := aggTask.(*RootTask)
if !ok {
return nil
}
result.SetPlan(constructInnerByZippedChildren(wrapper.zippedChildren[0:len(wrapper.zippedChildren)-1], result.p))
return result
>>>>>>> 194711a43cc (planner: fix index out of range in constructIndexJoinInnerSideTask (#54534))
}

var symmetricOp = map[string]string{
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/issuetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_test(
data = glob(["testdata/**"]),
flaky = True,
race = "on",
shard_count = 3,
deps = [
"//pkg/parser",
"//pkg/planner",
Expand Down
26 changes: 26 additions & 0 deletions pkg/planner/core/issuetest/planner_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,29 @@ func Test53726(t *testing.T) {
" └─TableReader_11 2.00 root data:TableFullScan_10",
" └─TableFullScan_10 2.00 cop[tikv] table:t7 keep order:false"))
}

func TestIssue54535(t *testing.T) {
// test for tidb_enable_inl_join_inner_multi_pattern system variable
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set session tidb_enable_inl_join_inner_multi_pattern='ON'")
tk.MustExec("create table ta(a1 int, a2 int, a3 int, index idx_a(a1))")
tk.MustExec("create table tb(b1 int, b2 int, b3 int, index idx_b(b1))")
tk.MustExec("analyze table ta")
tk.MustExec("analyze table tb")

tk.MustQuery("explain SELECT /*+ inl_join(tmp) */ * FROM ta, (SELECT b1, COUNT(b3) AS cnt FROM tb GROUP BY b1, b2) as tmp where ta.a1 = tmp.b1").
Check(testkit.Rows(
"Projection_9 9990.00 root test.ta.a1, test.ta.a2, test.ta.a3, test.tb.b1, Column#9",
"└─IndexJoin_16 9990.00 root inner join, inner:HashAgg_14, outer key:test.ta.a1, inner key:test.tb.b1, equal cond:eq(test.ta.a1, test.tb.b1)",
" ├─TableReader_43(Build) 9990.00 root data:Selection_42",
" │ └─Selection_42 9990.00 cop[tikv] not(isnull(test.ta.a1))",
" │ └─TableFullScan_41 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo",
" └─HashAgg_14(Probe) 79840080.00 root group by:test.tb.b1, test.tb.b2, funcs:count(Column#11)->Column#9, funcs:firstrow(test.tb.b1)->test.tb.b1",
" └─IndexLookUp_15 79840080.00 root ",
" ├─Selection_12(Build) 9990.00 cop[tikv] not(isnull(test.tb.b1))",
" │ └─IndexRangeScan_10 10000.00 cop[tikv] table:tb, index:idx_b(b1) range: decided by [eq(test.tb.b1, test.ta.a1)], keep order:false, stats:pseudo",
" └─HashAgg_13(Probe) 79840080.00 cop[tikv] group by:test.tb.b1, test.tb.b2, funcs:count(test.tb.b3)->Column#11",
" └─TableRowIDScan_11 9990.00 cop[tikv] table:tb keep order:false, stats:pseudo"))
}