From a878806fe8295c0880a0e5d9100f59900c9ab437 Mon Sep 17 00:00:00 2001 From: Zhuhe Fang Date: Wed, 5 Feb 2020 12:23:23 +0800 Subject: [PATCH] executor: correctly handle panic for hashjoin build phase (#14056) --- executor/join.go | 19 ++++++++++++++++++- executor/seqtest/seq_executor_test.go | 15 +++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/executor/join.go b/executor/join.go index 83c2619932c1b..480f370d9b1e8 100644 --- a/executor/join.go +++ b/executor/join.go @@ -22,6 +22,7 @@ import ( "unsafe" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" @@ -281,6 +282,7 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C } chk := chunk.NewChunkWithCapacity(e.children[e.innerIdx].base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize) err = e.innerExec.Next(ctx, chk) + failpoint.Inject("errorFetchBuildSideRowsMockOOMPanic", nil) if err != nil { e.innerFinished <- errors.Trace(err) return @@ -551,7 +553,16 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { // innerResultCh transfers inner chunk from inner fetch to build hash table. innerResultCh := make(chan *chunk.Chunk, 1) doneCh := make(chan struct{}) - go util.WithRecovery(func() { e.fetchInnerRows(ctx, innerResultCh, doneCh) }, nil) + fetchBuildSideRowsOk := make(chan error, 1) + go util.WithRecovery( + func() { e.fetchInnerRows(ctx, innerResultCh, doneCh) }, + func(r interface{}) { + if r != nil { + fetchBuildSideRowsOk <- errors.Errorf("%v", r) + } + close(fetchBuildSideRowsOk) + }, + ) // TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe. err := e.buildHashTableForList(innerResultCh) @@ -562,6 +573,12 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { // wait fetchInnerRows be finished. for range innerResultCh { } + // Check whether err is nil to avoid sending redundant error into innerFinished. + if err == nil { + if err = <-fetchBuildSideRowsOk; err != nil { + e.innerFinished <- err + } + } } // buildHashTableForList builds hash table from `list`. diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 31407cfd433c8..921d41760efa4 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1104,6 +1104,21 @@ func (s *seqTestSuite) TestMaxDeltaSchemaCount(c *C) { tk.MustQuery("select @@global.tidb_max_delta_schema_count").Check(testkit.Rows("2048")) } +func (s *seqTestSuite) TestOOMPanicInHashJoinWhenFetchBuildRows(c *C) { + fpName := "github.com/pingcap/tidb/executor/errorFetchBuildSideRowsMockOOMPanic" + c.Assert(failpoint.Enable(fpName, `panic("ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(c1 int, c2 int)") + tk.MustExec("insert into t values(1,1),(2,2)") + err := tk.QueryToErr("select * from t as t2 join t as t1 where t1.c1=t2.c1") + c.Assert(err.Error(), Equals, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") +} + func (s *seqTestSuite) TestBatchDML(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test")