Skip to content

Commit

Permalink
kv: support unreplicated locks, hook up SELECT FOR UPDATE
Browse files Browse the repository at this point in the history
Closes cockroachdb#40205.
Informs cockroachdb#41720.

This change teaches the KV client and the KV API about unreplicated locks.
It then adds a KeyLocking mode to ScanRequest and ReverseScanRequest, which
allows their users to select the locking strength that they would like the
scan to use. This locking strength defaults to None, which corresponds to
the current behavior. However, some users will want to acquire locks on each
row scanned, which is now possible by setting the locking strength to a
stronger level. For now, only the Exclusive strength is supported.

The change then revisits SQL's row-level locking support, which is supported
all the way down to the row fetcher for implicit (e.g. UPDATE) and explicit
(e.g. SELECT ... FOR UPDATE) upgrade locking. The change uses the new
key-locking functionality in the KV API to hook up row-level locking,
completing the integration of SELECT FOR UPDATE with the KV layer and,
in particular, the new lock-table structure.

cockroachdb#43775 described the three main
benefits of this change:
- higher throughput under contention
- lower latency and improved fairness under contention
- a reduction in transaction retries under contention

I've revisited those results a few times in the last two months and seen that
the results continue to hold, and in some cases they have improved. I intend
to update this PR with a more complete analysis of its impact on those three
areas.

Release note (sql change): SELECT FOR UPDATE now hooks into a new
leaseholder-only locking mechanism. This allows the feature to be used
to improve performance of transactions that read, modify, and write
contended to rows. Similarly, UPDATE statements now use this new
mechanism by default, meaning that their performance under contention is
improved. This is only enabled for UPDATE statements that can push their
filter all the way into their key-value scan. To determine whether an
UPDATE statement is implicitly using SELECT FOR UPDATE locking, look
for a "locking strength" field in the EXPLAIN output for the statement.
  • Loading branch information
nvanbenschoten committed Mar 6, 2020
1 parent c1d217f commit a98be1f
Show file tree
Hide file tree
Showing 34 changed files with 1,926 additions and 1,159 deletions.
88 changes: 80 additions & 8 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion c-deps/libroach/protos/roachpb/data.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (p *scanRequestScanner) exportSpan(
for remaining := span; ; {
start := timeutil.Now()
b := txn.NewBatch()
r := roachpb.NewScan(remaining.Key, remaining.EndKey).(*roachpb.ScanRequest)
r := roachpb.NewScan(remaining.Key, remaining.EndKey, false /* forUpdate */).(*roachpb.ScanRequest)
r.ScanFormat = roachpb.BATCH_RESPONSE
b.Header.TargetBytes = targetBytesPerScan
// NB: We use a raw request rather than the Scan() method because we want
Expand Down
36 changes: 30 additions & 6 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func (b *Batch) Inc(key interface{}, value int64) {
b.initResult(1, 1, notRaw, nil)
}

func (b *Batch) scan(s, e interface{}, isReverse bool) {
func (b *Batch) scan(s, e interface{}, isReverse, forUpdate bool) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -532,22 +532,34 @@ func (b *Batch) scan(s, e interface{}, isReverse bool) {
return
}
if !isReverse {
b.appendReqs(roachpb.NewScan(begin, end))
b.appendReqs(roachpb.NewScan(begin, end, forUpdate))
} else {
b.appendReqs(roachpb.NewReverseScan(begin, end))
b.appendReqs(roachpb.NewReverseScan(begin, end, forUpdate))
}
b.initResult(1, 0, notRaw, nil)
}

// Scan retrieves the key/values between begin (inclusive) and end (exclusive) in
// ascending order.
//
// A new result will be appended to the batch which will contain "rows" (each
// A new result will be appended to the batch which will contain "rows" (each
// row is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) Scan(s, e interface{}) {
b.scan(s, e, false)
b.scan(s, e, false /* isReverse */, false /* forUpdate */)
}

// ScanForUpdate retrieves the key/values between begin (inclusive) and end
// (exclusive) in ascending order. Unreplicated, exclusive locks are acquired on
// each of the returned keys.
//
// A new result will be appended to the batch which will contain "rows" (each
// row is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ScanForUpdate(s, e interface{}) {
b.scan(s, e, false /* isReverse */, true /* forUpdate */)
}

// ReverseScan retrieves the rows between begin (inclusive) and end (exclusive)
Expand All @@ -558,7 +570,19 @@ func (b *Batch) Scan(s, e interface{}) {
//
// key can be either a byte slice or a string.
func (b *Batch) ReverseScan(s, e interface{}) {
b.scan(s, e, true)
b.scan(s, e, true /* isReverse */, false /* forUpdate */)
}

// ReverseScanForUpdate retrieves the rows between begin (inclusive) and end
// (exclusive) in descending order. Unreplicated, exclusive locks are acquired
// on each of the returned keys.
//
// A new result will be appended to the batch which will contain "rows" (each
// "row" is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ReverseScanForUpdate(s, e interface{}) {
b.scan(s, e, true /* isReverse */, true /* forUpdate */)
}

// Del deletes one or more keys.
Expand Down
Loading

0 comments on commit a98be1f

Please sign in to comment.