diff --git a/pkg/planner/core/exhaust_physical_plans.go b/pkg/planner/core/exhaust_physical_plans.go index c5f7e6fa6d12c..d503f9b143d86 100644 --- a/pkg/planner/core/exhaust_physical_plans.go +++ b/pkg/planner/core/exhaust_physical_plans.go @@ -1375,12 +1375,131 @@ func (p *LogicalJoin) constructInnerIndexScanTask( if usedStats != nil && usedStats.GetUsedInfo(is.physicalTableID) != nil { is.usedStatsInfo = usedStats.GetUsedInfo(is.physicalTableID) } +<<<<<<< HEAD finalStats := ds.tableStats.ScaleByExpectCnt(rowCount) is.addPushedDownSelection(cop, ds, tmpPath, finalStats) t := cop.convertToRootTask(ds.SCtx()) reader := t.p t.p = p.constructInnerByWrapper(wrapper, reader) return t +======= + finalStats := ds.TableStats.ScaleByExpectCnt(rowCount) + if err := is.addPushedDownSelection(cop, ds, tmpPath, finalStats); err != nil { + logutil.BgLogger().Warn("unexpected error happened during addPushedDownSelection function", zap.Error(err)) + return nil + } + return constructIndexJoinInnerSideTask(p, cop, ds, path, wrapper) +} + +// construct the inner join task by inner child plan tree +// The Logical include two parts: logicalplan->physicalplan, physicalplan->task +// Step1: whether agg can be pushed down to coprocessor +// +// Step1.1: If the agg can be pushded down to coprocessor, we will build a copTask and attach the agg to the copTask +// There are two kinds of agg: stream agg and hash agg. Stream agg depends on some conditions, such as the group by cols +// +// Step2: build other inner plan node to task +func constructIndexJoinInnerSideTask(p *LogicalJoin, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task { + var la *LogicalAggregation + var canPushAggToCop bool + if len(wrapper.zippedChildren) > 0 { + la, canPushAggToCop = wrapper.zippedChildren[len(wrapper.zippedChildren)-1].(*LogicalAggregation) + if la != nil && la.HasDistinct() { + // TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented. + // If AllowDistinctAggPushDown is set to true, we should not consider RootTask. + if !la.SCtx().GetSessionVars().AllowDistinctAggPushDown { + canPushAggToCop = false + } + } + } + + // If the bottom plan is not aggregation or the aggregation can't be pushed to coprocessor, we will construct a root task directly. + if !canPushAggToCop { + result := dsCopTask.ConvertToRootTask(ds.SCtx()).(*RootTask) + result.SetPlan(constructInnerByZippedChildren(wrapper.zippedChildren, result.GetPlan())) + return result + } + + // Try stream aggregation first. + // We will choose the stream aggregation if the following conditions are met: + // 1. Force hint stream agg by /*+ stream_agg() */ + // 2. Other conditions copy from getStreamAggs() in exhaust_physical_plans.go + _, preferStream := la.ResetHintIfConflicted() + for _, aggFunc := range la.AggFuncs { + if aggFunc.Mode == aggregation.FinalMode { + preferStream = false + break + } + } + // group by a + b is not interested in any order. + groupByCols := la.GetGroupByCols() + if len(groupByCols) != len(la.GroupByItems) { + preferStream = false + } + if la.HasDistinct() && !la.DistinctArgsMeetsProperty() { + preferStream = false + } + // sort items must be the super set of group by items + if path != nil && path.Index != nil && !path.Index.MVIndex && + ds.TableInfo.GetPartitionInfo() == nil { + if len(path.IdxCols) < len(groupByCols) { + preferStream = false + } else { + sctx := p.SCtx() + for i, groupbyCol := range groupByCols { + if path.IdxColLens[i] != types.UnspecifiedLength || + !groupbyCol.EqualByExprAndID(sctx.GetExprCtx().GetEvalCtx(), path.IdxCols[i]) { + preferStream = false + } + } + } + } else { + preferStream = false + } + + // build physical agg and attach to task + var aggTask base.Task + // build stream agg and change ds keep order to true + if preferStream { + newGbyItems := make([]expression.Expression, len(la.GroupByItems)) + copy(newGbyItems, la.GroupByItems) + newAggFuncs := make([]*aggregation.AggFuncDesc, len(la.AggFuncs)) + copy(newAggFuncs, la.AggFuncs) + streamAgg := basePhysicalAgg{ + GroupByItems: newGbyItems, + AggFuncs: newAggFuncs, + }.initForStream(la.SCtx(), la.StatsInfo(), la.QueryBlockOffset(), nil) + streamAgg.SetSchema(la.Schema().Clone()) + // change to keep order for index scan and dsCopTask + if dsCopTask.indexPlan != nil { + // get the index scan from dsCopTask.indexPlan + physicalIndexScan, _ := dsCopTask.indexPlan.(*PhysicalIndexScan) + if physicalIndexScan == nil && len(dsCopTask.indexPlan.Children()) == 1 { + physicalIndexScan, _ = dsCopTask.indexPlan.Children()[0].(*PhysicalIndexScan) + } + if physicalIndexScan != nil { + physicalIndexScan.KeepOrder = true + dsCopTask.keepOrder = true + aggTask = streamAgg.Attach2Task(dsCopTask) + } + } + } + + // build hash agg, when the stream agg is illegal such as the order by prop is not matched + if aggTask == nil { + physicalHashAgg := NewPhysicalHashAgg(la, la.StatsInfo(), nil) + physicalHashAgg.SetSchema(la.Schema().Clone()) + aggTask = physicalHashAgg.Attach2Task(dsCopTask) + } + + // build other inner plan node to task + result, ok := aggTask.(*RootTask) + if !ok { + return nil + } + result.SetPlan(constructInnerByZippedChildren(wrapper.zippedChildren[0:len(wrapper.zippedChildren)-1], result.p)) + return result +>>>>>>> 194711a43cc (planner: fix index out of range in constructIndexJoinInnerSideTask (#54534)) } var symmetricOp = map[string]string{ diff --git a/pkg/planner/core/issuetest/BUILD.bazel b/pkg/planner/core/issuetest/BUILD.bazel index a08f63c0df355..d184351de60fe 100644 --- a/pkg/planner/core/issuetest/BUILD.bazel +++ b/pkg/planner/core/issuetest/BUILD.bazel @@ -10,6 +10,7 @@ go_test( data = glob(["testdata/**"]), flaky = True, race = "on", + shard_count = 3, deps = [ "//pkg/parser", "//pkg/planner", diff --git a/pkg/planner/core/issuetest/planner_issue_test.go b/pkg/planner/core/issuetest/planner_issue_test.go index 772a66b644388..f47eb20aa270b 100644 --- a/pkg/planner/core/issuetest/planner_issue_test.go +++ b/pkg/planner/core/issuetest/planner_issue_test.go @@ -85,3 +85,29 @@ func Test53726(t *testing.T) { " └─TableReader_11 2.00 root data:TableFullScan_10", " └─TableFullScan_10 2.00 cop[tikv] table:t7 keep order:false")) } + +func TestIssue54535(t *testing.T) { + // test for tidb_enable_inl_join_inner_multi_pattern system variable + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set session tidb_enable_inl_join_inner_multi_pattern='ON'") + tk.MustExec("create table ta(a1 int, a2 int, a3 int, index idx_a(a1))") + tk.MustExec("create table tb(b1 int, b2 int, b3 int, index idx_b(b1))") + tk.MustExec("analyze table ta") + tk.MustExec("analyze table tb") + + tk.MustQuery("explain SELECT /*+ inl_join(tmp) */ * FROM ta, (SELECT b1, COUNT(b3) AS cnt FROM tb GROUP BY b1, b2) as tmp where ta.a1 = tmp.b1"). + Check(testkit.Rows( + "Projection_9 9990.00 root test.ta.a1, test.ta.a2, test.ta.a3, test.tb.b1, Column#9", + "└─IndexJoin_16 9990.00 root inner join, inner:HashAgg_14, outer key:test.ta.a1, inner key:test.tb.b1, equal cond:eq(test.ta.a1, test.tb.b1)", + " ├─TableReader_43(Build) 9990.00 root data:Selection_42", + " │ └─Selection_42 9990.00 cop[tikv] not(isnull(test.ta.a1))", + " │ └─TableFullScan_41 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo", + " └─HashAgg_14(Probe) 79840080.00 root group by:test.tb.b1, test.tb.b2, funcs:count(Column#11)->Column#9, funcs:firstrow(test.tb.b1)->test.tb.b1", + " └─IndexLookUp_15 79840080.00 root ", + " ├─Selection_12(Build) 9990.00 cop[tikv] not(isnull(test.tb.b1))", + " │ └─IndexRangeScan_10 10000.00 cop[tikv] table:tb, index:idx_b(b1) range: decided by [eq(test.tb.b1, test.ta.a1)], keep order:false, stats:pseudo", + " └─HashAgg_13(Probe) 79840080.00 cop[tikv] group by:test.tb.b1, test.tb.b2, funcs:count(test.tb.b3)->Column#11", + " └─TableRowIDScan_11 9990.00 cop[tikv] table:tb keep order:false, stats:pseudo")) +}