From 1c5d55632309c781405479c8424cced26b122533 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Fri, 12 Oct 2018 19:18:42 +0800 Subject: [PATCH] plan,executor: support IndexJoin over UnionScan (#7877) --- executor/builder.go | 30 ++++++++-- executor/index_lookup_join_test.go | 76 ++++++++++++++++-------- planner/core/exhaust_physical_plans.go | 49 ++++++++++------ planner/core/find_best_task.go | 11 ++-- planner/core/physical_plan_test.go | 81 +++++++------------------- 5 files changed, 136 insertions(+), 111 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index e11e0e8fc2dda..5b4dee82e4481 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -705,18 +705,22 @@ func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor { } func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor { - src := b.build(v.Children()[0]) + reader := b.build(v.Children()[0]) if b.err != nil { b.err = errors.Trace(b.err) return nil } - us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src)} + return b.buildUnionScanFromReader(reader, v) +} + +func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) Executor { + us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)} // Get the handle column index of the below plannercore. // We can guarantee that there must be only one col in the map. for _, cols := range v.Children()[0].Schema().TblID2Handle { us.belowHandleIndex = cols[0].Index } - switch x := src.(type) { + switch x := reader.(type) { case *TableReaderExecutor: us.desc = x.desc us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID) @@ -753,7 +757,7 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E b.err = us.buildAndSortAddedRows() default: // The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting. - return src + return reader } if b.err != nil { b.err = errors.Trace(b.err) @@ -1864,10 +1868,28 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, return builder.buildIndexReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) case *plannercore.PhysicalIndexLookUpReader: return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) + case *plannercore.PhysicalUnionScan: + return builder.buildUnionScanForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) } return nil, errors.New("Wrong plan type for dataReaderBuilder") } +func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan, + values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { + builder.Plan = v.Children()[0] + reader, err := builder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff) + if err != nil { + return nil, errors.Trace(err) + } + e := builder.buildUnionScanFromReader(reader, v) + if e == nil { + return nil, builder.err + } + us := e.(*UnionScanExec) + us.snapshotChunkBuffer = us.newFirstChunk() + return us, nil +} + func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) { e, err := buildNoRangeTableReader(builder.executorBuilder, v) if err != nil { diff --git a/executor/index_lookup_join_test.go b/executor/index_lookup_join_test.go index 30cc7b490768f..5b91d81a589e6 100644 --- a/executor/index_lookup_join_test.go +++ b/executor/index_lookup_join_test.go @@ -38,30 +38,56 @@ func (s *testSuite) TestIndexLookupJoinHang(c *C) { rs.Close() } -func (s *testSuite) TestInapplicableIndexJoinHint(c *C) { +func (s *testSuite) TestIndexJoinUnionScan(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) - tk.MustExec(`drop table if exists t1, t2;`) - tk.MustExec(`create table t1(a bigint, b bigint);`) - tk.MustExec(`create table t2(a bigint, b bigint);`) - tk.MustQuery(`select /*+ TIDB_INLJ(t1, t2) */ * from t1, t2;`).Check(testkit.Rows()) - tk.MustQuery(`show warnings;`).Check(testkit.Rows(`Warning 1815 Optimizer Hint /*+ TIDB_INLJ(t1, t2) */ is inapplicable without column equal ON condition`)) - tk.MustQuery(`select /*+ TIDB_INLJ(t1, t2) */ * from t1 join t2 on t1.a=t2.a;`).Check(testkit.Rows()) - tk.MustQuery(`show warnings;`).Check(testkit.Rows(`Warning 1815 Optimizer Hint /*+ TIDB_INLJ(t1, t2) */ is inapplicable`)) - - tk.MustExec(`drop table if exists t1, t2;`) - tk.MustExec(`create table t1(a bigint, b bigint, index idx_a(a));`) - tk.MustExec(`create table t2(a bigint, b bigint);`) - tk.MustQuery(`select /*+ TIDB_INLJ(t1) */ * from t1 left join t2 on t1.a=t2.a;`).Check(testkit.Rows()) - tk.MustQuery(`show warnings;`).Check(testkit.Rows(`Warning 1815 Optimizer Hint /*+ TIDB_INLJ(t1) */ is inapplicable`)) - tk.MustQuery(`select /*+ TIDB_INLJ(t2) */ * from t1 right join t2 on t1.a=t2.a;`).Check(testkit.Rows()) - tk.MustQuery(`show warnings;`).Check(testkit.Rows(`Warning 1815 Optimizer Hint /*+ TIDB_INLJ(t2) */ is inapplicable`)) -} - -func (s *testSuite) TestIndexJoinOverflow(c *C) { - tk := testkit.NewTestKitWithInit(c, s.store) - tk.MustExec(`drop table if exists t1, t2`) - tk.MustExec(`create table t1(a int)`) - tk.MustExec(`insert into t1 values (-1)`) - tk.MustExec(`create table t2(a int unsigned, index idx(a));`) - tk.MustQuery(`select /*+ TIDB_INLJ(t2) */ * from t1 join t2 on t1.a = t2.a;`).Check(testkit.Rows()) + tk.MustExec("create table t1(id int primary key, a int)") + tk.MustExec("create table t2(id int primary key, a int, b int, key idx_a(a))") + tk.MustExec("insert into t2 values (1,1,1),(4,2,4)") + tk.MustExec("begin") + tk.MustExec("insert into t1 values(2,2)") + tk.MustExec("insert into t2 values(2,2,2), (3,3,3)") + // TableScan below UnionScan + tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows( + "IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.id", + "├─UnionScan_12 10000.00 root ", + "│ └─TableReader_14 10000.00 root data:TableScan_13", + "│ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo", + "└─UnionScan_10 10.00 root ", + " └─TableReader_9 10.00 root data:TableScan_8", + " └─TableScan_8 10.00 cop table:t2, range: decided by [test.t1.a], keep order:false, stats:pseudo", + )) + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows( + "2 2 2 2 2", + )) + // IndexLookUp below UnionScan + tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "IndexJoin_12 12500.00 root inner join, inner:UnionScan_11, outer key:test.t1.a, inner key:test.t2.a", + "├─UnionScan_13 10000.00 root ", + "│ └─TableReader_15 10000.00 root data:TableScan_14", + "│ └─TableScan_14 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo", + "└─UnionScan_11 10.00 root ", + " └─IndexLookUp_10 10.00 root ", + " ├─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo", + " └─TableScan_9 10.00 cop table:t2, keep order:false, stats:pseudo", + )) + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "2 2 2 2 2", + "2 2 4 2 4", + )) + // IndexScan below UnionScan + tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "Projection_7 12500.00 root test.t1.a, test.t2.a", + "└─IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.a", + " ├─UnionScan_12 10000.00 root ", + " │ └─TableReader_14 10000.00 root data:TableScan_13", + " │ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo", + " └─UnionScan_10 10.00 root ", + " └─IndexReader_9 10.00 root index:IndexScan_8", + " └─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo", + )) + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "2 2", + "2 2", + )) + tk.MustExec("rollback") } diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 7039968647b35..31ce139cecbf8 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -411,20 +411,23 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou innerJoinKeys = p.LeftJoinKeys outerJoinKeys = p.RightJoinKeys } - x, ok := innerChild.(*DataSource) - if !ok { + ds, isDataSource := innerChild.(*DataSource) + us, isUnionScan := innerChild.(*LogicalUnionScan) + if !isDataSource && !isUnionScan { return nil } + if isUnionScan { + ds = us.Children()[0].(*DataSource) + } var tblPath *accessPath - for _, path := range x.possibleAccessPaths { + for _, path := range ds.possibleAccessPaths { if path.isTablePath { tblPath = path break } } - if pkCol := x.getPKIsHandleCol(); pkCol != nil && tblPath != nil { + if pkCol := ds.getPKIsHandleCol(); pkCol != nil && tblPath != nil { keyOff2IdxOff := make([]int, len(innerJoinKeys)) - pkCol := x.getPKIsHandleCol() pkMatched := false for i, key := range innerJoinKeys { if !key.Equal(nil, pkCol) { @@ -435,7 +438,7 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou keyOff2IdxOff[i] = 0 } if pkMatched { - innerPlan := p.constructInnerTableScan(x, pkCol, outerJoinKeys) + innerPlan := p.constructInnerTableScan(ds, pkCol, outerJoinKeys, us) // Since the primary key means one value corresponding to exact one row, this will always be a no worse one // comparing to other index. return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, nil, keyOff2IdxOff) @@ -448,12 +451,12 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou remainedOfBest []expression.Expression keyOff2IdxOff []int ) - for _, path := range x.possibleAccessPaths { + for _, path := range ds.possibleAccessPaths { if path.isTablePath { continue } indexInfo := path.index - ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, x, innerJoinKeys) + ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, ds, innerJoinKeys) // We choose the index by the number of used columns of the range, the much the better. // Notice that there may be the cases like `t1.a=t2.a and b > 2 and b < 1`. So ranges can be nil though the conditions are valid. // But obviously when the range is nil, we don't need index join. @@ -466,20 +469,15 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou } } if bestIndexInfo != nil { - innerPlan := p.constructInnerIndexScan(x, bestIndexInfo, remainedOfBest, outerJoinKeys) + innerPlan := p.constructInnerIndexScan(ds, bestIndexInfo, remainedOfBest, outerJoinKeys, us) return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, rangesOfBest, keyOff2IdxOff) } return nil } // constructInnerTableScan is specially used to construct the inner plan for PhysicalIndexJoin. -func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column) PhysicalPlan { - var ranges []*ranger.Range - if pk != nil { - ranges = ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag)) - } else { - ranges = ranger.FullIntRange(false) - } +func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan { + ranges := ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag)) ts := PhysicalTableScan{ Table: ds.tableInfo, Columns: ds.Columns, @@ -504,11 +502,23 @@ func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Col selStats := ts.stats.Scale(selectionFactor) ts.addPushedDownSelection(copTask, selStats) t := finishCopTask(ds.ctx, copTask) - return t.plan() + reader := t.plan() + return p.constructInnerUnionScan(us, reader) +} + +func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan { + if us == nil { + return reader + } + // Use `reader.stats` instead of `us.stats` because it should be more accurate. No need to specify + // childrenReqProps now since we have got reader already. + physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, reader.statsInfo(), nil) + physicalUnionScan.SetChildren(reader) + return physicalUnionScan } // constructInnerIndexScan is specially used to construct the inner plan for PhysicalIndexJoin. -func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column) PhysicalPlan { +func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan { is := PhysicalIndexScan{ Table: ds.tableInfo, TableAsName: ds.TableAsName, @@ -550,7 +560,8 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn path := &accessPath{indexFilters: indexConds, tableFilters: tblConds, countAfterIndex: math.MaxFloat64} is.addPushedDownSelection(cop, ds, math.MaxFloat64, path) t := finishCopTask(ds.ctx, cop) - return t.plan() + reader := t.plan() + return p.constructInnerUnionScan(us, reader) } // buildRangeForIndexJoin checks whether this index can be used for building index join and return the range if this index is ok. diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 5c3c670b35279..55b13cefe185a 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -79,6 +79,11 @@ func (p *LogicalTableDual) findBestTask(prop *property.PhysicalProperty) (task, // findBestTask implements LogicalPlan interface. func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty) (bestTask task, err error) { + // If p is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself, + // and set inner child prop nil, so here we do nothing. + if prop == nil { + return nil, nil + } // Look up the task with this prop in the task map. // It's used to reduce double counting. bestTask = p.getTask(prop) @@ -329,10 +334,8 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida // findBestTask implements the PhysicalPlan interface. // It will enumerate all the available indices and choose a plan with least cost. func (ds *DataSource) findBestTask(prop *property.PhysicalProperty) (t task, err error) { - // If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself. - // So here we do nothing. - // TODO: Add a special prop to handle IndexJoin's inner plan. - // Then we can remove forceToTableScan and forceToIndexScan. + // If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself, + // and set inner child prop nil, so here we do nothing. if prop == nil { return nil, nil } diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 458fe62fba065..acebb9240577d 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/util/testleak" "golang.org/x/net/context" ) @@ -1305,7 +1304,7 @@ func (s *testPlanSuite) TestIndexLookupCartesianJoin(c *C) { c.Assert(lastWarn.Err.Error(), Equals, "[planner:1815]Optimizer Hint /*+ TIDB_INLJ(t1, t2) */ is inapplicable without column equal ON condition") } -func (s *testPlanSuite) TestDoSubquery(c *C) { +func (s *testPlanSuite) TestIndexJoinUnionScan(c *C) { defer testleak.AfterTest(c)() store, dom, err := newStoreWithBootstrap() c.Assert(err, IsNil) @@ -1321,71 +1320,35 @@ func (s *testPlanSuite) TestDoSubquery(c *C) { sql string best string }{ + // Test Index Join + UnionScan + TableScan. { - sql: "do 1 in (select a from t)", - best: "LeftHashJoin{Dual->TableReader(Table(t))}->Projection", - }, - } - for _, tt := range tests { - comment := Commentf("for %s", tt.sql) - stmt, err := s.ParseOneStmt(tt.sql, "", "") - c.Assert(err, IsNil, comment) - p, err := core.Optimize(se, stmt, s.is) - c.Assert(err, IsNil) - c.Assert(core.ToString(p), Equals, tt.best, comment) - } -} - -func (s *testPlanSuite) TestUnmatchedTableInHint(c *C) { - defer testleak.AfterTest(c)() - store, dom, err := newStoreWithBootstrap() - c.Assert(err, IsNil) - defer func() { - dom.Close() - store.Close() - }() - se, err := session.CreateSession4Test(store) - c.Assert(err, IsNil) - _, err = se.Execute(context.Background(), "use test") - c.Assert(err, IsNil) - tests := []struct { - sql string - warning string - }{ - { - sql: "SELECT /*+ TIDB_SMJ(t3, t4) */ * from t t1, t t2 where t1.a = t2.a", - warning: "[planner:1815]There are no matching table names for (t3, t4) in optimizer hint /*+ TIDB_SMJ(t3, t4) */. Maybe you can use the table alias name", - }, - { - sql: "SELECT /*+ TIDB_HJ(t3, t4) */ * from t t1, t t2 where t1.a = t2.a", - warning: "[planner:1815]There are no matching table names for (t3, t4) in optimizer hint /*+ TIDB_HJ(t3, t4) */. Maybe you can use the table alias name", - }, - { - sql: "SELECT /*+ TIDB_INLJ(t3, t4) */ * from t t1, t t2 where t1.a = t2.a", - warning: "[planner:1815]There are no matching table names for (t3, t4) in optimizer hint /*+ TIDB_INLJ(t3, t4) */. Maybe you can use the table alias name", + sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.a", + best: "IndexJoin{TableReader(Table(t))->UnionScan([])->TableReader(Table(t))->UnionScan([])}(t1.a,t2.a)", }, + // Test Index Join + UnionScan + DoubleRead. { - sql: "SELECT /*+ TIDB_SMJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.a", - warning: "", + sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.c", + best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexLookUp(Index(t.c_d_e)[[NULL,+inf]], Table(t))->UnionScan([])}(t1.a,t2.c)", }, + // Test Index Join + UnionScan + IndexScan. { - sql: "SELECT /*+ TIDB_SMJ(t3, t4) */ * from t t1, t t2, t t3 where t1.a = t2.a and t2.a = t3.a", - warning: "[planner:1815]There are no matching table names for (t4) in optimizer hint /*+ TIDB_SMJ(t3, t4) */. Maybe you can use the table alias name", + sql: "select /*+ TIDB_INLJ(t1, t2) */ t1.a , t2.c from t t1, t t2 where t1.a = t2.c", + best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexReader(Index(t.c_d_e)[[NULL,+inf]])->UnionScan([])}(t1.a,t2.c)->Projection", }, } - for _, test := range tests { - se.GetSessionVars().StmtCtx.SetWarnings(nil) - stmt, err := s.ParseOneStmt(test.sql, "", "") + for i, tt := range tests { + comment := Commentf("case:%v sql:%s", i, tt.sql) + stmt, err := s.ParseOneStmt(tt.sql, "", "") + c.Assert(err, IsNil, comment) + err = se.NewTxn() c.Assert(err, IsNil) - _, err = core.Optimize(se, stmt, s.is) + // Make txn not read only. + txn, err := se.Txn(true) c.Assert(err, IsNil) - warnings := se.GetSessionVars().StmtCtx.GetWarnings() - if test.warning == "" { - c.Assert(len(warnings), Equals, 0) - } else { - c.Assert(len(warnings), Equals, 1) - c.Assert(warnings[0].Level, Equals, stmtctx.WarnLevelWarning) - c.Assert(warnings[0].Err.Error(), Equals, test.warning) - } + txn.Set(kv.Key("AAA"), []byte("BBB")) + se.StmtCommit() + p, err := core.Optimize(se, stmt, s.is) + c.Assert(err, IsNil) + c.Assert(core.ToString(p), Equals, tt.best, comment) } }