From 439b1e36beca1836cece0d183d9a511a1c40f74e Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 14 Mar 2023 15:01:35 -0400 Subject: [PATCH 1/2] kvcoord: Replace `withDiff` argument with `WithDiff` option Refactor RangeFeed call to take `kvcoord.WithDiff` option instead of stray boolean. Epic: None Release note: None --- pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go | 1 - pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go | 6 ++++-- pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go | 13 +++++++++---- .../kvcoord/dist_sender_rangefeed_mock_test.go | 3 +-- pkg/kv/kvclient/rangefeed/db_adapter.go | 3 +-- pkg/kv/kvclient/rangefeed/mocks_generated_test.go | 10 +++++----- pkg/kv/kvclient/rangefeed/rangefeed.go | 6 ++++-- pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go | 10 +++------- pkg/kv/kvnemesis/watcher.go | 2 +- pkg/kv/kvserver/client_rangefeed_test.go | 8 +++----- pkg/kv/kvserver/replica_rangefeed_test.go | 6 +++--- 11 files changed, 34 insertions(+), 34 deletions(-) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go index 1e73b666881c..aab10254453f 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go @@ -398,7 +398,6 @@ type rawEventFeed []kvpb.RangeFeedEvent func (f rawEventFeed) run( ctx context.Context, spans []kvcoord.SpanTimePair, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, opts ...kvcoord.RangeFeedOption, ) error { diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 190a5b7eecff..1eb61891c5fe 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -37,7 +37,6 @@ type rangeFeedConfig struct { type rangefeedFactory func( ctx context.Context, spans []kvcoord.SpanTimePair, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, opts ...kvcoord.RangeFeedOption, ) error @@ -79,9 +78,12 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang if cfg.UseMux { rfOpts = append(rfOpts, kvcoord.WithMuxRangeFeed()) } + if cfg.WithDiff { + rfOpts = append(rfOpts, kvcoord.WithDiff()) + } g.GoCtx(func(ctx context.Context) error { - return p(ctx, cfg.Spans, cfg.WithDiff, feed.eventC, rfOpts...) + return p(ctx, cfg.Spans, feed.eventC, rfOpts...) }) return g.Wait() } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 2c75ef7c72ab..5018bf203c9d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -114,6 +114,13 @@ func WithSystemTablePriority() RangeFeedOption { }) } +// WithDiff turns on "diff" option for the rangefeed. +func WithDiff() RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.withDiff = true + }) +} + // A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) @@ -134,7 +141,6 @@ func (ds *DistSender) RangeFeed( ctx context.Context, spans []roachpb.Span, startAfter hlc.Timestamp, // exclusive - withDiff bool, eventCh chan<- RangeFeedMessage, opts ...RangeFeedOption, ) error { @@ -145,7 +151,7 @@ func (ds *DistSender) RangeFeed( StartAfter: startAfter, }) } - return ds.RangeFeedSpans(ctx, timedSpans, withDiff, eventCh, opts...) + return ds.RangeFeedSpans(ctx, timedSpans, eventCh, opts...) } // SpanTimePair is a pair of span along with its starting time. The starting @@ -161,7 +167,6 @@ type SpanTimePair struct { func (ds *DistSender) RangeFeedSpans( ctx context.Context, spans []SpanTimePair, - withDiff bool, eventCh chan<- RangeFeedMessage, opts ...RangeFeedOption, ) error { @@ -178,7 +183,7 @@ func (ds *DistSender) RangeFeedSpans( ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender") defer sp.Finish() - rr := newRangeFeedRegistry(ctx, withDiff) + rr := newRangeFeedRegistry(ctx, cfg.withDiff) ds.activeRangeFeeds.Store(rr, nil) defer ds.activeRangeFeeds.Delete(rr) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index 480b8992ea28..e0736a7e3aa8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -172,8 +172,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { if useMuxRangeFeed { opts = append(opts, WithMuxRangeFeed()) } - err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, - false, nil, opts...) + err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, nil, opts...) require.Error(t, err) }) } diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go index 0a73d507b058..a2b0cb9dff0f 100644 --- a/pkg/kv/kvclient/rangefeed/db_adapter.go +++ b/pkg/kv/kvclient/rangefeed/db_adapter.go @@ -74,11 +74,10 @@ func (dbc *dbAdapter) RangeFeed( ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, opts ...kvcoord.RangeFeedOption, ) error { - return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC, opts...) + return dbc.distSender.RangeFeed(ctx, spans, startFrom, eventC, opts...) } // concurrentBoundAccount is a thread safe bound account. diff --git a/pkg/kv/kvclient/rangefeed/mocks_generated_test.go b/pkg/kv/kvclient/rangefeed/mocks_generated_test.go index 8c211dcb108d..2e6298b320b8 100644 --- a/pkg/kv/kvclient/rangefeed/mocks_generated_test.go +++ b/pkg/kv/kvclient/rangefeed/mocks_generated_test.go @@ -38,10 +38,10 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder { } // RangeFeed mocks base method. -func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.Timestamp, arg3 bool, arg4 chan<- kvcoord.RangeFeedMessage, arg5 ...kvcoord.RangeFeedOption) error { +func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.Timestamp, arg3 chan<- kvcoord.RangeFeedMessage, arg4 ...kvcoord.RangeFeedOption) error { m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1, arg2, arg3, arg4} - for _, a := range arg5 { + varargs := []interface{}{arg0, arg1, arg2, arg3} + for _, a := range arg4 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "RangeFeed", varargs...) @@ -50,9 +50,9 @@ func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.T } // RangeFeed indicates an expected call of RangeFeed. -func (mr *MockDBMockRecorder) RangeFeed(arg0, arg1, arg2, arg3, arg4 interface{}, arg5 ...interface{}) *gomock.Call { +func (mr *MockDBMockRecorder) RangeFeed(arg0, arg1, arg2, arg3 interface{}, arg4 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1, arg2, arg3, arg4}, arg5...) + varargs := append([]interface{}{arg0, arg1, arg2, arg3}, arg4...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeFeed", reflect.TypeOf((*MockDB)(nil).RangeFeed), varargs...) } diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index 9253ee38d9b6..74d47558e30a 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -55,7 +55,6 @@ type DB interface { ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, opts ...kvcoord.RangeFeedOption, ) error @@ -299,6 +298,9 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { if useMuxRangeFeed { rangefeedOpts = append(rangefeedOpts, kvcoord.WithMuxRangeFeed()) } + if f.withDiff { + rangefeedOpts = append(rangefeedOpts, kvcoord.WithDiff()) + } for i := 0; r.Next(); i++ { ts := frontier.Frontier() @@ -309,7 +311,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { start := timeutil.Now() rangeFeedTask := func(ctx context.Context) error { - return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh, rangefeedOpts...) + return f.client.RangeFeed(ctx, f.spans, ts, eventCh, rangefeedOpts...) } processEventsTask := func(ctx context.Context) error { return f.processEvents(ctx, frontier, eventCh) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go index 54ae877fdd83..906fa4c005b1 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -36,7 +36,6 @@ type mockClient struct { ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, ) error @@ -53,11 +52,10 @@ func (m *mockClient) RangeFeed( ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, opts ...kvcoord.RangeFeedOption, ) error { - return m.rangefeed(ctx, spans, startFrom, withDiff, eventC) + return m.rangefeed(ctx, spans, startFrom, eventC) } func (m *mockClient) Scan( @@ -165,9 +163,8 @@ func TestRangeFeedMock(t *testing.T) { return nil }, rangefeed: func( - ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, eventC chan<- kvcoord.RangeFeedMessage, ) error { - assert.False(t, withDiff) // it was not set sendEvent := func(ts hlc.Timestamp) { eventC <- kvcoord.RangeFeedMessage{ RangeFeedEvent: &kvpb.RangeFeedEvent{ @@ -269,9 +266,8 @@ func TestRangeFeedMock(t *testing.T) { return nil }, rangefeed: func( - ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, eventC chan<- kvcoord.RangeFeedMessage, ) error { - assert.True(t, withDiff) eventC <- kvcoord.RangeFeedMessage{ RangeFeedEvent: &kvpb.RangeFeedEvent{ Val: &kvpb.RangeFeedValue{ diff --git a/pkg/kv/kvnemesis/watcher.go b/pkg/kv/kvnemesis/watcher.go index d301b3a30868..c3ac5d532d4d 100644 --- a/pkg/kv/kvnemesis/watcher.go +++ b/pkg/kv/kvnemesis/watcher.go @@ -87,7 +87,7 @@ func Watch(ctx context.Context, env *Env, dbs []*kv.DB, dataSpan roachpb.Span) ( w.mu.Unlock() ds := dss[i] - err := ds.RangeFeed(ctx, []roachpb.Span{dataSpan}, ts, true /* withDiff */, eventC) + err := ds.RangeFeed(ctx, []roachpb.Span{dataSpan}, ts, eventC, kvcoord.WithDiff()) if isRetryableRangeFeedErr(err) { log.Infof(ctx, "got retryable RangeFeed error: %+v", err) continue diff --git a/pkg/kv/kvserver/client_rangefeed_test.go b/pkg/kv/kvserver/client_rangefeed_test.go index fcd8efa455f2..b4e068695a0c 100644 --- a/pkg/kv/kvserver/client_rangefeed_test.go +++ b/pkg/kv/kvserver/client_rangefeed_test.go @@ -77,7 +77,7 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) { rangefeedErrChan := make(chan error, 1) ctxToCancel, cancel := context.WithCancel(ctx) go func() { - rangefeedErrChan <- ds.RangeFeed(ctxToCancel, []roachpb.Span{descTableSpan}, startTS, false /* withDiff */, evChan) + rangefeedErrChan <- ds.RangeFeed(ctxToCancel, []roachpb.Span{descTableSpan}, startTS, evChan) }() // Note: 42 is a system descriptor. @@ -137,7 +137,7 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) { }) evChan := make(chan kvcoord.RangeFeedMessage) require.Regexp(t, `rangefeeds require the kv\.rangefeed.enabled setting`, - ds.RangeFeed(ctx, []roachpb.Span{scratchSpan}, startTS, false /* withDiff */, evChan)) + ds.RangeFeed(ctx, []roachpb.Span{scratchSpan}, startTS, evChan)) }) } @@ -191,7 +191,6 @@ func TestMergeOfRangeEventTableWhileRunningRangefeed(t *testing.T) { rangefeedErrChan <- ds.RangeFeed(rangefeedCtx, []roachpb.Span{lhsRepl.Desc().RSpan().AsRawSpanWithNoLocals()}, start, - false, /* withDiff */ eventCh) }() @@ -256,7 +255,6 @@ func TestRangefeedIsRoutedToNonVoter(t *testing.T) { rangefeedCtx, []roachpb.Span{desc.RSpan().AsRawSpanWithNoLocals()}, startTS, - false, /* withDiff */ eventCh, ) }() @@ -310,7 +308,7 @@ func TestRangefeedWorksOnLivenessRange(t *testing.T) { eventC := make(chan kvcoord.RangeFeedMessage) errC := make(chan error, 1) go func() { - errC <- ds.RangeFeed(ctx, []roachpb.Span{keys.NodeLivenessSpan}, startTS, false /* withDiff */, eventC) + errC <- ds.RangeFeed(ctx, []roachpb.Span{keys.NodeLivenessSpan}, startTS, eventC) }() // Wait for a liveness update. diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 316f6c950181..123c55eac771 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -1160,7 +1160,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) { span := roachpb.Span{ Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(), } - rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, false /* withDiff */, rangeFeedCh) + rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, rangeFeedCh) }() } @@ -1309,7 +1309,7 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) { span := roachpb.Span{ Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(), } - rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, false /* withDiff */, rangeFeedCh) + rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, rangeFeedCh) }() // Wait for a checkpoint above ts. @@ -1490,7 +1490,7 @@ func TestNewRangefeedForceLeaseRetry(t *testing.T) { } startRangefeed := func() { span := rangefeedSpan - rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, false /* withDiff */, rangeFeedCh) + rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, rangeFeedCh) } // Wait for a checkpoint above ts. From 4fbdabbaafb3156ac4a0874547df2c9cb4722304 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Thu, 2 Mar 2023 18:50:07 -0500 Subject: [PATCH 2/2] kvcoord: MuxRangeFeed client uses 1 go routine per node Rewrite MuxRangeFeed client to use 1 Go routine per node, instead of 1 Go routine per range. Prior to this change, MuxRangeFeed client was structured so that it was entirely compatible with the execution model of the regular range feed. As a result, 1 Go routine was used per range. This rewrite replaces old implementation with an almost clean slate implementation which uses 1 Go routine per node. Where possible, relatively small and targetted modifications to the rangefeed library were made to extract common methods (such as range splitting). The reduction in the number of Go routines created by rangefeed has direct impact on the cluster performance, and most importantly SQL latency. This is mostly due to the fact that with this PR, the number of Go routines started by MuxRangeFeed is down to 2 per range (on the rangefeed server side) vs 5 for the regular rangefeed. When running changefeeds against tables with 10s-100s of thousands of ranges, this significant difference in the Go routine count has direct impact on Go scheduler latency, the number of runnable Go routines, and ultimately, on the SQL latency. Epic: none Release note (enterprise change) : MuxRangeFeed client (enabled via `changefeed.mux_rangefeed.enabled` setting) is more efficient when running against large scale workloads. --- pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 + .../kvcoord/dist_sender_mux_rangefeed.go | 629 +++++++++++------- .../kvclient/kvcoord/dist_sender_rangefeed.go | 409 +++++++----- .../dist_sender_rangefeed_mock_test.go | 16 +- .../kvcoord/dist_sender_rangefeed_test.go | 4 +- .../kvclient/rangefeed/rangefeed_mock_test.go | 6 +- 6 files changed, 659 insertions(+), 406 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index ced73d0b9909..f85bb1e61855 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -69,6 +69,7 @@ go_library( "//pkg/util/ctxgroup", "//pkg/util/envutil", "//pkg/util/errorutil/unimplemented", + "//pkg/util/future", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/iterutil", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 38086a99c1ff..2792f71bc941 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -12,16 +12,24 @@ package kvcoord import ( "context" - "sync" + "io" + "net" "sync/atomic" + "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/future" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -32,24 +40,15 @@ import ( // rangefeeds. rangefeedMuxer caches MuxRangeFeed stream per node, and executes // each range feed request on an appropriate node. type rangefeedMuxer struct { - // eventCh receives events from all active muxStreams. - eventCh chan *kvpb.MuxRangeFeedEvent - // Context group controlling execution of MuxRangeFeed calls. When this group - // cancels, the entire muxer shuts down. The goroutines started in `g` will - // always return `nil` errors except when they detect that the mux is shutting - // down. + // cancels, the entire muxer shuts down. g ctxgroup.Group - // When g cancels, demuxLoopDone gets closed. - demuxLoopDone chan struct{} - - mu struct { - syncutil.Mutex - - // map of active MuxRangeFeed clients. - clients map[roachpb.NodeID]*muxClientState - } + ds *DistSender + cfg rangeFeedConfig + registry *rangeFeedRegistry + catchupSem *limit.ConcurrentRequestLimiter + eventCh chan<- RangeFeedMessage // Each call to start new range feed gets a unique ID which is echoed back // by MuxRangeFeed rpc. This is done as a safety mechanism to make sure @@ -58,161 +57,233 @@ type rangefeedMuxer struct { // Accessed atomically. seqID int64 - // producers is a map of all rangefeeds running across all nodes. - // streamID -> *channelRangeFeedEventProducer. - producers syncutil.IntMap + // muxClient is a nodeID -> *muxStreamOrError + muxClients syncutil.IntMap } -// muxClientState is the state maintained for each MuxRangeFeed rpc. -type muxClientState struct { - initCtx termCtx // signaled when client ready to be used. - doneCtx terminationContext // signaled when client shuts down. - - // RPC state. Valid only after initCtx.Done(). - client kvpb.Internal_MuxRangeFeedClient - cancel context.CancelFunc +// muxRangeFeed is an entry point to establish MuxRangeFeed +// RPC for the specified spans. Waits for rangefeed to complete. +func muxRangeFeed( + ctx context.Context, + cfg rangeFeedConfig, + spans []SpanTimePair, + ds *DistSender, + rr *rangeFeedRegistry, + catchupSem *limit.ConcurrentRequestLimiter, + eventCh chan<- RangeFeedMessage, +) (retErr error) { + if log.V(1) { + log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans)) + start := timeutil.Now() + defer func() { + log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr) + }() + } - // Number of consumers (ranges) running on this node; accessed under rangefeedMuxer lock. - numStreams int + m := &rangefeedMuxer{ + g: ctxgroup.WithContext(ctx), + registry: rr, + ds: ds, + cfg: cfg, + catchupSem: catchupSem, + eventCh: eventCh, + } + divideAllSpansOnRangeBoundaries(spans, m.startSingleRangeFeed, ds, &m.g) + return errors.CombineErrors(m.g.Wait(), ctx.Err()) } -func newRangefeedMuxer(g ctxgroup.Group) *rangefeedMuxer { - m := &rangefeedMuxer{ - eventCh: make(chan *kvpb.MuxRangeFeedEvent), - demuxLoopDone: make(chan struct{}), - g: g, +// muxStream represents MuxRangeFeed RPC established with a node. +// +// MuxRangeFeed is a bidirectional RPC: the muxStream.sender is the client -> +// server portion of the stream, and muxStream.receiver is the server -> client +// portion. Any number of RangeFeedRequests may be initiated with the server +// (sender.Send). The server will send MuxRangeFeed for all the range feeds, and +// those events are received via receiver.Recv. If an error occurs with one of +// the logical range feeds, a MuxRangeFeedEvent describing the error will be +// emitted. This error can be handled appropriately, and rangefeed may be +// restarted. The sender and receiver may continue to be used to handle other +// requests and events. However, if either sender or receiver return an error, +// the entire stream must be torn down, and all active range feeds should be +// restarted. +type muxStream struct { + nodeID roachpb.NodeID + streams syncutil.IntMap // streamID -> *activeMuxRangeFeed. + + // mu must be held when starting rangefeed. + mu struct { + syncutil.Mutex + sender rangeFeedRequestSender + closed bool } +} - m.mu.clients = make(map[roachpb.NodeID]*muxClientState) - m.g.GoCtx(m.demuxLoop) +// muxStreamOrError is a tuple of mux stream connection or an error that +// occurred while connecting to the node. +type muxStreamOrError struct { + stream *muxStream + err error +} - return m +// activeMuxRangeFeed augments activeRangeFeed with additional state. +type activeMuxRangeFeed struct { + *activeRangeFeed + token rangecache.EvictionToken + startAfter hlc.Timestamp + catchupRes limit.Reservation } -// channelRangeFeedEventProducer is a rangeFeedEventProducer which receives -// events on input channel, and returns events when Recv is called. -type channelRangeFeedEventProducer struct { - // Event producer utilizes two contexts: - // - // - callerCtx connected to singleRangeFeed, i.e. a context that will cancel - // if a single-range rangefeed fails (range stuck, parent ctx cancels). - // - muxClientCtx connected to receiveEventsFromNode, i.e. a streaming RPC to - // a node serving multiple rangefeeds. This cancels if, for example, the - // remote node goes down or there are networking issues. - // - // When singleRangeFeed blocks in Recv(), we have to respect cancellations in - // both contexts. The implementation of Recv() on this type does this. - callerCtx context.Context - muxClientCtx terminationContext - - streamID int64 // stream ID for this producer. - eventCh chan *kvpb.RangeFeedEvent // consumer event channel. +func (a *activeMuxRangeFeed) release() { + a.activeRangeFeed.release() + if a.catchupRes != nil { + a.catchupRes.Release() + } } -var _ kvpb.RangeFeedEventProducer = (*channelRangeFeedEventProducer)(nil) +// the "Send" portion of the kvpb.Internal_MuxRangeFeedClient +type rangeFeedRequestSender interface { + Send(req *kvpb.RangeFeedRequest) error +} -// Recv implements rangeFeedEventProducer interface. -func (c *channelRangeFeedEventProducer) Recv() (*kvpb.RangeFeedEvent, error) { - select { - case <-c.callerCtx.Done(): - return nil, c.callerCtx.Err() - case <-c.muxClientCtx.Done(): - return nil, c.muxClientCtx.Err() - case e := <-c.eventCh: - return e, nil - } +// the "Recv" portion of the kvpb.Internal_MuxRangeFeedClient. +type muxRangeFeedEventReceiver interface { + Recv() (*kvpb.MuxRangeFeedEvent, error) } -// startMuxRangeFeed begins the execution of rangefeed for the specified -// RangeFeedRequest. -// The passed in client is only needed to establish MuxRangeFeed RPC. -func (m *rangefeedMuxer) startMuxRangeFeed( - ctx context.Context, client rpc.RestrictedInternalClient, req *kvpb.RangeFeedRequest, -) (kvpb.RangeFeedEventProducer, func(), error) { - ms, err := m.establishMuxConnection(ctx, client, req.Replica.NodeID) - if err != nil { - return nil, nil, err +// startSingleRangeFeed looks up routing information for the +// span, and begins execution of rangefeed. +func (m *rangefeedMuxer) startSingleRangeFeed( + ctx context.Context, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, +) error { + // Bound the partial rangefeed to the partial span. + span := rs.AsRawSpanWithNoLocals() + + var releaseTransport func() + maybeReleaseTransport := func() { + if releaseTransport != nil { + releaseTransport() + releaseTransport = nil + } } + defer maybeReleaseTransport() - req.StreamID = atomic.AddInt64(&m.seqID, 1) - streamCtx := logtags.AddTag(ctx, "stream", req.StreamID) - producer := &channelRangeFeedEventProducer{ - callerCtx: streamCtx, - muxClientCtx: ms.doneCtx, - streamID: req.StreamID, - eventCh: make(chan *kvpb.RangeFeedEvent), + // Before starting single rangefeed, acquire catchup scan quota. + catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem) + if err != nil { + return err } - m.producers.Store(req.StreamID, unsafe.Pointer(producer)) - if log.V(1) { - log.Info(streamCtx, "starting rangefeed") + // Register active mux range feed. + stream := &activeMuxRangeFeed{ + activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.ds.metrics.RangefeedRanges), + startAfter: startAfter, + catchupRes: catchupRes, + token: token, } + streamID := atomic.AddInt64(&m.seqID, 1) + + // stream ownership gets transferred (indicated by setting stream to nil) when + // we successfully send request. If we fail to do so, cleanup. + defer func() { + if stream != nil { + stream.release() + } + }() + + // Start a retry loop for sending the batch to the range. + for r := retry.StartWithCtx(ctx, m.ds.rpcRetryOptions); r.Next(); { + maybeReleaseTransport() + + // If we've cleared the descriptor on failure, re-lookup. + if !token.Valid() { + var err error + ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false) + if err != nil { + log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err) + if !rangecache.IsRangeLookupErrorRetryable(err) { + return err + } + continue + } + token = ri + } - cleanup := func() { - m.producers.Delete(req.StreamID) + // Establish a RangeFeed for a single Range. + log.VEventf(ctx, 1, "MuxRangeFeed starting for range %s@%s (rangeID %d)", + span, startAfter, token.Desc().RangeID) + transport, err := newTransportForRange(ctx, token.Desc(), m.ds) + if err != nil { + log.VErrEventf(ctx, 1, "Failed to create transport for %s ", token.String()) + continue + } + releaseTransport = transport.Release - m.mu.Lock() - defer m.mu.Unlock() + for !transport.IsExhausted() { + args := makeRangeFeedRequest(span, token.Desc().RangeID, m.cfg.overSystemTable, startAfter, m.cfg.withDiff) + args.Replica = transport.NextReplica() + args.StreamID = streamID - ms.numStreams-- - if ms.numStreams == 0 { - delete(m.mu.clients, req.Replica.NodeID) - if log.V(1) { - log.InfofDepth(streamCtx, 1, "shut down inactive mux for node %d", req.Replica.NodeID) + rpcClient, err := transport.NextInternalClient(ctx) + if err != nil { + log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err) + continue + } + + conn, err := m.establishMuxConnection(ctx, rpcClient, args.Replica.NodeID) + if err != nil { + return err + } + + if err := conn.startRangeFeed(streamID, stream, &args); err != nil { + log.VErrEventf(ctx, 1, + "RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err) + continue } - ms.cancel() + // We successfully established rangefeed, so the responsibility + // for releasing the stream is transferred to the mux go routine. + stream = nil + return nil } - } - if err := ms.client.Send(req); err != nil { - cleanup() - return nil, nil, err + // If the transport is exhausted, we evict routing token and retry range + // resolution. + token.Evict(ctx) + token = rangecache.EvictionToken{} } - return producer, cleanup, nil + + return ctx.Err() } // establishMuxConnection establishes MuxRangeFeed RPC with the node. func (m *rangefeedMuxer) establishMuxConnection( ctx context.Context, client rpc.RestrictedInternalClient, nodeID roachpb.NodeID, -) (*muxClientState, error) { - // NB: the `ctx` in scope here belongs to a client for a single range feed, and must - // not influence the lifetime of the mux connection. At the time of writing, the caller - // is `singleRangeFeed` which calls into this method through its streamProducerFactory - // argument. - m.mu.Lock() - ms, found := m.mu.clients[nodeID] - if !found { - // Initialize muxClientState. - // Only initCtx is initialized here since we need to block on it. - // The rest of the initialization happens in startNodeMuxRangeFeed. - ms = &muxClientState{initCtx: makeTerminationContext()} - // Kick off client initialization on another Go routine. - // It is important that we start MuxRangeFeed RPC using long-lived - // context available in the main context group used for this muxer. +) (*muxStream, error) { + ptr, exists := m.muxClients.LoadOrStore(int64(nodeID), unsafe.Pointer(future.Make[muxStreamOrError]())) + muxClient := (*future.Future[muxStreamOrError])(ptr) + if !exists { + // Start mux rangefeed go routine responsible for receiving MuxRangeFeedEvents. m.g.GoCtx(func(ctx context.Context) error { - return m.startNodeMuxRangeFeed(ctx, ms, client, nodeID) + return m.startNodeMuxRangeFeed(ctx, client, nodeID, muxClient) }) - m.mu.clients[nodeID] = ms } - ms.numStreams++ - m.mu.Unlock() // Ensure mux client is ready. + init := future.MakeAwaitableFuture(muxClient) select { case <-ctx.Done(): return nil, ctx.Err() - case <-ms.initCtx.Done(): - return ms, ms.initCtx.Err() + case <-init.Done(): + c := init.Get() + return c.stream, c.err } } // startNodeMuxRangeFeedLocked establishes MuxRangeFeed RPC with the node. func (m *rangefeedMuxer) startNodeMuxRangeFeed( ctx context.Context, - ms *muxClientState, client rpc.RestrictedInternalClient, nodeID roachpb.NodeID, -) error { + stream *future.Future[muxStreamOrError], +) (retErr error) { ctx = logtags.AddTag(ctx, "mux_n", nodeID) // Add "generation" number to the context so that log messages and stacks can // differentiate between multiple instances of mux rangefeed Go routine @@ -222,141 +293,249 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( defer restore() if log.V(1) { - log.Info(ctx, "Establishing MuxRangeFeed") + log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID) start := timeutil.Now() defer func() { - log.Infof(ctx, "MuxRangeFeed terminating after %s", timeutil.Since(start)) + log.Infof(ctx, "MuxRangeFeed to node %d terminating after %s with err=%v", + nodeID, timeutil.Since(start), retErr) }() } - doneCtx := makeTerminationContext() - ms.doneCtx = &doneCtx ctx, cancel := context.WithCancel(ctx) + defer cancel() + + mux, err := client.MuxRangeFeed(ctx) + if err != nil { + return future.MustSet(stream, muxStreamOrError{err: err}) + } - ms.cancel = func() { - cancel() - doneCtx.close(context.Canceled) + ms := muxStream{nodeID: nodeID} + ms.mu.sender = mux + if err := future.MustSet(stream, muxStreamOrError{stream: &ms}); err != nil { + return err } - defer ms.cancel() - - // NB: it is important that this Go routine never returns an error. Errors - // should be propagated to the caller either via initCtx.err, or doneCtx.err. - // We do this to make sure that this error does not kill entire context group. - // We want the caller (singleRangeFeed) to decide if this error is retry-able. - var err error - ms.client, err = client.MuxRangeFeed(ctx) - ms.initCtx.close(err) - - if err == nil { - err = m.receiveEventsFromNode(ctx, ms) + + stuckWatcher := newStuckRangeFeedCanceler(cancel, defaultStuckRangeThreshold(m.ds.st)) + defer stuckWatcher.stop() + + if recvErr := m.receiveEventsFromNode(ctx, mux, stuckWatcher, &ms); recvErr != nil { + // Clear out this client, and restart all streams on this node. + // Note: there is a race here where we may delete this muxClient, while + // another go routine loaded it. That's fine, since we would not + // be able to send new request on this stream anymore, and we'll retry + // against another node. + m.muxClients.Delete(int64(nodeID)) + + if recvErr == io.EOF { + recvErr = nil + } + + return ms.closeWithRestart(ctx, recvErr, func(_ int64, a *activeMuxRangeFeed) error { + return m.restartActiveRangeFeed(ctx, a, recvErr) + }) } - doneCtx.close(err) - // We propagated error to the caller via init/done context. - return nil //nolint:returnerrcheck + return nil } -// demuxLoop de-multiplexes events and sends them to appropriate rangefeed event -// consumer. -func (m *rangefeedMuxer) demuxLoop(ctx context.Context) (retErr error) { - defer close(m.demuxLoopDone) +// receiveEventsFromNode receives mux rangefeed events from a node. +func (m *rangefeedMuxer) receiveEventsFromNode( + ctx context.Context, + receiver muxRangeFeedEventReceiver, + stuckWatcher *stuckRangeFeedCanceler, + ms *muxStream, +) error { + stuckThreshold := defaultStuckRangeThreshold(m.ds.st) + stuckCheckFreq := func() time.Duration { + if threshold := stuckThreshold(); threshold > 0 { + return threshold + } + return time.Minute + } + nextStuckCheck := timeutil.Now().Add(stuckCheckFreq()) + var event *kvpb.MuxRangeFeedEvent for { - select { - case <-ctx.Done(): - return ctx.Err() - case e := <-m.eventCh: - var producer *channelRangeFeedEventProducer - if v, found := m.producers.Load(e.StreamID); found { - producer = (*channelRangeFeedEventProducer)(v) + if err := stuckWatcher.do(func() (err error) { + event, err = receiver.Recv() + return err + }); err != nil { + return err + } + + active := ms.lookupStream(event.StreamID) + + // The stream may already have terminated. That's fine -- we may have + // encountered range split or similar rangefeed error, causing the caller to + // exit (and terminate this stream), but the server side stream termination + // is async and probabilistic (rangefeed registration output loop may have a + // checkpoint event available, *and* it may have context cancellation, but + // which one executes is a coin flip) and so it is possible that we may see + // additional event(s) arriving for a stream that is no longer active. + if active == nil { + if log.V(1) { + log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event) } + continue + } - // The stream may already have terminated (either producer is nil, or - // producer.muxClientCtx.Done()). That's fine -- we may have encountered range - // split or similar rangefeed error, causing the caller to exit (and - // terminate this stream), but the server side stream termination is async - // and probabilistic (rangefeed registration output loop may have a - // checkpoint event available, *and* it may have context cancellation, but - // which one executes is a coin flip) and so it is possible that we may - // see additional event(s) arriving for a stream that is no longer active. - if producer == nil { - if log.V(1) { - log.Infof(ctx, "received stray event stream %d: %v", e.StreamID, e) + switch t := event.GetValue().(type) { + case *kvpb.RangeFeedCheckpoint: + if t.Span.Contains(active.Span) { + // If we see the first non-empty checkpoint, we know we're done with the catchup scan. + if !t.ResolvedTS.IsEmpty() && active.catchupRes != nil { + active.catchupRes.Release() + active.catchupRes = nil } - continue + // Note that this timestamp means that all rows in the span with + // writes at or before the timestamp have now been seen. The + // Timestamp field in the request is exclusive, meaning if we send + // the request with exactly the ResolveTS, we'll see only rows after + // that timestamp. + active.startAfter.Forward(t.ResolvedTS) + } + case *kvpb.RangeFeedError: + log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) + if active.catchupRes != nil { + m.ds.metrics.RangefeedErrorCatchup.Inc(1) + } + ms.streams.Delete(event.StreamID) + if err := m.restartActiveRangeFeed(ctx, active, t.Error.GoError()); err != nil { + return err } + continue + } - select { - case <-ctx.Done(): - return ctx.Err() - case producer.eventCh <- &e.RangeFeedEvent: - case <-producer.callerCtx.Done(): - if log.V(1) { - log.Infof(ctx, "received stray event, but caller exited: stream=%d: e=%v", e.StreamID, e) - } - case <-producer.muxClientCtx.Done(): - if log.V(1) { - log.Infof(ctx, "received stray event, but node mux exited: stream=%d: e=%v", e.StreamID, e) + active.onRangeEvent(ms.nodeID, event.RangeID, &event.RangeFeedEvent) + msg := RangeFeedMessage{RangeFeedEvent: &event.RangeFeedEvent, RegisteredSpan: active.Span} + select { + case <-ctx.Done(): + return ctx.Err() + case m.eventCh <- msg: + } + + // Piggyback on this loop to check if any of the active ranges + // on this node appear to be stuck. + // NB: this does not notify the server in any way. We may have to add + // a more complex protocol -- or better yet, figure out why ranges + // get stuck in the first place. + if timeutil.Now().Before(nextStuckCheck) { + if threshold := stuckThreshold(); threshold > 0 { + if _, err := ms.eachStream(func(id int64, a *activeMuxRangeFeed) error { + if !a.startAfter.IsEmpty() && timeutil.Since(a.startAfter.GoTime()) > stuckThreshold() { + ms.streams.Delete(id) + return m.restartActiveRangeFeed(ctx, a, errRestartStuckRange) + } + return nil + }); err != nil { + return err } } + nextStuckCheck = timeutil.Now().Add(stuckCheckFreq()) } } } -// terminationContext (inspired by context.Context) describes -// termination information. -type terminationContext interface { - Done() <-chan struct{} - Err() error -} +// restartActiveRangeFeed restarts rangefeed after it encountered "reason" error. +func (m *rangefeedMuxer) restartActiveRangeFeed( + ctx context.Context, active *activeMuxRangeFeed, reason error, +) error { + if log.V(1) { + log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", + active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason) + } + active.setLastError(reason) + active.release() -// termCtx implements terminationContext, and allows error to be set. -type termCtx struct { - sync.Once - done chan struct{} - err error -} + errInfo, err := handleRangefeedError(ctx, reason) + if err != nil { + // If this is an error we cannot recover from, terminate the rangefeed. + return err + } -func makeTerminationContext() termCtx { - return termCtx{done: make(chan struct{})} -} + if errInfo.evict && active.token.Valid() { + active.token.Evict(ctx) + active.token = rangecache.EvictionToken{} + } -func (tc *termCtx) Done() <-chan struct{} { - return tc.done -} -func (tc *termCtx) Err() error { - return tc.err + rs, err := keys.SpanAddr(active.Span) + if err != nil { + return err + } + + if errInfo.resolveSpan { + return divideSpanOnRangeBoundaries(ctx, m.ds, rs, active.startAfter, m.startSingleRangeFeed) + } + return m.startSingleRangeFeed(ctx, rs, active.startAfter, active.token) } -// close closes this context with specified error. -func (tc *termCtx) close(err error) { - tc.Do(func() { - tc.err = err - close(tc.done) - }) +// startRangeFeed initiates rangefeed for the specified request running +// on this node connection. If no error returned, registers stream +// with this connection. Otherwise, stream is not registered. +func (c *muxStream) startRangeFeed( + streamID int64, stream *activeMuxRangeFeed, req *kvpb.RangeFeedRequest, +) error { + // NB: lock must be held for the duration of this method. + c.mu.Lock() + defer c.mu.Unlock() + + if c.mu.closed { + return net.ErrClosed + } + + // Concurrent Send calls are not thread safe; thus Send calls must be + // synchronized. + if err := c.mu.sender.Send(req); err != nil { + return err + } + + // mu must be held while marking this stream in flight (streams.Store) to + // synchronize with mux termination. When node mux terminates, it invokes + // c.closeWithRestart(), which marks this mux stream connection closed and + // restarts all active streams. Thus, we must make sure that this streamID + // gets properly recorded even if mux go routine terminates right after the + // above sender.Send() succeeded. + c.streams.Store(streamID, unsafe.Pointer(stream)) + return nil } -// receiveEventsFromNode receives mux rangefeed events, and forwards them to the -// demuxLoop. -// Passed in context must be the context used to create ms.client. -func (m *rangefeedMuxer) receiveEventsFromNode(ctx context.Context, ms *muxClientState) error { - for { - event, streamErr := ms.client.Recv() +func (c *muxStream) lookupStream(streamID int64) *activeMuxRangeFeed { + v, ok := c.streams.Load(streamID) + if ok { + return (*activeMuxRangeFeed)(v) + } + return nil +} - if streamErr != nil { - return streamErr - } +func (c *muxStream) closeWithRestart( + ctx context.Context, reason error, restartFn func(streamID int64, a *activeMuxRangeFeed) error, +) error { + c.mu.Lock() + c.mu.closed = true + c.mu.Unlock() + + // make sure that the underlying error is not fatal. If it is, there is no + // reason to restart each rangefeed, so just bail out. + if _, err := handleRangefeedError(ctx, reason); err != nil { + return err + } - select { - case <-ctx.Done(): - // Normally, when ctx is done, we would receive streamErr above. - // But it's possible that the context was canceled right after the last Recv(), - // and in that case we must exit. - return ctx.Err() - case <-m.demuxLoopDone: - // demuxLoop exited, and so should we (happens when main context group completes) - return errors.Wrapf(context.Canceled, "demux loop terminated") - case m.eventCh <- event: - } + n, err := c.eachStream(restartFn) + if log.V(1) { + log.Infof(ctx, "mux to node %d restarted %d streams: err=%v", c.nodeID, n, err) } + return err +} + +// eachStream invokes provided function for each stream. If the function +// returns an error, iteration stops. Returns number of streams processed. +func (c *muxStream) eachStream( + fn func(streamID int64, a *activeMuxRangeFeed) error, +) (n int, err error) { + c.streams.Range(func(key int64, value unsafe.Pointer) bool { + err = fn(key, (*activeMuxRangeFeed)(value)) + n++ + return err == nil + }) + return n, err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 5018bf203c9d..3f2fb1d92e1d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -37,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -88,6 +90,7 @@ func maxConcurrentCatchupScans(sv *settings.Values) int { type rangeFeedConfig struct { useMuxRangeFeed bool overSystemTable bool + withDiff bool } // RangeFeedOption configures a RangeFeed. @@ -162,6 +165,10 @@ type SpanTimePair struct { StartAfter hlc.Timestamp // exclusive } +func (p SpanTimePair) String() string { + return fmt.Sprintf("%s@%s", p.Span, p.StartAfter) +} + // RangeFeedSpans is similar to RangeFeed but allows specification of different // starting time for each span. func (ds *DistSender) RangeFeedSpans( @@ -190,19 +197,14 @@ func (ds *DistSender) RangeFeedSpans( catchupSem := limit.MakeConcurrentRequestLimiter( "distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV)) - g := ctxgroup.WithContext(ctx) - - var eventProducer rangeFeedEventProducerFactory if ds.st.Version.IsActive(ctx, clusterversion.TODODelete_V22_2RangefeedUseOneStreamPerNode) && enableMuxRangeFeed && cfg.useMuxRangeFeed { - m := newRangefeedMuxer(g) - eventProducer = m.startMuxRangeFeed - } else { - eventProducer = legacyRangeFeedEventProducer + return muxRangeFeed(ctx, cfg, spans, ds, rr, &catchupSem, eventCh) } // Goroutine that processes subdivided ranges and creates a rangefeed for // each. + g := ctxgroup.WithContext(ctx) rangeCh := make(chan singleRangeInfo, 16) g.GoCtx(func(ctx context.Context) error { for { @@ -210,8 +212,8 @@ func (ds *DistSender) RangeFeedSpans( case sri := <-rangeCh: // Spawn a child goroutine to process this feed. g.GoCtx(func(ctx context.Context) error { - return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter, - sri.token, withDiff, &catchupSem, rangeCh, eventCh, cfg) + return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startAfter, + sri.token, &catchupSem, rangeCh, eventCh, cfg) }) case <-ctx.Done(): return ctx.Err() @@ -220,6 +222,17 @@ func (ds *DistSender) RangeFeedSpans( }) // Kick off the initial set of ranges. + divideAllSpansOnRangeBoundaries(spans, sendSingleRangeInfo(rangeCh), ds, &g) + + return g.Wait() +} + +// divideAllSpansOnRangeBoundaries divides all spans on range boundaries and invokes +// provided onRange function for each range. Resolution happens concurrently using provided +// context group. +func divideAllSpansOnRangeBoundaries( + spans []SpanTimePair, onRange onRangeFn, ds *DistSender, g *ctxgroup.Group, +) { for _, s := range spans { func(stp SpanTimePair) { g.GoCtx(func(ctx context.Context) error { @@ -227,12 +240,10 @@ func (ds *DistSender) RangeFeedSpans( if err != nil { return err } - return ds.divideAndSendRangeFeedToRanges(ctx, rs, stp.StartAfter, rangeCh) + return divideSpanOnRangeBoundaries(ctx, ds, rs, stp.StartAfter, onRange) }) }(s) } - - return g.Wait() } // RangeFeedContext is the structure containing arguments passed to @@ -292,6 +303,7 @@ func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr // activeRangeFeed is a thread safe PartialRangeFeed. type activeRangeFeed struct { + release func() syncutil.Mutex PartialRangeFeed } @@ -339,8 +351,31 @@ func newRangeFeedRegistry(ctx context.Context, withDiff bool) *rangeFeedRegistry return rr } -func (ds *DistSender) divideAndSendRangeFeedToRanges( - ctx context.Context, rs roachpb.RSpan, startAfter hlc.Timestamp, rangeCh chan<- singleRangeInfo, +func sendSingleRangeInfo(rangeCh chan<- singleRangeInfo) onRangeFn { + return func(ctx context.Context, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken) error { + select { + case rangeCh <- singleRangeInfo{ + rs: rs, + startAfter: startAfter, + token: token, + }: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} + +type onRangeFn func( + ctx context.Context, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, +) error + +func divideSpanOnRangeBoundaries( + ctx context.Context, + ds *DistSender, + rs roachpb.RSpan, + startAfter hlc.Timestamp, + onRange onRangeFn, ) error { // As RangeIterator iterates, it can return overlapping descriptors (and // during splits, this happens frequently), but divideAndSendRangeFeedToRanges @@ -356,14 +391,8 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( return err } nextRS.Key = partialRS.EndKey - select { - case rangeCh <- singleRangeInfo{ - rs: partialRS, - startAfter: startAfter, - token: ri.Token(), - }: - case <-ctx.Done(): - return ctx.Err() + if err := onRange(ctx, partialRS, startAfter, ri.Token()); err != nil { + return err } if !ri.NeedAnother(nextRS) { break @@ -372,6 +401,29 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( return ri.Error() } +// newActiveRangeFeed registers active rangefeed with rangefeedRegistry. +// The caller must call active.release() in order to cleanup. +func newActiveRangeFeed( + span roachpb.Span, startAfter hlc.Timestamp, rr *rangeFeedRegistry, c *metric.Gauge, +) *activeRangeFeed { + // Register partial range feed with registry. + active := &activeRangeFeed{ + PartialRangeFeed: PartialRangeFeed{ + Span: span, + StartAfter: startAfter, + CreatedTime: timeutil.Now(), + }, + release: func() { + rr.ranges.Delete(active) + c.Dec(1) + }, + } + rr.ranges.Store(active, nil) + c.Inc(1) + + return active +} + // partialRangeFeed establishes a RangeFeed to the range specified by desc. It // manages lifecycle events of the range in order to maintain the RangeFeed // connection; this may involve instructing higher-level functions to retry @@ -379,11 +431,9 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( func (ds *DistSender) partialRangeFeed( ctx context.Context, rr *rangeFeedRegistry, - streamProducerFactory rangeFeedEventProducerFactory, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, - withDiff bool, catchupSem *limit.ConcurrentRequestLimiter, rangeCh chan<- singleRangeInfo, eventCh chan<- RangeFeedMessage, @@ -393,17 +443,8 @@ func (ds *DistSender) partialRangeFeed( span := rs.AsRawSpanWithNoLocals() // Register partial range feed with registry. - active := &activeRangeFeed{ - PartialRangeFeed: PartialRangeFeed{ - Span: span, - StartAfter: startAfter, - CreatedTime: timeutil.Now(), - }, - } - rr.ranges.Store(active, nil) - ds.metrics.RangefeedRanges.Inc(1) - defer rr.ranges.Delete(active) - defer ds.metrics.RangefeedRanges.Dec(1) + active := newActiveRangeFeed(span, startAfter, rr, ds.metrics.RangefeedRanges) + defer active.release() // Start a retry loop for sending the batch to the range. for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { @@ -427,79 +468,172 @@ func (ds *DistSender) partialRangeFeed( } maxTS, err := ds.singleRangeFeed( - ctx, span, startAfter, withDiff, token.Desc(), - catchupSem, eventCh, streamProducerFactory, active.onRangeEvent, cfg) + ctx, span, startAfter, token.Desc(), + catchupSem, eventCh, active.onRangeEvent, cfg) // Forward the timestamp in case we end up sending it again. startAfter.Forward(maxTS) - if err != nil { - active.setLastError(err) + if log.V(1) { + log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", + active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), err) + } + active.setLastError(err) - if log.V(1) { - log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", - span, startAfter, timeutil.Since(startAfter.GoTime()), err) - } - switch { - case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) || - errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)): - // These errors are likely to be unique to the replica that - // reported them, so no action is required before the next - // retry. - case errors.Is(err, errRestartStuckRange): - // Stuck ranges indicate a bug somewhere in the system. We are being - // defensive and attempt to restart this rangefeed. Usually, any - // stuck-ness is cleared out if we just attempt to re-resolve range - // descriptor and retry. - // - // The error contains the replica which we were waiting for. - log.Warningf(ctx, "restarting stuck rangefeed: %s", err) - token.Evict(ctx) - token = rangecache.EvictionToken{} - continue - case IsSendError(err), errors.HasType(err, (*kvpb.RangeNotFoundError)(nil)): - // Evict the descriptor from the cache and reload on next attempt. - token.Evict(ctx) - token = rangecache.EvictionToken{} - continue - case errors.HasType(err, (*kvpb.RangeKeyMismatchError)(nil)): - // Evict the descriptor from the cache. - token.Evict(ctx) - return ds.divideAndSendRangeFeedToRanges(ctx, rs, startAfter, rangeCh) - case errors.HasType(err, (*kvpb.RangeFeedRetryError)(nil)): - var t *kvpb.RangeFeedRetryError - if ok := errors.As(err, &t); !ok { - return errors.AssertionFailedf("wrong error type: %T", err) - } - switch t.Reason { - case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, - kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, - kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, - kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: - // Try again with same descriptor. These are transient - // errors that should not show up again. - continue - case kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT, - kvpb.RangeFeedRetryError_REASON_RANGE_MERGED, - kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: - // Evict the descriptor from the cache. - token.Evict(ctx) - return ds.divideAndSendRangeFeedToRanges(ctx, rs, startAfter, rangeCh) - default: - return errors.AssertionFailedf("unrecognized retryable error type: %T", err) - } - default: - return err - } + errInfo, err := handleRangefeedError(ctx, err) + if err != nil { + return err + } + if errInfo.evict { + token.Evict(ctx) + token = rangecache.EvictionToken{} + } + if errInfo.resolveSpan { + return divideSpanOnRangeBoundaries(ctx, ds, rs, active.StartAfter, sendSingleRangeInfo(rangeCh)) } } return ctx.Err() } +type rangefeedErrorInfo struct { + resolveSpan bool // true if the span resolution needs to be performed, and rangefeed restarted. + evict bool // true if routing info needs to be updated prior to retry. +} + +// handleRangefeedError handles an error that occurred while running rangefeed. +// Returns rangefeedErrorInfo describing how the error should be handled for the +// range. Returns an error if the entire rangefeed should terminate. +func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, error) { + if err == nil { + return rangefeedErrorInfo{}, nil + } + + switch { + case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) || + errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)): + // These errors are likely to be unique to the replica that + // reported them, so no action is required before the next + // retry. + return rangefeedErrorInfo{}, nil + case errors.Is(err, errRestartStuckRange): + // Stuck ranges indicate a bug somewhere in the system. We are being + // defensive and attempt to restart this rangefeed. Usually, any + // stuck-ness is cleared out if we just attempt to re-resolve range + // descriptor and retry. + // + // The error contains the replica which we were waiting for. + log.Warningf(ctx, "restarting stuck rangefeed: %s", err) + return rangefeedErrorInfo{evict: true}, nil + case IsSendError(err), errors.HasType(err, (*kvpb.RangeNotFoundError)(nil)): + return rangefeedErrorInfo{evict: true}, nil + case errors.HasType(err, (*kvpb.RangeKeyMismatchError)(nil)): + return rangefeedErrorInfo{evict: true, resolveSpan: true}, nil + case errors.HasType(err, (*kvpb.RangeFeedRetryError)(nil)): + var t *kvpb.RangeFeedRetryError + if ok := errors.As(err, &t); !ok { + return rangefeedErrorInfo{}, errors.AssertionFailedf("wrong error type: %T", err) + } + switch t.Reason { + case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, + kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, + kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, + kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: + // Try again with same descriptor. These are transient + // errors that should not show up again. + return rangefeedErrorInfo{}, nil + case kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT, + kvpb.RangeFeedRetryError_REASON_RANGE_MERGED, + kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: + return rangefeedErrorInfo{evict: true, resolveSpan: true}, nil + default: + return rangefeedErrorInfo{}, errors.AssertionFailedf("unrecognized retryable error type: %T", err) + } + default: + return rangefeedErrorInfo{}, err + } +} + +func acquireCatchupScanQuota( + ctx context.Context, ds *DistSender, catchupSem *limit.ConcurrentRequestLimiter, +) (limit.Reservation, error) { + // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take + // opportunity to update semaphore limit. + ds.metrics.RangefeedCatchupRanges.Inc(1) + catchupSem.SetLimit(maxConcurrentCatchupScans(&ds.st.SV)) + return catchupSem.Begin(ctx) +} + +// nweTransportForRange returns Transport for the specified range descriptor. +func newTransportForRange( + ctx context.Context, desc *roachpb.RangeDescriptor, ds *DistSender, +) (Transport, error) { + var latencyFn LatencyFunc + if ds.rpcContext != nil { + latencyFn = ds.rpcContext.RemoteClocks.Latency + } + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) + if err != nil { + return nil, err + } + replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality) + opts := SendOptions{class: connectionClass(&ds.st.SV)} + return ds.transportFactory(opts, ds.nodeDialer, replicas) +} + // onRangeEventCb is invoked for each non-error range event. // nodeID identifies the node ID which generated the event. type onRangeEventCb func(nodeID roachpb.NodeID, rangeID roachpb.RangeID, event *kvpb.RangeFeedEvent) +// makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and +// rangeID. Request is constructed to watch event after specified timestamp, and +// with optional diff. If the request corresponds to a system range, request +// receives higher admission priority. +func makeRangeFeedRequest( + span roachpb.Span, + rangeID roachpb.RangeID, + isSystemRange bool, + startAfter hlc.Timestamp, + withDiff bool, +) kvpb.RangeFeedRequest { + admissionPri := admissionpb.BulkNormalPri + if isSystemRange { + admissionPri = admissionpb.NormalPri + } + return kvpb.RangeFeedRequest{ + Span: span, + Header: kvpb.Header{ + Timestamp: startAfter, + RangeID: rangeID, + }, + WithDiff: withDiff, + AdmissionHeader: kvpb.AdmissionHeader{ + // NB: AdmissionHeader is used only at the start of the range feed + // stream since the initial catch-up scan is expensive. + Priority: int32(admissionPri), + CreateTime: timeutil.Now().UnixNano(), + Source: kvpb.AdmissionHeader_FROM_SQL, + NoMemoryReservedAtSource: true, + }, + } +} + +func defaultStuckRangeThreshold(st *cluster.Settings) func() time.Duration { + return func() time.Duration { + // Before the introduction of kv.rangefeed.range_stuck_threshold = 1m, + // clusters may already have kv.closed_timestamp.side_transport_interval set + // to >1m. This would cause rangefeeds to continually restart. We therefore + // conservatively use the highest value. + threshold := rangefeedRangeStuckThreshold.Get(&st.SV) + if threshold > 0 { + if t := time.Duration(math.Round( + 1.2 * float64(closedts.SideTransportCloseInterval.Get(&st.SV)))); t > threshold { + threshold = t + } + } + return threshold + } +} + // singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed // RPC call. Results will be sent on the provided channel. Returns the timestamp // of the maximum rangefeed checkpoint seen, which can be used to re-establish @@ -510,11 +644,9 @@ func (ds *DistSender) singleRangeFeed( ctx context.Context, span roachpb.Span, startAfter hlc.Timestamp, - withDiff bool, desc *roachpb.RangeDescriptor, catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- RangeFeedMessage, - streamProducerFactory rangeFeedEventProducerFactory, onRangeEvent onRangeEventCb, cfg rangeFeedConfig, ) (_ hlc.Timestamp, retErr error) { @@ -527,38 +659,8 @@ func (ds *DistSender) singleRangeFeed( cancelFeed() }() - admissionPri := admissionpb.BulkNormalPri - if cfg.overSystemTable { - admissionPri = admissionpb.NormalPri - } - args := kvpb.RangeFeedRequest{ - Span: span, - Header: kvpb.Header{ - Timestamp: startAfter, - RangeID: desc.RangeID, - }, - WithDiff: withDiff, - AdmissionHeader: kvpb.AdmissionHeader{ - // NB: AdmissionHeader is used only at the start of the range feed - // stream since the initial catch-up scan is expensive. - Priority: int32(admissionPri), - CreateTime: timeutil.Now().UnixNano(), - Source: kvpb.AdmissionHeader_FROM_SQL, - NoMemoryReservedAtSource: true, - }, - } - - var latencyFn LatencyFunc - if ds.rpcContext != nil { - latencyFn = ds.rpcContext.RemoteClocks.Latency - } - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) - if err != nil { - return args.Timestamp, err - } - replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality) - opts := SendOptions{class: connectionClass(&ds.st.SV)} - transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) + args := makeRangeFeedRequest(span, desc.RangeID, cfg.overSystemTable, startAfter, cfg.withDiff) + transport, err := newTransportForRange(ctx, desc, ds) if err != nil { return args.Timestamp, err } @@ -566,12 +668,11 @@ func (ds *DistSender) singleRangeFeed( // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take // opportunity to update semaphore limit. - ds.metrics.RangefeedCatchupRanges.Inc(1) - catchupSem.SetLimit(maxConcurrentCatchupScans(&ds.st.SV)) - catchupRes, err := catchupSem.Begin(ctx) + catchupRes, err := acquireCatchupScanQuota(ctx, ds, catchupSem) if err != nil { return hlc.Timestamp{}, err } + finishCatchupScan := func() { if catchupRes != nil { catchupRes.Release() @@ -582,20 +683,7 @@ func (ds *DistSender) singleRangeFeed( // cleanup catchup reservation in case of early termination. defer finishCatchupScan() - stuckWatcher := newStuckRangeFeedCanceler(cancelFeed, func() time.Duration { - // Before the introduction of kv.rangefeed.range_stuck_threshold = 1m, - // clusters may already have kv.closed_timestamp.side_transport_interval set - // to >1m. This would cause rangefeeds to continually restart. We therefore - // conservatively use the highest value. - threshold := rangefeedRangeStuckThreshold.Get(&ds.st.SV) - if threshold > 0 { - if t := time.Duration(math.Round( - 1.2 * float64(closedts.SideTransportCloseInterval.Get(&ds.st.SV)))); t > threshold { - threshold = t - } - } - return threshold - }) + stuckWatcher := newStuckRangeFeedCanceler(cancelFeed, defaultStuckRangeThreshold(ds.st)) defer stuckWatcher.stop() var streamCleanup func() @@ -610,8 +698,7 @@ func (ds *DistSender) singleRangeFeed( for { stuckWatcher.stop() // if timer is running from previous iteration, stop it now if transport.IsExhausted() { - return args.Timestamp, newSendError( - fmt.Sprintf("sending to all %d replicas failed", len(replicas))) + return args.Timestamp, newSendError("sending to all replicas failed") } maybeCleanupStream() @@ -629,9 +716,9 @@ func (ds *DistSender) singleRangeFeed( ctx = logtags.AddTag(ctx, "dest_s", args.Replica.StoreID) ctx = logtags.AddTag(ctx, "dest_r", args.RangeID) ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx) + streamCleanup = restore - var stream kvpb.RangeFeedEventProducer - stream, streamCleanup, err = streamProducerFactory(ctx, client, &args) + stream, err := client.RangeFeed(ctx, &args) if err != nil { restore() log.VErrEventf(ctx, 2, "RPC error: %s", err) @@ -641,13 +728,6 @@ func (ds *DistSender) singleRangeFeed( } continue } - { - origStreamCleanup := streamCleanup - streamCleanup = func() { - origStreamCleanup() - restore() - } - } var event *kvpb.RangeFeedEvent for { @@ -681,7 +761,6 @@ func (ds *DistSender) singleRangeFeed( // that timestamp. args.Timestamp.Forward(t.ResolvedTS) } - case *kvpb.RangeFeedSSTable: case *kvpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) if catchupRes != nil { @@ -714,22 +793,6 @@ func connectionClass(sv *settings.Values) rpc.ConnectionClass { return rpc.DefaultClass } -type rangeFeedEventProducerFactory func( - ctx context.Context, - client rpc.RestrictedInternalClient, - req *kvpb.RangeFeedRequest, -) (kvpb.RangeFeedEventProducer, func(), error) - -// legacyRangeFeedEventProducer is a rangeFeedEventProducerFactory using -// legacy RangeFeed RPC. -func legacyRangeFeedEventProducer( - ctx context.Context, client rpc.RestrictedInternalClient, req *kvpb.RangeFeedRequest, -) (producer kvpb.RangeFeedEventProducer, cleanup func(), err error) { - cleanup = func() {} - producer, err = client.RangeFeed(ctx, req) - return producer, cleanup, err -} - func (ds *DistSender) handleStuckEvent( args *kvpb.RangeFeedRequest, afterCatchupScan bool, threshold time.Duration, ) error { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index e0736a7e3aa8..f11cbf6c29d9 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache/rangecachemock" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvpb/kvpbmock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -128,13 +129,22 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { // returning a range descriptor and a client that immediately // cancels the context and closes the range feed stream. if spec.expectRetry { - rangeDB.EXPECT().FirstRange().Return(&desc, nil) + rangeDB.EXPECT().FirstRange().MinTimes(1).Return(&desc, nil) client := kvpbmock.NewMockInternalClient(ctrl) if useMuxRangeFeed { + recvCalled := make(chan struct{}) + sendCalled := make(chan struct{}) stream := kvpbmock.NewMockInternal_MuxRangeFeedClient(ctrl) - stream.EXPECT().Send(gomock.Any()).Return(nil) - stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + stream.EXPECT().Send(gomock.Any()).Do(func(r *kvpb.RangeFeedRequest) { + close(sendCalled) + <-recvCalled + cancel() + }).Return(nil) + stream.EXPECT().Recv().Do(func() { + close(recvCalled) + <-sendCalled + }).Return(nil, io.EOF) client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil) } else { stream := kvpbmock.NewMockInternal_RangeFeedClient(ctrl) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 3b6ecaf1369c..44954854ce0c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -204,7 +204,7 @@ func rangeFeed( opts = append(opts, kvcoord.WithMuxRangeFeed()) ctx = context.WithValue(ctx, useMuxRangeFeedCtxKey{}, struct{}{}) } - return ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, false, events, opts...) + return ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, events, opts...) }) g.GoCtx(func(ctx context.Context) error { for { @@ -552,7 +552,7 @@ func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) { g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error { defer close(events) - err := ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, false, events) + err := ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, events) t.Logf("from RangeFeed: %v", err) return err }) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go index 906fa4c005b1..1f1e18b8d28f 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -358,11 +358,11 @@ func TestBackoffOnRangefeedFailure(t *testing.T) { // Make sure rangefeed is retried even after 3 failures, then succeed and cancel context // (which signals the rangefeed to shut down gracefully). - db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Times(3). Return(errors.New("rangefeed failed")) - db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage, ...kvcoord.RangeFeedOption) { + db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(context.Context, []roachpb.Span, hlc.Timestamp, chan<- kvcoord.RangeFeedMessage, ...kvcoord.RangeFeedOption) { cancel() }). Return(nil)