Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
101050: kvserver: move eager expiration lease extension to Raft scheduler r=erikgrinaker a=erikgrinaker

**kvserver: never quiesce expiration leases**

Previously, this was contingent on `kv.expiration_leases_only.enabled`. This patch therefore, in practice, disables quiescence for the meta/liveness ranges and for any ranges that are transferring an expiration lease before upgrading it to an epoch lease. Quiescing these will tend to result in more work rather than less, because we'll have to wake the ranges up again shortly which incurs an additional Raft append.

Epic: none
Release note: None
  
**kvserver: move eager expiration lease extension to Raft scheduler**

This patch makes the Raft scheduler responsible for eagerly extending all expiration-based leases, replacing the store lease renewer. Previously, this was contingent on `kv.expiration_leases_only.enabled`, and by default only the meta/liveness ranges were eagerly extended by the store lease renewer.

Touches cockroachdb#98433.

Epic: none
Release note: None
  
**kvserver: don't extend leases during request processing**

This patch removes expiration lease extension during request processing, since the Raft scheduler now always eagerly extends expiration leases. Previously, this was contingent on `kv.expiration_leases_only.enabled`.

Epic: none
Release note: None

Co-authored-by: Erik Grinaker <grinaker@cockroachlabs.com>
  • Loading branch information
craig[bot] and erikgrinaker committed Apr 27, 2023
2 parents 5608a14 + 6536527 commit 5b44c4f
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 385 deletions.
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,11 +1458,10 @@ func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T)
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
// Outlandishly high to disable proactive renewal of
// expiration based leases. Lease upgrades happen
// immediately after applying without needing active
// Disable proactive renewal of expiration based leases. Lease
// upgrades happen immediately after applying without needing active
// renewal.
LeaseRenewalDurationOverride: 100 * time.Hour,
DisableAutomaticLeaseRenewal: true,
LeaseUpgradeInterceptor: func(lease *roachpb.Lease) {
mu.Lock()
defer mu.Unlock()
Expand Down
41 changes: 9 additions & 32 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1614,13 +1614,8 @@ func (r *Replica) checkExecutionCanProceedBeforeStorageSnapshot(
return kvserverpb.LeaseStatus{}, err
}

var shouldExtend bool
postRUnlock := func() {}
r.mu.RLock()
defer func() {
r.mu.RUnlock()
postRUnlock()
}()
defer r.mu.RUnlock()

// Has the replica been initialized?
// NB: this should have already been checked in Store.Send, so we don't need
Expand All @@ -1645,7 +1640,7 @@ func (r *Replica) checkExecutionCanProceedBeforeStorageSnapshot(
return kvserverpb.LeaseStatus{}, err
}

st, shouldExtend, err := r.checkLeaseRLocked(ctx, ba)
st, err := r.checkLeaseRLocked(ctx, ba)
if err != nil {
return kvserverpb.LeaseStatus{}, err
}
Expand All @@ -1669,19 +1664,6 @@ func (r *Replica) checkExecutionCanProceedBeforeStorageSnapshot(
}
}

if shouldExtend && !ExpirationLeasesOnly.Get(&r.ClusterSettings().SV) {
// If we're asked to extend the lease, trigger (async) lease renewal. We
// don't do this if kv.expiration_leases_only.enabled is true, since we in
// that case eagerly extend expiration leases during Raft ticks instead.
//
// TODO(erikgrinaker): Remove this when we always eagerly extend
// expiration leases.
//
// Kicking this off requires an exclusive lock, and we hold a read-only lock
// already, so we jump through a hoop to run it in a suitably positioned
// defer.
postRUnlock = func() { r.maybeExtendLeaseAsync(ctx, st) }
}
return st, nil
}

Expand Down Expand Up @@ -1745,12 +1727,10 @@ func (r *Replica) checkExecutionCanProceedRWOrAdmin(
// checkLeaseRLocked checks the provided batch against the GC
// threshold and lease. A nil error indicates to go ahead with the batch, and
// is accompanied either by a valid or zero lease status, the latter case
// indicating that the request was permitted to bypass the lease check. The
// returned bool indicates whether the lease should be extended (only on nil
// error).
// indicating that the request was permitted to bypass the lease check.
func (r *Replica) checkLeaseRLocked(
ctx context.Context, ba *kvpb.BatchRequest,
) (kvserverpb.LeaseStatus, bool, error) {
) (kvserverpb.LeaseStatus, error) {
now := r.Clock().NowAsClockTimestamp()
// If the request is a write or a consistent read, it requires the
// replica serving it to hold the range lease. We pass the write
Expand All @@ -1762,7 +1742,6 @@ func (r *Replica) checkLeaseRLocked(
reqTS := ba.WriteTimestamp()
st := r.leaseStatusForRequestRLocked(ctx, now, reqTS)

var shouldExtend bool
// Write commands that skip the lease check in practice are exactly
// RequestLease and TransferLease. Both use the provided previous lease for
// verification below raft. We return a zero lease status from this method and
Expand All @@ -1773,26 +1752,24 @@ func (r *Replica) checkLeaseRLocked(
// doesn't check the lease.
if !ba.IsSingleSkipsLeaseCheckRequest() && ba.ReadConsistency != kvpb.INCONSISTENT {
// Check the lease.
var err error
shouldExtend, err = r.leaseGoodToGoForStatusRLocked(ctx, now, reqTS, st)
err := r.leaseGoodToGoForStatusRLocked(ctx, now, reqTS, st)
if err != nil {
// No valid lease, but if we can serve this request via follower reads,
// we may continue.
if !r.canServeFollowerReadRLocked(ctx, ba) {
// If not, return the error.
return kvserverpb.LeaseStatus{}, false, err
return kvserverpb.LeaseStatus{}, err
}
// Otherwise, suppress the error. Also, remember that we're not serving
// this under the lease by zeroing out the status. We also intentionally
// do not pass the original status to checkTSAboveGCThreshold as
// this method assumes that a valid status indicates that this replica
// holds the lease (see #73123). `shouldExtend` is already false in this
// branch, but for completeness we zero it out as well.
st, shouldExtend, err = kvserverpb.LeaseStatus{}, false, nil
// holds the lease (see #73123).
st, err = kvserverpb.LeaseStatus{}, nil
}
}

return st, shouldExtend, nil
return st, nil
}

// checkExecutionCanProceedForRangeFeed returns an error if a rangefeed request
Expand Down
161 changes: 4 additions & 157 deletions pkg/kv/kvserver/replica_lease_renewal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package kvserver

import (
"context"
"sync/atomic"
"testing"
"time"

Expand All @@ -25,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand All @@ -41,9 +39,9 @@ func TestLeaseRenewer(t *testing.T) {
skip.UnderStressRace(t)
skip.UnderDeadlock(t)

// When kv.expiration_leases_only.enabled is true, the Raft scheduler is
// responsible for extensions, but we still track expiration leases for system
// ranges in the store lease renewer in case the setting changes.
// We test with kv.expiration_leases_only.enabled both enabled and disabled,
// to ensure meta/liveness ranges are still eagerly extended, while regular
// ranges are upgraded to epoch leases.
testutils.RunTrueAndFalse(t, "kv.expiration_leases_only.enabled", func(t *testing.T, expOnly bool) {
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
Expand All @@ -59,11 +57,6 @@ func TestLeaseRenewer(t *testing.T) {
RaftElectionTimeoutTicks: 20,
RaftReproposalTimeoutTicks: 30,
},
Knobs: base.TestingKnobs{
Store: &StoreTestingKnobs{
LeaseRenewalDurationOverride: 100 * time.Millisecond,
},
},
},
})
defer tc.Stopper().Stop(ctx)
Expand Down Expand Up @@ -93,16 +86,6 @@ func TestLeaseRenewer(t *testing.T) {
return repl
}

getLeaseRenewers := func(rangeID roachpb.RangeID) []roachpb.NodeID {
var renewers []roachpb.NodeID
for _, nodeID := range tc.NodeIDs() {
if _, ok := getNodeStore(nodeID).renewableLeases.Load(int64(rangeID)); ok {
renewers = append(renewers, nodeID)
}
}
return renewers
}

// assertLeaseExtension asserts that the given range has an expiration-based
// lease that is eagerly extended.
assertLeaseExtension := func(rangeID roachpb.RangeID) {
Expand Down Expand Up @@ -133,56 +116,13 @@ func TestLeaseRenewer(t *testing.T) {
}, 20*time.Second, 100*time.Millisecond)
}

// assertStoreLeaseRenewer asserts that the range is tracked by the store lease
// renewer on the leaseholder.
assertStoreLeaseRenewer := func(rangeID roachpb.RangeID) {
repl := getNodeReplica(1, rangeID)
require.Eventually(t, func() bool {
lease, _ := repl.GetLease()
renewers := getLeaseRenewers(rangeID)
renewedByLeaseholder := len(renewers) == 1 && renewers[0] == lease.Replica.NodeID
// If kv.expiration_leases_only.enabled is true, then the store lease
// renewer is disabled -- it will still track the ranges in case the
// setting changes, but won't detect the new lease and untrack them as
// long as it has a replica. We therefore allow multiple nodes to track
// it, as long as the leaseholder is one of them.
if expOnly {
for _, renewer := range renewers {
if renewer == lease.Replica.NodeID {
renewedByLeaseholder = true
break
}
}
}
if !renewedByLeaseholder {
t.Logf("r%d renewers: %v", rangeID, renewers)
}
return renewedByLeaseholder
}, 20*time.Second, 100*time.Millisecond)
}

// assertNoStoreLeaseRenewer asserts that the range is not tracked by any
// lease renewer.
assertNoStoreLeaseRenewer := func(rangeID roachpb.RangeID) {
require.Eventually(t, func() bool {
renewers := getLeaseRenewers(rangeID)
if len(renewers) > 0 {
t.Logf("r%d renewers: %v", rangeID, renewers)
return false
}
return true
}, 20*time.Second, 100*time.Millisecond)
}

// The meta range should always be eagerly renewed.
firstRangeID := tc.LookupRangeOrFatal(t, keys.MinKey).RangeID
assertLeaseExtension(firstRangeID)
assertStoreLeaseRenewer(firstRangeID)

// Split off an expiration-based range, and assert that the lease is extended.
desc := tc.LookupRangeOrFatal(t, tc.ScratchRangeWithExpirationLease(t))
assertLeaseExtension(desc.RangeID)
assertStoreLeaseRenewer(desc.RangeID)

// Transfer the lease to a different leaseholder, and assert that the lease is
// still extended. Wait for the split to apply on all nodes first.
Expand All @@ -191,107 +131,14 @@ func TestLeaseRenewer(t *testing.T) {
target := tc.Target(lookupNode(lease.Replica.NodeID%3 + 1))
tc.TransferRangeLeaseOrFatal(t, desc, target)
assertLeaseExtension(desc.RangeID)
assertStoreLeaseRenewer(desc.RangeID)

// Merge the range back. This should unregister it from the lease renewer.
require.NoError(t, tc.Server(0).DB().AdminMerge(ctx, desc.StartKey.AsRawKey().Prevish(16)))
assertNoStoreLeaseRenewer(desc.RangeID)

// Split off a regular non-system range. This should only be eagerly
// extended if kv.expiration_leases_only.enabled is true, and should never
// be tracked by the store lease renewer (which only handles system ranges).
// extended if kv.expiration_leases_only.enabled is true.
desc = tc.LookupRangeOrFatal(t, tc.ScratchRange(t))
if expOnly {
assertLeaseExtension(desc.RangeID)
} else {
assertLeaseUpgrade(desc.RangeID)
}
assertNoStoreLeaseRenewer(desc.RangeID)
})
}

func setupLeaseRenewerTest(
ctx context.Context, t *testing.T, init func(*base.TestClusterArgs),
) (
cycles *int32, /* atomic */
_ serverutils.TestClusterInterface,
) {
st := cluster.MakeTestingClusterSettings()
ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism

cycles = new(int32)
var args base.TestClusterArgs
args.ServerArgs.Settings = st
args.ServerArgs.Knobs.Store = &StoreTestingKnobs{
LeaseRenewalOnPostCycle: func() {
atomic.AddInt32(cycles, 1)
},
}
init(&args)
tc := serverutils.StartNewTestCluster(t, 1, args)
t.Cleanup(func() { tc.Stopper().Stop(ctx) })

desc := tc.LookupRangeOrFatal(t, tc.ScratchRangeWithExpirationLease(t))
srv := tc.Server(0)
s, err := srv.GetStores().(*Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)

_, err = s.DB().Get(ctx, desc.StartKey)
require.NoError(t, err)

repl, err := s.GetReplica(desc.RangeID)
require.NoError(t, err)

// There's a lease since we just split off the range.
lease, _ := repl.GetLease()
require.Equal(t, s.NodeID(), lease.Replica.NodeID)

return cycles, tc
}

func TestLeaseRenewerExtendsExpirationBasedLeases(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

t.Run("triggered", func(t *testing.T) {
renewCh := make(chan struct{})
cycles, tc := setupLeaseRenewerTest(ctx, t, func(args *base.TestClusterArgs) {
args.ServerArgs.Knobs.Store.(*StoreTestingKnobs).LeaseRenewalSignalChan = renewCh
})
defer tc.Stopper().Stop(ctx)

trigger := func() {
// Need to signal the chan twice to make sure we see the effect
// of at least the first iteration.
for i := 0; i < 2; i++ {
select {
case renewCh <- struct{}{}:
case <-time.After(10 * time.Second):
t.Fatal("unable to send on renewal chan for 10s")
}
}
}

for i := 0; i < 5; i++ {
trigger()
n := atomic.LoadInt32(cycles)
require.NotZero(t, n, "during cycle #%v", i+1)
atomic.AddInt32(cycles, -n)
}
})

t.Run("periodic", func(t *testing.T) {
cycles, tc := setupLeaseRenewerTest(ctx, t, func(args *base.TestClusterArgs) {
args.ServerArgs.Knobs.Store.(*StoreTestingKnobs).LeaseRenewalDurationOverride = 10 * time.Millisecond
})
defer tc.Stopper().Stop(ctx)

testutils.SucceedsSoon(t, func() error {
if n := atomic.LoadInt32(cycles); n < 5 {
return errors.Errorf("saw only %d renewal cycles", n)
}
return nil
})
})
}
Loading

0 comments on commit 5b44c4f

Please sign in to comment.