diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 93af98a228488..b118bc0cb40f8 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -268,7 +268,7 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error { failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { // Wait for processWorker to close resultCh. - time.Sleep(2) + time.Sleep(time.Second * 2) // Should use fetchCh instead of resultCh to send error. syncErr(ctx, e.finished, fetchCh, errors.New("testIndexMergeResultChCloseEarly")) }) @@ -347,8 +347,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, for parTblIdx, keyRange := range keyRanges { // check if this executor is closed select { + case <-ctx.Done(): + return case <-e.finished: - break + return default: } @@ -466,8 +468,10 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, for _, tbl := range tbls { // check if this executor is closed select { + case <-ctx.Done(): + return case <-e.finished: - break + return default: } @@ -720,6 +724,12 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e } func (e *IndexMergeReaderExecutor) getResultTask() (*lookupTableTask, error) { + failpoint.Inject("testIndexMergeMainReturnEarly", func(_ failpoint.Value) { + // To make sure processWorker make resultCh to be full. + // When main goroutine close finished, processWorker may be stuck when writing resultCh. + time.Sleep(time.Second * 20) + failpoint.Return(nil, errors.New("failpoint testIndexMergeMainReturnEarly")) + }) if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) { return e.resultCurr, nil } @@ -747,6 +757,7 @@ func handleWorkerPanic(ctx context.Context, finished <-chan struct{}, ch chan<- defer close(ch) } if r == nil { + logutil.BgLogger().Info("worker finish without panic", zap.Any("worker", worker)) return } @@ -800,7 +811,20 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil) distinctHandles := make(map[int64]*kv.HandleMap) - for task := range fetchCh { + for { + var ok bool + var task *lookupTableTask + select { + case <-ctx.Done(): + return + case <-finished: + return + case task, ok = <-fetchCh: + if !ok { + return + } + } + select { case err := <-task.doneCh: // If got error from partialIndexWorker/partialTableWorker, stop processing. @@ -836,7 +860,7 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan if len(fhs) == 0 { continue } - task := &lookupTableTask{ + task = &lookupTableTask{ handles: fhs, doneCh: make(chan error, 1), @@ -845,13 +869,27 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan if w.stats != nil { w.stats.IndexMergeProcess += time.Since(start) } + failpoint.Inject("testIndexMergeProcessWorkerUnionHang", func(_ failpoint.Value) { + for i := 0; i < cap(resultCh); i++ { + select { + case resultCh <- &lookupTableTask{}: + default: + } + } + }) select { case <-ctx.Done(): return case <-finished: return case workCh <- task: - resultCh <- task + select { + case <-ctx.Done(): + return + case <-finished: + return + case resultCh <- task: + } } } } @@ -994,12 +1032,14 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task ** for { waitStart := time.Now() select { + case <-ctx.Done(): + return + case <-w.finished: + return case *task, ok = <-w.workCh: if !ok { return } - case <-w.finished: - return } // Make sure panic failpoint is after fetch task from workCh. // Otherwise cannot send error to task.doneCh. @@ -1020,13 +1060,20 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task ** atomic.AddInt64(&w.stats.TableTaskNum, 1) } failpoint.Inject("testIndexMergePickAndExecTaskPanic", nil) - (*task).doneCh <- err + select { + case <-ctx.Done(): + return + case <-w.finished: + return + case (*task).doneCh <- err: + } } } func (w *indexMergeTableScanWorker) handleTableScanWorkerPanic(ctx context.Context, finished <-chan struct{}, task **lookupTableTask, worker string) func(r interface{}) { return func(r interface{}) { if r == nil { + logutil.BgLogger().Info("worker finish without panic", zap.Any("worker", worker)) return } diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 3cf974acff70d..a5055f9c64e18 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -532,17 +532,7 @@ func TestIndexMergeCoprGoroutinesLeak(t *testing.T) { defer clean() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - - tk.MustExec("use test") - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") - insertStr := "insert into t1 values(0, 0, 0)" - for i := 1; i < 1000; i++ { - insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) - } - tk.MustExec(insertStr) - tk.MustExec("analyze table t1;") - tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + setupPartitionTableHelper(tk) var err error sql := "select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;" @@ -654,16 +644,7 @@ func TestIndexMergePanic(t *testing.T) { tk.MustExec("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c1 < 100 or c2 < 100") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly")) - tk.MustExec("use test") - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") - insertStr := "insert into t1 values(0, 0, 0)" - for i := 1; i < 1000; i++ { - insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) - } - tk.MustExec(insertStr) - tk.MustExec("analyze table t1;") - tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + setupPartitionTableHelper(tk) minV := 200 maxV := 1000 @@ -718,3 +699,35 @@ func TestIndexMergePanic(t *testing.T) { require.NoError(t, failpoint.Disable(fp)) } } + +func setupPartitionTableHelper(tk *testkit.TestKit) { + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") + insertStr := "insert into t1 values(0, 0, 0)" + for i := 1; i < 1000; i++ { + insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) + } + tk.MustExec(insertStr) + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") +} + +func TestIndexMergeProcessWorkerHang(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + setupPartitionTableHelper(tk) + + var err error + sql := "select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;" + res := tk.MustQuery("explain " + sql).Rows() + require.Contains(t, res[1][0], "IndexMerge") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeMainReturnEarly", "return()")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeProcessWorkerUnionHang", "return(true)")) + err = tk.QueryToErr(sql) + require.Contains(t, err.Error(), "testIndexMergeMainReturnEarly") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeMainReturnEarly")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeProcessWorkerUnionHang")) +}