Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <you1474600@gmail.com>
  • Loading branch information
you06 committed Mar 17, 2023
1 parent b9d5044 commit 4ed6972
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,11 +1179,6 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
req.ReplicaRead = true
req.ReplicaReadType = options.GetTiKVReplicaReadType(kv.ReplicaReadFollower)
ops = append(ops, tikv.WithMatchStores([]uint64{*task.redirect2Replica}))
if replicaRead == kv.ReplicaReadLeader {
replicaRead = kv.ReplicaReadFollower
req.ReplicaRead = true
req.ReplicaReadType = options.GetTiKVReplicaReadType(replicaRead)
}
}
resp, rpcCtx, storeAddr, err := worker.kvclient.SendReqCtx(bo.TiKVBackoffer(), req, task.region, tikv.ReadTimeoutMedium, getEndPointType(task.storeType), task.storeAddr, ops...)
err = derr.ToTiDBErr(err)
Expand Down Expand Up @@ -1512,17 +1507,21 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t
appendRemainTasks(t.task)
}
if regionErr := resp.GetRegionError(); regionErr != nil && regionErr.ServerIsBusy != nil &&
regionErr.ServerIsBusy.EstimatedWaitMs > 0 && len(batchResps) == 0 && len(remainTasks) != 0 {
busyThresholdFallback = true
handler := newBatchTaskBuilder(bo, worker.req, worker.store.GetRegionCache(), kv.ReplicaReadFollower)
for _, task := range remainTasks {
// do not set busy threshold again.
task.busyThreshold = 0
if err = handler.handle(task); err != nil {
return nil, err
regionErr.ServerIsBusy.EstimatedWaitMs > 0 && len(remainTasks) != 0 {
if len(batchResps) == 0 {
busyThresholdFallback = true
handler := newBatchTaskBuilder(bo, worker.req, worker.store.GetRegionCache(), kv.ReplicaReadFollower)
for _, task := range remainTasks {
// do not set busy threshold again.
task.busyThreshold = 0
if err = handler.handle(task); err != nil {
return nil, err
}
}
remainTasks = handler.build()
} else {
return nil, errors.New("store batched coprocessor with server is busy error shouldn't contain responses")
}
remainTasks = handler.build()
}
return remainTasks, nil
}
Expand Down

0 comments on commit 4ed6972

Please sign in to comment.