From fdb53e97cc205149334dd3adbd1c445130912735 Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Thu, 13 Jun 2024 11:12:53 -0400 Subject: [PATCH 1/2] rpc: extend TestInternalClientAdapterRunsInterceptors to mux rangefeed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch adapts the test TestInternalClientAdapterRunsInterceptors to mux rangefeed so that we don’t lose test coverage when removing non-mux rangefeed code in the future commits. Part of: #125666 Release note: none --- pkg/rpc/context_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index d8c626b1d42a..d656ec18d9ee 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -510,8 +510,9 @@ func TestInternalClientAdapterRunsInterceptors(t *testing.T) { for i := 0; i < 2; i++ { serverStreamInterceptor1Called, serverStreamInterceptor2Called = false, false clientStreamInterceptor1Called, clientStreamInterceptor2Called = false, false - stream, err := lic.RangeFeed(ctx, &kvpb.RangeFeedRequest{}) + stream, err := lic.MuxRangeFeed(ctx) require.NoError(t, err) + require.NoError(t, stream.Send(&kvpb.RangeFeedRequest{})) _, err = stream.Recv() require.ErrorIs(t, err, io.EOF) require.True(t, clientStreamInterceptor1Called) From bedfa9b2a1398a4a7dc1e1d257d54a2f9fc76cba Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Thu, 13 Jun 2024 15:04:20 -0400 Subject: [PATCH 2/2] rangefeed: remove tests for non-mux rangefeed This patch removes tests that use non-mux rangefeed code (which will soonly be removed). Note that all non-mux rangefeed tests have corresponding mux rangefeed tests, so we are not losing test coverage here. Part of: #125666 Release note: none --- .../kvclient/kvcoord/dist_sender_rangefeed.go | 11 - .../dist_sender_rangefeed_mock_test.go | 229 ++++----- .../kvcoord/dist_sender_rangefeed_test.go | 472 ++++++++---------- pkg/kv/kvclient/rangefeed/BUILD.bazel | 1 - pkg/kv/kvclient/rangefeed/rangefeed.go | 6 - pkg/rpc/context_test.go | 214 ++++---- 6 files changed, 414 insertions(+), 519 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index fe3ee4d757c9..ff78c5e1232c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -107,17 +107,6 @@ type optionFunc func(*rangeFeedConfig) func (o optionFunc) set(c *rangeFeedConfig) { o(c) } -// WithoutMuxRangeFeed configures range feed to use legacy RangeFeed RPC. -// -// TODO(erikgrinaker): this should be removed when support for the legacy -// RangeFeed protocol is no longer needed in mixed-version clusters, and we -// don't need test coverage for it. -func WithoutMuxRangeFeed() RangeFeedOption { - return optionFunc(func(c *rangeFeedConfig) { - c.disableMuxRangeFeed = true - }) -} - // WithSystemTablePriority is used for system-internal rangefeeds, it uses a // higher admission priority during catch up scans. func WithSystemTablePriority() RangeFeedOption { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index acde280c77c4..ef14c6bfa131 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -13,7 +13,6 @@ package kvcoord import ( "context" "fmt" - "io" "testing" "github.com/cockroachdb/cockroach/pkg/gossip" @@ -44,126 +43,114 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - for _, useMuxRangeFeed := range []bool{false, true} { - for _, spec := range []struct { - errorCode codes.Code - expectRetry bool - }{ - {codes.FailedPrecondition, true}, // target node is decommissioned; retry - {codes.PermissionDenied, false}, // this node is decommissioned; abort - {codes.Unauthenticated, false}, // this node is not part of cluster; abort - } { - t.Run(fmt.Sprintf("mux=%t/%s", useMuxRangeFeed, spec.errorCode), - func(t *testing.T) { - clock := hlc.NewClockForTesting(nil) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - g := makeGossip(t, stopper, rpcContext) - - desc := roachpb.RangeDescriptor{ - RangeID: 1, - Generation: 1, - StartKey: roachpb.RKeyMin, - EndKey: roachpb.RKeyMax, - InternalReplicas: []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1, ReplicaID: 1}, - {NodeID: 2, StoreID: 2, ReplicaID: 2}, - }, - } - for _, repl := range desc.InternalReplicas { - require.NoError(t, g.AddInfoProto( - gossip.MakeNodeIDKey(repl.NodeID), - newNodeDesc(repl.NodeID), - gossip.NodeDescriptorTTL, - )) - } - - ctrl := gomock.NewController(t) - transport := NewMockTransport(ctrl) - rangeDB := rangecachemock.NewMockRangeDescriptorDB(ctrl) - - // We start off with a cached lease on r1. - cachedLease := roachpb.Lease{ - Replica: desc.InternalReplicas[0], - Sequence: 1, - } - - // All nodes return the specified error code. We expect the range feed to - // keep trying all replicas in sequence regardless of error. - for _, repl := range desc.InternalReplicas { - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(repl) - transport.EXPECT().NextInternalClient(gomock.Any()).Return( - nil, grpcstatus.Error(spec.errorCode, "")) - } - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // Once all replicas have failed, it should try to refresh the lease using - // the range cache. We let this succeed once. - rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).Return([]roachpb.RangeDescriptor{desc}, nil, nil) - - // It then tries the replicas again. This time we just report the - // transport as exhausted immediately. - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // This invalidates the cache yet again. This time we error. - rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).Return(nil, nil, grpcstatus.Error(spec.errorCode, "")) - - // If we expect a range lookup retry, allow the retry to succeed by - // returning a range descriptor and a client that immediately - // cancels the context and closes the range feed stream. - if spec.expectRetry { - rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).MinTimes(1).Return([]roachpb.RangeDescriptor{desc}, nil, nil) //.FirstRange().Return(&desc, nil) - client := kvpbmock.NewMockInternalClient(ctrl) - - if useMuxRangeFeed { - stream := kvpbmock.NewMockInternal_MuxRangeFeedClient(ctrl) - stream.EXPECT().Send(gomock.Any()).Return(nil) - stream.EXPECT().Recv().Do(func() { - cancel() - }).Return(nil, context.Canceled).AnyTimes() - client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil).AnyTimes() - } else { - stream := kvpbmock.NewMockInternal_RangeFeedClient(ctrl) - stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) - client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) - } - - transport.EXPECT().IsExhausted().Return(false).AnyTimes() - transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]).AnyTimes() - transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil).AnyTimes() - transport.EXPECT().Release().AnyTimes() - } - - ds := NewDistSender(DistSenderConfig{ - AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: clock, - NodeDescs: g, - RPCRetryOptions: &retry.Options{MaxRetries: 10}, - Stopper: stopper, - TransportFactory: func(SendOptions, ReplicaSlice) Transport { - return transport - }, - RangeDescriptorDB: rangeDB, - Settings: cluster.MakeTestingClusterSettings(), - }) - ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: desc, - Lease: cachedLease, - }) - - var opts []RangeFeedOption - if !useMuxRangeFeed { - opts = append(opts, WithoutMuxRangeFeed()) - } - err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, nil, opts...) - require.Error(t, err) + for _, spec := range []struct { + errorCode codes.Code + expectRetry bool + }{ + {codes.FailedPrecondition, true}, // target node is decommissioned; retry + {codes.PermissionDenied, false}, // this node is decommissioned; abort + {codes.Unauthenticated, false}, // this node is not part of cluster; abort + } { + t.Run(fmt.Sprintf("transport_error=%s", spec.errorCode), + func(t *testing.T) { + clock := hlc.NewClockForTesting(nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + for _, repl := range desc.InternalReplicas { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(repl.NodeID), + newNodeDesc(repl.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + ctrl := gomock.NewController(t) + transport := NewMockTransport(ctrl) + rangeDB := rangecachemock.NewMockRangeDescriptorDB(ctrl) + + // We start off with a cached lease on r1. + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[0], + Sequence: 1, + } + + // All nodes return the specified error code. We expect the range feed to + // keep trying all replicas in sequence regardless of error. + for _, repl := range desc.InternalReplicas { + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(repl) + transport.EXPECT().NextInternalClient(gomock.Any()).Return( + nil, grpcstatus.Error(spec.errorCode, "")) + } + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // Once all replicas have failed, it should try to refresh the lease using + // the range cache. We let this succeed once. + rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).Return([]roachpb.RangeDescriptor{desc}, nil, nil) + + // It then tries the replicas again. This time we just report the + // transport as exhausted immediately. + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // This invalidates the cache yet again. This time we error. + rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).Return(nil, nil, grpcstatus.Error(spec.errorCode, "")) + + // If we expect a range lookup retry, allow the retry to succeed by + // returning a range descriptor and a client that immediately + // cancels the context and closes the range feed stream. + if spec.expectRetry { + rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).MinTimes(1).Return([]roachpb.RangeDescriptor{desc}, nil, nil) //.FirstRange().Return(&desc, nil) + client := kvpbmock.NewMockInternalClient(ctrl) + + stream := kvpbmock.NewMockInternal_MuxRangeFeedClient(ctrl) + stream.EXPECT().Send(gomock.Any()).Return(nil) + stream.EXPECT().Recv().Do(func() { + cancel() + }).Return(nil, context.Canceled).AnyTimes() + client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil).AnyTimes() + + transport.EXPECT().IsExhausted().Return(false).AnyTimes() + transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]).AnyTimes() + transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil).AnyTimes() + transport.EXPECT().Release().AnyTimes() + } + + ds := NewDistSender(DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: g, + RPCRetryOptions: &retry.Options{MaxRetries: 10}, + Stopper: stopper, + TransportFactory: func(SendOptions, ReplicaSlice) Transport { + return transport + }, + RangeDescriptorDB: rangeDB, + Settings: cluster.MakeTestingClusterSettings(), }) - } + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, nil) + require.Error(t, err) + }) } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 8b1a81dc7a34..da0a5addfcf6 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -38,48 +38,28 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/sasha-s/go-deadlock" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) -type wrapRangeFeedClientFn func(client kvpb.Internal_RangeFeedClient) kvpb.Internal_RangeFeedClient type testRangefeedClient struct { rpc.RestrictedInternalClient - muxRangeFeedEnabled bool - count func() - wrapRangeFeedClient wrapRangeFeedClientFn + count func() } func (c *testRangefeedClient) RangeFeed( ctx context.Context, args *kvpb.RangeFeedRequest, opts ...grpc.CallOption, ) (kvpb.Internal_RangeFeedClient, error) { defer c.count() - - if c.muxRangeFeedEnabled && ctx.Value(useMuxRangeFeedCtxKey{}) != nil { - panic(errors.AssertionFailedf("unexpected call to RangeFeed")) - } - - rfClient, err := c.RestrictedInternalClient.RangeFeed(ctx, args, opts...) - if err != nil { - return nil, err - } - if c.wrapRangeFeedClient == nil { - return rfClient, nil - } - return c.wrapRangeFeedClient(rfClient), nil + panic(errors.AssertionFailedf("unexpected call to RangeFeed")) } func (c *testRangefeedClient) MuxRangeFeed( ctx context.Context, opts ...grpc.CallOption, ) (kvpb.Internal_MuxRangeFeedClient, error) { defer c.count() - - if !c.muxRangeFeedEnabled || ctx.Value(useMuxRangeFeedCtxKey{}) == nil { - panic(errors.AssertionFailedf("unexpected call to MuxRangeFeed")) - } return c.RestrictedInternalClient.MuxRangeFeed(ctx, opts...) } @@ -99,15 +79,12 @@ func (c *internalClientCounts) Inc(ic rpc.RestrictedInternalClient) { type countConnectionsTransport struct { kvcoord.Transport - counts *internalClientCounts - wrapRangeFeedClient wrapRangeFeedClientFn - rfStreamEnabled bool + counts *internalClientCounts } var _ kvcoord.Transport = (*countConnectionsTransport)(nil) type testFeedCtxKey struct{} -type useMuxRangeFeedCtxKey struct{} func (c *countConnectionsTransport) NextInternalClient( ctx context.Context, @@ -124,8 +101,6 @@ func (c *countConnectionsTransport) NextInternalClient( tc := &testRangefeedClient{ RestrictedInternalClient: client, - muxRangeFeedEnabled: c.rfStreamEnabled, - wrapRangeFeedClient: c.wrapRangeFeedClient, } tc.count = func() { @@ -138,16 +113,14 @@ func (c *countConnectionsTransport) NextInternalClient( } func makeTransportFactory( - rfStreamEnabled bool, counts *internalClientCounts, wrapFn wrapRangeFeedClientFn, + counts *internalClientCounts, ) func(kvcoord.TransportFactory) kvcoord.TransportFactory { return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) kvcoord.Transport { transport := factory(options, slice) countingTransport := &countConnectionsTransport{ - Transport: transport, - rfStreamEnabled: rfStreamEnabled, - counts: counts, - wrapRangeFeedClient: wrapFn, + Transport: transport, + counts: counts, } return countingTransport } @@ -163,7 +136,6 @@ func rangeFeed( sp roachpb.Span, startFrom hlc.Timestamp, onValue func(event kvcoord.RangeFeedMessage), - useMuxRangeFeed bool, opts ...kvcoord.RangeFeedOption, ) func() { ds := dsI.(*kvcoord.DistSender) @@ -172,11 +144,6 @@ func rangeFeed( g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) (err error) { - if useMuxRangeFeed { - ctx = context.WithValue(ctx, useMuxRangeFeedCtxKey{}, struct{}{}) - } else { - opts = append(opts, kvcoord.WithoutMuxRangeFeed()) - } return ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, events, opts...) }) g.GoCtx(func(ctx context.Context) error { @@ -246,7 +213,7 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) { ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ KVClient: &kvcoord.ClientTestingKnobs{ - TransportFactory: makeTransportFactory(true, connCounts, nil), + TransportFactory: makeTransportFactory(connCounts), }, }, }, @@ -282,7 +249,7 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) { fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) allSeen, onValue := observeNValues(1000) - closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, true) + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue) defer closeFeed() channelWaitWithTimeout(t, allSeen) closeFeed() // Explicitly shutdown the feed to make sure counters no longer change. @@ -333,7 +300,7 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) { const numErrsToReturn = 100 var numErrors atomic.Int32 enoughErrors := make(chan struct{}) - closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, noValuesExpected, true, + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, noValuesExpected, kvcoord.TestingWithOnRangefeedEvent( func(_ context.Context, _ roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) { *event = transientErrEvent @@ -375,143 +342,132 @@ func TestRangeFeedMetricsManagement(t *testing.T) { ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) - testutils.RunTrueAndFalse(t, "mux", func(t *testing.T, useMux bool) { - metrics := kvcoord.TestingMakeRangeFeedMetrics() - - // Number of ranges for which we'll issue transient error. - const numRangesToRetry int64 = 3 - // Number of ranges which we will block from completion. - const numCatchupToBlock int64 = 2 - - // Upon shutdown, make sure the metrics have correct values. - defer func() { - require.EqualValues(t, 0, metrics.RangefeedRanges.Value()) - require.EqualValues(t, 0, metrics.RangefeedLocalRanges.Value()) - - // We injected numRangesToRetry transient errors during catchup scan. - // It is possible however, that we will observe key-mismatch error when restarting - // due to how we split the ranges above (i.e. there is a version of the range - // that goes from e.g. 800-Max, and then there is correct version 800-900). - // When iterating through the entire table span, we pick up correct version. - // However, if we attempt to re-resolve single range, we may get incorrect/old - // version that was cached. Thus, we occasionally see additional transient restarts. - require.GreaterOrEqual(t, metrics.Errors.RangefeedErrorCatchup.Count(), numRangesToRetry) - require.GreaterOrEqual(t, metrics.Errors.RangefeedRestartRanges.Count(), numRangesToRetry) - - // Even though numCatchupToBlock ranges were blocked in the catchup scan phase, - // the counter should be 0 once rangefeed is done. - require.EqualValues(t, 0, metrics.RangefeedCatchupRanges.Value()) - - }() - - frontier, err := span.MakeFrontier(fooSpan) - require.NoError(t, err) - frontier = span.MakeConcurrentFrontier(frontier) - - // This error causes rangefeed to restart. - transientErrEvent := kvpb.RangeFeedEvent{ - Error: &kvpb.RangeFeedError{Error: *kvpb.NewError(&kvpb.StoreNotFoundError{})}, - } + metrics := kvcoord.TestingMakeRangeFeedMetrics() - var numRetried atomic.Int64 - var numCatchupBlocked atomic.Int64 - skipSet := struct { - syncutil.Mutex - stuck roachpb.SpanGroup // Spans that are stuck in catchup scan. - retry roachpb.SpanGroup // Spans we issued retry for. - }{} - const kindRetry = true - const kindStuck = false - shouldSkip := func(k roachpb.Key, kind bool) bool { - skipSet.Lock() - defer skipSet.Unlock() - if kind == kindRetry { - return skipSet.retry.Contains(k) - } - return skipSet.stuck.Contains(k) + // Number of ranges for which we'll issue transient error. + const numRangesToRetry int64 = 3 + // Number of ranges which we will block from completion. + const numCatchupToBlock int64 = 2 + + // Upon shutdown, make sure the metrics have correct values. + defer func() { + require.EqualValues(t, 0, metrics.RangefeedRanges.Value()) + require.EqualValues(t, 0, metrics.RangefeedLocalRanges.Value()) + + // We injected numRangesToRetry transient errors during catchup scan. + // It is possible however, that we will observe key-mismatch error when restarting + // due to how we split the ranges above (i.e. there is a version of the range + // that goes from e.g. 800-Max, and then there is correct version 800-900). + // When iterating through the entire table span, we pick up correct version. + // However, if we attempt to re-resolve single range, we may get incorrect/old + // version that was cached. Thus, we occasionally see additional transient restarts. + require.GreaterOrEqual(t, metrics.Errors.RangefeedErrorCatchup.Count(), numRangesToRetry) + require.GreaterOrEqual(t, metrics.Errors.RangefeedRestartRanges.Count(), numRangesToRetry) + + // Even though numCatchupToBlock ranges were blocked in the catchup scan phase, + // the counter should be 0 once rangefeed is done. + require.EqualValues(t, 0, metrics.RangefeedCatchupRanges.Value()) + + }() + + frontier, err := span.MakeFrontier(fooSpan) + require.NoError(t, err) + frontier = span.MakeConcurrentFrontier(frontier) + + // This error causes rangefeed to restart. + transientErrEvent := kvpb.RangeFeedEvent{ + Error: &kvpb.RangeFeedError{Error: *kvpb.NewError(&kvpb.StoreNotFoundError{})}, + } + + var numRetried atomic.Int64 + var numCatchupBlocked atomic.Int64 + skipSet := struct { + syncutil.Mutex + stuck roachpb.SpanGroup // Spans that are stuck in catchup scan. + retry roachpb.SpanGroup // Spans we issued retry for. + }{} + const kindRetry = true + const kindStuck = false + shouldSkip := func(k roachpb.Key, kind bool) bool { + skipSet.Lock() + defer skipSet.Unlock() + if kind == kindRetry { + return skipSet.retry.Contains(k) } + return skipSet.stuck.Contains(k) + } + + ignoreValues := func(event kvcoord.RangeFeedMessage) {} + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, ignoreValues, + kvcoord.TestingWithRangeFeedMetrics(&metrics), + kvcoord.TestingWithOnRangefeedEvent( + func(ctx context.Context, s roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) { + switch t := event.GetValue().(type) { + case *kvpb.RangeFeedValue: + // If we previously arranged for the range to be skipped (stuck catchup scan), + // then skip any value that belongs to the skipped range. + // This is only needed for mux rangefeed, since regular rangefeed just blocks. + return shouldSkip(t.Key, kindStuck), nil + case *kvpb.RangeFeedCheckpoint: + if checkpoint := t; checkpoint.Span.Contains(s) { + if checkpoint.ResolvedTS.IsEmpty() { + return true, nil + } + + // Skip any subsequent checkpoint if we previously arranged for + // range to be skipped. + if shouldSkip(checkpoint.Span.Key, kindStuck) { + return true, nil + } + + if !shouldSkip(checkpoint.Span.Key, kindRetry) && numRetried.Add(1) <= numRangesToRetry { + // Return transient error for this range, but do this only once per range. + skipSet.Lock() + skipSet.retry.Add(checkpoint.Span) + skipSet.Unlock() + log.Infof(ctx, "skipping span %s", checkpoint.Span) + *event = transientErrEvent + return false, nil + } - ignoreValues := func(event kvcoord.RangeFeedMessage) {} - closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, ignoreValues, useMux, - kvcoord.TestingWithRangeFeedMetrics(&metrics), - kvcoord.TestingWithOnRangefeedEvent( - func(ctx context.Context, s roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) { - switch t := event.GetValue().(type) { - case *kvpb.RangeFeedValue: - // If we previously arranged for the range to be skipped (stuck catchup scan), - // then skip any value that belongs to the skipped range. - // This is only needed for mux rangefeed, since regular rangefeed just blocks. - return useMux && shouldSkip(t.Key, kindStuck), nil - case *kvpb.RangeFeedCheckpoint: - if checkpoint := t; checkpoint.Span.Contains(s) { - if checkpoint.ResolvedTS.IsEmpty() { - return true, nil - } - - // Skip any subsequent checkpoint if we previously arranged for - // range to be skipped. - if useMux && shouldSkip(checkpoint.Span.Key, kindStuck) { - return true, nil - } - - if !shouldSkip(checkpoint.Span.Key, kindRetry) && numRetried.Add(1) <= numRangesToRetry { - // Return transient error for this range, but do this only once per range. - skipSet.Lock() - skipSet.retry.Add(checkpoint.Span) - skipSet.Unlock() - log.Infof(ctx, "skipping span %s", checkpoint.Span) - *event = transientErrEvent - return false, nil - } - - _, err := frontier.Forward(checkpoint.Span, checkpoint.ResolvedTS) - if err != nil { - return false, err - } - - if numCatchupBlocked.Add(1) <= numCatchupToBlock { - if useMux { - // Mux rangefeed can't block single range, so just skip this event - // and arrange for other events belonging to this range to be skipped as well. - skipSet.Lock() - skipSet.stuck.Add(checkpoint.Span) - skipSet.Unlock() - log.Infof(ctx, "skipping stuck span %s", checkpoint.Span) - return true /* skip */, nil - } - - // Regular rangefeed can block to prevent catchup completion until rangefeed is canceled. - return false, timeutil.RunWithTimeout(ctx, "wait-rf-timeout", time.Minute, - func(ctx context.Context) error { - <-ctx.Done() - return ctx.Err() - }) - } + _, err := frontier.Forward(checkpoint.Span, checkpoint.ResolvedTS) + if err != nil { + return false, err + } + + if numCatchupBlocked.Add(1) <= numCatchupToBlock { + // Mux rangefeed can't block single range, so just skip this event + // and arrange for other events belonging to this range to be skipped as well. + skipSet.Lock() + skipSet.stuck.Add(checkpoint.Span) + skipSet.Unlock() + log.Infof(ctx, "skipping stuck span %s", checkpoint.Span) + return true /* skip */, nil } } + } - return false, nil - })) - defer closeFeed() + return false, nil + })) + defer closeFeed() - // Wait for the test frontier to advance. Once it advances, - // we know the rangefeed is started, all ranges are running (even if some of them are blocked). - testutils.SucceedsWithin(t, func() error { - if frontier.Frontier().IsEmpty() { - return errors.Newf("waiting for frontier advance: %s", frontier.String()) - } - return nil - }, 10*time.Second) + // Wait for the test frontier to advance. Once it advances, + // we know the rangefeed is started, all ranges are running (even if some of them are blocked). + testutils.SucceedsWithin(t, func() error { + if frontier.Frontier().IsEmpty() { + return errors.Newf("waiting for frontier advance: %s", frontier.String()) + } + return nil + }, 10*time.Second) - // At this point, we know the rangefeed for all ranges are running. - require.EqualValues(t, numRanges, metrics.RangefeedRanges.Value(), frontier.String()) + // At this point, we know the rangefeed for all ranges are running. + require.EqualValues(t, numRanges, metrics.RangefeedRanges.Value(), frontier.String()) - // All ranges expected to be local. - require.EqualValues(t, numRanges, metrics.RangefeedLocalRanges.Value(), frontier.String()) + // All ranges expected to be local. + require.EqualValues(t, numRanges, metrics.RangefeedLocalRanges.Value(), frontier.String()) - // We also know that we have blocked numCatchupToBlock ranges in their catchup scan. - require.EqualValues(t, numCatchupToBlock, metrics.RangefeedCatchupRanges.Value()) - }) + // We also know that we have blocked numCatchupToBlock ranges in their catchup scan. + require.EqualValues(t, numCatchupToBlock, metrics.RangefeedCatchupRanges.Value()) } // TestRangefeedRangeObserver ensures the kvcoord.WithRangeObserver option @@ -530,99 +486,97 @@ func TestRangefeedRangeObserver(t *testing.T) { kvserver.RangefeedEnabled.Override( context.Background(), &ts.ClusterSettings().SV, true) - testutils.RunTrueAndFalse(t, "mux", func(t *testing.T, useMux bool) { - sqlDB.ExecMultiple(t, - `CREATE TABLE foo (key INT PRIMARY KEY)`, - `INSERT INTO foo (key) SELECT * FROM generate_series(1, 4)`, - `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 4, 1))`, - ) - defer func() { - sqlDB.Exec(t, `DROP TABLE foo`) - }() - - fooDesc := desctestutils.TestingGetPublicTableDescriptor( - ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") - fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) - - ignoreValues := func(event kvcoord.RangeFeedMessage) {} - - // Set up an observer to continuously poll for the list of ranges - // being watched. - var observedRangesMu syncutil.Mutex - observedRanges := make(map[string]struct{}) - ctx2, cancel := context.WithCancel(context.Background()) - g := ctxgroup.WithContext(ctx2) - defer func() { - cancel() - err := g.Wait() - // Ensure the observer goroutine terminates gracefully via context cancellation. - require.True(t, testutils.IsError(err, "context canceled")) - }() - observer := func(fn kvcoord.ForEachRangeFn) { - g.GoCtx(func(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(200 * time.Millisecond): - } - observedRangesMu.Lock() - observedRanges = make(map[string]struct{}) - err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error { - observedRanges[feed.Span.String()] = struct{}{} - return nil - }) - observedRangesMu.Unlock() - if err != nil { - return err - } - } - }) - } + sqlDB.ExecMultiple(t, + `CREATE TABLE foo (key INT PRIMARY KEY)`, + `INSERT INTO foo (key) SELECT * FROM generate_series(1, 4)`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 4, 1))`, + ) + defer func() { + sqlDB.Exec(t, `DROP TABLE foo`) + }() - closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, useMux, - kvcoord.WithRangeObserver(observer)) - defer closeFeed() + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) - makeSpan := func(suffix string) string { - return fmt.Sprintf("/Table/%d/%s", fooDesc.GetID(), suffix) - } + ignoreValues := func(event kvcoord.RangeFeedMessage) {} - // The initial set of ranges we expect to observe. - expectedRanges := map[string]struct{}{ - makeSpan("1{-/1}"): {}, - makeSpan("1/{1-2}"): {}, - makeSpan("1/{2-3}"): {}, - makeSpan("1/{3-4}"): {}, - makeSpan("{1/4-2}"): {}, - } - checkExpectedRanges := func() { - testutils.SucceedsWithin(t, func() error { + // Set up an observer to continuously poll for the list of ranges + // being watched. + var observedRangesMu syncutil.Mutex + observedRanges := make(map[string]struct{}) + ctx2, cancel := context.WithCancel(context.Background()) + g := ctxgroup.WithContext(ctx2) + defer func() { + cancel() + err := g.Wait() + // Ensure the observer goroutine terminates gracefully via context cancellation. + require.True(t, testutils.IsError(err, "context canceled")) + }() + observer := func(fn kvcoord.ForEachRangeFn) { + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(200 * time.Millisecond): + } observedRangesMu.Lock() - defer observedRangesMu.Unlock() - if !reflect.DeepEqual(observedRanges, expectedRanges) { - return errors.Newf("expected ranges %v, but got %v", expectedRanges, observedRanges) + observedRanges = make(map[string]struct{}) + err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error { + observedRanges[feed.Span.String()] = struct{}{} + return nil + }) + observedRangesMu.Unlock() + if err != nil { + return err } - return nil - }, 10*time.Second) - } - checkExpectedRanges() - - // Add another range and ensure we can observe it. - sqlDB.ExecMultiple(t, - `INSERT INTO FOO VALUES(5)`, - `ALTER TABLE foo SPLIT AT VALUES(5)`, - ) - expectedRanges = map[string]struct{}{ - makeSpan("1{-/1}"): {}, - makeSpan("1/{1-2}"): {}, - makeSpan("1/{2-3}"): {}, - makeSpan("1/{3-4}"): {}, - makeSpan("1/{4-5}"): {}, - makeSpan("{1/5-2}"): {}, - } - checkExpectedRanges() - }) + } + }) + } + + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, + kvcoord.WithRangeObserver(observer)) + defer closeFeed() + + makeSpan := func(suffix string) string { + return fmt.Sprintf("/Table/%d/%s", fooDesc.GetID(), suffix) + } + + // The initial set of ranges we expect to observe. + expectedRanges := map[string]struct{}{ + makeSpan("1{-/1}"): {}, + makeSpan("1/{1-2}"): {}, + makeSpan("1/{2-3}"): {}, + makeSpan("1/{3-4}"): {}, + makeSpan("{1/4-2}"): {}, + } + checkExpectedRanges := func() { + testutils.SucceedsWithin(t, func() error { + observedRangesMu.Lock() + defer observedRangesMu.Unlock() + if !reflect.DeepEqual(observedRanges, expectedRanges) { + return errors.Newf("expected ranges %v, but got %v", expectedRanges, observedRanges) + } + return nil + }, 10*time.Second) + } + checkExpectedRanges() + + // Add another range and ensure we can observe it. + sqlDB.ExecMultiple(t, + `INSERT INTO FOO VALUES(5)`, + `ALTER TABLE foo SPLIT AT VALUES(5)`, + ) + expectedRanges = map[string]struct{}{ + makeSpan("1{-/1}"): {}, + makeSpan("1/{1-2}"): {}, + makeSpan("1/{2-3}"): {}, + makeSpan("1/{3-4}"): {}, + makeSpan("1/{4-5}"): {}, + makeSpan("{1/5-2}"): {}, + } + checkExpectedRanges() } // TestMuxRangeFeedCanCloseStream verifies stream termination functionality in mux rangefeed. @@ -676,7 +630,7 @@ func TestMuxRangeFeedCanCloseStream(t *testing.T) { ignoreValues := func(event kvcoord.RangeFeedMessage) {} var numRestartStreams atomic.Int32 - closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, true, + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, kvcoord.TestingWithMuxRangeFeedRequestSenderCapture( // We expect a single mux sender since we have 1 node in this test. func(nodeID roachpb.NodeID, capture func(request *kvpb.RangeFeedRequest) error) { @@ -805,7 +759,7 @@ func TestMuxRangeFeedDoesNotDeadlockWithLocalStreams(t *testing.T) { fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) allSeen, onValue := observeNValues(1000) - closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startFrom, onValue, true, + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startFrom, onValue, kvcoord.TestingWithBeforeSendRequest(func() { // Prior to sending rangefeed request, block for just a bit // to make deadlock more likely. diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 6d8c8a5234bc..54c320b59746 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -26,7 +26,6 @@ go_library( "//pkg/util/hlc", "//pkg/util/limit", "//pkg/util/log", - "//pkg/util/metamorphic", "//pkg/util/mon", "//pkg/util/retry", "//pkg/util/span", diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index d2ab9d474781..0f203d2fee40 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -28,7 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -306,8 +305,6 @@ func (f *RangeFeed) Close() { // will be reset. const resetThreshold = 30 * time.Second -var useMuxRangeFeed = metamorphic.ConstantWithTestBool("use-mux-rangefeed", true) - // run will run the RangeFeed until the context is canceled or if the client // indicates that an initial scan error is non-recoverable. The // resumeWithFrontier arg enables the client to resume the rangefeed using the @@ -345,9 +342,6 @@ func (f *RangeFeed) run(ctx context.Context, frontier span.Frontier, resumeWithF if f.scanConfig.overSystemTable { rangefeedOpts = append(rangefeedOpts, kvcoord.WithSystemTablePriority()) } - if !useMuxRangeFeed { - rangefeedOpts = append(rangefeedOpts, kvcoord.WithoutMuxRangeFeed()) - } if f.withDiff { rangefeedOpts = append(rangefeedOpts, kvcoord.WithDiff()) } diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index d656ec18d9ee..183ad07a3955 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -541,60 +541,48 @@ func TestInternalClientAdapterWithClientStreamInterceptors(t *testing.T) { _ /* server */, serverInterceptors, err := NewServerEx(ctx, serverCtx) require.NoError(t, err) + var clientInterceptors ClientInterceptorInfo + var s *testClientStream + clientInterceptors.StreamInterceptors = append(clientInterceptors.StreamInterceptors, + func( + ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, + method string, streamer grpc.Streamer, opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + clientStream, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + return nil, err + } + s = &testClientStream{inner: clientStream} + return s, nil + }) - testutils.RunTrueAndFalse(t, "use_mux_rangefeed", func(t *testing.T, useMux bool) { - var clientInterceptors ClientInterceptorInfo - var s *testClientStream - clientInterceptors.StreamInterceptors = append(clientInterceptors.StreamInterceptors, - func( - ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, - method string, streamer grpc.Streamer, opts ...grpc.CallOption, - ) (grpc.ClientStream, error) { - clientStream, err := streamer(ctx, desc, cc, method, opts...) - if err != nil { - return nil, err - } - s = &testClientStream{inner: clientStream} - return s, nil - }) + internal := &internalServer{rangeFeedEvents: []kvpb.RangeFeedEvent{{}, {}}} + serverCtx.SetLocalInternalServer( + internal, + serverInterceptors, clientInterceptors) + ic := serverCtx.GetLocalInternalClientForAddr(1) + lic, ok := ic.(internalClientAdapter) + require.True(t, ok) + require.Equal(t, internal, lic.server) - internal := &internalServer{rangeFeedEvents: []kvpb.RangeFeedEvent{{}, {}}} - serverCtx.SetLocalInternalServer( - internal, - serverInterceptors, clientInterceptors) - ic := serverCtx.GetLocalInternalClientForAddr(1) - lic, ok := ic.(internalClientAdapter) - require.True(t, ok) - require.Equal(t, internal, lic.server) - - var receiveEvent func() error - if useMux { - stream, err := lic.MuxRangeFeed(ctx) - require.NoError(t, err) - require.NoError(t, stream.Send(&kvpb.RangeFeedRequest{})) - receiveEvent = func() error { - e, err := stream.Recv() - _ = e - return err - } - } else { - stream, err := lic.RangeFeed(ctx, &kvpb.RangeFeedRequest{}) - require.NoError(t, err) - receiveEvent = func() error { - _, err := stream.Recv() - return err - } - } - // Consume the stream. - for { - err := receiveEvent() - if err == io.EOF { - break - } - require.NoError(t, err) + var receiveEvent func() error + stream, err := lic.MuxRangeFeed(ctx) + require.NoError(t, err) + require.NoError(t, stream.Send(&kvpb.RangeFeedRequest{})) + receiveEvent = func() error { + e, err := stream.Recv() + _ = e + return err + } + // Consume the stream. + for { + err := receiveEvent() + if err == io.EOF { + break } - require.Equal(t, len(internal.rangeFeedEvents)+1, s.recvCount) - }) + require.NoError(t, err) + } + require.Equal(t, len(internal.rangeFeedEvents)+1, s.recvCount) } // Test that a server stream interceptor can wrap the ServerStream when the @@ -617,81 +605,65 @@ func TestInternalClientAdapterWithServerStreamInterceptors(t *testing.T) { _ /* server */, serverInterceptors, err := NewServerEx(ctx, serverCtx) require.NoError(t, err) - testutils.RunTrueAndFalse(t, "use_mux_rangefeed", func(t *testing.T, useMux bool) { - const int1Name = "interceptor 1" - serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, - func( - srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, - ) error { - serverStream := &testServerStream{name: "interceptor 1", inner: ss} - return handler(srv, serverStream) - }) - var secondInterceptorWrapped grpc.ServerStream - const int2Name = "interceptor 2" - serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, - func( - srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, - ) error { - secondInterceptorWrapped = ss - serverStream := &testServerStream{name: int2Name, inner: ss} - return handler(srv, serverStream) - }) + const int1Name = "interceptor 1" + serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, + func( + srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, + ) error { + serverStream := &testServerStream{name: "interceptor 1", inner: ss} + return handler(srv, serverStream) + }) + var secondInterceptorWrapped grpc.ServerStream + const int2Name = "interceptor 2" + serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, + func( + srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, + ) error { + secondInterceptorWrapped = ss + serverStream := &testServerStream{name: int2Name, inner: ss} + return handler(srv, serverStream) + }) - internal := &internalServer{rangeFeedEvents: []kvpb.RangeFeedEvent{{}, {}}} - serverCtx.SetLocalInternalServer( - internal, - serverInterceptors, ClientInterceptorInfo{}) - ic := serverCtx.GetLocalInternalClientForAddr(1) - lic, ok := ic.(internalClientAdapter) - require.True(t, ok) - require.Equal(t, internal, lic.server) - - var receiveEvent func() error - if useMux { - stream, err := lic.MuxRangeFeed(ctx) - require.NoError(t, err) - require.NoError(t, stream.Send(&kvpb.RangeFeedRequest{})) - receiveEvent = func() error { - _, err := stream.Recv() - return err - } - } else { - stream, err := lic.RangeFeed(ctx, &kvpb.RangeFeedRequest{}) - require.NoError(t, err) - receiveEvent = func() error { - _, err := stream.Recv() - return err - } - } + internal := &internalServer{rangeFeedEvents: []kvpb.RangeFeedEvent{{}, {}}} + serverCtx.SetLocalInternalServer( + internal, + serverInterceptors, ClientInterceptorInfo{}) + ic := serverCtx.GetLocalInternalClientForAddr(1) + lic, ok := ic.(internalClientAdapter) + require.True(t, ok) + require.Equal(t, internal, lic.server) - // Consume the stream. This will synchronize with the server RPC handler - // goroutine, ensuring that the server-side interceptors run. - for { - err := receiveEvent() - if err == io.EOF { - break - } - require.NoError(t, err) + var receiveEvent func() error + stream, err := lic.MuxRangeFeed(ctx) + require.NoError(t, err) + require.NoError(t, stream.Send(&kvpb.RangeFeedRequest{})) + receiveEvent = func() error { + _, err := stream.Recv() + return err + } + + // Consume the stream. This will synchronize with the server RPC handler + // goroutine, ensuring that the server-side interceptors run. + for { + err := receiveEvent() + if err == io.EOF { + break } + require.NoError(t, err) + } - require.IsType(t, &testServerStream{}, secondInterceptorWrapped) + require.IsType(t, &testServerStream{}, secondInterceptorWrapped) - require.Equal(t, int1Name, secondInterceptorWrapped.(*testServerStream).name) - var ss grpc.ServerStream - if useMux { - require.IsType(t, muxRangeFeedServerAdapter{}, internal.muxRfServerStream) - ss = internal.muxRfServerStream.(muxRangeFeedServerAdapter).ServerStream - } else { - require.IsType(t, rangeFeedServerAdapter{}, internal.rfServerStream) - ss = internal.rfServerStream.(rangeFeedServerAdapter).ServerStream - } - require.IsType(t, &testServerStream{}, ss) - topStream := ss.(*testServerStream) - require.Equal(t, int2Name, topStream.name) - require.IsType(t, &testServerStream{}, topStream.inner) - bottomStream := topStream.inner.(*testServerStream) - require.Equal(t, int1Name, bottomStream.name) - }) + require.Equal(t, int1Name, secondInterceptorWrapped.(*testServerStream).name) + var ss grpc.ServerStream + require.IsType(t, muxRangeFeedServerAdapter{}, internal.muxRfServerStream) + ss = internal.muxRfServerStream.(muxRangeFeedServerAdapter).ServerStream + require.IsType(t, &testServerStream{}, ss) + topStream := ss.(*testServerStream) + require.Equal(t, int2Name, topStream.name) + require.IsType(t, &testServerStream{}, topStream.inner) + bottomStream := topStream.inner.(*testServerStream) + require.Equal(t, int1Name, bottomStream.name) } type testClientStream struct {