Skip to content

Commit

Permalink
storage/concurrency: split read and write timestamp when scanning loc…
Browse files Browse the repository at this point in the history
…kTable

This resolves an infinite loop that was possible and somewhat common in
`TestMonotonicInserts`. The loop occurred because read requests were discovering
intents at timestamps above their read timestamp but within their uncertainty
interval. This is essential to avoid stale reads. However, it was confusing the
lockTable. This commit addresses this issue by considering a transaction's
uncertainty interval when scanning the lockTable.

A new assertion was added that would detect this infinite loop condition.
  • Loading branch information
nvanbenschoten committed Feb 28, 2020
1 parent 292a8f5 commit 7b0c15a
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func DefaultDeclareIsolatedKeys(
access = spanset.SpanReadWrite
}
latchSpans.AddMVCC(access, req.Header().Span(), header.Timestamp)
lockSpans.AddMVCC(access, req.Header().Span(), header.Timestamp)
lockSpans.AddNonMVCC(access, req.Header().Span())
}

// DeclareKeysForBatch adds all keys that the batch with the provided header
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,14 @@ type Request struct {
// The maximal set of spans within which the request expects to have
// isolation from conflicting transactions. Conflicting locks within
// these spans will be queued on and conditionally pushed.
//
// Note that unlike LatchSpans, the timestamps that these spans are
// declared at are NOT consulted. All read spans are considered to take
// place at the transaction's read timestamp (Txn.ReadTimestamp) and all
// write spans are considered to take place the transaction's write
// timestamp (Txn.WriteTimestamp). If the request is non-transactional
// (Txn == nil), all reads and writes are considered to take place at
// Timestamp.
LockSpans *spanset.SpanSet
}

Expand Down
94 changes: 57 additions & 37 deletions pkg/storage/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,10 @@ type lockTableGuardImpl struct {
seqNum uint64

// Information about this request.
txn *enginepb.TxnMeta
spans *spanset.SpanSet
ts hlc.Timestamp
txn *enginepb.TxnMeta
spans *spanset.SpanSet
readTS hlc.Timestamp
writeTS hlc.Timestamp

// Snapshots of the trees for which this request has some spans. Note that
// the lockStates in these snapshots may have been removed from
Expand Down Expand Up @@ -663,7 +664,7 @@ func (l *lockState) informActiveWaiters() {
if waitForTxn == nil {
checkForWaitSelf = true
waitForTxn = l.reservation.txn
waitForTs = l.reservation.ts
waitForTs = l.reservation.writeTS
if !findDistinguished && l.distinguishedWaiter.isTxn(waitForTxn) {
findDistinguished = true
l.distinguishedWaiter = nil
Expand Down Expand Up @@ -826,7 +827,7 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,
return false
}
waitForTxn = l.reservation.txn
waitForTs = l.reservation.ts
waitForTs = l.reservation.writeTS
reservedBySelfTxn = g.isTxn(waitForTxn)
// A non-transactional write request never makes or breaks reservations,
// and only waits for a reservation if the reservation has a lower seqNum.
Expand All @@ -845,7 +846,7 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,
return false
}
// Locked by some other txn.
if g.ts.Less(waitForTs) {
if g.readTS.Less(waitForTs) {
return false
}
g.mu.Lock()
Expand Down Expand Up @@ -1117,33 +1118,48 @@ func (l *lockState) discoveredLock(
informWaiters = false
}

g.mu.Lock()
_, presentHere := g.mu.locks[l]
addToQueue := !presentHere && sa == spanset.SpanReadWrite
if addToQueue {
// Since g will place itself in queue as inactive waiter below.
g.mu.locks[l] = struct{}{}
}
g.mu.Unlock()
switch sa {
case spanset.SpanReadOnly:
// Don't enter the lock's queuedReaders list, because all queued readers
// are expected to be active. Instead, wait until the next scan.

if addToQueue {
// Put self in queue as inactive waiter.
qg := &queuedGuard{
guard: g,
active: false,
// Confirm that the guard will wait on the lock the next time it scans
// the lock table. If not then it shouldn't have discovered the lock in
// the first place. Bugs here would cause infinite loops where the same
// lock is repeatedly re-discovered.
if g.readTS.Less(ts) {
return errors.Errorf("discovered non-conflicting lock")
}
// g is not necessarily first in the queue in the (rare) case (a) above.
var e *list.Element
for e = l.queuedWriters.Front(); e != nil; e = e.Next() {
qqg := e.Value.(*queuedGuard)
if qqg.guard.seqNum > g.seqNum {
break
}

case spanset.SpanReadWrite:
// Immediately enter the lock's queuedWriters list.
g.mu.Lock()
_, presentHere := g.mu.locks[l]
if !presentHere {
// Since g will place itself in queue as inactive waiter below.
g.mu.locks[l] = struct{}{}
}
if e == nil {
l.queuedWriters.PushBack(qg)
} else {
l.queuedWriters.InsertBefore(qg, e)
g.mu.Unlock()

if !presentHere {
// Put self in queue as inactive waiter.
qg := &queuedGuard{
guard: g,
active: false,
}
// g is not necessarily first in the queue in the (rare) case (a) above.
var e *list.Element
for e = l.queuedWriters.Front(); e != nil; e = e.Next() {
qqg := e.Value.(*queuedGuard)
if qqg.guard.seqNum > g.seqNum {
break
}
}
if e == nil {
l.queuedWriters.PushBack(qg)
} else {
l.queuedWriters.InsertBefore(qg, e)
}
}
}

Expand Down Expand Up @@ -1325,7 +1341,7 @@ func (l *lockState) increasedLockTs(newTs hlc.Timestamp) {
g := e.Value.(*lockTableGuardImpl)
curr := e
e = e.Next()
if g.ts.Less(newTs) {
if g.readTS.Less(newTs) {
// Stop waiting.
l.waitingReaders.Remove(curr)
if g == l.distinguishedWaiter {
Expand Down Expand Up @@ -1468,14 +1484,18 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa
if guard == nil {
seqNum := atomic.AddUint64(&t.seqNum, 1)
g = &lockTableGuardImpl{
seqNum: seqNum,
spans: req.LockSpans,
ts: req.Timestamp,
sa: spanset.NumSpanAccess - 1,
index: -1,
seqNum: seqNum,
spans: req.LockSpans,
readTS: req.Timestamp,
writeTS: req.Timestamp,
sa: spanset.NumSpanAccess - 1,
index: -1,
}
if req.Txn != nil {
g.txn = &req.Txn.TxnMeta
g.readTS = req.Txn.ReadTimestamp
g.readTS.Forward(req.Txn.MaxTimestamp)
g.writeTS = req.Txn.WriteTimestamp
}
g.mu.signal = make(chan struct{}, 1)
g.mu.locks = make(map[*lockState]struct{})
Expand Down Expand Up @@ -1777,7 +1797,7 @@ func (t *lockTableImpl) String() string {
txn, ts, _ := l.getLockerInfo()
if txn == nil {
fmt.Fprintf(&buf, " res: req: %d, %s\n",
l.reservation.seqNum, waitingOnStr(l.reservation.txn, l.reservation.ts))
l.reservation.seqNum, waitingOnStr(l.reservation.txn, l.reservation.writeTS))
} else {
fmt.Fprintf(&buf, " holder: %s\n", waitingOnStr(txn, ts))
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/storage/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ func TestLockTableBasic(t *testing.T) {
// Update the transaction's timestamp, if necessary. The transaction
// may have needed to move its timestamp for any number of reasons.
txnMeta.WriteTimestamp = ts
req.Txn = &roachpb.Transaction{TxnMeta: *txnMeta}
req.Txn = &roachpb.Transaction{
TxnMeta: *txnMeta,
ReadTimestamp: ts,
}
}
requestsByName[reqName] = req
return ""
Expand Down Expand Up @@ -854,6 +857,7 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) {
ID: nextUUID(&txnCounter),
WriteTimestamp: ts,
},
ReadTimestamp: ts,
}
}
request := &Request{
Expand Down Expand Up @@ -939,7 +943,10 @@ func TestLockTableConcurrentRequests(t *testing.T) {
LockSpans: spans,
}
if txnMeta != nil {
request.Txn = &roachpb.Transaction{TxnMeta: *txnMeta}
request.Txn = &roachpb.Transaction{
TxnMeta: *txnMeta,
ReadTimestamp: ts,
}
}
wi := workloadItem{request: request}
for i := 0; i < numKeys; i++ {
Expand Down Expand Up @@ -1087,7 +1094,11 @@ func createRequests(index int, numOutstanding int, numKeys int, numReadKeys int)
for i := 0; i < numOutstanding; i++ {
wiCopy := wi
wiCopy.Request.Txn = &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{ID: nextUUID(&txnCounter), WriteTimestamp: ts},
TxnMeta: enginepb.TxnMeta{
ID: nextUUID(&txnCounter),
WriteTimestamp: ts,
},
ReadTimestamp: ts,
}
result = append(result, wiCopy)
}
Expand Down

0 comments on commit 7b0c15a

Please sign in to comment.