Skip to content

Commit

Permalink
executor: fix IndexMerge hang when got oom cancel (#41633) (#41673)
Browse files Browse the repository at this point in the history
close #41545
  • Loading branch information
ti-chi-bot authored Mar 29, 2023
1 parent 8c49f07 commit d36fcb6
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 30 deletions.
65 changes: 56 additions & 9 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont
func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error {
failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) {
// Wait for processWorker to close resultCh.
time.Sleep(2)
time.Sleep(time.Second * 2)
// Should use fetchCh instead of resultCh to send error.
syncErr(ctx, e.finished, fetchCh, errors.New("testIndexMergeResultChCloseEarly"))
})
Expand Down Expand Up @@ -347,8 +347,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
for parTblIdx, keyRange := range keyRanges {
// check if this executor is closed
select {
case <-ctx.Done():
return
case <-e.finished:
break
return
default:
}

Expand Down Expand Up @@ -466,8 +468,10 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
for _, tbl := range tbls {
// check if this executor is closed
select {
case <-ctx.Done():
return
case <-e.finished:
break
return
default:
}

Expand Down Expand Up @@ -720,6 +724,12 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e
}

func (e *IndexMergeReaderExecutor) getResultTask() (*lookupTableTask, error) {
failpoint.Inject("testIndexMergeMainReturnEarly", func(_ failpoint.Value) {
// To make sure processWorker make resultCh to be full.
// When main goroutine close finished, processWorker may be stuck when writing resultCh.
time.Sleep(time.Second * 20)
failpoint.Return(nil, errors.New("failpoint testIndexMergeMainReturnEarly"))
})
if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) {
return e.resultCurr, nil
}
Expand Down Expand Up @@ -747,6 +757,7 @@ func handleWorkerPanic(ctx context.Context, finished <-chan struct{}, ch chan<-
defer close(ch)
}
if r == nil {
logutil.BgLogger().Info("worker finish without panic", zap.Any("worker", worker))
return
}

Expand Down Expand Up @@ -800,7 +811,20 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan
failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil)

distinctHandles := make(map[int64]*kv.HandleMap)
for task := range fetchCh {
for {
var ok bool
var task *lookupTableTask
select {
case <-ctx.Done():
return
case <-finished:
return
case task, ok = <-fetchCh:
if !ok {
return
}
}

select {
case err := <-task.doneCh:
// If got error from partialIndexWorker/partialTableWorker, stop processing.
Expand Down Expand Up @@ -836,7 +860,7 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan
if len(fhs) == 0 {
continue
}
task := &lookupTableTask{
task = &lookupTableTask{
handles: fhs,
doneCh: make(chan error, 1),

Expand All @@ -845,13 +869,27 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan
if w.stats != nil {
w.stats.IndexMergeProcess += time.Since(start)
}
failpoint.Inject("testIndexMergeProcessWorkerUnionHang", func(_ failpoint.Value) {
for i := 0; i < cap(resultCh); i++ {
select {
case resultCh <- &lookupTableTask{}:
default:
}
}
})
select {
case <-ctx.Done():
return
case <-finished:
return
case workCh <- task:
resultCh <- task
select {
case <-ctx.Done():
return
case <-finished:
return
case resultCh <- task:
}
}
}
}
Expand Down Expand Up @@ -994,12 +1032,14 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task **
for {
waitStart := time.Now()
select {
case <-ctx.Done():
return
case <-w.finished:
return
case *task, ok = <-w.workCh:
if !ok {
return
}
case <-w.finished:
return
}
// Make sure panic failpoint is after fetch task from workCh.
// Otherwise cannot send error to task.doneCh.
Expand All @@ -1020,13 +1060,20 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task **
atomic.AddInt64(&w.stats.TableTaskNum, 1)
}
failpoint.Inject("testIndexMergePickAndExecTaskPanic", nil)
(*task).doneCh <- err
select {
case <-ctx.Done():
return
case <-w.finished:
return
case (*task).doneCh <- err:
}
}
}

func (w *indexMergeTableScanWorker) handleTableScanWorkerPanic(ctx context.Context, finished <-chan struct{}, task **lookupTableTask, worker string) func(r interface{}) {
return func(r interface{}) {
if r == nil {
logutil.BgLogger().Info("worker finish without panic", zap.Any("worker", worker))
return
}

Expand Down
55 changes: 34 additions & 21 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,17 +532,7 @@ func TestIndexMergeCoprGoroutinesLeak(t *testing.T) {
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));")
insertStr := "insert into t1 values(0, 0, 0)"
for i := 1; i < 1000; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
setupPartitionTableHelper(tk)

var err error
sql := "select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;"
Expand Down Expand Up @@ -654,16 +644,7 @@ func TestIndexMergePanic(t *testing.T) {
tk.MustExec("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c1 < 100 or c2 < 100")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly"))

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;")
insertStr := "insert into t1 values(0, 0, 0)"
for i := 1; i < 1000; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
setupPartitionTableHelper(tk)

minV := 200
maxV := 1000
Expand Down Expand Up @@ -718,3 +699,35 @@ func TestIndexMergePanic(t *testing.T) {
require.NoError(t, failpoint.Disable(fp))
}
}

func setupPartitionTableHelper(tk *testkit.TestKit) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));")
insertStr := "insert into t1 values(0, 0, 0)"
for i := 1; i < 1000; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
}

func TestIndexMergeProcessWorkerHang(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
setupPartitionTableHelper(tk)

var err error
sql := "select /*+ use_index_merge(t1) */ c1 from t1 where c1 < 900 or c2 < 1000;"
res := tk.MustQuery("explain " + sql).Rows()
require.Contains(t, res[1][0], "IndexMerge")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeMainReturnEarly", "return()"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeProcessWorkerUnionHang", "return(true)"))
err = tk.QueryToErr(sql)
require.Contains(t, err.Error(), "testIndexMergeMainReturnEarly")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeMainReturnEarly"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeProcessWorkerUnionHang"))
}

0 comments on commit d36fcb6

Please sign in to comment.