Skip to content

Commit

Permalink
Merge pull request #31250 from tschottdorf/backport2.1-31013
Browse files Browse the repository at this point in the history
backport-2.1: kv: try next replica on RangeNotFoundError
  • Loading branch information
tbg committed Oct 12, 2018
2 parents db85d17 + fdca0bd commit a7a3cc1
Show file tree
Hide file tree
Showing 16 changed files with 413 additions and 211 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ SET TRACING = off;
SHOW TRACE FOR SESSION;
"`)
if err != nil {
t.Fatal(err)
t.Fatalf("%s\n\n%s", buf, err)
}
duration = timeutil.Since(start)
c.l.Printf("post-restart, query took %s\n", duration)
Expand Down
67 changes: 52 additions & 15 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ func (ds *DistSender) getNodeDescriptor() *roachpb.NodeDescriptor {
// The replicas are assumed to be ordered by preference, with closer
// ones (i.e. expected lowest latency) first.
func (ds *DistSender) sendRPC(
ctx context.Context, rangeID roachpb.RangeID, replicas ReplicaSlice, ba roachpb.BatchRequest,
ctx context.Context,
rangeID roachpb.RangeID,
replicas ReplicaSlice,
ba roachpb.BatchRequest,
cachedLeaseHolder roachpb.ReplicaDescriptor,
) (*roachpb.BatchResponse, error) {
if len(replicas) == 0 {
return nil, roachpb.NewSendError(
Expand All @@ -384,7 +388,15 @@ func (ds *DistSender) sendRPC(
tracing.AnnotateTrace()
defer tracing.AnnotateTrace()

return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, rangeID, replicas, ba, ds.nodeDialer)
return ds.sendToReplicas(
ctx,
SendOptions{metrics: &ds.metrics},
rangeID,
replicas,
ba,
ds.nodeDialer,
cachedLeaseHolder,
)
}

// CountRanges returns the number of ranges that encompass the given key span.
Expand Down Expand Up @@ -435,16 +447,16 @@ func (ds *DistSender) sendSingleRange(

// If this request needs to go to a lease holder and we know who that is, move
// it to the front.
var knowLeaseholder bool
if !ba.IsReadOnly() || ba.ReadConsistency.RequiresReadLease() {
var cachedLeaseHolder roachpb.ReplicaDescriptor
if ba.RequiresLeaseHolder() {
if storeID, ok := ds.leaseHolderCache.Lookup(ctx, desc.RangeID); ok {
if i := replicas.FindReplica(storeID); i >= 0 {
replicas.MoveToFront(i)
knowLeaseholder = true
cachedLeaseHolder = replicas[0].ReplicaDescriptor
}
}
}
if !knowLeaseholder {
if (cachedLeaseHolder == roachpb.ReplicaDescriptor{}) {
// Rearrange the replicas so that they're ordered in expectation of
// request latency.
var latencyFn LatencyFunc
Expand All @@ -454,7 +466,7 @@ func (ds *DistSender) sendSingleRange(
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)
}

br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba)
br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba, cachedLeaseHolder)
if err != nil {
log.VErrEvent(ctx, 2, err.Error())
return nil, roachpb.NewError(err)
Expand Down Expand Up @@ -1104,12 +1116,12 @@ func (ds *DistSender) sendPartialBatch(
// row and the range descriptor hasn't changed, return the error
// to our caller.
switch tErr := pErr.GetDetail().(type) {
case *roachpb.SendError, *roachpb.RangeNotFoundError:
case *roachpb.SendError:
// We've tried all the replicas without success. Either
// they're all down, or we're using an out-of-date range
// descriptor. Invalidate the cache and try again with the new
// metadata.
log.Event(ctx, "evicting range descriptor on send error and backoff for re-lookup")
log.VEventf(ctx, 1, "evicting range descriptor on %T and backoff for re-lookup: %+v", tErr, desc)
if err := evictToken.Evict(ctx); err != nil {
return response{pErr: roachpb.NewError(err)}
}
Expand Down Expand Up @@ -1281,6 +1293,7 @@ func (ds *DistSender) sendToReplicas(
replicas ReplicaSlice,
ba roachpb.BatchRequest,
nodeDialer *nodedialer.Dialer,
cachedLeaseHolder roachpb.ReplicaDescriptor,
) (*roachpb.BatchResponse, error) {
var ambiguousError error
var haveCommit bool
Expand Down Expand Up @@ -1341,25 +1354,49 @@ func (ds *DistSender) sendToReplicas(
}
}
} else {
// NB: This section of code may have unfortunate performance implications. If we
// exit the below type switch with propagateError remaining at `false`, we'll try
// more replicas. That may succeed and future requests might do the same thing over
// and over again, adding needless round-trips to the earlier replicas.
propagateError := false
switch tErr := br.Error.GetDetail().(type) {
case nil:
// When a request that we know could only succeed on the leaseholder comes
// back as successful, make sure the leaseholder cache reflects this
// replica. In steady state, this is almost always the case, and so we
// gate the update on whether the response comes from a node that we didn't
// know held the lease.
if cachedLeaseHolder != curReplica && ba.RequiresLeaseHolder() {
ds.leaseHolderCache.Update(ctx, rangeID, curReplica.StoreID)
}
return br, nil
case *roachpb.StoreNotFoundError, *roachpb.NodeUnavailableError:
// These errors are likely to be unique to the replica that reported
// them, so no action is required before the next retry.
case *roachpb.RangeNotFoundError:
// The store we routed to doesn't have this replica. This can happen when
// our descriptor is outright outdated, but it can also be caused by a
// replica that has just been added but needs a snapshot to be caught up.
//
// We'll try other replicas which typically gives us the leaseholder, either
// via the NotLeaseHolderError or nil error paths, both of which update the
// leaseholder cache.
case *roachpb.NotLeaseHolderError:
ds.metrics.NotLeaseHolderErrCount.Inc(1)
if lh := tErr.LeaseHolder; lh != nil {
// If the replica we contacted knows the new lease holder, update the cache.
// Update the leaseholder cache. Naively this would also happen when the
// next RPC comes back, but we don't want to wait out the additional RPC
// latency.
ds.leaseHolderCache.Update(ctx, rangeID, lh.StoreID)
// Avoid an extra update to the leaseholder cache if the next RPC succeeds.
cachedLeaseHolder = *lh

// If the implicated leaseholder is not a known replica,
// return a RangeNotFoundError to signal eviction of the
// cached RangeDescriptor and re-send.
// If the implicated leaseholder is not a known replica, return a SendError
// to signal eviction of the cached RangeDescriptor and re-send.
if replicas.FindReplica(lh.StoreID) == -1 {
// Replace NotLeaseHolderError with RangeNotFoundError.
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(rangeID))
br.Error = roachpb.NewError(roachpb.NewSendError(fmt.Sprintf(
"leaseholder s%d (via %+v) not in cached replicas %v", lh.StoreID, curReplica, replicas,
)))
propagateError = true
} else {
// Move the new lease holder to the head of the queue for the next retry.
Expand Down
108 changes: 101 additions & 7 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,18 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
_ ReplicaSlice,
args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
reply := &roachpb.BatchResponse{}
if first {
reply := &roachpb.BatchResponse{}
reply.Error = roachpb.NewError(
&roachpb.NotLeaseHolderError{LeaseHolder: &leaseHolder})
first = false
return reply, nil
}
return args.CreateReply(), nil
// Return an error to avoid activating a code path that would
// populate the leaseholder cache from the successful response.
// That's not what this test wants to test.
reply.Error = roachpb.NewErrorf("boom")
return reply, nil
}

cfg := DistSenderConfig{
Expand All @@ -560,8 +564,8 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
ds := NewDistSender(cfg, g)
v := roachpb.MakeValueFromString("value")
put := roachpb.NewPut(roachpb.Key("a"), v)
if _, err := client.SendWrapped(context.Background(), ds, put); err != nil {
t.Errorf("put encountered error: %s", err)
if _, pErr := client.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("unexpected error: %v", pErr)
}
if first {
t.Errorf("The command did not retry")
Expand Down Expand Up @@ -656,8 +660,10 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
t.Errorf("contacted n1: %t, contacted n2: %t", contacted1, contacted2)
}

if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); ok {
t.Fatalf("expected no lease holder for r1, but got s%d", storeID)
if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); !ok {
t.Fatalf("expected new leaseholder to be cached")
} else if exp := roachpb.StoreID(2); storeID != exp {
t.Fatalf("expected lease holder for r1 to be cached as s%d, but got s%d", exp, storeID)
}
}

Expand Down Expand Up @@ -946,7 +952,7 @@ func TestEvictCacheOnUnknownLeaseHolder(t *testing.T) {
case 0, 1:
err = &roachpb.NotLeaseHolderError{LeaseHolder: &roachpb.ReplicaDescriptor{NodeID: 99, StoreID: 999}}
case 2:
err = roachpb.NewRangeNotFoundError(0)
err = roachpb.NewRangeNotFoundError(0, 0)
default:
return args.CreateReply(), nil
}
Expand Down Expand Up @@ -1273,6 +1279,94 @@ func TestSendRPCRetry(t *testing.T) {
}
}

// This test reproduces the main problem in:
// https://github.com/cockroachdb/cockroach/issues/30613.
// by verifying that if a RangeNotFoundError is returned from a Replica,
// the next Replica is tried.
func TestSendRPCRangeNotFoundError(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

g, clock := makeGossip(t, stopper)
if err := g.SetNodeDescriptor(&roachpb.NodeDescriptor{NodeID: 1}); err != nil {
t.Fatal(err)
}

// Fill RangeDescriptor with three replicas.
var descriptor = roachpb.RangeDescriptor{
RangeID: 1,
StartKey: roachpb.RKey("a"),
EndKey: roachpb.RKey("z"),
}
for i := 1; i <= 3; i++ {
addr := util.MakeUnresolvedAddr("tcp", fmt.Sprintf("node%d", i))
nd := &roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(i),
Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()),
}
if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(i)), nd, time.Hour); err != nil {
t.Fatal(err)
}

descriptor.Replicas = append(descriptor.Replicas, roachpb.ReplicaDescriptor{
NodeID: roachpb.NodeID(i),
StoreID: roachpb.StoreID(i),
ReplicaID: roachpb.ReplicaID(i),
})
}
descDB := mockRangeDescriptorDBForDescs(
testMetaRangeDescriptor,
descriptor,
)

seen := map[roachpb.ReplicaID]struct{}{}
var leaseholderStoreID roachpb.StoreID
var ds *DistSender
var testFn simpleSendFn = func(
_ context.Context,
_ SendOptions,
_ ReplicaSlice,
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
br := ba.CreateReply()
if _, ok := seen[ba.Replica.ReplicaID]; ok {
br.Error = roachpb.NewErrorf("visited replica %+v twice", ba.Replica)
return br, nil
}
seen[ba.Replica.ReplicaID] = struct{}{}
if len(seen) <= 2 {
if len(seen) == 1 {
// Add to the leaseholder cache to verify that the response evicts it.
ds.leaseHolderCache.Update(context.Background(), ba.RangeID, ba.Replica.StoreID)
}
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
return br, nil
}
leaseholderStoreID = ba.Replica.StoreID
return br, nil
}
cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(testFn),
},
RangeDescriptorDB: descDB,
}
ds = NewDistSender(cfg, g)
get := roachpb.NewGet(roachpb.Key("b"))
_, err := client.SendWrapped(context.Background(), ds, get)
if err != nil {
t.Fatal(err)
}
if storeID, found := ds.leaseHolderCache.Lookup(context.Background(), roachpb.RangeID(1)); !found {
t.Fatal("expected a cached leaseholder")
} else if storeID != leaseholderStoreID {
t.Fatalf("unexpected cached leaseholder s%d, expected s%d", storeID, leaseholderStoreID)
}
}

// TestGetNodeDescriptor checks that the Node descriptor automatically gets
// looked up from Gossip.
func TestGetNodeDescriptor(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,5 @@ func sendBatch(
TransportFactory: transportFactory,
},
}, nil)
return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, 0, makeReplicas(addrs...), roachpb.BatchRequest{}, nodeDialer)
return ds.sendToReplicas(ctx, SendOptions{metrics: &ds.metrics}, 0, makeReplicas(addrs...), roachpb.BatchRequest{}, nodeDialer, roachpb.ReplicaDescriptor{})
}
7 changes: 7 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ func (ba *BatchRequest) IsReadOnly() bool {
return len(ba.Requests) > 0 && !ba.hasFlag(isWrite|isAdmin)
}

// RequiresLeaseHolder returns true if the request can only be served by the
// leaseholders of the ranges it addresses.
func (ba *BatchRequest) RequiresLeaseHolder() bool {
return !ba.IsReadOnly() || ba.Header.ReadConsistency.RequiresReadLease()

}

// IsReverse returns true iff the BatchRequest contains a reverse request.
func (ba *BatchRequest) IsReverse() bool {
return ba.hasFlag(isReverse)
Expand Down
12 changes: 9 additions & 3 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ func (s *SendError) message(_ *Error) string {

var _ ErrorDetailInterface = &SendError{}

// NewRangeNotFoundError initializes a new RangeNotFoundError.
func NewRangeNotFoundError(rangeID RangeID) *RangeNotFoundError {
// NewRangeNotFoundError initializes a new RangeNotFoundError for the given RangeID and, optionally,
// a StoreID.
func NewRangeNotFoundError(rangeID RangeID, storeID StoreID) *RangeNotFoundError {
return &RangeNotFoundError{
RangeID: rangeID,
StoreID: storeID,
}
}

Expand All @@ -253,7 +255,11 @@ func (e *RangeNotFoundError) Error() string {
}

func (e *RangeNotFoundError) message(_ *Error) string {
return fmt.Sprintf("r%d was not found", e.RangeID)
msg := fmt.Sprintf("r%d was not found", e.RangeID)
if e.StoreID != 0 {
msg += fmt.Sprintf(" on s%d", e.StoreID)
}
return msg
}

var _ ErrorDetailInterface = &RangeNotFoundError{}
Expand Down
Loading

0 comments on commit a7a3cc1

Please sign in to comment.