From 493407e59f25ccba26d935368a05a169ac7b86e0 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 27 May 2021 19:20:43 +0800 Subject: [PATCH] fix 24930 --- executor/parallel_apply.go | 7 +++++++ executor/parallel_apply_test.go | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/executor/parallel_apply.go b/executor/parallel_apply.go index d02ebac9e5349..636ec96ad2868 100644 --- a/executor/parallel_apply.go +++ b/executor/parallel_apply.go @@ -68,6 +68,7 @@ type ParallelNestedLoopApplyExec struct { // fields about concurrency control concurrency int started uint32 + drained uint32 // drained == true indicates there is no more data freeChkCh chan *chunk.Chunk resultChkCh chan result outerRowCh chan outerRow @@ -130,6 +131,11 @@ func (e *ParallelNestedLoopApplyExec) Open(ctx context.Context) error { // Next implements the Executor interface. func (e *ParallelNestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { + if atomic.LoadUint32(&e.drained) == 1 { + req.Reset() + return nil + } + if atomic.CompareAndSwapUint32(&e.started, 0, 1) { e.workerWg.Add(1) go e.outerWorker(ctx) @@ -147,6 +153,7 @@ func (e *ParallelNestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk } if result.chk == nil { // no more data req.Reset() + atomic.StoreUint32(&e.drained, 1) return nil } req.SwapColumns(result.chk) diff --git a/executor/parallel_apply_test.go b/executor/parallel_apply_test.go index c0ecb19783273..a72944ebda507 100644 --- a/executor/parallel_apply_test.go +++ b/executor/parallel_apply_test.go @@ -596,3 +596,14 @@ func (s *testSuite) TestApplyGoroutinePanic(c *C) { c.Assert(failpoint.Disable(panicPath), IsNil) } } + +func (s *testSuite) TestIssue24930(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("set tidb_enable_parallel_apply=true") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1(a int)") + tk.MustExec("create table t2(a int)") + tk.MustQuery(`select case when t1.a is null + then (select t2.a from t2 where t2.a = t1.a limit 1) else t1.a end a + from t1 where t1.a=1 order by a limit 1`).Check(testkit.Rows()) // can return an empty result instead of hanging forever +}