Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: Fix SMJ hint, support SMJ with descending order (#14505) #14664

Merged
merged 4 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
retTypes(rightExec),
),
isOuterJoin: v.JoinType.IsOuterJoin(),
desc: v.Desc,
}

leftKeys := v.LeftKeys
Expand Down
10 changes: 7 additions & 3 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type MergeJoinExec struct {
compareFuncs []expression.CompareFunc
joiner joiner
isOuterJoin bool
desc bool

prepared bool
outerIdx int
Expand Down Expand Up @@ -297,21 +298,24 @@ func (e *MergeJoinExec) joinToChunk(ctx context.Context, chk *chunk.Chunk) (hasM
}

cmpResult := -1
if e.outerTable.selected[e.outerTable.row.Idx()] && len(e.innerRows) > 0 {
if e.desc {
cmpResult = 1
}
if e.outerTable.selected[e.outerTable.row.Idx()] && e.innerIter4Row.Len() > 0 {
cmpResult, err = e.compare(e.outerTable.row, e.innerIter4Row.Current())
if err != nil {
return false, err
}
}

if cmpResult > 0 {
if (cmpResult > 0 && !e.desc) || (cmpResult < 0 && e.desc) {
if err = e.fetchNextInnerRows(); err != nil {
return false, err
}
continue
}

if cmpResult < 0 {
if (cmpResult < 0 && !e.desc) || (cmpResult > 0 && e.desc) {
e.joiner.onMissMatch(false, e.outerTable.row, chk)
if err != nil {
return false, err
Expand Down
35 changes: 35 additions & 0 deletions executor/merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,41 @@ func (s *testSuite1) TestMergeJoin(c *C) {
"1",
"0",
))

// Test TIDB_SMJ for join with order by desc, see https://github.com/pingcap/tidb/issues/14483
tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t (a int, key(a))")
tk.MustExec("create table t1 (a int, key(a))")
tk.MustExec("insert into t values (1), (2), (3)")
tk.MustExec("insert into t1 values (1), (2), (3)")
tk.MustQuery("select /*+ TIDB_SMJ(t1, t2) */ t.a from t, t1 where t.a = t1.a order by t1.a desc").Check(testkit.Rows(
"3", "2", "1"))
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, key(a), key(b))")
tk.MustExec("insert into t values (1,1),(1,2),(1,3),(2,1),(2,2),(3,1),(3,2),(3,3)")
tk.MustQuery("select /*+ TIDB_SMJ(t1, t2) */ t1.a from t t1, t t2 where t1.a = t2.b order by t1.a desc").Check(testkit.Rows(
"3", "3", "3", "3", "3", "3",
"2", "2", "2", "2", "2", "2",
"1", "1", "1", "1", "1", "1", "1", "1", "1"))

tk.MustExec("drop table if exists s")
tk.MustExec("create table s (a int)")
tk.MustExec("insert into s values (4), (1), (3), (2)")
tk.MustQuery("explain select s1.a1 from (select a as a1 from s order by s.a desc) as s1 join (select a as a2 from s order by s.a desc) as s2 on s1.a1 = s2.a2 order by s1.a1 desc").Check(testkit.Rows(
"Projection_27 12487.50 root test.s.a1",
"└─MergeJoin_28 12487.50 root inner join, left key:test.s.a1, right key:test.s.a2",
" ├─Sort_29 9990.00 root test.s.a1:desc",
" │ └─TableReader_21 9990.00 root data:Selection_20",
" │ └─Selection_20 9990.00 cop not(isnull(test.s.a))",
" │ └─TableScan_19 10000.00 cop table:s, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─Sort_31 9990.00 root test.s.a2:desc",
" └─TableReader_26 9990.00 root data:Selection_25",
" └─Selection_25 9990.00 cop not(isnull(test.s.a))",
" └─TableScan_24 10000.00 cop table:s, range:[-inf,+inf], keep order:false, stats:pseudo",
))
tk.MustQuery("select s1.a1 from (select a as a1 from s order by s.a desc) as s1 join (select a as a2 from s order by s.a desc) as s2 on s1.a1 = s2.a2 order by s1.a1 desc").Check(testkit.Rows(
"4", "3", "2", "1"))
}

func (s *testSuite1) Test3WaysMergeJoin(c *C) {
Expand Down
11 changes: 7 additions & 4 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func (p *LogicalJoin) moveEqualToOtherConditions(offsets []int) []expression.Exp

// Only if the input required prop is the prefix fo join keys, we can pass through this property.
func (p *PhysicalMergeJoin) tryToGetChildReqProp(prop *property.PhysicalProperty) ([]*property.PhysicalProperty, bool) {
lProp := property.NewPhysicalProperty(property.RootTaskType, p.LeftKeys, false, math.MaxFloat64, false)
rProp := property.NewPhysicalProperty(property.RootTaskType, p.RightKeys, false, math.MaxFloat64, false)
all, desc := prop.AllSameOrder()
lProp := property.NewPhysicalProperty(property.RootTaskType, p.LeftKeys, desc, math.MaxFloat64, false)
rProp := property.NewPhysicalProperty(property.RootTaskType, p.RightKeys, desc, math.MaxFloat64, false)
if !prop.IsEmpty() {
// sort merge join fits the cases of massive ordered data, so desc scan is always expensive.
all, desc := prop.AllSameOrder()
if !all || desc {
if !all {
return nil, false
}
if !prop.IsPrefix(lProp) && !prop.IsPrefix(rProp) {
Expand Down Expand Up @@ -156,6 +156,8 @@ func (p *LogicalJoin) getMergeJoin(prop *property.PhysicalProperty) []PhysicalPl
reqProps[1].ExpectedCnt = p.children[1].statsInfo().RowCount * expCntScale
}
mergeJoin.childrenReqProps = reqProps
_, desc := prop.AllSameOrder()
mergeJoin.Desc = desc
joins = append(joins, mergeJoin)
}
}
Expand Down Expand Up @@ -239,6 +241,7 @@ func (p *LogicalJoin) getEnforcedMergeJoin(prop *property.PhysicalProperty) []Ph
LeftKeys: leftKeys,
RightKeys: rightKeys,
OtherConditions: p.OtherConditions,
Desc: desc,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt))
enforcedPhysicalMergeJoin.SetSchema(p.schema)
enforcedPhysicalMergeJoin.childrenReqProps = []*property.PhysicalProperty{lProp, rProp}
Expand Down
2 changes: 2 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ type PhysicalMergeJoin struct {

LeftKeys []*expression.Column
RightKeys []*expression.Column
// Desc means whether inner child keep desc order.
Desc bool
}

// PhysicalLock is the physical operator of lock, which is used for `select ... for update` clause.
Expand Down