Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
125665: rangefeed: remove tests for non-mux rangefeed r=nvanbenschoten a=wenyihu6

**rpc: extend TestInternalClientAdapterRunsInterceptors to mux rangefeed**

This patch adapts the test TestInternalClientAdapterRunsInterceptors to mux
rangefeed so that we don’t lose test coverage when removing non-mux rangefeed
code in the future commits.

Part of: cockroachdb#125666
Release note: none

---

**rangefeed: remove tests for non-mux rangefeed**

This patch removes tests that use non-mux rangefeed code (which will soonly be
removed). Note that all non-mux rangefeed tests have corresponding mux rangefeed
tests, so we are not losing test coverage here.

Part of: cockroachdb#125666
Release note: none

Co-authored-by: Wenyi Hu <wenyi@cockroachlabs.com>
  • Loading branch information
craig[bot] and wenyihu6 committed Jun 14, 2024
2 parents a24c4f3 + bedfa9b commit 7426c8e
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 520 deletions.
11 changes: 0 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,6 @@ type optionFunc func(*rangeFeedConfig)

func (o optionFunc) set(c *rangeFeedConfig) { o(c) }

// WithoutMuxRangeFeed configures range feed to use legacy RangeFeed RPC.
//
// TODO(erikgrinaker): this should be removed when support for the legacy
// RangeFeed protocol is no longer needed in mixed-version clusters, and we
// don't need test coverage for it.
func WithoutMuxRangeFeed() RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.disableMuxRangeFeed = true
})
}

// WithSystemTablePriority is used for system-internal rangefeeds, it uses a
// higher admission priority during catch up scans.
func WithSystemTablePriority() RangeFeedOption {
Expand Down
229 changes: 108 additions & 121 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package kvcoord
import (
"context"
"fmt"
"io"
"testing"

"github.com/cockroachdb/cockroach/pkg/gossip"
Expand Down Expand Up @@ -44,126 +43,114 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, useMuxRangeFeed := range []bool{false, true} {
for _, spec := range []struct {
errorCode codes.Code
expectRetry bool
}{
{codes.FailedPrecondition, true}, // target node is decommissioned; retry
{codes.PermissionDenied, false}, // this node is decommissioned; abort
{codes.Unauthenticated, false}, // this node is not part of cluster; abort
} {
t.Run(fmt.Sprintf("mux=%t/%s", useMuxRangeFeed, spec.errorCode),
func(t *testing.T) {
clock := hlc.NewClockForTesting(nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)

desc := roachpb.RangeDescriptor{
RangeID: 1,
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
},
}
for _, repl := range desc.InternalReplicas {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(repl.NodeID),
newNodeDesc(repl.NodeID),
gossip.NodeDescriptorTTL,
))
}

ctrl := gomock.NewController(t)
transport := NewMockTransport(ctrl)
rangeDB := rangecachemock.NewMockRangeDescriptorDB(ctrl)

// We start off with a cached lease on r1.
cachedLease := roachpb.Lease{
Replica: desc.InternalReplicas[0],
Sequence: 1,
}

// All nodes return the specified error code. We expect the range feed to
// keep trying all replicas in sequence regardless of error.
for _, repl := range desc.InternalReplicas {
transport.EXPECT().IsExhausted().Return(false)
transport.EXPECT().NextReplica().Return(repl)
transport.EXPECT().NextInternalClient(gomock.Any()).Return(
nil, grpcstatus.Error(spec.errorCode, ""))
}
transport.EXPECT().IsExhausted().Return(true)
transport.EXPECT().Release()

// Once all replicas have failed, it should try to refresh the lease using
// the range cache. We let this succeed once.
rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).Return([]roachpb.RangeDescriptor{desc}, nil, nil)

// It then tries the replicas again. This time we just report the
// transport as exhausted immediately.
transport.EXPECT().IsExhausted().Return(true)
transport.EXPECT().Release()

// This invalidates the cache yet again. This time we error.
rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).Return(nil, nil, grpcstatus.Error(spec.errorCode, ""))

// If we expect a range lookup retry, allow the retry to succeed by
// returning a range descriptor and a client that immediately
// cancels the context and closes the range feed stream.
if spec.expectRetry {
rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).MinTimes(1).Return([]roachpb.RangeDescriptor{desc}, nil, nil) //.FirstRange().Return(&desc, nil)
client := kvpbmock.NewMockInternalClient(ctrl)

if useMuxRangeFeed {
stream := kvpbmock.NewMockInternal_MuxRangeFeedClient(ctrl)
stream.EXPECT().Send(gomock.Any()).Return(nil)
stream.EXPECT().Recv().Do(func() {
cancel()
}).Return(nil, context.Canceled).AnyTimes()
client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil).AnyTimes()
} else {
stream := kvpbmock.NewMockInternal_RangeFeedClient(ctrl)
stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF)
client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil)
}

transport.EXPECT().IsExhausted().Return(false).AnyTimes()
transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]).AnyTimes()
transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil).AnyTimes()
transport.EXPECT().Release().AnyTimes()
}

ds := NewDistSender(DistSenderConfig{
AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: clock,
NodeDescs: g,
RPCRetryOptions: &retry.Options{MaxRetries: 10},
Stopper: stopper,
TransportFactory: func(SendOptions, ReplicaSlice) Transport {
return transport
},
RangeDescriptorDB: rangeDB,
Settings: cluster.MakeTestingClusterSettings(),
})
ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: cachedLease,
})

var opts []RangeFeedOption
if !useMuxRangeFeed {
opts = append(opts, WithoutMuxRangeFeed())
}
err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, nil, opts...)
require.Error(t, err)
for _, spec := range []struct {
errorCode codes.Code
expectRetry bool
}{
{codes.FailedPrecondition, true}, // target node is decommissioned; retry
{codes.PermissionDenied, false}, // this node is decommissioned; abort
{codes.Unauthenticated, false}, // this node is not part of cluster; abort
} {
t.Run(fmt.Sprintf("transport_error=%s", spec.errorCode),
func(t *testing.T) {
clock := hlc.NewClockForTesting(nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)

desc := roachpb.RangeDescriptor{
RangeID: 1,
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
},
}
for _, repl := range desc.InternalReplicas {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(repl.NodeID),
newNodeDesc(repl.NodeID),
gossip.NodeDescriptorTTL,
))
}

ctrl := gomock.NewController(t)
transport := NewMockTransport(ctrl)
rangeDB := rangecachemock.NewMockRangeDescriptorDB(ctrl)

// We start off with a cached lease on r1.
cachedLease := roachpb.Lease{
Replica: desc.InternalReplicas[0],
Sequence: 1,
}

// All nodes return the specified error code. We expect the range feed to
// keep trying all replicas in sequence regardless of error.
for _, repl := range desc.InternalReplicas {
transport.EXPECT().IsExhausted().Return(false)
transport.EXPECT().NextReplica().Return(repl)
transport.EXPECT().NextInternalClient(gomock.Any()).Return(
nil, grpcstatus.Error(spec.errorCode, ""))
}
transport.EXPECT().IsExhausted().Return(true)
transport.EXPECT().Release()

// Once all replicas have failed, it should try to refresh the lease using
// the range cache. We let this succeed once.
rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).Return([]roachpb.RangeDescriptor{desc}, nil, nil)

// It then tries the replicas again. This time we just report the
// transport as exhausted immediately.
transport.EXPECT().IsExhausted().Return(true)
transport.EXPECT().Release()

// This invalidates the cache yet again. This time we error.
rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).Return(nil, nil, grpcstatus.Error(spec.errorCode, ""))

// If we expect a range lookup retry, allow the retry to succeed by
// returning a range descriptor and a client that immediately
// cancels the context and closes the range feed stream.
if spec.expectRetry {
rangeDB.EXPECT().RangeLookup(gomock.Any(), roachpb.RKeyMin, kvpb.INCONSISTENT, false).MinTimes(1).Return([]roachpb.RangeDescriptor{desc}, nil, nil) //.FirstRange().Return(&desc, nil)
client := kvpbmock.NewMockInternalClient(ctrl)

stream := kvpbmock.NewMockInternal_MuxRangeFeedClient(ctrl)
stream.EXPECT().Send(gomock.Any()).Return(nil)
stream.EXPECT().Recv().Do(func() {
cancel()
}).Return(nil, context.Canceled).AnyTimes()
client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil).AnyTimes()

transport.EXPECT().IsExhausted().Return(false).AnyTimes()
transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]).AnyTimes()
transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil).AnyTimes()
transport.EXPECT().Release().AnyTimes()
}

ds := NewDistSender(DistSenderConfig{
AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: clock,
NodeDescs: g,
RPCRetryOptions: &retry.Options{MaxRetries: 10},
Stopper: stopper,
TransportFactory: func(SendOptions, ReplicaSlice) Transport {
return transport
},
RangeDescriptorDB: rangeDB,
Settings: cluster.MakeTestingClusterSettings(),
})
}
ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: cachedLease,
})

err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, nil)
require.Error(t, err)
})
}
}
Loading

0 comments on commit 7426c8e

Please sign in to comment.