diff --git a/executor/executor_test.go b/executor/executor_test.go index 09bcca022ca44..4598bb9eb17fa 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8815,6 +8815,20 @@ func (s *testSerialSuite) TestIssue28650(c *C) { } } +func (s *testSerialSuite) TestIssue30289(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + fpName := "github.com/pingcap/tidb/executor/issue30289" + c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a") + c.Assert(err.Error(), Matches, "issue30289 build return error") +} + func (s *testSerialSuite) TestIssue29498(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/join.go b/executor/join.go index 01e0ef3653f8f..694d475309eeb 100644 --- a/executor/join.go +++ b/executor/join.go @@ -216,9 +216,13 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) { return } if !hasWaitedForBuild { + failpoint.Inject("issue30289", func(val failpoint.Value) { + if val.(bool) { + probeSideResult.Reset() + } + }) if probeSideResult.NumRows() == 0 && !e.useOuterToBuild { e.finished.Store(true) - return } emptyBuild, buildErr := e.wait4BuildSide() if buildErr != nil { @@ -260,6 +264,13 @@ func (e *HashJoinExec) wait4BuildSide() (emptyBuild bool, err error) { func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh <-chan struct{}) { defer close(chkCh) var err error + failpoint.Inject("issue30289", func(val failpoint.Value) { + if val.(bool) { + err = errors.Errorf("issue30289 build return error") + e.buildFinished <- errors.Trace(err) + return + } + }) for { if e.finished.Load().(bool) { return diff --git a/executor/shuffle.go b/executor/shuffle.go index 9ad4ff522e4cd..cf2a3a225cd28 100644 --- a/executor/shuffle.go +++ b/executor/shuffle.go @@ -141,17 +141,29 @@ func (e *ShuffleExec) Close() error { if !e.prepared { for _, w := range e.workers { for _, r := range w.receivers { - close(r.inputHolderCh) - close(r.inputCh) + if r.inputHolderCh != nil { + close(r.inputHolderCh) + } + if r.inputCh != nil { + close(r.inputCh) + } } - close(w.outputHolderCh) + if w.outputHolderCh != nil { + close(w.outputHolderCh) + } + } + if e.outputCh != nil { + close(e.outputCh) } - close(e.outputCh) } - close(e.finishCh) + if e.finishCh != nil { + close(e.finishCh) + } for _, w := range e.workers { for _, r := range w.receivers { - for range r.inputCh { + if r.inputCh != nil { + for range r.inputCh { + } } } // close child executor of each worker @@ -159,7 +171,9 @@ func (e *ShuffleExec) Close() error { firstErr = err } } - for range e.outputCh { // workers exit before `e.outputCh` is closed. + if e.outputCh != nil { + for range e.outputCh { // workers exit before `e.outputCh` is closed. + } } e.executed = false