From be01e5a2f866bd7e513338d242596577a4c2ec67 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Tue, 23 Jul 2024 19:42:11 +0800 Subject: [PATCH] executor: support detaching the `IndexReader` and `IndexLookUp` (#54800) close pingcap/tidb#54799 --- pkg/executor/detach.go | 43 ++++++++++++++++++ pkg/executor/detach_integration_test.go | 59 +++++++++++++++++++++++++ pkg/executor/distsql.go | 7 ++- 3 files changed, 107 insertions(+), 2 deletions(-) diff --git a/pkg/executor/detach.go b/pkg/executor/detach.go index b65de754f1f09..8baf207bf1f6c 100644 --- a/pkg/executor/detach.go +++ b/pkg/executor/detach.go @@ -63,6 +63,29 @@ func (treCtx tableReaderExecutorContext) Detach() tableReaderExecutorContext { return treCtx } +func (ireCtx indexReaderExecutorContext) Detach() indexReaderExecutorContext { + newCtx := ireCtx + + if ctx, ok := ireCtx.ectx.(*contextsession.SessionExprContext); ok { + staticExprCtx := ctx.IntoStatic() + + newCtx.dctx = newCtx.dctx.Detach() + newCtx.rctx = newCtx.rctx.Detach(staticExprCtx) + newCtx.buildPBCtx = newCtx.buildPBCtx.Detach(staticExprCtx) + newCtx.ectx = staticExprCtx + return newCtx + } + + return ireCtx +} + +func (iluCtx indexLookUpExecutorContext) Detach() indexLookUpExecutorContext { + newCtx := iluCtx + newCtx.tableReaderExecutorContext = newCtx.tableReaderExecutorContext.Detach() + + return iluCtx +} + // Detach detaches the current executor from the session context. func (e *TableReaderExecutor) Detach() (exec.Executor, bool) { newExec := new(TableReaderExecutor) @@ -72,3 +95,23 @@ func (e *TableReaderExecutor) Detach() (exec.Executor, bool) { return newExec, true } + +// Detach detaches the current executor from the session context. +func (e *IndexReaderExecutor) Detach() (exec.Executor, bool) { + newExec := new(IndexReaderExecutor) + *newExec = *e + + newExec.indexReaderExecutorContext = newExec.indexReaderExecutorContext.Detach() + + return newExec, true +} + +// Detach detaches the current executor from the session context. +func (e *IndexLookUpExecutor) Detach() (exec.Executor, bool) { + newExec := new(IndexLookUpExecutor) + *newExec = *e + + newExec.indexLookUpExecutorContext = newExec.indexLookUpExecutorContext.Detach() + + return newExec, true +} diff --git a/pkg/executor/detach_integration_test.go b/pkg/executor/detach_integration_test.go index df3d81bc0831d..7a8de51af47db 100644 --- a/pkg/executor/detach_integration_test.go +++ b/pkg/executor/detach_integration_test.go @@ -172,3 +172,62 @@ func TestDetachWithParam(t *testing.T) { stop.Store(true) wg.Wait() } + +func TestDetachIndexReaderAndIndexLookUp(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true) + tk.MustExec("create table t (a int, b int, c int, key idx_a_b (a,b), key idx_b (b))") + for i := 0; i < 10000; i++ { + tk.MustExec("insert into t values (?, ?, ?)", i, i, i) + } + + // Test detach index reader + tk.MustHavePlan("select a, b from t where a > 100 and a < 200", "IndexReader") + rs, err := tk.Exec("select a, b from t where a > ? and a < ?", 100, 200) + require.NoError(t, err) + drs, ok, err := rs.(sqlexec.DetachableRecordSet).TryDetach() + require.NoError(t, err) + require.True(t, ok) + + chk := drs.NewChunk(nil) + expectedSelect := 101 + for { + err = drs.Next(context.Background(), chk) + require.NoError(t, err) + + if chk.NumRows() == 0 { + break + } + for i := 0; i < chk.NumRows(); i++ { + require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(0)) + require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(1)) + expectedSelect++ + } + } + + // Test detach indexLookUp + tk.MustHavePlan("select c from t use index(idx_b) where b > 100 and b < 200", "IndexLookUp") + rs, err = tk.Exec("select c from t where b > ? and b < ?", 100, 200) + require.NoError(t, err) + drs, ok, err = rs.(sqlexec.DetachableRecordSet).TryDetach() + require.NoError(t, err) + require.True(t, ok) + + chk = drs.NewChunk(nil) + expectedSelect = 101 + for { + err = drs.Next(context.Background(), chk) + require.NoError(t, err) + + if chk.NumRows() == 0 { + break + } + for i := 0; i < chk.NumRows(); i++ { + require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(0)) + expectedSelect++ + } + } +} diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 19d1b14801725..ec6dc3abd10b4 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -467,8 +467,8 @@ type IndexLookUpExecutor struct { // All fields above are immutable. - idxWorkerWg sync.WaitGroup - tblWorkerWg sync.WaitGroup + idxWorkerWg *sync.WaitGroup + tblWorkerWg *sync.WaitGroup finished chan struct{} resultCh chan *lookupTableTask @@ -620,6 +620,9 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error { return err } } + + e.idxWorkerWg = &sync.WaitGroup{} + e.tblWorkerWg = &sync.WaitGroup{} return nil }