Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed Feb 7, 2024
1 parent 6f0170b commit f4bfdc7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
33 changes: 32 additions & 1 deletion cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,9 @@ func (s *eventFeedSession) requestRegionToStore(
s.onRegionFail(ctx, errInfo)
continue
}
s.storeStreamsCache[stream.addr] = stream

s.addStream(stream)

log.Info("creating new stream to store to send request",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
Expand Down Expand Up @@ -1389,6 +1391,35 @@ func (s *eventFeedSession) sendResolvedTs(
return nil
}

// addStream adds a stream to the session.streams.
// Note: It must be called with deleteStream in a same goroutine.
func (s *eventFeedSession) addStream(stream *eventFeedStream) error {
oldStream, ok := s.storeStreamsCache[stream.addr]
if ok {
failpoint.Inject("kvClientAddDuplicatedStream", func() {
log.Error("A stream to a same store already exists, it shouldn't happen, please report a bug",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("oldStreamID", oldStream.id),
zap.Uint64("newStreamID", stream.id))
})
// There is no need to return an error here because even if it happens,
// it does not harm the data correctness, but may only cause some lag spikes.
// Log it to help us improve the code.
log.Error("A stream to a same store already exists, it shouldn't happen, please report a bug",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("oldStreamID", oldStream.id),
zap.Uint64("newStreamID", stream.id))
}
s.storeStreamsCache[stream.addr] = stream
return nil
}

// deleteStream deletes a stream from the session.streams.
// If the stream is not found, it takes no effect.
func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) {
Expand Down
6 changes: 6 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,12 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) {
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4)

err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientAddDuplicatedStream", failpointStr)
require.Nil(t, err)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientAddDuplicatedStream")
}()

err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError", failpointStr)
require.Nil(t, err)
defer func() {
Expand Down
1 change: 0 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ const (
"check-balance-interval": 60000000000,
"add-table-batch-size": 50
},
"enable-kv-connect-backoff": false,
"cdc-v2": {
"enable": false,
"meta-store": {
Expand Down

0 comments on commit f4bfdc7

Please sign in to comment.