diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 22061039616a5..656fc11186ce1 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" "github.com/tikv/client-go/v2/tikv" @@ -341,13 +342,18 @@ func (m *mppIterator) cancelMppTasks() { } // send cancel cmd to all stores where tasks run + wg := util.WaitGroupWrapper{} for addr := range usedStoreAddrs { - _, err := m.store.GetTiKVClient().SendRequest(context.Background(), addr, wrappedReq, tikv.ReadTimeoutShort) - logutil.BgLogger().Debug("cancel task ", zap.Uint64("query id ", m.startTs), zap.String(" on addr ", addr)) - if err != nil { - logutil.BgLogger().Error("cancel task error: ", zap.Error(err), zap.Uint64(" for query id ", m.startTs), zap.String(" on addr ", addr)) - } + storeAddr := addr + wg.Run(func() { + _, err := m.store.GetTiKVClient().SendRequest(context.Background(), storeAddr, wrappedReq, tikv.ReadTimeoutShort) + logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr)) + if err != nil { + logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr)) + } + }) } + wg.Wait() } func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) {