Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
124682: kv: consolidate range lease construction r=nvanbenschoten a=nvanbenschoten

This commit consolidates the creation of lease objects into one place, a new `kv/kvserver/leases` package. This avoids the diffuse construction of leases that was previously spread between `replica_range_lease.go`, `cmd_lease_request.go`, `cmd_lease_transfer.go`, and `cmd_lease.go`.

This makes the logic easier to understand and easier to test. It will also make the logic easier to adjust when we add support for leader leases.

First part of cockroachdb#123498. The next PR will consolidate lease validation logic (before latches, after latches, before proposing) into one place.

Release note: None

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jun 14, 2024
2 parents d55eec4 + 06d666e commit a24c4f3
Show file tree
Hide file tree
Showing 19 changed files with 1,314 additions and 364 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@
/pkg/kv/kvserver/kvserverbase/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvserverpb/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvstorage/ @cockroachdb/repl-prs
/pkg/kv/kvserver/leases/ @cockroachdb/kv-prs
/pkg/kv/kvserver/liveness/ @cockroachdb/kv-prs
/pkg/kv/kvserver/load/ @cockroachdb/kv-prs
/pkg/kv/kvserver/lockspanset/ @cockroachdb/kv-prs
Expand Down
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
"//pkg/kv/kvserver/leases:leases_test",
"//pkg/kv/kvserver/liveness:liveness_test",
"//pkg/kv/kvserver/lockspanset:lockspanset_test",
"//pkg/kv/kvserver/logstore:logstore_test",
Expand Down Expand Up @@ -1439,6 +1440,8 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
"//pkg/kv/kvserver/kvstorage:kvstorage",
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
"//pkg/kv/kvserver/leases:leases",
"//pkg/kv/kvserver/leases:leases_test",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb",
"//pkg/kv/kvserver/liveness:liveness",
"//pkg/kv/kvserver/liveness:liveness_test",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,9 @@ func TestOracle(t *testing.T) {
{NodeID: 3, Address: util.MakeUnresolvedAddr("tcp", "3"), Locality: region("c")},
}
replicas := []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1},
{NodeID: 2, StoreID: 2},
{NodeID: 3, StoreID: 3},
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}
desc := &roachpb.RangeDescriptor{
InternalReplicas: replicas,
Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1649,11 +1649,7 @@ message RequestLeaseRequest {
// The previous lease is specified by the caller to verify
// it has not changed when executing this command.
Lease prev_lease = 3 [(gogoproto.nullable) = false];
// The MinLeaseProposedTS of the proposing replica to make sure that leases
// issued after a node restart receive a new sequence number (instead of
// counting as a lease extension). See #23204.
util.hlc.Timestamp min_proposed_ts = 4 [(gogoproto.customname) = "MinProposedTS",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"];
reserved 4;
}

// A TransferLeaseRequest represents the arguments to the TransferLease()
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ go_library(
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/kvstorage",
"//pkg/kv/kvserver/leases",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/load",
Expand Down
16 changes: 10 additions & 6 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2109,18 +2109,20 @@ func TestAllocatorTransferLeaseTargetIOOverloadCheck(t *testing.T) {
defer stopper.Stop(ctx)
n := len(tc.leaseCounts)
stores := make([]*roachpb.StoreDescriptor, n)
existing := make([]roachpb.ReplicaDescriptor, 0, n)
storeIDs := make([]roachpb.StoreID, n)
for i := range tc.leaseCounts {
existing = append(existing, replicas(roachpb.StoreID(i+1))...)
storeID := roachpb.StoreID(i + 1)
stores[i] = &roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(i + 1),
StoreID: storeID,
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i + 1)},
Capacity: roachpb.StoreCapacity{
LeaseCount: int32(tc.leaseCounts[i]),
IOThresholdMax: TestingIOThresholdWithScore(tc.IOScores[i]),
},
}
storeIDs[i] = storeID
}
existing := replicas(storeIDs...)

sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)
Expand Down Expand Up @@ -2940,18 +2942,20 @@ func TestAllocatorShouldTransferLeaseIOOverload(t *testing.T) {
defer stopper.Stop(ctx)
n := len(tc.leaseCounts)
stores := make([]*roachpb.StoreDescriptor, n)
existing := make([]roachpb.ReplicaDescriptor, 0, n)
storeIDs := make([]roachpb.StoreID, n)
for i := range tc.leaseCounts {
existing = append(existing, replicas(roachpb.StoreID(i+1))...)
storeID := roachpb.StoreID(i + 1)
stores[i] = &roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(i + 1),
StoreID: storeID,
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i + 1)},
Capacity: roachpb.StoreCapacity{
LeaseCount: int32(tc.leaseCounts[i]),
IOThresholdMax: TestingIOThresholdWithScore(tc.IOScores[i]),
},
}
storeIDs[i] = storeID
}
existing := replicas(storeIDs...)

sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/build",
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/abortspan",
Expand Down
69 changes: 2 additions & 67 deletions pkg/kv/kvserver/batcheval/cmd_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ package batcheval

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary"
Expand Down Expand Up @@ -56,69 +53,14 @@ func evalNewLease(
ms *enginepb.MVCCStats,
lease roachpb.Lease,
prevLease roachpb.Lease,
isExtension bool,
isTransfer bool,
) (result.Result, error) {
// When returning an error from this method, must always return
// a newFailedLeaseTrigger() to satisfy stats.

// Ensure either an Epoch is set or Start < Expiration.
if (lease.Type() == roachpb.LeaseExpiration && lease.GetExpiration().LessEq(lease.Start.ToTimestamp())) ||
(lease.Type() == roachpb.LeaseEpoch && lease.Expiration != nil) {
// This amounts to a bug.
return newFailedLeaseTrigger(isTransfer),
&kvpb.LeaseRejectedError{
Existing: prevLease,
Requested: lease,
Message: fmt.Sprintf("illegal lease: epoch=%d, interval=[%s, %s)",
lease.Epoch, lease.Start, lease.Expiration),
}
}

// Verify that requesting replica is part of the current replica set.
desc := rec.Desc()
if _, ok := desc.GetReplicaDescriptor(lease.Replica.StoreID); !ok {
return newFailedLeaseTrigger(isTransfer),
&kvpb.LeaseRejectedError{
Existing: prevLease,
Requested: lease,
Message: "replica not found",
}
}

// Requests should not set the sequence number themselves. Set the sequence
// number here based on whether the lease is equivalent to the one it's
// succeeding.
if lease.Sequence != 0 {
return newFailedLeaseTrigger(isTransfer),
&kvpb.LeaseRejectedError{
Existing: prevLease,
Requested: lease,
Message: "sequence number should not be set",
}
}
isV24_1 := rec.ClusterSettings().Version.IsActive(ctx, clusterversion.V24_1Start)
// Construct the prior read summary if the lease sequence is changing.
var priorReadSum *rspb.ReadSummary
if prevLease.Equivalent(lease, isV24_1 /* expToEpochEquiv */) {
// If the proposed lease is equivalent to the previous lease, it is
// given the same sequence number. This is subtle, but is important
// to ensure that leases which are meant to be considered the same
// lease for the purpose of matching leases during command execution
// (see Lease.Equivalent) will be considered so. For example, an
// extension to an expiration-based lease will result in a new lease
// with the same sequence number.
lease.Sequence = prevLease.Sequence
} else {
// We set the new lease sequence to one more than the previous lease
// sequence. This is safe and will never result in repeated lease
// sequences because the sequence check beneath Raft acts as an atomic
// compare-and-swap of sorts. If two lease requests are proposed in
// parallel, both with the same previous lease, only one will be
// accepted and the other will get a LeaseRejectedError and need to
// retry with a different sequence number. This is actually exactly what
// the sequence number is used to enforce!
lease.Sequence = prevLease.Sequence + 1

if prevLease.Sequence != lease.Sequence {
// If the new lease is not equivalent to the old lease, construct a read
// summary to instruct the new leaseholder on how to update its timestamp
// cache to respect prior reads served on the range.
Expand All @@ -144,13 +86,6 @@ func evalNewLease(
}
}

// Record information about the type of event that resulted in this new lease.
if isTransfer {
lease.AcquisitionType = roachpb.LeaseAcquisitionType_Transfer
} else {
lease.AcquisitionType = roachpb.LeaseAcquisitionType_Request
}

// Store the lease to disk & in-memory.
if err := MakeStateLoader(rec).SetLeaseBlind(ctx, readWriter, ms, lease, prevLease); err != nil {
return newFailedLeaseTrigger(isTransfer), err
Expand Down
82 changes: 6 additions & 76 deletions pkg/kv/kvserver/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,91 +59,21 @@ func RequestLease(
prevLease := args.PrevLease
newLease := args.Lease

rErr := &kvpb.LeaseRejectedError{
Existing: prevLease,
Requested: newLease,
}

// MIGRATION(tschottdorf): needed to apply Raft commands which got proposed
// before the StartStasis field was introduced.
// TODO(nvanbenschoten): remove in #124057 when clusterversion.MinSupported
// is v24.1 or greater.
if newLease.DeprecatedStartStasis == nil {
newLease.DeprecatedStartStasis = newLease.Expiration
}

isExtension := prevLease.Replica.StoreID == newLease.Replica.StoreID
effectiveStart := newLease.Start

// If this check is removed at some point, the filtering of learners on the
// sending side would have to be removed as well.
wasLastLeaseholder := isExtension
wasLastLeaseholder := prevLease.Replica.StoreID == newLease.Replica.StoreID
if err := roachpb.CheckCanReceiveLease(
newLease.Replica, cArgs.EvalCtx.Desc().Replicas(), wasLastLeaseholder,
); err != nil {
rErr.Message = err.Error()
return newFailedLeaseTrigger(false /* isTransfer */), rErr
}

// Wind the start timestamp back as far towards the previous lease as we
// can. That'll make sure that when multiple leases are requested out of
// order at the same replica (after all, they use the request timestamp,
// which isn't straight out of our local clock), they all succeed unless
// they have a "real" issue with a previous lease. Example: Assuming no
// previous lease, one request for [5, 15) followed by one for [0, 15)
// would fail without this optimization. With it, the first request
// effectively gets the lease for [0, 15), which the second one can commit
// again (even extending your own lease is possible; see below).
//
// If this is our lease (or no prior lease exists), we effectively absorb
// the old lease. This allows multiple requests from the same replica to
// merge without ticking away from the minimal common start timestamp. It
// also has the positive side-effect of fixing #3561, which was caused by
// the absence of replay protection.
if prevLease.Replica.StoreID == 0 || isExtension {
effectiveStart.Backward(prevLease.Start)
// If the lease holder promised to not propose any commands below
// MinProposedTS, it must also not be allowed to extend a lease before that
// timestamp. We make sure that when a node restarts, its earlier in-flight
// commands (which are not tracked by the spanlatch manager post restart)
// receive an error under the new lease by making sure the sequence number
// of that lease is higher. This in turn is achieved by forwarding its start
// time here, which makes it not Equivalent() to the preceding lease for the
// same store.
//
// Note also that leasePostApply makes sure to update the timestamp cache in
// this case: even though the lease holder does not change, the sequence
// number does and this triggers a low water mark bump.
//
// The bug prevented with this is unlikely to occur in practice
// since earlier commands usually apply before this lease will.
if ts := args.MinProposedTS; isExtension && ts != nil {
effectiveStart.Forward(*ts)
}

} else if prevLease.Type() == roachpb.LeaseExpiration {
effectiveStart.BackwardWithTimestamp(prevLease.Expiration.Next())
}

if isExtension {
if effectiveStart.Less(prevLease.Start) {
rErr.Message = "extension moved start timestamp backwards"
return newFailedLeaseTrigger(false /* isTransfer */), rErr
}
if newLease.Type() == roachpb.LeaseExpiration {
// NB: Avoid mutating pointers in the argument which might be shared with
// the caller.
t := *newLease.Expiration
newLease.Expiration = &t
newLease.Expiration.Forward(prevLease.GetExpiration())
rErr := &kvpb.LeaseRejectedError{
Existing: prevLease,
Requested: newLease,
Message: err.Error(),
}
} else if prevLease.Type() == roachpb.LeaseExpiration && effectiveStart.ToTimestamp().Less(prevLease.GetExpiration()) {
rErr.Message = "requested lease overlaps previous lease"
return newFailedLeaseTrigger(false /* isTransfer */), rErr
}
newLease.Start = effectiveStart

log.VEventf(ctx, 2, "lease request: prev lease: %+v, new lease: %+v", prevLease, newLease)
return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats,
newLease, prevLease, isExtension, false /* isTransfer */)
newLease, prevLease, false /* isTransfer */)
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) {
Replica: replicas[1],
ProposedTS: now,
Start: now,
Sequence: prevLease.Sequence + 1,
}
if epoch {
nextLease.Epoch = 1
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,5 @@ func TransferLease(

log.VEventf(ctx, 2, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, newLease)
return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats,
newLease, prevLease, false /* isExtension */, true /* isTransfer */)
newLease, prevLease, true /* isTransfer */)
}
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2057,8 +2057,8 @@ func TestLeaseMetricsOnSplitAndTransfer(t *testing.T) {
if val := injectLeaseTransferError.Load(); val != nil && val.(bool) {
// Note that we can't just return an error here as we only
// end up counting failures in the metrics if the command
// makes it through to being executed. So use a fake store ID.
args.Lease.Replica.StoreID = roachpb.StoreID(1000)
// makes it through to being executed. So use a fake replica ID.
args.Lease.Replica.ReplicaID = 1000
}
}
return nil
Expand Down Expand Up @@ -2337,6 +2337,7 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) {
t.Fatal(err)
}
leaseReq.PrevLease = leaseInfo.CurrentOrProspective()
leaseReq.Lease.Sequence = leaseReq.PrevLease.Sequence + 1

_, pErr := kv.SendWrapped(ctx, s.DB().NonTransactionalSender(), &leaseReq)
if _, ok := pErr.GetDetail().(*kvpb.AmbiguousResultError); ok {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/kvserverpb/lease_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ func (st LeaseStatus) IsValid() bool {
return st.State == LeaseState_VALID
}

// IsExpired returns whether the lease was expired at the time that the
// lease status was computed.
func (st LeaseStatus) IsExpired() bool {
return st.State == LeaseState_EXPIRED
}

// OwnedBy returns whether the lease is owned by the given store.
func (st LeaseStatus) OwnedBy(storeID roachpb.StoreID) bool {
return st.Lease.OwnedBy(storeID)
Expand Down
30 changes: 30 additions & 0 deletions pkg/kv/kvserver/leases/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "leases",
srcs = ["build.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/leases",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "leases_test",
srcs = ["build_test.go"],
embed = [":leases"],
deps = [
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/util/hlc",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit a24c4f3

Please sign in to comment.