diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 22cdde57496..c38ed3cc280 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -711,7 +711,9 @@ func (s *eventFeedSession) requestRegionToStore( s.onRegionFail(ctx, errInfo) continue } - s.storeStreamsCache[stream.addr] = stream + + s.addStream(stream) + log.Info("creating new stream to store to send request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -1389,6 +1391,35 @@ func (s *eventFeedSession) sendResolvedTs( return nil } +// addStream adds a stream to the session.streams. +// Note: It must be called with deleteStream in a same goroutine. +func (s *eventFeedSession) addStream(stream *eventFeedStream) error { + oldStream, ok := s.storeStreamsCache[stream.addr] + if ok { + failpoint.Inject("kvClientAddDuplicatedStream", func() { + log.Error("A stream to a same store already exists, it shouldn't happen, please report a bug", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("oldStreamID", oldStream.id), + zap.Uint64("newStreamID", stream.id)) + }) + // There is no need to return an error here because even if it happens, + // it does not harm the data correctness, but may only cause some lag spikes. + // Log it to help us improve the code. + log.Error("A stream to a same store already exists, it shouldn't happen, please report a bug", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("oldStreamID", oldStream.id), + zap.Uint64("newStreamID", stream.id)) + } + s.storeStreamsCache[stream.addr] = stream + return nil +} + // deleteStream deletes a stream from the session.streams. // If the stream is not found, it takes no effect. func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 5f048e2621f..399438984d1 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -1382,6 +1382,12 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) + err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientAddDuplicatedStream", failpointStr) + require.Nil(t, err) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientAddDuplicatedStream") + }() + err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError", failpointStr) require.Nil(t, err) defer func() { diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 3e26479f1d8..376444979e6 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -176,7 +176,6 @@ const ( "check-balance-interval": 60000000000, "add-table-batch-size": 50 }, - "enable-kv-connect-backoff": false, "cdc-v2": { "enable": false, "meta-store": {