Skip to content

Commit

Permalink
concurrency: rollback ignored sequence numbers on lock acquisition
Browse files Browse the repository at this point in the history
This patch adds logic to prune the list of sequence numbers tracked by
the lock table for unreplicated locks. This is done when some of the
tracked sequence numbers are considered ignored, by virtue of a
savepoint rollback.

Note that we only do so for unreplicated locks, and only if an
unreplicated lock is being reacquired. This is because the in-memory
lock table is only the source of truth for in-memory locks; the mvcc
keyspace is the source of truth for replicated ones. As such, trying
to mimic the logic is hard/error-prone -- so we don't attempt to do
so.

Fixes #102269

Release note: None
  • Loading branch information
arulajmani committed May 17, 2023
1 parent 03a7cf9 commit a778ca0
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 57 deletions.
80 changes: 47 additions & 33 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1850,74 +1850,88 @@ func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strengt
return false
}

// Acquires this lock. Returns the list of guards that are done actively
// waiting at this key -- these will be requests from the same transaction
// that is acquiring the lock.
// Acquires this lock. Any requests that are waiting in the lock's wait queues
// from the transaction acquiring the lock are also released.
//
// Acquires l.mu.
func (l *lockState) acquireLock(
_ lock.Strength,
durability lock.Durability,
txn *enginepb.TxnMeta,
ts hlc.Timestamp,
clock *hlc.Clock,
) error {
func (l *lockState) acquireLock(acq *roachpb.LockAcquisition, clock *hlc.Clock) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.holder.locked {
// Already held.
beforeTxn, beforeTs := l.getLockHolder()
if txn.ID != beforeTxn.ID {
if acq.Txn.ID != beforeTxn.ID {
return errors.AssertionFailedf("existing lock cannot be acquired by different transaction")
}
if durability == lock.Unreplicated &&
if acq.Durability == lock.Unreplicated &&
l.holder.holder[lock.Unreplicated].txn != nil &&
l.holder.holder[lock.Unreplicated].txn.Epoch > txn.Epoch {
l.holder.holder[lock.Unreplicated].txn.Epoch > acq.Txn.Epoch {
// If the lock is being re-acquired as an unreplicated lock, and the
// request trying to do so belongs to a prior epoch, we reject the
// request. This parallels the logic mvccPutInternal has for intents.
return errors.Errorf(
"locking request with epoch %d came after lock(unreplicated) had already been acquired at epoch %d in txn %s",
txn.Epoch, l.holder.holder[durability].txn.Epoch, txn.ID,
acq.Txn.Epoch, l.holder.holder[acq.Durability].txn.Epoch, acq.Txn.ID,
)
}
// TODO(arul): Once we stop storing sequence numbers/transaction protos
// associated with replicated locks, the following logic can be deleted.
if durability == lock.Replicated &&
if acq.Durability == lock.Replicated &&
l.holder.holder[lock.Replicated].txn != nil &&
l.holder.holder[lock.Replicated].txn.Epoch > txn.Epoch {
l.holder.holder[lock.Replicated].txn.Epoch > acq.Txn.Epoch {
// If we're dealing with a replicated lock (intent), and the transaction
// acquiring this lock belongs to a prior epoch, we expect mvccPutInternal
// to return an error. As such, the request should never call into
// AcquireLock and reach this point.
return errors.AssertionFailedf(
"locking request with epoch %d came after lock(replicated) had already been acquired at epoch %d in txn %s",
txn.Epoch, l.holder.holder[durability].txn.Epoch, txn.ID,
acq.Txn.Epoch, l.holder.holder[acq.Durability].txn.Epoch, acq.Txn.ID,
)
}
seqs := l.holder.holder[durability].seqs
if l.holder.holder[durability].txn != nil && l.holder.holder[durability].txn.Epoch < txn.Epoch {
seqs := l.holder.holder[acq.Durability].seqs
// Lock is being re-acquired...
if l.holder.holder[acq.Durability].txn != nil &&
// ...at a higher epoch.
l.holder.holder[acq.Durability].txn.Epoch < acq.Txn.Epoch {
// Clear the sequences for the older epoch.
seqs = seqs[:0]
}
if len(seqs) > 0 && seqs[len(seqs)-1] >= txn.Sequence {
// Lock is being re-acquired with durability Unreplicated...
if acq.Durability == lock.Unreplicated && l.holder.holder[lock.Unreplicated].txn != nil &&
// ... at the same epoch.
l.holder.holder[lock.Unreplicated].txn.Epoch == acq.Txn.Epoch {
// Prune the list of sequence numbers tracked for this lock by removing
// any sequence numbers that are considered ignored by virtue of a
// savepoint rollback.
//
// Note that the in-memory lock table is the source of truth for just
// unreplicated locks, so we only do this pruning for unreplicated lock
// acquisition. On the other hand, for replicated locks, the source of
// truth is what's written in MVCC. We could try and mimic that logic
// here, but we choose not to, as doing so is error-prone/difficult to
// maintain.
seqs = removeIgnored(seqs, acq.IgnoredSeqNums)
}

if len(seqs) > 0 && seqs[len(seqs)-1] >= acq.Txn.Sequence {
// Idempotent lock acquisition. In this case, we simply ignore the lock
// acquisition as long as it corresponds to an existing sequence number.
// If the sequence number is not being tracked yet, insert it into the
// sequence history. The validity of such a lock re-acquisition should
// have already been determined at the MVCC level.
if i := sort.Search(len(seqs), func(i int) bool {
return seqs[i] >= txn.Sequence
return seqs[i] >= acq.Txn.Sequence
}); i == len(seqs) {
panic("lockTable bug - search value <= last element")
} else if seqs[i] != txn.Sequence {
} else if seqs[i] != acq.Txn.Sequence {
seqs = append(seqs, 0)
copy(seqs[i+1:], seqs[i:])
seqs[i] = txn.Sequence
l.holder.holder[durability].seqs = seqs
seqs[i] = acq.Txn.Sequence
l.holder.holder[acq.Durability].seqs = seqs
}
return nil
}
l.holder.holder[durability].txn = txn
l.holder.holder[acq.Durability].txn = &acq.Txn
// Forward the lock's timestamp instead of assigning to it blindly.
// While lock acquisition uses monotonically increasing timestamps
// from the perspective of the transaction's coordinator, this does
Expand Down Expand Up @@ -1956,8 +1970,8 @@ func (l *lockState) acquireLock(
// timestamp at that point, which may cause them to conflict with the
// lock even if they had not conflicted before. In a sense, it is no
// different than the first time a lock is added to the lockTable.
l.holder.holder[durability].ts.Forward(ts)
l.holder.holder[durability].seqs = append(seqs, txn.Sequence)
l.holder.holder[acq.Durability].ts.Forward(acq.Txn.WriteTimestamp)
l.holder.holder[acq.Durability].seqs = append(seqs, acq.Txn.Sequence)

_, afterTs := l.getLockHolder()
if beforeTs.Less(afterTs) {
Expand All @@ -1970,7 +1984,7 @@ func (l *lockState) acquireLock(
// of a concurrent release but that is harmless since this request is
// holding latches and has proceeded to evaluation.
if l.reservation != nil {
if l.reservation.txn.ID != txn.ID {
if l.reservation.txn.ID != acq.Txn.ID {
// Reservation is broken.
qg := &queuedGuard{
guard: l.reservation,
Expand Down Expand Up @@ -1998,13 +2012,13 @@ func (l *lockState) acquireLock(
}
l.reservation = nil
l.holder.locked = true
l.holder.holder[durability].txn = txn
l.holder.holder[durability].ts = ts
l.holder.holder[durability].seqs = append([]enginepb.TxnSeq(nil), txn.Sequence)
l.holder.holder[acq.Durability].txn = &acq.Txn
l.holder.holder[acq.Durability].ts = acq.Txn.WriteTimestamp
l.holder.holder[acq.Durability].seqs = append([]enginepb.TxnSeq(nil), acq.Txn.Sequence)
l.holder.startTime = clock.PhysicalTime()

// If there are waiting requests from the same txn, they no longer need to wait.
l.releaseWritersFromTxn(txn)
l.releaseWritersFromTxn(&acq.Txn)

// Inform active waiters since lock has transitioned to held.
l.informActiveWaiters()
Expand Down Expand Up @@ -2788,7 +2802,7 @@ func (t *lockTableImpl) AcquireLock(acq *roachpb.LockAcquisition) error {
return nil
}
}
err := l.acquireLock(acq.Strength, acq.Durability, &acq.Txn, acq.Txn.WriteTimestamp, t.clock)
err := l.acquireLock(acq, t.clock)
t.locks.mu.Unlock()

if checkMaxLocks {
Expand Down
59 changes: 35 additions & 24 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ start-waiting: <bool>
Calls lockTable.ScanOptimistic. The request must not have an existing guard.
If a guard is returned, stores it for later use.
acquire r=<name> k=<key> durability=r|u
acquire r=<name> k=<key> durability=r|u [ignored-seqs=<int>[-<int>][,<int>[-<int>]]]
----
<error string>
Expand Down Expand Up @@ -372,6 +372,11 @@ func TestLockTableBasic(t *testing.T) {
durability = lock.Replicated
}
acq := roachpb.MakeLockAcquisition(req.Txn, roachpb.Key(key), durability)
var ignored []enginepb.IgnoredSeqNumRange
if d.HasArg("ignored-seqs") {
ignored = scanIgnoredSeqNumbers(t, d)
}
acq.IgnoredSeqNums = ignored
if err := lt.AcquireLock(&acq); err != nil {
return err.Error()
}
Expand Down Expand Up @@ -413,29 +418,7 @@ func TestLockTableBasic(t *testing.T) {
span := getSpan(t, d, s)
var ignored []enginepb.IgnoredSeqNumRange
if d.HasArg("ignored-seqs") {
var seqsStr string
d.ScanArgs(t, "ignored-seqs", &seqsStr)
parts := strings.Split(seqsStr, ",")
for _, p := range parts {
pair := strings.Split(p, "-")
if len(pair) != 1 && len(pair) != 2 {
d.Fatalf(t, "error parsing %s", parts)
}
startNum, err := strconv.ParseInt(pair[0], 10, 32)
if err != nil {
d.Fatalf(t, "error parsing ignored seqnums: %s", err)
}
ignoredRange := enginepb.IgnoredSeqNumRange{
Start: enginepb.TxnSeq(startNum), End: enginepb.TxnSeq(startNum)}
if len(pair) == 2 {
endNum, err := strconv.ParseInt(pair[1], 10, 32)
if err != nil {
d.Fatalf(t, "error parsing ignored seqnums: %s", err)
}
ignoredRange.End = enginepb.TxnSeq(endNum)
}
ignored = append(ignored, ignoredRange)
}
ignored = scanIgnoredSeqNumbers(t, d)
}
// TODO(sbhola): also test STAGING.
intent := &roachpb.LockUpdate{
Expand Down Expand Up @@ -747,6 +730,34 @@ func getStrength(t *testing.T, d *datadriven.TestData, strS string) lock.Strengt
}
}

func scanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange {
var ignored []enginepb.IgnoredSeqNumRange
var seqsStr string
d.ScanArgs(t, "ignored-seqs", &seqsStr)
parts := strings.Split(seqsStr, ",")
for _, p := range parts {
pair := strings.Split(p, "-")
if len(pair) != 1 && len(pair) != 2 {
d.Fatalf(t, "error parsing %s", parts)
}
startNum, err := strconv.ParseInt(pair[0], 10, 32)
if err != nil {
d.Fatalf(t, "error parsing ignored seqnums: %s", err)
}
ignoredRange := enginepb.IgnoredSeqNumRange{
Start: enginepb.TxnSeq(startNum), End: enginepb.TxnSeq(startNum)}
if len(pair) == 2 {
endNum, err := strconv.ParseInt(pair[1], 10, 32)
if err != nil {
d.Fatalf(t, "error parsing ignored seqnums: %s", err)
}
ignoredRange.End = enginepb.TxnSeq(endNum)
}
ignored = append(ignored, ignoredRange)
}
return ignored
}

func intentsToResolveToStr(toResolve []roachpb.LockUpdate, startOnNewLine bool) string {
if len(toResolve) == 0 {
return ""
Expand Down
Loading

0 comments on commit a778ca0

Please sign in to comment.