Skip to content

Commit

Permalink
concurrency: split lock re-acquisition into its own function
Browse files Browse the repository at this point in the history
I think this should help once we introduce multiple locks on a single
key.

Epic: none

Release note: None
  • Loading branch information
arulajmani committed Aug 21, 2023
1 parent eda35b5 commit 20906a0
Showing 1 changed file with 135 additions and 125 deletions.
260 changes: 135 additions & 125 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2456,131 +2456,8 @@ func (l *lockState) acquireLock(acq *roachpb.LockAcquisition, clock *hlc.Clock)
defer l.mu.Unlock()
if l.isLocked() {
// Already held.
beforeTxn, beforeTs := l.getLockHolder()
if acq.Txn.ID != beforeTxn.ID {
return errors.AssertionFailedf("existing lock cannot be acquired by different transaction")
}
// An unreplicated lock is being re-acquired...
if acq.Durability == lock.Unreplicated && l.isHeldUnreplicated() {
switch {
case l.holder.txn.Epoch < acq.Txn.Epoch: // at a higher epoch
l.holder.unreplicatedInfo.epochBumped()
case l.holder.txn.Epoch == acq.Txn.Epoch: // at the same epoch
// Prune the list of sequence numbers tracked for this lock by removing
// any sequence numbers that are considered ignored by virtue of a
// savepoint rollback.
//
// Note that the in-memory lock table is the source of truth for just
// unreplicated locks, and as such, sequence numbers are only tracked
// for them.
l.holder.unreplicatedInfo.rollbackIgnoredSeqNumbers(acq.IgnoredSeqNums)
case l.holder.txn.Epoch > acq.Txn.Epoch: // at a prior epoch
// Reject the request; the logic here parallels how mvccPutInternal
// handles this case for intents.
return errors.Errorf(
"locking request with epoch %d came after lock(unreplicated) had already been acquired at epoch %d in txn %s",
acq.Txn.Epoch, l.holder.txn.Epoch, acq.Txn.ID,
)
default:
panic("unreachable")
}
}
if l.isIdempotentLockAcquisition(acq) {
return nil // nothing more to do here.
}
// Forward the lock's timestamp instead of assigning to it blindly.
// While lock acquisition uses monotonically increasing timestamps
// from the perspective of the transaction's coordinator, this does
// not guarantee that a lock will never be acquired at a higher
// epoch and/or sequence number but with a lower timestamp when in
// the presence of transaction pushes. Consider the following
// sequence of events:
//
// - txn A acquires lock at sequence 1, ts 10
// - txn B pushes txn A to ts 20
// - txn B updates lock to ts 20
// - txn A's coordinator does not immediately learn of the push
// - txn A re-acquires lock at sequence 2, ts 15
//
// A lock's timestamp at a given durability level is not allowed to
// regress, so by forwarding its timestamp during the second acquisition
// instead of assigning to it blindly, it remains at 20.
//
// However, a lock's timestamp as reported by getLockHolder can regress
// if it is acquired at a lower timestamp and a different durability
// than it was previously held with. This is necessary to support
// because the hard constraint which we must uphold here that the
// lockHolderInfo for a replicated lock cannot diverge from the
// replicated state machine in such a way that its timestamp in the
// lockTable exceeds that in the replicated keyspace. If this invariant
// were to be violated, we'd risk infinite lock-discovery loops for
// requests that conflict with the lock as is written in the replicated
// state machine but not as is reflected in the lockTable.
//
// Lock timestamp regressions are safe from the perspective of other
// transactions because the request which re-acquired the lock at the
// lower timestamp must have been holding a write latch at or below the
// new lock's timestamp. This means that no conflicting requests could
// be evaluating concurrently. Instead, all will need to re-scan the
// lockTable once they acquire latches and will notice the reduced
// timestamp at that point, which may cause them to conflict with the
// lock even if they had not conflicted before. In a sense, it is no
// different than the first time a lock is added to the lockTable.
switch acq.Durability {
case lock.Unreplicated:
l.holder.unreplicatedInfo.ts.Forward(acq.Txn.WriteTimestamp)
if err := l.holder.unreplicatedInfo.acquire(acq.Strength, acq.Txn.Sequence); err != nil {
return err
}
case lock.Replicated:
l.holder.replicatedInfo.ts.Forward(acq.Txn.WriteTimestamp)
default:
panic(fmt.Sprintf("unknown lock durability: %s", acq.Durability))
}

// Selectively update the txn meta.
switch {
case l.holder.txn.Epoch > acq.Txn.Epoch: // lock is being acquired at a prior epoch
// We do not update the txn meta here -- the epoch is not allowed to
// regress.
//
// NB: We can get here if the lock acquisition here corresponds to an
// operation from a prior epoch. We've already handled the case for
// unreplicated lock acquisition above, so this can only happen if the
// lock acquisition corresponds to a replicated lock.
//
// If mvccPutInternal is aware of the newer epoch, it'll simply reject
// this operation and we'll never get here. However, it's not guaranteed
// that mvccPutInternal knows about this newer epoch. The in-memory lock
// table may know about the newer epoch though, if there's been a
// different unreplicated lock acquisition on this key by the transaction.
// So if we were to blindly update the TxnMeta here, we'd be regressing
// the epoch, which messes with our sequence number tracking inside of
// unreplicatedLockInfo.
assert(acq.Durability == lock.Replicated, "the unreplicated case should have been handled above")
case l.holder.txn.Epoch == acq.Txn.Epoch: // lock is being acquired at the same epoch
l.holder.txn = &acq.Txn
case l.holder.txn.Epoch < acq.Txn.Epoch: // lock is being acquired at a newer epoch
l.holder.txn = &acq.Txn
// The txn meta tracked here corresponds to unreplicated locks. When we
// learn about a newer epoch during lock acquisition of a replicated lock,
// we clear out the unreplicatedLockInfo state being tracked from the
// older epoch.
//
// Note that we don't clear out replicatedLockInfo when an unreplicated
// lock is being acquired at a newer epoch. This is because replicated
// locks are held across epochs, and there is no per-epoch tracking in the
// lock table for them.
if acq.Durability == lock.Replicated {
l.holder.unreplicatedInfo.clear()
}
}

_, afterTs := l.getLockHolder()
if beforeTs.Less(afterTs) {
l.increasedLockTs(afterTs)
}
return nil
assert(l.isLockedBy(acq.Txn.ID), "multiple lock holders on a single key aren't supported yet")
return l.reacquireLock(acq)
}

// NB: The lock isn't held, so the request trying to acquire the lock must be
Expand Down Expand Up @@ -2633,6 +2510,139 @@ func (l *lockState) acquireLock(acq *roachpb.LockAcquisition, clock *hlc.Clock)
return nil
}

// reacquireLock is called in response to a lock re-acquisition. In such cases,
// a lock is already held on the receiver's key by the transaction referenced in
// the supplied lock acquisition.
//
// REQUIRES: l.mu to be locked.
func (l *lockState) reacquireLock(acq *roachpb.LockAcquisition) error {
beforeTxn, beforeTs := l.getLockHolder()
if acq.Txn.ID != beforeTxn.ID {
return errors.AssertionFailedf("existing lock cannot be acquired by different transaction")
}
// An unreplicated lock is being re-acquired...
if acq.Durability == lock.Unreplicated && l.isHeldUnreplicated() {
switch {
case l.holder.txn.Epoch < acq.Txn.Epoch: // at a higher epoch
l.holder.unreplicatedInfo.epochBumped()
case l.holder.txn.Epoch == acq.Txn.Epoch: // at the same epoch
// Prune the list of sequence numbers tracked for this lock by removing
// any sequence numbers that are considered ignored by virtue of a
// savepoint rollback.
//
// Note that the in-memory lock table is the source of truth for just
// unreplicated locks, and as such, sequence numbers are only tracked
// for them.
l.holder.unreplicatedInfo.rollbackIgnoredSeqNumbers(acq.IgnoredSeqNums)
case l.holder.txn.Epoch > acq.Txn.Epoch: // at a prior epoch
// Reject the request; the logic here parallels how mvccPutInternal
// handles this case for intents.
return errors.Errorf(
"locking request with epoch %d came after lock(unreplicated) had already been acquired at epoch %d in txn %s",
acq.Txn.Epoch, l.holder.txn.Epoch, acq.Txn.ID,
)
default:
panic("unreachable")
}
}
if l.isIdempotentLockAcquisition(acq) {
return nil // nothing more to do here.
}
// Forward the lock's timestamp instead of assigning to it blindly.
// While lock acquisition uses monotonically increasing timestamps
// from the perspective of the transaction's coordinator, this does
// not guarantee that a lock will never be acquired at a higher
// epoch and/or sequence number but with a lower timestamp when in
// the presence of transaction pushes. Consider the following
// sequence of events:
//
// - txn A acquires lock at sequence 1, ts 10
// - txn B pushes txn A to ts 20
// - txn B updates lock to ts 20
// - txn A's coordinator does not immediately learn of the push
// - txn A re-acquires lock at sequence 2, ts 15
//
// A lock's timestamp at a given durability level is not allowed to
// regress, so by forwarding its timestamp during the second acquisition
// instead of assigning to it blindly, it remains at 20.
//
// However, a lock's timestamp as reported by getLockHolder can regress
// if it is acquired at a lower timestamp and a different durability
// than it was previously held with. This is necessary to support
// because the hard constraint which we must uphold here that the
// lockHolderInfo for a replicated lock cannot diverge from the
// replicated state machine in such a way that its timestamp in the
// lockTable exceeds that in the replicated keyspace. If this invariant
// were to be violated, we'd risk infinite lock-discovery loops for
// requests that conflict with the lock as is written in the replicated
// state machine but not as is reflected in the lockTable.
//
// Lock timestamp regressions are safe from the perspective of other
// transactions because the request which re-acquired the lock at the
// lower timestamp must have been holding a write latch at or below the
// new lock's timestamp. This means that no conflicting requests could
// be evaluating concurrently. Instead, all will need to re-scan the
// lockTable once they acquire latches and will notice the reduced
// timestamp at that point, which may cause them to conflict with the
// lock even if they had not conflicted before. In a sense, it is no
// different than the first time a lock is added to the lockTable.
switch acq.Durability {
case lock.Unreplicated:
l.holder.unreplicatedInfo.ts.Forward(acq.Txn.WriteTimestamp)
if err := l.holder.unreplicatedInfo.acquire(acq.Strength, acq.Txn.Sequence); err != nil {
return err
}
case lock.Replicated:
l.holder.replicatedInfo.ts.Forward(acq.Txn.WriteTimestamp)
default:
panic(fmt.Sprintf("unknown lock durability: %s", acq.Durability))
}

// Selectively update the txn meta.
switch {
case l.holder.txn.Epoch > acq.Txn.Epoch: // lock is being acquired at a prior epoch
// We do not update the txn meta here -- the epoch is not allowed to
// regress.
//
// NB: We can get here if the lock acquisition here corresponds to an
// operation from a prior epoch. We've already handled the case for
// unreplicated lock acquisition above, so this can only happen if the
// lock acquisition corresponds to a replicated lock.
//
// If mvccPutInternal is aware of the newer epoch, it'll simply reject
// this operation and we'll never get here. However, it's not guaranteed
// that mvccPutInternal knows about this newer epoch. The in-memory lock
// table may know about the newer epoch though, if there's been a
// different unreplicated lock acquisition on this key by the transaction.
// So if we were to blindly update the TxnMeta here, we'd be regressing
// the epoch, which messes with our sequence number tracking inside of
// unreplicatedLockInfo.
assert(acq.Durability == lock.Replicated, "the unreplicated case should have been handled above")
case l.holder.txn.Epoch == acq.Txn.Epoch: // lock is being acquired at the same epoch
l.holder.txn = &acq.Txn
case l.holder.txn.Epoch < acq.Txn.Epoch: // lock is being acquired at a newer epoch
l.holder.txn = &acq.Txn
// The txn meta tracked here corresponds to unreplicated locks. When we
// learn about a newer epoch during lock acquisition of a replicated lock,
// we clear out the unreplicatedLockInfo state being tracked from the
// older epoch.
//
// Note that we don't clear out replicatedLockInfo when an unreplicated
// lock is being acquired at a newer epoch. This is because replicated
// locks are held across epochs, and there is no per-epoch tracking in the
// lock table for them.
if acq.Durability == lock.Replicated {
l.holder.unreplicatedInfo.clear()
}
}

_, afterTs := l.getLockHolder()
if beforeTs.Less(afterTs) {
l.increasedLockTs(afterTs)
}
return nil
}

// isIdempotentLockAcquisition returns true if the lock acquisition is
// idempotent. Idempotent lock acquisitions do not require any changes to what
// is being tracked in the lock's state.
Expand Down

0 comments on commit 20906a0

Please sign in to comment.