Skip to content

Commit

Permalink
plan/executor: make semi joins null and empty aware (#9051) (#9449)
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka authored and zz-jason committed Feb 25, 2019
1 parent 225421b commit e58cae4
Show file tree
Hide file tree
Showing 22 changed files with 687 additions and 128 deletions.
38 changes: 19 additions & 19 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ set @@session.tidb_opt_insubquery_unfold = 0;
explain select sum(t1.c1 in (select c1 from t2)) from t1;
id count task operator info
StreamAgg_12 1.00 root funcs:sum(col_0)
└─Projection_33 10000.00 root cast(5_aux_0)
└─MergeJoin_26 10000.00 root left outer semi join, left key:test.t1.c1, right key:test.t2.c1
├─TableReader_19 10000.00 root data:TableScan_18
│ └─TableScan_18 10000.00 cop table:t1, range:[-inf,+inf], keep order:true, stats:pseudo
└─IndexReader_21 10000.00 root index:IndexScan_20
└─IndexScan_20 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
└─Projection_21 10000.00 root cast(5_aux_0)
└─HashLeftJoin_18 10000.00 root left outer semi join, inner:TableReader_17, other cond:eq(test.t1.c1, test.t2.c1)
├─TableReader_20 10000.00 root data:TableScan_19
│ └─TableScan_19 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─TableReader_17 10000.00 root data:TableScan_16
└─TableScan_16 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
explain select 1 in (select c2 from t2) from t1;
id count task operator info
Projection_6 10000.00 root 5_aux_0
Expand Down Expand Up @@ -217,25 +217,25 @@ subgraph cluster12{
node [style=filled, color=lightgrey]
color=black
label = "root"
"StreamAgg_12" -> "Projection_33"
"Projection_33" -> "MergeJoin_26"
"MergeJoin_26" -> "TableReader_19"
"MergeJoin_26" -> "IndexReader_21"
"StreamAgg_12" -> "Projection_21"
"Projection_21" -> "HashLeftJoin_18"
"HashLeftJoin_18" -> "TableReader_20"
"HashLeftJoin_18" -> "TableReader_17"
}
subgraph cluster18{
subgraph cluster19{
node [style=filled, color=lightgrey]
color=black
label = "cop"
"TableScan_18"
"TableScan_19"
}
subgraph cluster20{
subgraph cluster16{
node [style=filled, color=lightgrey]
color=black
label = "cop"
"IndexScan_20"
"TableScan_16"
}
"TableReader_19" -> "TableScan_18"
"IndexReader_21" -> "IndexScan_20"
"TableReader_20" -> "TableScan_19"
"TableReader_17" -> "TableScan_16"
}

explain format="dot" select 1 in (select c2 from t2) from t1;
Expand Down Expand Up @@ -272,7 +272,7 @@ create table t(a int primary key, b int, c int, index idx(b));
explain select t.c in (select count(*) from t s ignore index(idx), t t1 where s.a = t.a and s.a = t1.a) from t;
id count task operator info
Projection_11 10000.00 root 9_aux_0
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, count(*))]
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, other cond:eq(test.t.c, count(*))
├─TableReader_15 10000.00 root data:TableScan_14
│ └─TableScan_14 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_20 1.00 root funcs:count(1)
Expand All @@ -285,7 +285,7 @@ Projection_11 10000.00 root 9_aux_0
explain select t.c in (select count(*) from t s use index(idx), t t1 where s.b = t.a and s.a = t1.a) from t;
id count task operator info
Projection_11 10000.00 root 9_aux_0
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, count(*))]
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, other cond:eq(test.t.c, count(*))
├─TableReader_15 10000.00 root data:TableScan_14
│ └─TableScan_14 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_20 1.00 root funcs:count(1)
Expand All @@ -297,7 +297,7 @@ Projection_11 10000.00 root 9_aux_0
explain select t.c in (select count(*) from t s use index(idx), t t1 where s.b = t.a and s.c = t1.a) from t;
id count task operator info
Projection_11 10000.00 root 9_aux_0
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, equal:[eq(test.t.c, count(*))]
└─Apply_13 10000.00 root left outer semi join, inner:StreamAgg_20, other cond:eq(test.t.c, count(*))
├─TableReader_15 10000.00 root data:TableScan_14
│ └─TableScan_14 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_20 1.00 root funcs:count(1)
Expand Down
32 changes: 32 additions & 0 deletions cmd/explaintest/r/select.result
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,35 @@ Projection_9 10000.00 root or(and(and(le(col_count, 1), eq(t1.a, col_firstrow)),
└─Projection_27 10000.00 root t2.a, t2.a, cast(isnull(t2.a))
└─TableReader_24 10000.00 root data:TableScan_23
└─TableScan_23 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
drop table if exists t;
create table t(a int, b int);
drop table if exists s;
create table s(a varchar(20), b varchar(20));
explain select a in (select a from s where s.b = t.b) from t;
id count task operator info
Projection_9 10000.00 root 6_aux_0
└─HashLeftJoin_10 10000.00 root left outer semi join, inner:Projection_14, equal:[eq(cast(test.t.b), cast(test.s.b))], other cond:eq(cast(test.t.a), cast(test.s.a))
├─Projection_11 10000.00 root test.t.a, test.t.b, cast(test.t.b)
│ └─TableReader_13 10000.00 root data:TableScan_12
│ └─TableScan_12 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
└─Projection_14 10000.00 root test.s.a, test.s.b, cast(test.s.b)
└─TableReader_16 10000.00 root data:TableScan_15
└─TableScan_15 10000.00 cop table:s, range:[-inf,+inf], keep order:false, stats:pseudo
explain select a in (select a+b from t t2 where t2.b = t1.b) from t t1;
id count task operator info
Projection_7 10000.00 root 6_aux_0
└─HashLeftJoin_8 10000.00 root left outer semi join, inner:TableReader_12, equal:[eq(t1.b, t2.b)], other cond:eq(t1.a, plus(t2.a, t2.b))
├─TableReader_10 10000.00 root data:TableScan_9
│ └─TableScan_9 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─TableReader_12 10000.00 root data:TableScan_11
└─TableScan_11 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
drop table t;
create table t(a int not null, b int);
explain select a in (select a from t t2 where t2.b = t1.b) from t t1;
id count task operator info
Projection_7 10000.00 root 6_aux_0
└─HashLeftJoin_8 10000.00 root left outer semi join, inner:TableReader_12, equal:[eq(t1.b, t2.b) eq(t1.a, t2.a)]
├─TableReader_10 10000.00 root data:TableScan_9
│ └─TableScan_9 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─TableReader_12 10000.00 root data:TableScan_11
└─TableScan_11 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
10 changes: 10 additions & 0 deletions cmd/explaintest/t/select.test
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,13 @@ drop table if exists t;
create table t(a int, b int);
explain select a != any (select a from t t2) from t t1;
explain select a = all (select a from t t2) from t t1;

drop table if exists t;
create table t(a int, b int);
drop table if exists s;
create table s(a varchar(20), b varchar(20));
explain select a in (select a from s where s.b = t.b) from t;
explain select a in (select a+b from t t2 where t2.b = t1.b) from t t1;
drop table t;
create table t(a int not null, b int);
explain select a in (select a from t t2 where t2.b = t1.b) from t t1;
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {
for {
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
selected, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
selected, _, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
if err != nil {
return errors.Trace(err)
}
Expand Down
7 changes: 5 additions & 2 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type lookUpJoinTask struct {
doneCh chan error
cursor int
hasMatch bool
hasNull bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -234,18 +235,20 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {

outerRow := task.outerResult.GetRow(task.cursor)
if e.innerIter.Current() != e.innerIter.End() {
matched, err := e.joiner.tryToMatch(outerRow, e.innerIter, chk)
matched, isNull, err := e.joiner.tryToMatch(outerRow, e.innerIter, chk)
if err != nil {
return errors.Trace(err)
}
task.hasMatch = task.hasMatch || matched
task.hasNull = task.hasNull || isNull
}
if e.innerIter.Current() == e.innerIter.End() {
if !task.hasMatch {
e.joiner.onMissMatch(outerRow, chk)
e.joiner.onMissMatch(task.hasNull, outerRow, chk)
}
task.cursor++
task.hasMatch = false
task.hasNull = false
}
if chk.NumRows() == e.maxChunkSize {
return nil
Expand Down
23 changes: 13 additions & 10 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (e *HashJoinExec) getJoinKeyFromChkRow(isOuterKey bool, row chunk.Row, keyB
return true, keyBuf, nil
}
}

keyBuf = keyBuf[:0]
keyBuf, err = codec.HashChunkRow(e.ctx.GetSessionVars().StmtCtx, keyBuf, row, allTypes, keyColIdx)
if err != nil {
Expand Down Expand Up @@ -408,13 +407,13 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
return false, joinResult
}
if hasNull {
e.joiners[workerID].onMissMatch(outerRow, joinResult.chk)
e.joiners[workerID].onMissMatch(false, outerRow, joinResult.chk)
return true, joinResult
}
e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0])
innerPtrs := e.hashTableValBufs[workerID]
if len(innerPtrs) == 0 {
e.joiners[workerID].onMissMatch(outerRow, joinResult.chk)
e.joiners[workerID].onMissMatch(false, outerRow, joinResult.chk)
return true, joinResult
}
innerRows := make([]chunk.Row, 0, len(innerPtrs))
Expand All @@ -424,14 +423,15 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
innerRows = append(innerRows, matchedInner)
}
iter := chunk.NewIterator4Slice(innerRows)
hasMatch := false
hasMatch, hasNull := false, false
for iter.Begin(); iter.Current() != iter.End(); {
matched, err := e.joiners[workerID].tryToMatch(outerRow, iter, joinResult.chk)
matched, isNull, err := e.joiners[workerID].tryToMatch(outerRow, iter, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
return false, joinResult
}
hasMatch = hasMatch || matched
hasNull = hasNull || isNull

if joinResult.chk.NumRows() == e.maxChunkSize {
ok := true
Expand All @@ -443,7 +443,7 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
}
}
if !hasMatch {
e.joiners[workerID].onMissMatch(outerRow, joinResult.chk)
e.joiners[workerID].onMissMatch(hasNull, outerRow, joinResult.chk)
}
return true, joinResult
}
Expand Down Expand Up @@ -471,7 +471,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
}
for i := range selected {
if !selected[i] { // process unmatched outer rows
e.joiners[workerID].onMissMatch(outerChk.GetRow(i), joinResult.chk)
e.joiners[workerID].onMissMatch(false, outerChk.GetRow(i), joinResult.chk)
} else { // process matched outer rows
ok, joinResult = e.joinMatchedOuterRow2Chunk(workerID, outerChk.GetRow(i), joinResult)
if !ok {
Expand Down Expand Up @@ -609,6 +609,7 @@ type NestedLoopApplyExec struct {
innerIter chunk.Iterator
outerRow *chunk.Row
hasMatch bool
hasNull bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -666,7 +667,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
if selected {
return &outerRow, nil
} else if e.outer {
e.joiner.onMissMatch(outerRow, chk)
e.joiner.onMissMatch(false, outerRow, chk)
if chk.NumRows() == e.maxChunkSize {
return nil, nil
}
Expand Down Expand Up @@ -714,13 +715,14 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
for {
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
if e.outerRow != nil && !e.hasMatch {
e.joiner.onMissMatch(*e.outerRow, chk)
e.joiner.onMissMatch(e.hasNull, *e.outerRow, chk)
}
e.outerRow, err = e.fetchSelectedOuterRow(ctx, chk)
if e.outerRow == nil || err != nil {
return errors.Trace(err)
}
e.hasMatch = false
e.hasNull = false

for _, col := range e.outerSchema {
*col.Data = e.outerRow.GetDatum(col.Index, col.RetType)
Expand All @@ -733,8 +735,9 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
e.innerIter.Begin()
}

matched, err := e.joiner.tryToMatch(*e.outerRow, e.innerIter, chk)
matched, isNull, err := e.joiner.tryToMatch(*e.outerRow, e.innerIter, chk)
e.hasMatch = e.hasMatch || matched
e.hasNull = e.hasNull || isNull

if err != nil || chk.NumRows() == e.maxChunkSize {
return errors.Trace(err)
Expand Down
Loading

0 comments on commit e58cae4

Please sign in to comment.