diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index 4b4529dcb5f8..548cc3de3ef9 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -1850,74 +1850,88 @@ func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strengt return false } -// Acquires this lock. Returns the list of guards that are done actively -// waiting at this key -- these will be requests from the same transaction -// that is acquiring the lock. +// Acquires this lock. Any requests that are waiting in the lock's wait queues +// from the transaction acquiring the lock are also released. +// // Acquires l.mu. -func (l *lockState) acquireLock( - _ lock.Strength, - durability lock.Durability, - txn *enginepb.TxnMeta, - ts hlc.Timestamp, - clock *hlc.Clock, -) error { +func (l *lockState) acquireLock(acq *roachpb.LockAcquisition, clock *hlc.Clock) error { l.mu.Lock() defer l.mu.Unlock() if l.holder.locked { // Already held. beforeTxn, beforeTs := l.getLockHolder() - if txn.ID != beforeTxn.ID { + if acq.Txn.ID != beforeTxn.ID { return errors.AssertionFailedf("existing lock cannot be acquired by different transaction") } - if durability == lock.Unreplicated && + if acq.Durability == lock.Unreplicated && l.holder.holder[lock.Unreplicated].txn != nil && - l.holder.holder[lock.Unreplicated].txn.Epoch > txn.Epoch { + l.holder.holder[lock.Unreplicated].txn.Epoch > acq.Txn.Epoch { // If the lock is being re-acquired as an unreplicated lock, and the // request trying to do so belongs to a prior epoch, we reject the // request. This parallels the logic mvccPutInternal has for intents. return errors.Errorf( "locking request with epoch %d came after lock(unreplicated) had already been acquired at epoch %d in txn %s", - txn.Epoch, l.holder.holder[durability].txn.Epoch, txn.ID, + acq.Txn.Epoch, l.holder.holder[acq.Durability].txn.Epoch, acq.Txn.ID, ) } // TODO(arul): Once we stop storing sequence numbers/transaction protos // associated with replicated locks, the following logic can be deleted. - if durability == lock.Replicated && + if acq.Durability == lock.Replicated && l.holder.holder[lock.Replicated].txn != nil && - l.holder.holder[lock.Replicated].txn.Epoch > txn.Epoch { + l.holder.holder[lock.Replicated].txn.Epoch > acq.Txn.Epoch { // If we're dealing with a replicated lock (intent), and the transaction // acquiring this lock belongs to a prior epoch, we expect mvccPutInternal // to return an error. As such, the request should never call into // AcquireLock and reach this point. return errors.AssertionFailedf( "locking request with epoch %d came after lock(replicated) had already been acquired at epoch %d in txn %s", - txn.Epoch, l.holder.holder[durability].txn.Epoch, txn.ID, + acq.Txn.Epoch, l.holder.holder[acq.Durability].txn.Epoch, acq.Txn.ID, ) } - seqs := l.holder.holder[durability].seqs - if l.holder.holder[durability].txn != nil && l.holder.holder[durability].txn.Epoch < txn.Epoch { + seqs := l.holder.holder[acq.Durability].seqs + // Lock is being re-acquired... + if l.holder.holder[acq.Durability].txn != nil && + // ...at a higher epoch. + l.holder.holder[acq.Durability].txn.Epoch < acq.Txn.Epoch { // Clear the sequences for the older epoch. seqs = seqs[:0] } - if len(seqs) > 0 && seqs[len(seqs)-1] >= txn.Sequence { + // Lock is being re-acquired with durability Unreplicated... + if acq.Durability == lock.Unreplicated && l.holder.holder[lock.Unreplicated].txn != nil && + // ... at the same epoch. + l.holder.holder[lock.Unreplicated].txn.Epoch == acq.Txn.Epoch { + // Prune the list of sequence numbers tracked for this lock by removing + // any sequence numbers that are considered ignored by virtue of a + // savepoint rollback. + // + // Note that the in-memory lock table is the source of truth for just + // unreplicated locks, so we only do this pruning for unreplicated lock + // acquisition. On the other hand, for replicated locks, the source of + // truth is what's written in MVCC. We could try and mimic that logic + // here, but we choose not to, as doing so is error-prone/difficult to + // maintain. + seqs = removeIgnored(seqs, acq.IgnoredSeqNums) + } + + if len(seqs) > 0 && seqs[len(seqs)-1] >= acq.Txn.Sequence { // Idempotent lock acquisition. In this case, we simply ignore the lock // acquisition as long as it corresponds to an existing sequence number. // If the sequence number is not being tracked yet, insert it into the // sequence history. The validity of such a lock re-acquisition should // have already been determined at the MVCC level. if i := sort.Search(len(seqs), func(i int) bool { - return seqs[i] >= txn.Sequence + return seqs[i] >= acq.Txn.Sequence }); i == len(seqs) { panic("lockTable bug - search value <= last element") - } else if seqs[i] != txn.Sequence { + } else if seqs[i] != acq.Txn.Sequence { seqs = append(seqs, 0) copy(seqs[i+1:], seqs[i:]) - seqs[i] = txn.Sequence - l.holder.holder[durability].seqs = seqs + seqs[i] = acq.Txn.Sequence + l.holder.holder[acq.Durability].seqs = seqs } return nil } - l.holder.holder[durability].txn = txn + l.holder.holder[acq.Durability].txn = &acq.Txn // Forward the lock's timestamp instead of assigning to it blindly. // While lock acquisition uses monotonically increasing timestamps // from the perspective of the transaction's coordinator, this does @@ -1956,8 +1970,8 @@ func (l *lockState) acquireLock( // timestamp at that point, which may cause them to conflict with the // lock even if they had not conflicted before. In a sense, it is no // different than the first time a lock is added to the lockTable. - l.holder.holder[durability].ts.Forward(ts) - l.holder.holder[durability].seqs = append(seqs, txn.Sequence) + l.holder.holder[acq.Durability].ts.Forward(acq.Txn.WriteTimestamp) + l.holder.holder[acq.Durability].seqs = append(seqs, acq.Txn.Sequence) _, afterTs := l.getLockHolder() if beforeTs.Less(afterTs) { @@ -1970,7 +1984,7 @@ func (l *lockState) acquireLock( // of a concurrent release but that is harmless since this request is // holding latches and has proceeded to evaluation. if l.reservation != nil { - if l.reservation.txn.ID != txn.ID { + if l.reservation.txn.ID != acq.Txn.ID { // Reservation is broken. qg := &queuedGuard{ guard: l.reservation, @@ -1998,13 +2012,13 @@ func (l *lockState) acquireLock( } l.reservation = nil l.holder.locked = true - l.holder.holder[durability].txn = txn - l.holder.holder[durability].ts = ts - l.holder.holder[durability].seqs = append([]enginepb.TxnSeq(nil), txn.Sequence) + l.holder.holder[acq.Durability].txn = &acq.Txn + l.holder.holder[acq.Durability].ts = acq.Txn.WriteTimestamp + l.holder.holder[acq.Durability].seqs = append([]enginepb.TxnSeq(nil), acq.Txn.Sequence) l.holder.startTime = clock.PhysicalTime() // If there are waiting requests from the same txn, they no longer need to wait. - l.releaseWritersFromTxn(txn) + l.releaseWritersFromTxn(&acq.Txn) // Inform active waiters since lock has transitioned to held. l.informActiveWaiters() @@ -2788,7 +2802,7 @@ func (t *lockTableImpl) AcquireLock(acq *roachpb.LockAcquisition) error { return nil } } - err := l.acquireLock(acq.Strength, acq.Durability, &acq.Txn, acq.Txn.WriteTimestamp, t.clock) + err := l.acquireLock(acq, t.clock) t.locks.mu.Unlock() if checkMaxLocks { diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index 56be1375a3de..30b69fff8557 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -83,7 +83,7 @@ start-waiting: Calls lockTable.ScanOptimistic. The request must not have an existing guard. If a guard is returned, stores it for later use. -acquire r= k= durability=r|u +acquire r= k= durability=r|u [ignored-seqs=[-][,[-]]] ---- @@ -372,6 +372,11 @@ func TestLockTableBasic(t *testing.T) { durability = lock.Replicated } acq := roachpb.MakeLockAcquisition(req.Txn, roachpb.Key(key), durability) + var ignored []enginepb.IgnoredSeqNumRange + if d.HasArg("ignored-seqs") { + ignored = scanIgnoredSeqNumbers(t, d) + } + acq.IgnoredSeqNums = ignored if err := lt.AcquireLock(&acq); err != nil { return err.Error() } @@ -413,29 +418,7 @@ func TestLockTableBasic(t *testing.T) { span := getSpan(t, d, s) var ignored []enginepb.IgnoredSeqNumRange if d.HasArg("ignored-seqs") { - var seqsStr string - d.ScanArgs(t, "ignored-seqs", &seqsStr) - parts := strings.Split(seqsStr, ",") - for _, p := range parts { - pair := strings.Split(p, "-") - if len(pair) != 1 && len(pair) != 2 { - d.Fatalf(t, "error parsing %s", parts) - } - startNum, err := strconv.ParseInt(pair[0], 10, 32) - if err != nil { - d.Fatalf(t, "error parsing ignored seqnums: %s", err) - } - ignoredRange := enginepb.IgnoredSeqNumRange{ - Start: enginepb.TxnSeq(startNum), End: enginepb.TxnSeq(startNum)} - if len(pair) == 2 { - endNum, err := strconv.ParseInt(pair[1], 10, 32) - if err != nil { - d.Fatalf(t, "error parsing ignored seqnums: %s", err) - } - ignoredRange.End = enginepb.TxnSeq(endNum) - } - ignored = append(ignored, ignoredRange) - } + ignored = scanIgnoredSeqNumbers(t, d) } // TODO(sbhola): also test STAGING. intent := &roachpb.LockUpdate{ @@ -747,6 +730,34 @@ func getStrength(t *testing.T, d *datadriven.TestData, strS string) lock.Strengt } } +func scanIgnoredSeqNumbers(t *testing.T, d *datadriven.TestData) []enginepb.IgnoredSeqNumRange { + var ignored []enginepb.IgnoredSeqNumRange + var seqsStr string + d.ScanArgs(t, "ignored-seqs", &seqsStr) + parts := strings.Split(seqsStr, ",") + for _, p := range parts { + pair := strings.Split(p, "-") + if len(pair) != 1 && len(pair) != 2 { + d.Fatalf(t, "error parsing %s", parts) + } + startNum, err := strconv.ParseInt(pair[0], 10, 32) + if err != nil { + d.Fatalf(t, "error parsing ignored seqnums: %s", err) + } + ignoredRange := enginepb.IgnoredSeqNumRange{ + Start: enginepb.TxnSeq(startNum), End: enginepb.TxnSeq(startNum)} + if len(pair) == 2 { + endNum, err := strconv.ParseInt(pair[1], 10, 32) + if err != nil { + d.Fatalf(t, "error parsing ignored seqnums: %s", err) + } + ignoredRange.End = enginepb.TxnSeq(endNum) + } + ignored = append(ignored, ignoredRange) + } + return ignored +} + func intentsToResolveToStr(toResolve []roachpb.LockUpdate, startOnNewLine bool) string { if len(toResolve) == 0 { return "" diff --git a/pkg/kv/kvserver/concurrency/testdata/lock_table/acquire_ignored_seqs b/pkg/kv/kvserver/concurrency/testdata/lock_table/acquire_ignored_seqs new file mode 100644 index 000000000000..f0ae1f407bcd --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/lock_table/acquire_ignored_seqs @@ -0,0 +1,166 @@ +new-lock-table maxlocks=10000 +---- + +# ------------------------------------------------------------------------------ +# Acquire a lock on key a at timestamp 10,1 at multiple sequence numbers. +# ------------------------------------------------------------------------------ + +new-txn txn=txn1 ts=10,1 epoch=0 seq=1 +---- + +new-request r=req1 txn=txn1 ts=10,1 spans=intent@a +---- + +new-txn txn=txn1 ts=10,1 epoch=0 seq=2 +---- + +new-request r=req2 txn=txn1 ts=10,1 spans=intent@a +---- + +new-txn txn=txn1 ts=10,1 epoch=0 seq=3 +---- + +new-request r=req3 txn=txn1 ts=10,1 spans=intent@a +---- + +scan r=req1 +---- +start-waiting: false + +scan r=req2 +---- +start-waiting: false + +acquire r=req1 k=a durability=u +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,1, info: unrepl epoch: 0, seqs: [1] + +acquire r=req2 k=a durability=u +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,1, info: unrepl epoch: 0, seqs: [1, 2] + +acquire r=req3 k=a durability=u +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,1, info: unrepl epoch: 0, seqs: [1, 2, 3] + + +# ------------------------------------------------------------------------------ +# Re-acquire the (unreplicated) lock at a higher sequence number. Pass in 1 and +# 3 as ignored. +# ------------------------------------------------------------------------------ + +new-txn txn=txn1 ts=10,1 epoch=0 seq=5 +---- + +new-request r=req4 txn=txn1 ts=10,1 spans=intent@a +---- + +acquire r=req4 k=a durability=u ignored-seqs=1,3 +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,1, info: unrepl epoch: 0, seqs: [2, 5] + +# ------------------------------------------------------------------------------ +# Re-acquire the (unreplicated) lock at a higher sequence number. This time, +# pass in a sequence number as ignored at which the lock wasn't acquired. +# ------------------------------------------------------------------------------ + +new-txn txn=txn1 ts=10,1 epoch=0 seq=8 +---- + +new-request r=req5 txn=txn1 ts=10,1 spans=intent@a +---- + +acquire r=req5 k=a durability=u ignored-seqs=4 +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,1, info: unrepl epoch: 0, seqs: [2, 5, 8] + +# ------------------------------------------------------------------------------ +# Ensure only sequence numbers of unreplicated locks are pruned. That is, +# replicated locks acquired at a sequence number that is considered ignored +# should not be pruned. +# ------------------------------------------------------------------------------ + +# First, add a waiting writer on this lock so that it counts as contended. + +new-txn txn=txn2 ts=12,1 epoch=0 +---- + +new-request r=req6 txn=txn2 ts=12,1 spans=intent@a +---- + +scan r=req6 +---- +start-waiting: true + +acquire r=req5 k=a durability=r +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,1, info: repl epoch: 0, seqs: [8], unrepl epoch: 0, seqs: [2, 5, 8] + queued writers: + active: true req: 3, txn: 00000000-0000-0000-0000-000000000002 + distinguished req: 3 + +new-txn txn=txn1 ts=10,1 epoch=0 seq=9 +---- + +new-request r=req7 txn=txn1 ts=10,1 spans=intent@a +---- + +# Note that the lock is acquired as both replicated and un-replicated at +# sequence number 8. But because the lock is being acquired as replicated, we +# don't prune the list of unreplicated locks either. + +acquire r=req7 k=a durability=r ignored-seqs=8 +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,1, info: repl epoch: 0, seqs: [8, 9], unrepl epoch: 0, seqs: [2, 5, 8] + queued writers: + active: true req: 3, txn: 00000000-0000-0000-0000-000000000002 + distinguished req: 3 + +# Similarly, acquire the lock as unreplicated and try to ignore a sequence +# number (8) at which the lock was acquired as a replicated lock; it shouldn't +# be pruned. +acquire r=req7 k=a durability=u ignored-seqs=8 +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,1, info: repl epoch: 0, seqs: [8, 9], unrepl epoch: 0, seqs: [2, 5, 9] + queued writers: + active: true req: 3, txn: 00000000-0000-0000-0000-000000000002 + distinguished req: 3 + +# ------------------------------------------------------------------------------ +# Ensure ignoring a range of sequence numbers works as expected. +# ------------------------------------------------------------------------------ + + +new-txn txn=txn1 ts=10,1 epoch=0 seq=11 +---- + +new-request r=req8 txn=txn1 ts=10,1 spans=intent@a +---- + +# Note that 9 is held as both replicated and unreplicated; however it won't be +# removed from the replicated list. + +acquire r=req8 k=a durability=u ignored-seqs=2-5,9 +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 10.000000000,1, info: repl epoch: 0, seqs: [8, 9], unrepl epoch: 0, seqs: [11] + queued writers: + active: true req: 3, txn: 00000000-0000-0000-0000-000000000002 + distinguished req: 3