Skip to content

Commit

Permalink
executor: support detaching the IndexReader and IndexLookUp (#54800)
Browse files Browse the repository at this point in the history
close #54799
  • Loading branch information
YangKeao authored Jul 23, 2024
1 parent a18b3c5 commit be01e5a
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 2 deletions.
43 changes: 43 additions & 0 deletions pkg/executor/detach.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,29 @@ func (treCtx tableReaderExecutorContext) Detach() tableReaderExecutorContext {
return treCtx
}

func (ireCtx indexReaderExecutorContext) Detach() indexReaderExecutorContext {
newCtx := ireCtx

if ctx, ok := ireCtx.ectx.(*contextsession.SessionExprContext); ok {
staticExprCtx := ctx.IntoStatic()

newCtx.dctx = newCtx.dctx.Detach()
newCtx.rctx = newCtx.rctx.Detach(staticExprCtx)
newCtx.buildPBCtx = newCtx.buildPBCtx.Detach(staticExprCtx)
newCtx.ectx = staticExprCtx
return newCtx
}

return ireCtx
}

func (iluCtx indexLookUpExecutorContext) Detach() indexLookUpExecutorContext {
newCtx := iluCtx
newCtx.tableReaderExecutorContext = newCtx.tableReaderExecutorContext.Detach()

return iluCtx
}

// Detach detaches the current executor from the session context.
func (e *TableReaderExecutor) Detach() (exec.Executor, bool) {
newExec := new(TableReaderExecutor)
Expand All @@ -72,3 +95,23 @@ func (e *TableReaderExecutor) Detach() (exec.Executor, bool) {

return newExec, true
}

// Detach detaches the current executor from the session context.
func (e *IndexReaderExecutor) Detach() (exec.Executor, bool) {
newExec := new(IndexReaderExecutor)
*newExec = *e

newExec.indexReaderExecutorContext = newExec.indexReaderExecutorContext.Detach()

return newExec, true
}

// Detach detaches the current executor from the session context.
func (e *IndexLookUpExecutor) Detach() (exec.Executor, bool) {
newExec := new(IndexLookUpExecutor)
*newExec = *e

newExec.indexLookUpExecutorContext = newExec.indexLookUpExecutorContext.Detach()

return newExec, true
}
59 changes: 59 additions & 0 deletions pkg/executor/detach_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,62 @@ func TestDetachWithParam(t *testing.T) {
stop.Store(true)
wg.Wait()
}

func TestDetachIndexReaderAndIndexLookUp(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
tk.MustExec("create table t (a int, b int, c int, key idx_a_b (a,b), key idx_b (b))")
for i := 0; i < 10000; i++ {
tk.MustExec("insert into t values (?, ?, ?)", i, i, i)
}

// Test detach index reader
tk.MustHavePlan("select a, b from t where a > 100 and a < 200", "IndexReader")
rs, err := tk.Exec("select a, b from t where a > ? and a < ?", 100, 200)
require.NoError(t, err)
drs, ok, err := rs.(sqlexec.DetachableRecordSet).TryDetach()
require.NoError(t, err)
require.True(t, ok)

chk := drs.NewChunk(nil)
expectedSelect := 101
for {
err = drs.Next(context.Background(), chk)
require.NoError(t, err)

if chk.NumRows() == 0 {
break
}
for i := 0; i < chk.NumRows(); i++ {
require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(0))
require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(1))
expectedSelect++
}
}

// Test detach indexLookUp
tk.MustHavePlan("select c from t use index(idx_b) where b > 100 and b < 200", "IndexLookUp")
rs, err = tk.Exec("select c from t where b > ? and b < ?", 100, 200)
require.NoError(t, err)
drs, ok, err = rs.(sqlexec.DetachableRecordSet).TryDetach()
require.NoError(t, err)
require.True(t, ok)

chk = drs.NewChunk(nil)
expectedSelect = 101
for {
err = drs.Next(context.Background(), chk)
require.NoError(t, err)

if chk.NumRows() == 0 {
break
}
for i := 0; i < chk.NumRows(); i++ {
require.Equal(t, int64(expectedSelect), chk.GetRow(i).GetInt64(0))
expectedSelect++
}
}
}
7 changes: 5 additions & 2 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,8 @@ type IndexLookUpExecutor struct {

// All fields above are immutable.

idxWorkerWg sync.WaitGroup
tblWorkerWg sync.WaitGroup
idxWorkerWg *sync.WaitGroup
tblWorkerWg *sync.WaitGroup
finished chan struct{}

resultCh chan *lookupTableTask
Expand Down Expand Up @@ -620,6 +620,9 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {
return err
}
}

e.idxWorkerWg = &sync.WaitGroup{}
e.tblWorkerWg = &sync.WaitGroup{}
return nil
}

Expand Down

0 comments on commit be01e5a

Please sign in to comment.