diff --git a/cdc/kv/client.go b/cdc/kv/client.go index ef242276ea5..ed9ef4de9dc 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -611,12 +611,7 @@ MainLoop: } state := newRegionFeedState(sri, requestID) - hasOld := pendingRegions.insert(requestID, state) - if hasOld { - log.Error("region is already pending for the first response while trying to send another request."+ - "region merge may have happened which is not supported yet", - zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID)) - } + pendingRegions.insert(requestID, state) stream, ok := streams[rpcCtx.Addr] // Establish the stream if it has not been connected yet. @@ -624,10 +619,12 @@ MainLoop: stream, err = s.client.newStream(ctx, rpcCtx.Addr) if err != nil { // if get stream failed, maybe the store is down permanently, we should try to relocate the active store - log.Warn("get grpc stream client failed", zap.Error(err)) + log.Warn("get grpc stream client failed", + zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID), zap.Error(err)) bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) s.client.regionCache.OnSendFail(bo, rpcCtx, needReloadRegion(sri.failStoreIDs, rpcCtx), err) - // Retry connecting and sending the request. + // Delete the pendingRegion info from `pendingRegions` and retry connecting and sending the request. + pendingRegions.take(requestID) continue } streams[rpcCtx.Addr] = stream @@ -647,6 +644,7 @@ MainLoop: log.Error("send request to stream failed", zap.String("addr", rpcCtx.Addr), zap.Uint64("storeID", getStoreID(rpcCtx)), + zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID), zap.Error(err)) err1 := stream.CloseSend() @@ -656,6 +654,11 @@ MainLoop: // Delete the stream from the map so that the next time the store is accessed, the stream will be // re-established. delete(streams, rpcCtx.Addr) + // Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is + // requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all + // pending regions, the new pending regions that are requested after reconnecting won't be stopped + // incorrectly. + delete(storePendingRegions, rpcCtx.Addr) // Remove the region from pendingRegions. If it's already removed, it should be already retried by // `receiveFromStream`, so no need to retry here. @@ -968,7 +971,7 @@ func (s *eventFeedSession) receiveFromStream( zap.Uint64("regionID", event.RegionId), zap.Uint64("requestID", event.RequestId), zap.String("addr", addr)) - return errors.Errorf("received event regionID %v, requestID %v from %v but neither pending"+ + return errors.Errorf("received event regionID %v, requestID %v from %v but neither pending "+ "region nor running region was found", event.RegionId, event.RequestId, addr) }