Skip to content

Commit

Permalink
roachpb: cleanup speculative leases returned by NotLeaseHolderErrors
Browse files Browse the repository at this point in the history
Speculative leases are returned as part of NLHEs if the committed lease
is not known, but there is a guess as to who the leaseholder may be.
Previoulsy, there were two ways to return these -- either by populating
just the LeaseHolder field on the NLHE or by populating the Lease field
with an unset sequence number. This patch unifies this behavior, and
going forward, we expect the latter to be the canonical form to
represent speculative leases. To that effect, the LeaseHolder field in
the NLHE proto is prefixed as "deprecated". We should be able to remove
the thing entirely in v23.1.

Release note: None
Release justification: low risk change.
  • Loading branch information
arulajmani committed Aug 24, 2022
1 parent aed288d commit 0402f47
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 51 deletions.
8 changes: 5 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2188,16 +2188,18 @@ func (ds *DistSender) sendToReplicas(
ds.metrics.NotLeaseHolderErrCount.Inc(1)
// If we got some lease information, we use it. If not, we loop around
// and try the next replica.
if tErr.Lease != nil || tErr.LeaseHolder != nil {
if tErr.Lease != nil || tErr.DeprecatedLeaseHolder != nil {
// Update the leaseholder in the range cache. Naively this would also
// happen when the next RPC comes back, but we don't want to wait out
// the additional RPC latency.

var updatedLeaseholder bool
if tErr.Lease != nil {
updatedLeaseholder = routing.SyncTokenAndMaybeUpdateCache(ctx, tErr.Lease, &tErr.RangeDesc)
} else if tErr.LeaseHolder != nil {
updatedLeaseholder = routing.SyncTokenAndMaybeUpdateCacheWithSpeculativeLease(ctx, *tErr.LeaseHolder, &tErr.RangeDesc)
} else if tErr.DeprecatedLeaseHolder != nil {
updatedLeaseholder = routing.SyncTokenAndMaybeUpdateCacheWithSpeculativeLease(
ctx, *tErr.DeprecatedLeaseHolder, &tErr.RangeDesc,
)
}
// Move the new leaseholder to the head of the queue for the next
// retry. Note that the leaseholder might not be the one indicated by
Expand Down
37 changes: 22 additions & 15 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,11 +601,13 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
expLease: true,
},
{
// TODO(arul): This is only possible in 22.{1,2} mixed version clusters;
// remove once we get rid of the LeaseHolder field in 23.1.
name: "leaseholder in desc, no lease",
nlhe: roachpb.NotLeaseHolderError{
RangeID: testUserRangeDescriptor3Replicas.RangeID,
LeaseHolder: &recognizedLeaseHolder,
RangeDesc: testUserRangeDescriptor3Replicas,
RangeID: testUserRangeDescriptor3Replicas.RangeID,
DeprecatedLeaseHolder: &recognizedLeaseHolder,
RangeDesc: testUserRangeDescriptor3Replicas,
},
expLeaseholder: &recognizedLeaseHolder,
expLease: false,
Expand Down Expand Up @@ -757,9 +759,8 @@ func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) {
}
reply.Error = roachpb.NewError(
&roachpb.NotLeaseHolderError{
Replica: repls[int(seq)%2],
LeaseHolder: &repls[(int(seq)+1)%2],
Lease: lease,
Replica: repls[int(seq)%2],
Lease: lease,
})
return reply, nil
}
Expand Down Expand Up @@ -840,9 +841,8 @@ func TestNoBackoffOnNotLeaseHolderErrorFromFollowerRead(t *testing.T) {
br := ba.CreateReply()
if ba.Replica != lease.Replica {
br.Error = roachpb.NewError(&roachpb.NotLeaseHolderError{
Replica: ba.Replica,
LeaseHolder: &lease.Replica,
Lease: &lease,
Replica: ba.Replica,
Lease: &lease,
})
}
return br, nil
Expand Down Expand Up @@ -1123,8 +1123,11 @@ func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) {
Replica: roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4, ReplicaID: 4},
}
} else {
// Speculative lease -- the NLHE only carries LeaseHolder information.
nlhe.LeaseHolder = &roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4, ReplicaID: 4}
// Speculative lease -- the NLHE only carries LeaseHolder information
// and the sequence number is unset.
nlhe.Lease = &roachpb.Lease{
Replica: roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4, ReplicaID: 4},
}
}

cachedLease := roachpb.Lease{
Expand Down Expand Up @@ -1703,7 +1706,10 @@ func TestEvictCacheOnUnknownLeaseHolder(t *testing.T) {
var err error
switch count {
case 0, 1:
err = &roachpb.NotLeaseHolderError{LeaseHolder: &roachpb.ReplicaDescriptor{NodeID: 99, StoreID: 999}}
err = &roachpb.NotLeaseHolderError{
Lease: &roachpb.Lease{
Replica: roachpb.ReplicaDescriptor{NodeID: 99, StoreID: 999}},
}
case 2:
err = roachpb.NewRangeNotFoundError(0, 0)
default:
Expand Down Expand Up @@ -5173,8 +5179,9 @@ func TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff(t *t
// will include a speculative lease that points to a replica that isn't
// part of the client's range descriptor. This is only possible in
// versions <= 22.1 as NLHE errors from uninitialized replicas no longer
// return speculative leases. Effectively, this acts as a mixed
// (22.1, 22.2) version test.
// return speculative leases by populating the (Deprecated)LeaseHolder
// field. Effectively, this acts as a mixed (22.1, 22.2) version test.
// TODO(arul): remove the speculative lease version of this test in 23.1.

clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
Expand Down Expand Up @@ -5217,7 +5224,7 @@ func TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff(t *t
RangeDesc: roachpb.RangeDescriptor{},
}
if returnSpeculativeLease {
nlhe.LeaseHolder = &roachpb.ReplicaDescriptor{NodeID: 5, StoreID: 5, ReplicaID: 5}
nlhe.DeprecatedLeaseHolder = &roachpb.ReplicaDescriptor{NodeID: 5, StoreID: 5, ReplicaID: 5}
}
br.Error = roachpb.NewError(nlhe)
case 1:
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,8 +1282,9 @@ func TestRequestsOnLaggingReplica(t *testing.T) {
require.NotNil(t, pErr, "unexpected success")
nlhe := pErr.GetDetail().(*roachpb.NotLeaseHolderError)
require.NotNil(t, nlhe, "expected NotLeaseholderError, got: %s", pErr)
require.NotNil(t, nlhe.LeaseHolder, "expected NotLeaseholderError with a known leaseholder, got: %s", pErr)
require.Equal(t, leaderReplicaID, nlhe.LeaseHolder.ReplicaID)
require.False(t, nlhe.Lease.Empty())
require.NotNil(t, nlhe.Lease.Replica, "expected NotLeaseholderError with a known leaseholder, got: %s", pErr)
require.Equal(t, leaderReplicaID, nlhe.Lease.Replica.ReplicaID)
}

type fakeSnapshotStream struct {
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,9 +1617,9 @@ func TestLeaseExpirationBasedRangeTransfer(t *testing.T) {
if !ok {
t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr)
}
if !nlhe.LeaseHolder.Equal(&l.replica1Desc) {
if !nlhe.Lease.Replica.Equal(&l.replica1Desc) {
t.Fatalf("expected lease holder %+v, got %+v",
l.replica1Desc, nlhe.LeaseHolder)
l.replica1Desc, nlhe.Lease.Replica)
}

// Check that replica1 now has the lease.
Expand Down Expand Up @@ -1717,9 +1717,9 @@ func TestLeaseExpirationBasedDrainTransfer(t *testing.T) {
if !ok {
t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr)
}
if nlhe.LeaseHolder == nil || !nlhe.LeaseHolder.Equal(&l.replica1Desc) {
if nlhe.Lease.Empty() || !nlhe.Lease.Replica.Equal(&l.replica1Desc) {
t.Fatalf("expected lease holder %+v, got %+v",
l.replica1Desc, nlhe.LeaseHolder)
l.replica1Desc, nlhe.Lease.Replica)
}

// Check that replica1 now has the lease.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (r *Replica) getLeaseForGossip(ctx context.Context) (bool, *roachpb.Error)
switch e := pErr.GetDetail().(type) {
case *roachpb.NotLeaseHolderError:
// NotLeaseHolderError means there is an active lease, but only if
// the lease holder is set; otherwise, it's likely a timeout.
if e.LeaseHolder != nil {
// the lease is non-empty; otherwise, it's likely a timeout.
if !e.Lease.Empty() {
pErr = nil
}
default:
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1235,12 +1235,14 @@ func (rp *replicaProposer) rejectProposalWithRedirectLocked(
storeID := r.store.StoreID()
r.store.metrics.LeaseRequestErrorCount.Inc(1)
redirectRep, _ /* ok */ := rangeDesc.GetReplicaDescriptorByID(redirectTo)
speculativeLease := roachpb.Lease{
Replica: redirectRep,
}
log.VEventf(ctx, 2, "redirecting proposal to node %s; request: %s", redirectRep.NodeID, prop.Request)
rp.rejectProposalWithErrLocked(ctx, prop, roachpb.NewError(newNotLeaseHolderError(
speculativeLease, storeID, rangeDesc, "refusing to acquire lease on follower")))
rp.rejectProposalWithErrLocked(ctx, prop, roachpb.NewError(
newNotLeaseHolderErrorWithSpeculativeLease(
redirectRep,
storeID,
rangeDesc,
"refusing to acquire lease on follower"),
))
}

func (rp *replicaProposer) rejectProposalWithLeaseTransferRejectedLocked(
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,12 +1023,28 @@ func newNotLeaseHolderError(
if stillMember {
err.Lease = new(roachpb.Lease)
*err.Lease = l
err.LeaseHolder = &err.Lease.Replica
}
}
return err
}

// newNotLeaseHolderErrorWithSpeculativeLease returns a NotLeaseHolderError
// initialized with a speculative lease pointing to the supplied replica.
// A NotLeaseHolderError may be constructed with a speculative lease if the
// current lease is not known, but the error is being created by guessing who
// the leaseholder may be.
func newNotLeaseHolderErrorWithSpeculativeLease(
leaseHolder roachpb.ReplicaDescriptor,
proposerStoreID roachpb.StoreID,
rangeDesc *roachpb.RangeDescriptor,
msg string,
) *roachpb.NotLeaseHolderError {
speculativeLease := roachpb.Lease{
Replica: leaseHolder,
}
return newNotLeaseHolderError(speculativeLease, proposerStoreID, rangeDesc, msg)
}

// newLeaseTransferRejectedBecauseTargetMayNeedSnapshotError return an error
// indicating that a lease transfer failed because the current leaseholder could
// not prove that the lease transfer target did not need a Raft snapshot.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) {
require.Error(t, err)
var lErr *roachpb.NotLeaseHolderError
require.True(t, errors.As(err, &lErr))
require.Equal(t, secondReplica.StoreID, lErr.LeaseHolder.StoreID)
require.Equal(t, secondReplica.StoreID, lErr.Lease.Replica.StoreID)
} else {
// Check that the replica doesn't use its lease, even though there's
// no longer a transfer in progress. This is because, even though
Expand Down
15 changes: 7 additions & 8 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,13 @@ func (p *PinnedLeasesKnob) rejectLeaseIfPinnedElsewhere(r *Replica) *roachpb.Err
if err != nil {
return roachpb.NewError(err)
}
var pinned *roachpb.ReplicaDescriptor
if pinnedRep, ok := r.descRLocked().GetReplicaDescriptor(pinnedStore); ok {
pinned = &pinnedRep
}
pinned, _ := r.descRLocked().GetReplicaDescriptor(pinnedStore)
return roachpb.NewError(&roachpb.NotLeaseHolderError{
Replica: repDesc,
LeaseHolder: pinned,
RangeID: r.RangeID,
CustomMsg: "injected: lease pinned to another store",
Replica: repDesc,
Lease: &roachpb.Lease{
Replica: pinned,
},
RangeID: r.RangeID,
CustomMsg: "injected: lease pinned to another store",
})
}
4 changes: 2 additions & 2 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,12 +488,12 @@ func (e *NotLeaseHolderError) message(_ *Error) string {
} else {
fmt.Fprint(&buf, "replica not lease holder; ")
}
if e.LeaseHolder == nil {
if e.DeprecatedLeaseHolder == nil {
fmt.Fprint(&buf, "lease holder unknown")
} else if e.Lease != nil {
fmt.Fprintf(&buf, "current lease is %s", e.Lease)
} else {
fmt.Fprintf(&buf, "replica %s is", *e.LeaseHolder)
fmt.Fprintf(&buf, "replica %s is", *e.DeprecatedLeaseHolder)
}
return buf.String()
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/roachpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ message NotLeaseHolderError {
// representation, if known.
optional roachpb.ReplicaDescriptor replica = 1 [(gogoproto.nullable) = false];
// The lease holder, if known.
optional roachpb.ReplicaDescriptor lease_holder = 2;
// The current lease, if known. This might be nil even when lease_holder is
// set, as sometimes one can create this error without actually knowing the
// current lease, but having a guess about who the leader is.
//
// This field was only ever meaningful if the full lease was not known, but
// when constructing this error there was a guess about who the leaseholder
// may be. The same idea applied to speculative leases (which have unset
// sequence numbers). In a bid to unify these two cases, from v22.2, we stop
// making use of this field.
// TODO(arul): remove this field in 23.1.
optional roachpb.ReplicaDescriptor deprecated_lease_holder = 2;
// The current lease, if known.
//
// It's possible for leases returned here to represent speculative leases, not
// actual committed leases. In this case, the lease will not have its Sequence
Expand Down Expand Up @@ -196,7 +201,7 @@ enum TransactionAbortedReason {
// TODO(andrei): We should be able to identify the range merge case by saving
// a bit more info in the timestamp cache.
ABORT_REASON_TIMESTAMP_CACHE_REJECTED = 7;

reserved 2;
}

Expand Down Expand Up @@ -720,15 +725,15 @@ message InsufficientSpaceError {
optional int64 store_id = 1 [(gogoproto.nullable) = false,
(gogoproto.customname) = "StoreID", (gogoproto.casttype) = "StoreID"];

// Op is the operaton that was unable to be performed.
// Op is the operaton that was unable to be performed.
optional string op = 2 [(gogoproto.nullable) = false];

// Available is remaining capacity.
optional int64 available = 3 [(gogoproto.nullable) = false];

// Capacity is total capacity.
// Capacity is total capacity.
optional int64 capacity = 4 [(gogoproto.nullable) = false];

// RequiredFraction is the required remaining capacity fraction.
optional double required = 5 [(gogoproto.nullable) = false];
}
}

0 comments on commit 0402f47

Please sign in to comment.