Skip to content

Commit

Permalink
executor: try to optimize index lookup query may send too many cop ta…
Browse files Browse the repository at this point in the history
…sk (#53855)

close #53871
  • Loading branch information
crazycs520 authored Jun 13, 2024
1 parent fc342db commit 6cf8776
Showing 1 changed file with 34 additions and 6 deletions.
40 changes: 34 additions & 6 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPaging(e.indexPaging).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
Expand All @@ -686,12 +685,13 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
SetMemTracker(tracker).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias)

if builder.Request.Paging.Enable && builder.Request.Paging.MinPagingSize < uint64(initBatchSize) {
worker.batchSize = e.calculateBatchSize(initBatchSize, worker.maxBatchSize)
if builder.Request.Paging.Enable && builder.Request.Paging.MinPagingSize < uint64(worker.batchSize) {
// when paging enabled and Paging.MinPagingSize less than initBatchSize, change Paging.MinPagingSize to
// initBatchSize to avoid redundant paging RPC, see more detail in https://github.com/pingcap/tidb/issues/53827
builder.Request.Paging.MinPagingSize = uint64(initBatchSize)
if builder.Request.Paging.MaxPagingSize < uint64(initBatchSize) {
builder.Request.Paging.MaxPagingSize = uint64(initBatchSize)
builder.Request.Paging.MinPagingSize = uint64(worker.batchSize)
if builder.Request.Paging.MaxPagingSize < uint64(worker.batchSize) {
builder.Request.Paging.MaxPagingSize = uint64(worker.batchSize)
}
}
results := make([]distsql.SelectResult, 0, len(kvRanges))
Expand Down Expand Up @@ -724,7 +724,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
}
results = append(results, result)
}
worker.batchSize = min(initBatchSize, worker.maxBatchSize)
if len(results) > 1 && len(e.byItems) != 0 {
// e.Schema() not the output schema for indexReader, and we put byItems related column at first in `buildIndexReq`, so use nil here.
ssr := distsql.NewSortedSelectResults(e.Ctx().GetExprCtx().GetEvalCtx(), results, nil, e.byItems, e.memTracker)
Expand All @@ -746,6 +745,35 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
return nil
}

// calculateBatchSize calculates a suitable initial batch size.
func (e *IndexLookUpExecutor) calculateBatchSize(initBatchSize, maxBatchSize int) int {
var estRows int
if len(e.idxPlans) > 0 {
estRows = int(e.idxPlans[0].StatsCount())
}
return CalculateBatchSize(e.indexPaging, estRows, initBatchSize, maxBatchSize)
}

// CalculateBatchSize calculates a suitable initial batch size. It exports for testing.
func CalculateBatchSize(indexPaging bool, estRows, initBatchSize, maxBatchSize int) int {
batchSize := min(initBatchSize, maxBatchSize)
if indexPaging {
// If indexPaging is true means this query has limit, so use initBatchSize to avoid scan some unnecessary data.
return batchSize
}
if estRows >= maxBatchSize {
return maxBatchSize
}
for batchSize < estRows {
// If batchSize less than estRows, increase batch size to avoid unnecessary rpc.
batchSize = batchSize * 2
if batchSize >= maxBatchSize {
return maxBatchSize
}
}
return batchSize
}

// startTableWorker launches some background goroutines which pick tasks from workCh and execute the task.
func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) {
lookupConcurrencyLimit := e.Ctx().GetSessionVars().IndexLookupConcurrency()
Expand Down

0 comments on commit 6cf8776

Please sign in to comment.