From 6cf8776242fbfe0b922488357867ca5244e7b8fa Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 13 Jun 2024 19:04:52 +0800 Subject: [PATCH] executor: try to optimize index lookup query may send too many cop task (#53855) close pingcap/tidb#53871 --- pkg/executor/distsql.go | 40 ++++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index ce1eaf7a2b96f..50eb917cf225a 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -676,7 +676,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< SetStartTS(e.startTS). SetDesc(e.desc). SetKeepOrder(e.keepOrder). - SetPaging(e.indexPaging). SetTxnScope(e.txnScope). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). @@ -686,12 +685,13 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< SetMemTracker(tracker). SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias) - if builder.Request.Paging.Enable && builder.Request.Paging.MinPagingSize < uint64(initBatchSize) { + worker.batchSize = e.calculateBatchSize(initBatchSize, worker.maxBatchSize) + if builder.Request.Paging.Enable && builder.Request.Paging.MinPagingSize < uint64(worker.batchSize) { // when paging enabled and Paging.MinPagingSize less than initBatchSize, change Paging.MinPagingSize to // initBatchSize to avoid redundant paging RPC, see more detail in https://github.com/pingcap/tidb/issues/53827 - builder.Request.Paging.MinPagingSize = uint64(initBatchSize) - if builder.Request.Paging.MaxPagingSize < uint64(initBatchSize) { - builder.Request.Paging.MaxPagingSize = uint64(initBatchSize) + builder.Request.Paging.MinPagingSize = uint64(worker.batchSize) + if builder.Request.Paging.MaxPagingSize < uint64(worker.batchSize) { + builder.Request.Paging.MaxPagingSize = uint64(worker.batchSize) } } results := make([]distsql.SelectResult, 0, len(kvRanges)) @@ -724,7 +724,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< } results = append(results, result) } - worker.batchSize = min(initBatchSize, worker.maxBatchSize) if len(results) > 1 && len(e.byItems) != 0 { // e.Schema() not the output schema for indexReader, and we put byItems related column at first in `buildIndexReq`, so use nil here. ssr := distsql.NewSortedSelectResults(e.Ctx().GetExprCtx().GetEvalCtx(), results, nil, e.byItems, e.memTracker) @@ -746,6 +745,35 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< return nil } +// calculateBatchSize calculates a suitable initial batch size. +func (e *IndexLookUpExecutor) calculateBatchSize(initBatchSize, maxBatchSize int) int { + var estRows int + if len(e.idxPlans) > 0 { + estRows = int(e.idxPlans[0].StatsCount()) + } + return CalculateBatchSize(e.indexPaging, estRows, initBatchSize, maxBatchSize) +} + +// CalculateBatchSize calculates a suitable initial batch size. It exports for testing. +func CalculateBatchSize(indexPaging bool, estRows, initBatchSize, maxBatchSize int) int { + batchSize := min(initBatchSize, maxBatchSize) + if indexPaging { + // If indexPaging is true means this query has limit, so use initBatchSize to avoid scan some unnecessary data. + return batchSize + } + if estRows >= maxBatchSize { + return maxBatchSize + } + for batchSize < estRows { + // If batchSize less than estRows, increase batch size to avoid unnecessary rpc. + batchSize = batchSize * 2 + if batchSize >= maxBatchSize { + return maxBatchSize + } + } + return batchSize +} + // startTableWorker launches some background goroutines which pick tasks from workCh and execute the task. func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) { lookupConcurrencyLimit := e.Ctx().GetSessionVars().IndexLookupConcurrency()