Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan/executor: handle null for anti join properly #8706

Closed
wants to merge 9 commits into from
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) error {
for {
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
selected, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
selected, _, err := expression.EvalBool(e.ctx, e.filters, e.inputRow)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,15 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {

outerRow := task.outerResult.GetRow(task.cursor)
if e.innerIter.Current() != e.innerIter.End() {
matched, err := e.joiner.tryToMatch(outerRow, e.innerIter, chk)
matched, _, err := e.joiner.tryToMatch(outerRow, e.innerIter, chk)
if err != nil {
return errors.Trace(err)
}
task.hasMatch = task.hasMatch || matched
}
if e.innerIter.Current() == e.innerIter.End() {
if !task.hasMatch {
e.joiner.onMissMatch(outerRow, chk)
e.joiner.onMissMatch(false, outerRow, chk)
}
task.cursor++
task.hasMatch = false
Expand Down
19 changes: 11 additions & 8 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,13 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
return false, joinResult
}
if hasNull {
e.joiners[workerID].onMissMatch(outerRow, joinResult.chk)
e.joiners[workerID].onMissMatch(false, outerRow, joinResult.chk)
return true, joinResult
}
e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0])
innerPtrs := e.hashTableValBufs[workerID]
if len(innerPtrs) == 0 {
e.joiners[workerID].onMissMatch(outerRow, joinResult.chk)
e.joiners[workerID].onMissMatch(false, outerRow, joinResult.chk)
return true, joinResult
}
innerRows := make([]chunk.Row, 0, len(innerPtrs))
Expand All @@ -451,7 +451,7 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
iter := chunk.NewIterator4Slice(innerRows)
hasMatch := false
for iter.Begin(); iter.Current() != iter.End(); {
matched, err := e.joiners[workerID].tryToMatch(outerRow, iter, joinResult.chk)
matched, _, err := e.joiners[workerID].tryToMatch(outerRow, iter, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
return false, joinResult
Expand All @@ -468,7 +468,7 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
}
}
if !hasMatch {
e.joiners[workerID].onMissMatch(outerRow, joinResult.chk)
e.joiners[workerID].onMissMatch(false, outerRow, joinResult.chk)
}
return true, joinResult
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
}
for i := range selected {
if !selected[i] { // process unmatched outer rows
e.joiners[workerID].onMissMatch(outerChk.GetRow(i), joinResult.chk)
e.joiners[workerID].onMissMatch(false, outerChk.GetRow(i), joinResult.chk)
} else { // process matched outer rows
ok, joinResult = e.joinMatchedOuterRow2Chunk(workerID, outerChk.GetRow(i), joinResult)
if !ok {
Expand Down Expand Up @@ -634,6 +634,7 @@ type NestedLoopApplyExec struct {
innerIter chunk.Iterator
outerRow *chunk.Row
hasMatch bool
isNull bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -691,7 +692,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
if selected {
return &outerRow, nil
} else if e.outer {
e.joiner.onMissMatch(outerRow, chk)
e.joiner.onMissMatch(false, outerRow, chk)
if chk.NumRows() == e.maxChunkSize {
return nil, nil
}
Expand Down Expand Up @@ -739,13 +740,14 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
for {
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
if e.outerRow != nil && !e.hasMatch {
e.joiner.onMissMatch(*e.outerRow, chk)
e.joiner.onMissMatch(e.isNull, *e.outerRow, chk)
}
e.outerRow, err = e.fetchSelectedOuterRow(ctx, chk)
if e.outerRow == nil || err != nil {
return errors.Trace(err)
}
e.hasMatch = false
e.isNull = false

for _, col := range e.outerSchema {
*col.Data = e.outerRow.GetDatum(col.Index, col.RetType)
Expand All @@ -758,8 +760,9 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
e.innerIter.Begin()
}

matched, err := e.joiner.tryToMatch(*e.outerRow, e.innerIter, chk)
matched, isNull, err := e.joiner.tryToMatch(*e.outerRow, e.innerIter, chk)
e.hasMatch = e.hasMatch || matched
e.isNull = e.isNull || isNull

if err != nil || chk.NumRows() == e.maxChunkSize {
return errors.Trace(err)
Expand Down
89 changes: 89 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,3 +988,92 @@ func (s *testSuite2) TestHashJoin(c *C) {
innerExecInfo = row[3][4].(string)
c.Assert(innerExecInfo[len(innerExecInfo)-1:], LessEqual, "5")
}

func (s *testSuite) TestNotInAntiJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int)")
tk.MustExec("insert into t values(null, 1, 0), (1, 2, 0)")
tk.MustQuery("select a, b from t t1 where a not in (select b from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a not in (select b from t t2 where t1.b = t2.a)").Check(testkit.Rows(
"1 2",
))
tk.MustQuery("select a, b from t t1 where a not in (select a from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a not in (select a from t t2 where t1.b = t2.b)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a != all (select b from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a != all (select b from t t2 where t1.b = t2.a)").Check(testkit.Rows(
"1 2",
))
tk.MustQuery("select a, b from t t1 where a != all (select a from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a != all (select a from t t2 where t1.b = t2.b)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where not exists (select * from t t2 where t1.a = t2.b)").Check(testkit.Rows(
"<nil> 1",
))
tk.MustQuery("select a, b from t t1 where not exists (select * from t t2 where t1.a = t2.a)").Check(testkit.Rows(
"<nil> 1",
))
tk.MustExec("truncate table t")
tk.MustExec("insert into t values(1, null, 0), (2, 1, 0)")
tk.MustQuery("select a, b from t t1 where a not in (select b from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a not in (select b from t t2 where t1.b = t2.a)").Check(testkit.Rows(
"1 <nil>",
))
tk.MustQuery("select a, b from t t1 where a not in (select a from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a not in (select a from t t2 where t1.b = t2.b)").Check(testkit.Rows(
"1 <nil>",
))
tk.MustQuery("select a, b from t t1 where a != all (select b from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a != all (select b from t t2 where t1.b = t2.a)").Check(testkit.Rows(
"1 <nil>",
))
tk.MustQuery("select a, b from t t1 where a != all (select a from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a != all (select a from t t2 where t1.b = t2.b)").Check(testkit.Rows(
"1 <nil>",
))
tk.MustQuery("select a, b from t t1 where not exists (select * from t t2 where t1.a = t2.b)").Check(testkit.Rows(
"2 1",
))
tk.MustQuery("select a, b from t t1 where not exists (select * from t t2 where t1.a = t2.a)").Check(testkit.Rows())
tk.MustExec("truncate table t")
tk.MustExec("insert into t values(1, null, 0), (2, 1, 0), (null, 2, 0)")
tk.MustQuery("select a, b from t t1 where a not in (select b from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a not in (select b from t t2 where t1.b = t2.a)").Check(testkit.Rows(
"1 <nil>",
))
tk.MustQuery("select a, b from t t1 where a not in (select a from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a not in (select a from t t2 where t1.b = t2.b)").Check(testkit.Rows(
"1 <nil>",
))
tk.MustQuery("select a, b from t t1 where a != all (select b from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a != all (select b from t t2 where t1.b = t2.a)").Check(testkit.Rows(
"1 <nil>",
))
tk.MustQuery("select a, b from t t1 where a != all (select a from t t2)").Check(testkit.Rows())
tk.MustQuery("select a, b from t t1 where a != all (select a from t t2 where t1.b = t2.b)").Check(testkit.Rows(
"1 <nil>",
))
tk.MustQuery("select a, b from t t1 where not exists (select * from t t2 where t1.a = t2.b)").Check(testkit.Rows(
"<nil> 2",
))
tk.MustQuery("select a, b from t t1 where not exists (select * from t t2 where t1.a = t2.a)").Check(testkit.Rows(
"<nil> 2",
))
tk.MustExec("truncate table t")
tk.MustExec("insert into t values(1, null, 0), (2, null, 0)")
tk.MustQuery("select a, b from t t1 where b not in (select a from t t2)").Check(testkit.Rows())
tk.MustExec("truncate table t")
tk.MustExec("insert into t values(null, 1, 1), (2, 2, 2), (3, null, 3), (4, 4, 3)")
tk.MustQuery("select a, b, a not in (select b from t) from t order by a").Check(testkit.Rows(
"<nil> 1 <nil>",
"2 2 0",
"3 <nil> <nil>",
"4 4 0",
))
tk.MustQuery("select a, c, a not in (select c from t) from t order by a").Check(testkit.Rows(
"<nil> 1 <nil>",
"2 2 0",
"3 3 0",
"4 3 1",
))
}
Loading