Skip to content

Commit

Permalink
[DNM] storage/concurrency: introduce concurrency control package, pro…
Browse files Browse the repository at this point in the history
…totype SFU

Informs #41720.

This commit creates a new concurrency package that provides a concurrency
manager structure that encapsulates the details of concurrency control and
contention handling for serializable key-value transactions. Any reader of this
commit should start at `concurrency_control.go` and move out from there.

The new package has a few primary objectives:
1. centralize the handling of request synchronization and transaction contention
   handling in a single location, allowing for the topic to be documented and
   understood in isolation.
2. rework contention handling to react to intent state transitions directly. This
   simplifies the transaction queueing story, reduces the frequency of transaction
   push RPCs, and allows waiters to proceed after intent resolution as soon as possible.
3. create a framework that naturally permits "update" locking, which is required for
   kv-level SELECT FOR UPDATE support (#6583).
4. provide stronger guarantees around fairness when transactions conflict, in order
   to reduce tail latencies under contended scenarios.
5. create a structure that can extend to address the long-term goals of a fully
   centralized lock-table laid out in #41720.

WARNING: this is still a WIP. Notably, the lockTableImpl is mocked out with a
working but incomplete implementation. See #43740 for a more complete strawman.

Release note: None
  • Loading branch information
nvanbenschoten committed Jan 28, 2020
1 parent 1cbcb78 commit 7ada06b
Show file tree
Hide file tree
Showing 26 changed files with 1,305 additions and 329 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/batcheval/cmd_query_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ func QueryTxn(
}

// Get the list of txns waiting on this txn.
reply.WaitingTxns = cArgs.EvalCtx.GetTxnWaitQueue().GetDependents(args.Txn.ID)
reply.WaitingTxns = cArgs.EvalCtx.GetConcurrencyManager().GetDependents(args.Txn.ID)
return result.Result{}, nil
}
4 changes: 2 additions & 2 deletions pkg/storage/batcheval/cmd_resolve_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/abortspan"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -71,7 +71,7 @@ func (m *mockEvalCtx) GetLimiters() *Limiters {
func (m *mockEvalCtx) AbortSpan() *abortspan.AbortSpan {
return m.abortSpan
}
func (m *mockEvalCtx) GetTxnWaitQueue() *txnwait.Queue {
func (m *mockEvalCtx) GetConcurrencyManager() concurrency.Manager {
panic("unimplemented")
}
func (m *mockEvalCtx) NodeID() roachpb.NodeID {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/abortspan"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -53,7 +53,7 @@ type EvalContext interface {
Clock() *hlc.Clock
DB() *client.DB
AbortSpan() *abortspan.AbortSpan
GetTxnWaitQueue() *txnwait.Queue
GetConcurrencyManager() concurrency.Manager
GetLimiters() *Limiters

NodeID() roachpb.NodeID
Expand Down
355 changes: 355 additions & 0 deletions pkg/storage/concurrency/concurrency_control.go

Large diffs are not rendered by default.

Loading

0 comments on commit 7ada06b

Please sign in to comment.