From ac0893fba54dc046f4ff0aac67ed029e1382fce6 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 16 Oct 2019 17:43:58 +0800 Subject: [PATCH 1/4] Fix forever hanging when HashAgg is called by apply --- executor/aggregate.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/executor/aggregate.go b/executor/aggregate.go index aa7e42b5c0ff3..871cf4e5b69e6 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -163,6 +163,7 @@ type HashAggExec struct { isChildReturnEmpty bool childResult *chunk.Chunk + executed bool } // HashAggInput indicates the input of hash agg exec. @@ -614,10 +615,14 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error } }) + if e.executed { + return nil + } for !chk.IsFull() { e.finalInputCh <- chk result, ok := <-e.finalOutputCh if !ok { // all finalWorkers exited + e.executed = true if chk.NumRows() > 0 { // but there are some data left return nil } From 7d182255f2a55fb4c18643f1b022d79f724cc675 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 16 Oct 2019 18:31:54 +0800 Subject: [PATCH 2/4] fix test --- executor/aggregate.go | 1 + executor/aggregate_test.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 871cf4e5b69e6..31b0906c722ed 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -210,6 +210,7 @@ func (e *HashAggExec) Close() error { } close(e.finalOutputCh) } + e.executed = false close(e.finishCh) for _, ch := range e.partialOutputChs { for range ch { diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index fca05420430ad..3ece0759df75c 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -735,6 +735,7 @@ func (s *testSuite1) TestIssue10608(c *C) { tk.MustExec("create table s(a int, b int)") tk.MustExec("insert into s values(100292, 508931), (120002, 508932)") tk.MustExec("insert into t values(508931), (508932)") - tk.MustQuery("select (select group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-")) + tk.MustQuery("select (select /*+ stream_agg() */ group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-")) + tk.MustQuery("select (select /*+ hash_agg() */ group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-")) } From 7a6b2734f14dcaa0a4b87ff9d0d1c000fd7fbb37 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 16 Oct 2019 18:33:21 +0800 Subject: [PATCH 3/4] fix test --- executor/aggregate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 31b0906c722ed..10075fb51ef20 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -210,7 +210,6 @@ func (e *HashAggExec) Close() error { } close(e.finalOutputCh) } - e.executed = false close(e.finishCh) for _, ch := range e.partialOutputChs { for range ch { @@ -218,6 +217,7 @@ func (e *HashAggExec) Close() error { } for range e.finalOutputCh { } + e.executed = false return e.baseExecutor.Close() } From bc2715dae2efecfaafc722d8b75f3f22675a32a3 Mon Sep 17 00:00:00 2001 From: Feng Liyuan Date: Wed, 16 Oct 2019 20:36:59 +0800 Subject: [PATCH 4/4] revert test --- executor/aggregate_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 3ece0759df75c..fca05420430ad 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -735,7 +735,6 @@ func (s *testSuite1) TestIssue10608(c *C) { tk.MustExec("create table s(a int, b int)") tk.MustExec("insert into s values(100292, 508931), (120002, 508932)") tk.MustExec("insert into t values(508931), (508932)") - tk.MustQuery("select (select /*+ stream_agg() */ group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-")) - tk.MustQuery("select (select /*+ hash_agg() */ group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-")) + tk.MustQuery("select (select group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-")) }