Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: try next replica on RangeNotFoundError #31013

Merged
merged 5 commits into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ SET TRACING = off;
SHOW TRACE FOR SESSION;
"`)
if err != nil {
t.Fatal(err)
t.Fatalf("%s\n\n%s", buf, err)
}
duration = timeutil.Since(start)
c.l.Printf("post-restart, query took %s\n", duration)
Expand Down
67 changes: 52 additions & 15 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ func (ds *DistSender) getNodeDescriptor() *roachpb.NodeDescriptor {
// The replicas are assumed to be ordered by preference, with closer
// ones (i.e. expected lowest latency) first.
func (ds *DistSender) sendRPC(
ctx context.Context, rangeID roachpb.RangeID, replicas ReplicaSlice, ba roachpb.BatchRequest,
ctx context.Context,
rangeID roachpb.RangeID,
replicas ReplicaSlice,
ba roachpb.BatchRequest,
cachedLeaseHolder roachpb.ReplicaDescriptor,
) (*roachpb.BatchResponse, error) {
if len(replicas) == 0 {
return nil, roachpb.NewSendError(
Expand All @@ -384,7 +388,15 @@ func (ds *DistSender) sendRPC(
tracing.AnnotateTrace()
defer tracing.AnnotateTrace()

return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, rangeID, replicas, ba, ds.nodeDialer)
return ds.sendToReplicas(
ctx,
SendOptions{metrics: &ds.metrics},
rangeID,
replicas,
ba,
ds.nodeDialer,
cachedLeaseHolder,
)
}

// CountRanges returns the number of ranges that encompass the given key span.
Expand Down Expand Up @@ -435,16 +447,16 @@ func (ds *DistSender) sendSingleRange(

// If this request needs to go to a lease holder and we know who that is, move
// it to the front.
var knowLeaseholder bool
if !ba.IsReadOnly() || ba.ReadConsistency.RequiresReadLease() {
var cachedLeaseHolder roachpb.ReplicaDescriptor
if ba.RequiresLeaseHolder() {
if storeID, ok := ds.leaseHolderCache.Lookup(ctx, desc.RangeID); ok {
if i := replicas.FindReplica(storeID); i >= 0 {
replicas.MoveToFront(i)
knowLeaseholder = true
cachedLeaseHolder = replicas[0].ReplicaDescriptor
}
}
}
if !knowLeaseholder {
if (cachedLeaseHolder == roachpb.ReplicaDescriptor{}) {
// Rearrange the replicas so that they're ordered in expectation of
// request latency.
var latencyFn LatencyFunc
Expand All @@ -454,7 +466,7 @@ func (ds *DistSender) sendSingleRange(
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)
}

br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba)
br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba, cachedLeaseHolder)
if err != nil {
log.VErrEvent(ctx, 2, err.Error())
return nil, roachpb.NewError(err)
Expand Down Expand Up @@ -1109,12 +1121,12 @@ func (ds *DistSender) sendPartialBatch(
// row and the range descriptor hasn't changed, return the error
// to our caller.
switch tErr := pErr.GetDetail().(type) {
case *roachpb.SendError, *roachpb.RangeNotFoundError:
case *roachpb.SendError:
// We've tried all the replicas without success. Either
// they're all down, or we're using an out-of-date range
// descriptor. Invalidate the cache and try again with the new
// metadata.
log.Event(ctx, "evicting range descriptor on send error and backoff for re-lookup")
log.VEventf(ctx, 1, "evicting range descriptor on %T and backoff for re-lookup: %+v", tErr, desc)
if err := evictToken.Evict(ctx); err != nil {
return response{pErr: roachpb.NewError(err)}
}
Expand Down Expand Up @@ -1286,6 +1298,7 @@ func (ds *DistSender) sendToReplicas(
replicas ReplicaSlice,
ba roachpb.BatchRequest,
nodeDialer *nodedialer.Dialer,
cachedLeaseHolder roachpb.ReplicaDescriptor,
) (*roachpb.BatchResponse, error) {
var ambiguousError error
var haveCommit bool
Expand Down Expand Up @@ -1346,25 +1359,49 @@ func (ds *DistSender) sendToReplicas(
}
}
} else {
// NB: This section of code may have unfortunate performance implications. If we
// exit the below type switch with propagateError remaining at `false`, we'll try
// more replicas. That may succeed and future requests might do the same thing over
// and over again, adding needless round-trips to the earlier replicas.
propagateError := false
switch tErr := br.Error.GetDetail().(type) {
case nil:
// When a request that we know could only succeed on the leaseholder comes
// back as successful, make sure the leaseholder cache reflects this
// replica. In steady state, this is almost always the case, and so we
// gate the update on whether the response comes from a node that we didn't
// know held the lease.
if cachedLeaseHolder != curReplica && ba.RequiresLeaseHolder() {
ds.leaseHolderCache.Update(ctx, rangeID, curReplica.StoreID)
}
return br, nil
case *roachpb.StoreNotFoundError, *roachpb.NodeUnavailableError:
// These errors are likely to be unique to the replica that reported
// them, so no action is required before the next retry.
case *roachpb.RangeNotFoundError:
// The store we routed to doesn't have this replica. This can happen when
// our descriptor is outright outdated, but it can also be caused by a
// replica that has just been added but needs a snapshot to be caught up.
//
// We'll try other replicas which typically gives us the leaseholder, either
// via the NotLeaseHolderError or nil error paths, both of which update the
// leaseholder cache.
case *roachpb.NotLeaseHolderError:
ds.metrics.NotLeaseHolderErrCount.Inc(1)
if lh := tErr.LeaseHolder; lh != nil {
// If the replica we contacted knows the new lease holder, update the cache.
// Update the leaseholder cache. Naively this would also happen when the
// next RPC comes back, but we don't want to wait out the additional RPC
// latency.
ds.leaseHolderCache.Update(ctx, rangeID, lh.StoreID)
// Avoid an extra update to the leaseholder cache if the next RPC succeeds.
cachedLeaseHolder = *lh

// If the implicated leaseholder is not a known replica,
// return a RangeNotFoundError to signal eviction of the
// cached RangeDescriptor and re-send.
// If the implicated leaseholder is not a known replica, return a SendError
// to signal eviction of the cached RangeDescriptor and re-send.
if replicas.FindReplica(lh.StoreID) == -1 {
// Replace NotLeaseHolderError with RangeNotFoundError.
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(rangeID))
br.Error = roachpb.NewError(roachpb.NewSendError(fmt.Sprintf(
"leaseholder s%d (via %+v) not in cached replicas %v", lh.StoreID, curReplica, replicas,
)))
propagateError = true
} else {
// Move the new lease holder to the head of the queue for the next retry.
Expand Down
108 changes: 101 additions & 7 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,14 +541,18 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
_ ReplicaSlice,
args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
reply := &roachpb.BatchResponse{}
if first {
reply := &roachpb.BatchResponse{}
reply.Error = roachpb.NewError(
&roachpb.NotLeaseHolderError{LeaseHolder: &leaseHolder})
first = false
return reply, nil
}
return args.CreateReply(), nil
// Return an error to avoid activating a code path that would
// populate the leaseholder cache from the successful response.
// That's not what this test wants to test.
reply.Error = roachpb.NewErrorf("boom")
return reply, nil
}

cfg := DistSenderConfig{
Expand All @@ -563,8 +567,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
ds := NewDistSender(cfg, g)
v := roachpb.MakeValueFromString("value")
put := roachpb.NewPut(roachpb.Key("a"), v)
if _, err := client.SendWrapped(context.Background(), ds, put); err != nil {
t.Errorf("put encountered error: %s", err)
if _, pErr := client.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("unexpected error: %v", pErr)
}
if first {
t.Errorf("The command did not retry")
Expand Down Expand Up @@ -660,8 +664,10 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
t.Errorf("contacted n1: %t, contacted n2: %t", contacted1, contacted2)
}

if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); ok {
t.Fatalf("expected no lease holder for r1, but got s%d", storeID)
if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); !ok {
t.Fatalf("expected new leaseholder to be cached")
} else if exp := roachpb.StoreID(2); storeID != exp {
t.Fatalf("expected lease holder for r1 to be cached as s%d, but got s%d", exp, storeID)
}
}

Expand Down Expand Up @@ -953,7 +959,7 @@ func TestEvictCacheOnUnknownLeaseHolder(t *testing.T) {
case 0, 1:
err = &roachpb.NotLeaseHolderError{LeaseHolder: &roachpb.ReplicaDescriptor{NodeID: 99, StoreID: 999}}
case 2:
err = roachpb.NewRangeNotFoundError(0)
err = roachpb.NewRangeNotFoundError(0, 0)
default:
return args.CreateReply(), nil
}
Expand Down Expand Up @@ -1284,6 +1290,94 @@ func TestSendRPCRetry(t *testing.T) {
}
}

// This test reproduces the main problem in:
// https://github.com/cockroachdb/cockroach/issues/30613.
// by verifying that if a RangeNotFoundError is returned from a Replica,
// the next Replica is tried.
func TestSendRPCRangeNotFoundError(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

g, clock := makeGossip(t, stopper)
if err := g.SetNodeDescriptor(&roachpb.NodeDescriptor{NodeID: 1}); err != nil {
t.Fatal(err)
}

// Fill RangeDescriptor with three replicas.
var descriptor = roachpb.RangeDescriptor{
RangeID: 1,
StartKey: roachpb.RKey("a"),
EndKey: roachpb.RKey("z"),
}
for i := 1; i <= 3; i++ {
addr := util.MakeUnresolvedAddr("tcp", fmt.Sprintf("node%d", i))
nd := &roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(i),
Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()),
}
if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(i)), nd, time.Hour); err != nil {
t.Fatal(err)
}

descriptor.Replicas = append(descriptor.Replicas, roachpb.ReplicaDescriptor{
NodeID: roachpb.NodeID(i),
StoreID: roachpb.StoreID(i),
ReplicaID: roachpb.ReplicaID(i),
})
}
descDB := mockRangeDescriptorDBForDescs(
testMetaRangeDescriptor,
descriptor,
)

seen := map[roachpb.ReplicaID]struct{}{}
var leaseholderStoreID roachpb.StoreID
var ds *DistSender
var testFn simpleSendFn = func(
_ context.Context,
_ SendOptions,
_ ReplicaSlice,
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
br := ba.CreateReply()
if _, ok := seen[ba.Replica.ReplicaID]; ok {
br.Error = roachpb.NewErrorf("visited replica %+v twice", ba.Replica)
return br, nil
}
seen[ba.Replica.ReplicaID] = struct{}{}
if len(seen) <= 2 {
if len(seen) == 1 {
// Add to the leaseholder cache to verify that the response evicts it.
ds.leaseHolderCache.Update(context.Background(), ba.RangeID, ba.Replica.StoreID)
}
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
return br, nil
}
leaseholderStoreID = ba.Replica.StoreID
return br, nil
}
cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(testFn),
},
RangeDescriptorDB: descDB,
}
ds = NewDistSender(cfg, g)
get := roachpb.NewGet(roachpb.Key("b"))
_, err := client.SendWrapped(context.Background(), ds, get)
if err != nil {
t.Fatal(err)
}
if storeID, found := ds.leaseHolderCache.Lookup(context.Background(), roachpb.RangeID(1)); !found {
t.Fatal("expected a cached leaseholder")
} else if storeID != leaseholderStoreID {
t.Fatalf("unexpected cached leaseholder s%d, expected s%d", storeID, leaseholderStoreID)
}
}

// TestGetNodeDescriptor checks that the Node descriptor automatically gets
// looked up from Gossip.
func TestGetNodeDescriptor(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,5 @@ func sendBatch(
TransportFactory: transportFactory,
},
}, nil)
return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, 0, makeReplicas(addrs...), roachpb.BatchRequest{}, nodeDialer)
return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, 0, makeReplicas(addrs...), roachpb.BatchRequest{}, nodeDialer, roachpb.ReplicaDescriptor{})
}
7 changes: 7 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ func (ba *BatchRequest) IsReadOnly() bool {
return len(ba.Requests) > 0 && !ba.hasFlag(isWrite|isAdmin)
}

// RequiresLeaseHolder returns true if the request can only be served by the
// leaseholders of the ranges it addresses.
func (ba *BatchRequest) RequiresLeaseHolder() bool {
return !ba.IsReadOnly() || ba.Header.ReadConsistency.RequiresReadLease()

}

// IsReverse returns true iff the BatchRequest contains a reverse request.
func (ba *BatchRequest) IsReverse() bool {
return ba.hasFlag(isReverse)
Expand Down
12 changes: 9 additions & 3 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ func (s *SendError) message(_ *Error) string {

var _ ErrorDetailInterface = &SendError{}

// NewRangeNotFoundError initializes a new RangeNotFoundError.
func NewRangeNotFoundError(rangeID RangeID) *RangeNotFoundError {
// NewRangeNotFoundError initializes a new RangeNotFoundError for the given RangeID and, optionally,