Skip to content

Commit

Permalink
kvserver: hold r.mu across leasePostApplyLocked
Browse files Browse the repository at this point in the history
This is step one towards a solution to #58378. We want to update the
in-memory range state in the same critical section as the store
addressing update, and this includes calling leasePostApplyLocked.

As an interim step, this commit moves the call into a large `r.mu`
critical section that we will later combine with the `s.mu` critical
section during `applySnapshot`.

Release note: None
  • Loading branch information
tbg committed Feb 3, 2021
1 parent 4adc7d0 commit df826cd
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 39 deletions.
17 changes: 7 additions & 10 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,13 @@ func (ec *endCmds) done(
// except for read-only requests that are older than `freezeStart`, until the
// merge completes.
func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timestamp) error {
desc := r.Desc()
r.mu.Lock()
defer r.mu.Unlock()
return r.maybeWatchForMergeLocked(ctx, freezeStart)
}

func (r *Replica) maybeWatchForMergeLocked(ctx context.Context, freezeStart hlc.Timestamp) error {
desc := r.descRLocked()
descKey := keys.RangeDescriptorKey(desc.StartKey)
_, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, r.Clock().Now(),
storage.MVCCGetOptions{Inconsistent: true})
Expand All @@ -1430,12 +1436,10 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timest
// whether the merge succeeded or not.

mergeCompleteCh := make(chan struct{})
r.mu.Lock()
if r.mu.mergeComplete != nil {
// Another request already noticed the merge, installed a mergeComplete
// channel, and launched a goroutine to watch for the merge's completion.
// Nothing more to do.
r.mu.Unlock()
return nil
}
// Note that if the merge txn retries for any reason (for example, if the
Expand All @@ -1451,7 +1455,6 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timest
// range in case it managed to quiesce between when the Subsume request
// arrived and now, which is rare but entirely legal.
r.unquiesceLocked()
r.mu.Unlock()

taskCtx := r.AnnotateCtx(context.Background())
err = r.store.stopper.RunAsyncTask(taskCtx, "wait-for-merge", func(ctx context.Context) {
Expand Down Expand Up @@ -1578,12 +1581,6 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context, freezeStart hlc.Timest
return err
}

func (r *Replica) maybeTransferRaftLeadershipToLeaseholder(ctx context.Context) {
r.mu.Lock()
r.maybeTransferRaftLeadershipToLeaseholderLocked(ctx)
r.mu.Unlock()
}

// maybeTransferRaftLeadershipToLeaseholderLocked attempts to transfer the
// leadership away from this node to the leaseholder, if this node is the
// current raft leader but not the leaseholder. We don't attempt to transfer
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ func (r *Replica) handleDescResult(ctx context.Context, desc *roachpb.RangeDescr
}

func (r *Replica) handleLeaseResult(ctx context.Context, lease *roachpb.Lease) {
r.leasePostApply(ctx, *lease, false /* permitJump */)
r.mu.Lock()
defer r.mu.Unlock()
r.leasePostApplyLocked(ctx, *lease, false /* permitJump */)
}

func (r *Replica) handleTruncatedStateResult(
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const configGossipTTL = 0 // does not expire
func (r *Replica) gossipFirstRange(ctx context.Context) {
r.mu.Lock()
defer r.mu.Unlock()
r.gossipFirstRangeLocked(ctx)
}

func (r *Replica) gossipFirstRangeLocked(ctx context.Context) {
// Gossip is not provided for the bootstrap store and for some tests.
if r.store.Gossip() == nil {
return
Expand Down
54 changes: 29 additions & 25 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,24 +293,23 @@ A file preventing this node from restarting was placed at:
}
}

// leasePostApply updates the Replica's internal state to reflect the
// leasePostApplyLocked updates the Replica's internal state to reflect the
// application of a new Range lease. The method is idempotent, so it can be
// called repeatedly for the same lease safely. However, the method will panic
// if passed a lease with a lower sequence number than the current lease. By
// default, the method will also panic if passed a lease that indicates a
// forward sequence number jump (i.e. a skipped lease). This behavior can
// be disabled by passing permitJump as true.
func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, permitJump bool) {
r.mu.RLock()
replicaID := r.mu.replicaID
func (r *Replica) leasePostApplyLocked(
ctx context.Context, newLease roachpb.Lease, permitJump bool,
) {
// Pull out the last lease known to this Replica. It's possible that this is
// not actually the last lease in the Range's lease sequence because the
// Replica may have missed the application of a lease between prevLease and
// newLease. However, this should only be possible if a snapshot includes a
// lease update. All other forms of lease updates should be continuous
// without jumps (see permitJump).
prevLease := *r.mu.state.Lease
r.mu.RUnlock()

// Sanity check to make sure that the lease sequence is moving in the right
// direction.
Expand All @@ -337,7 +336,7 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
}
}

iAmTheLeaseHolder := newLease.Replica.ReplicaID == replicaID
iAmTheLeaseHolder := newLease.Replica.ReplicaID == r.mu.replicaID
// NB: in the case in which a node restarts, minLeaseProposedTS forces it to
// get a new lease and we make sure it gets a new sequence number, thus
// causing the right half of the disjunction to fire so that we update the
Expand All @@ -363,7 +362,7 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
// TODO(aayush): In the future, if we permit co-operative lease transfers
// when a range is subsumed, it should be relatively straightforward to
// allow historical reads on the subsumed RHS after such lease transfers.
if err := r.maybeWatchForMerge(ctx, hlc.Timestamp{} /* freezeStart */); err != nil {
if err := r.maybeWatchForMergeLocked(ctx, hlc.Timestamp{} /* freezeStart */); err != nil {
// We were unable to determine whether a merge was in progress. We cannot
// safely proceed.
log.Fatalf(ctx, "failed checking for in-progress merge while installing new lease %s: %s",
Expand All @@ -381,7 +380,7 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
// requests, this is kosher). This means that we don't use the old
// lease's expiration but instead use the new lease's start to initialize
// the timestamp cache low water.
setTimestampCacheLowWaterMark(r.store.tsCache, r.Desc(), newLease.Start.ToTimestamp())
setTimestampCacheLowWaterMark(r.store.tsCache, r.descRLocked(), newLease.Start.ToTimestamp())

// Reset the request counts used to make lease placement decisions whenever
// starting a new lease.
Expand All @@ -391,30 +390,32 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
}

// Inform the concurrency manager that the lease holder has been updated.
// We do this before installing the new lease in `r.mu.state` as we have
// an invariant that any replica with a lease has the concurrency manager
// enabled. (In practice, since both happen under `r.mu`, it is likely
// to not matter).
r.concMgr.OnRangeLeaseUpdated(newLease.Sequence, iAmTheLeaseHolder)

// Ordering is critical here. We only install the new lease after we've
// checked for an in-progress merge and updated the timestamp cache. If the
// ordering were reversed, it would be possible for requests to see the new
// lease but not the updated merge or timestamp cache state, which can result
// in serializability violations.
r.mu.Lock()
r.mu.state.Lease = &newLease
expirationBasedLease := r.requiresExpiringLeaseRLocked()
r.mu.Unlock()

// Gossip the first range whenever its lease is acquired. We check to make
// sure the lease is active so that a trailing replica won't process an old
// lease request and attempt to gossip the first range.
now := r.store.Clock().NowAsClockTimestamp()
if leaseChangingHands && iAmTheLeaseHolder && r.IsFirstRange() && r.OwnsValidLease(ctx, now) {
r.gossipFirstRange(ctx)
if leaseChangingHands && iAmTheLeaseHolder && r.IsFirstRange() && r.ownsValidLeaseRLocked(ctx, now) {
r.gossipFirstRangeLocked(ctx)
}

// Whenever we first acquire an expiration-based lease, notify the lease
// renewer worker that we want it to keep proactively renewing the lease
// before it expires.
if leaseChangingHands && iAmTheLeaseHolder && expirationBasedLease && r.OwnsValidLease(ctx, now) {
if leaseChangingHands && iAmTheLeaseHolder && expirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) {
r.store.renewableLeases.Store(int64(r.RangeID), unsafe.Pointer(r))
select {
case r.store.renewableLeasesSignal <- struct{}{}:
Expand All @@ -425,7 +426,7 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
// If we're the current raft leader, may want to transfer the leadership to
// the new leaseholder. Note that this condition is also checked periodically
// when ticking the replica.
r.maybeTransferRaftLeadershipToLeaseholder(ctx)
r.maybeTransferRaftLeadershipToLeaseholderLocked(ctx)

// Notify the store that a lease change occurred and it may need to
// gossip the updated store descriptor (with updated capacity).
Expand All @@ -450,18 +451,21 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
// will be gossiped rarely because it falls on a range with an epoch-based
// range lease that is only reacquired extremely infrequently.
if iAmTheLeaseHolder {
if err := r.MaybeGossipSystemConfig(ctx); err != nil {
log.Errorf(ctx, "%v", err)
}
if err := r.MaybeGossipNodeLiveness(ctx, keys.NodeLivenessSpan); err != nil {
log.Errorf(ctx, "%v", err)
}

// Emit an MLAI on the leaseholder replica, as follower will be looking
// for one and if we went on to quiesce, they wouldn't necessarily get
// one otherwise (unless they ask for it, which adds latency).
r.EmitMLAI()
// NB: run these in an async task to keep them out of the critical section
// (r.mu is held here).
_ = r.store.stopper.RunAsyncTask(ctx, "lease-triggers", func(ctx context.Context) {
if err := r.MaybeGossipSystemConfig(ctx); err != nil {
log.Errorf(ctx, "%v", err)
}
if err := r.MaybeGossipNodeLiveness(ctx, keys.NodeLivenessSpan); err != nil {
log.Errorf(ctx, "%v", err)
}

// Emit an MLAI on the leaseholder replica, as follower will be looking
// for one and if we went on to quiesce, they wouldn't necessarily get
// one otherwise (unless they ask for it, which adds latency).
r.EmitMLAI()
})
if leaseChangingHands && log.V(1) {
// This logging is useful to troubleshoot incomplete drains.
log.Info(ctx, "is now leaseholder")
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,7 @@ func (r *Replica) applySnapshot(

// Atomically swap the placeholder, if any, for the replica, and update the
// replica's descriptor.

r.store.mu.Lock()
if r.store.removePlaceholderLocked(ctx, r.RangeID) {
atomic.AddInt32(&r.store.counts.filledPlaceholders, 1)
Expand All @@ -975,16 +976,16 @@ func (r *Replica) applySnapshot(
}
r.store.mu.Unlock()

r.mu.Lock()
// Invoke the leasePostApply method to ensure we properly initialize the
// replica according to whether it holds the lease. We allow jumps in the
// lease sequence because there may be multiple lease changes accounted for
// in the snapshot.
r.leasePostApply(ctx, *s.Lease, true /* permitJump */)
r.leasePostApplyLocked(ctx, *s.Lease, true /* permitJump */)

// Inform the concurrency manager that this replica just applied a snapshot.
r.concMgr.OnReplicaSnapshotApplied()

r.mu.Lock()
// We set the persisted last index to the last applied index. This is
// not a correctness issue, but means that we may have just transferred
// some entries we're about to re-request from the leader and overwrite.
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ func prepareRightReplicaForSplit(
// Invoke the leasePostApply method to ensure we properly initialize
// the replica according to whether it holds the lease. This enables
// the txnWaitQueue.
rightRepl.leasePostApply(ctx, rightLease, false /* permitJump */)
rightRepl.mu.Lock()
defer rightRepl.mu.Unlock()
rightRepl.leasePostApplyLocked(ctx, rightLease, false /* permitJump */)
return rightRepl
}

Expand Down

0 comments on commit df826cd

Please sign in to comment.