Skip to content

Commit

Permalink
Support SMJ on Desc
Browse files Browse the repository at this point in the history
  • Loading branch information
ichn-hu committed Jan 20, 2020
1 parent b25c824 commit 69fa7a8
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 8 deletions.
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
retTypes(rightExec),
),
isOuterJoin: v.JoinType.IsOuterJoin(),
desc: v.Desc,
}

leftKeys := v.LeftJoinKeys
Expand Down
8 changes: 6 additions & 2 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type MergeJoinExec struct {
compareFuncs []expression.CompareFunc
joiner joiner
isOuterJoin bool
desc bool

prepared bool
outerIdx int
Expand Down Expand Up @@ -350,21 +351,24 @@ func (e *MergeJoinExec) joinToChunk(ctx context.Context, chk *chunk.Chunk) (hasM
}

cmpResult := -1
if e.desc {
cmpResult = 1
}
if e.outerTable.selected[e.outerTable.row.Idx()] && e.innerIter4Row.Len() > 0 {
cmpResult, err = e.compare(e.outerTable.row, e.innerIter4Row.Current())
if err != nil {
return false, err
}
}

if cmpResult > 0 {
if (cmpResult > 0 && !e.desc) || (cmpResult < 0 && e.desc) {
if err = e.fetchNextInnerRows(); err != nil {
return false, err
}
continue
}

if cmpResult < 0 {
if (cmpResult < 0 && !e.desc) || (cmpResult > 0 && e.desc) {
e.joiner.onMissMatch(false, e.outerTable.row, chk)
if err != nil {
return false, err
Expand Down
34 changes: 34 additions & 0 deletions executor/merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,40 @@ func (s *testSuite2) TestMergeJoin(c *C) {
"1",
"0",
))

// Test TIDB_SMJ for join with order by desc, see https://github.com/pingcap/tidb/issues/14483
tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t (a int, key(a))")
tk.MustExec("create table t1 (a int, key(a))")
tk.MustExec("insert into t values (1), (2), (3)")
tk.MustExec("insert into t1 values (1), (2), (3)")
tk.MustQuery("select /*+ TIDB_SMJ(t1, t2) */ t.a from t, t1 where t.a = t1.a order by t1.a desc").Check(testkit.Rows(
"3", "2", "1"))
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, key(a), key(b))")
tk.MustExec("insert into t values (1,1),(1,2),(1,3),(2,1),(2,2),(3,1),(3,2),(3,3)")
tk.MustQuery("select /*+ TIDB_SMJ(t1, t2) */ t1.a from t t1, t t2 where t1.a = t2.b order by t1.a desc").Check(testkit.Rows(
"3", "3", "3", "3", "3", "3",
"2", "2", "2", "2", "2", "2",
"1", "1", "1", "1", "1", "1", "1", "1", "1"))

tk.MustExec("drop table if exists s")
tk.MustExec("create table s (a int)")
tk.MustExec("insert into s values (4), (1), (3), (2)")
tk.MustQuery("explain select s1.a1 from (select a as a1 from s order by s.a desc) as s1 join (select a as a2 from s order by s.a desc) as s2 on s1.a1 = s2.a2 order by s1.a1 desc").Check(testkit.Rows(
"Projection_27 12487.50 root test.s.a",
"└─MergeJoin_28 12487.50 root inner join, left key:test.s.a, right key:test.s.a",
" ├─Sort_29 9990.00 root test.s.a:desc",
" │ └─TableReader_21 9990.00 root data:Selection_20",
" │ └─Selection_20 9990.00 cop[tikv] not(isnull(test.s.a))",
" │ └─TableScan_19 10000.00 cop[tikv] table:s, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─Sort_31 9990.00 root test.s.a:desc",
" └─TableReader_26 9990.00 root data:Selection_25",
" └─Selection_25 9990.00 cop[tikv] not(isnull(test.s.a))",
" └─TableScan_24 10000.00 cop[tikv] table:s, range:[-inf,+inf], keep order:false, stats:pseudo"))
tk.MustQuery("select s1.a1 from (select a as a1 from s order by s.a desc) as s1 join (select a as a2 from s order by s.a desc) as s2 on s1.a1 = s2.a2 order by s1.a1 desc").Check(testkit.Rows(
"4", "3", "2", "1"))
}

func (s *testSuite2) Test3WaysMergeJoin(c *C) {
Expand Down
12 changes: 7 additions & 5 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ func (p *LogicalJoin) moveEqualToOtherConditions(offsets []int) []expression.Exp

// Only if the input required prop is the prefix fo join keys, we can pass through this property.
func (p *PhysicalMergeJoin) tryToGetChildReqProp(prop *property.PhysicalProperty) ([]*property.PhysicalProperty, bool) {
lProp := property.NewPhysicalProperty(property.RootTaskType, p.LeftJoinKeys, false, math.MaxFloat64, false)
rProp := property.NewPhysicalProperty(property.RootTaskType, p.RightJoinKeys, false, math.MaxFloat64, false)
all, desc := prop.AllSameOrder()
lProp := property.NewPhysicalProperty(property.RootTaskType, p.LeftJoinKeys, desc, math.MaxFloat64, false)
rProp := property.NewPhysicalProperty(property.RootTaskType, p.RightJoinKeys, desc, math.MaxFloat64, false)
if !prop.IsEmpty() {
// sort merge join fits the cases of massive ordered data, so desc scan is always expensive.
all, desc := prop.AllSameOrder()
if !all || desc {
if !all {
return nil, false
}
if !prop.IsPrefix(lProp) && !prop.IsPrefix(rProp) {
Expand Down Expand Up @@ -163,6 +163,8 @@ func (p *LogicalJoin) GetMergeJoin(prop *property.PhysicalProperty, schema *expr
reqProps[1].ExpectedCnt = p.children[1].statsInfo().RowCount * expCntScale
}
mergeJoin.childrenReqProps = reqProps
_, desc := prop.AllSameOrder()
mergeJoin.Desc = desc
joins = append(joins, mergeJoin)
}
}
Expand Down Expand Up @@ -248,7 +250,7 @@ func (p *LogicalJoin) getEnforcedMergeJoin(prop *property.PhysicalProperty) []Ph
RightJoinKeys: rightKeys,
OtherConditions: p.OtherConditions,
}
enforcedPhysicalMergeJoin := PhysicalMergeJoin{basePhysicalJoin: baseJoin}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset)
enforcedPhysicalMergeJoin := PhysicalMergeJoin{basePhysicalJoin: baseJoin, Desc: desc}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset)
enforcedPhysicalMergeJoin.SetSchema(p.schema)
enforcedPhysicalMergeJoin.childrenReqProps = []*property.PhysicalProperty{lProp, rProp}
enforcedPhysicalMergeJoin.initCompareFuncs()
Expand Down
2 changes: 2 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ type PhysicalMergeJoin struct {
basePhysicalJoin

CompareFuncs []expression.CompareFunc
// Desc means whether inner child keep desc order.
Desc bool
}

// PhysicalLock is the physical operator of lock, which is used for `select ... for update` clause.
Expand Down
2 changes: 1 addition & 1 deletion planner/core/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,7 @@
},
{
"SQL": "select max(a) from (select t1.a from t t1 join t t2 on t1.a=t2.a) t",
"Best": "IndexMergeJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t.a,test.t.a)->Limit->StreamAgg"
"Best": "MergeInnerJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t.a,test.t.a)->Limit->StreamAgg"
}
]
},
Expand Down

0 comments on commit 69fa7a8

Please sign in to comment.