Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: integrate Concurrency Manager into Replica request path #45482

Merged
merged 10 commits into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ func init() {
}

func declareKeysExport(
desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
desc *roachpb.RangeDescriptor,
header roachpb.Header,
req roachpb.Request,
latchSpans, lockSpans *spanset.SpanSet,
) {
batcheval.DefaultDeclareKeys(desc, header, req, spans)
spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)})
batcheval.DefaultDeclareIsolatedKeys(desc, header, req, latchSpans, lockSpans)
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)})
}

// evalExport dumps the requested keys into files of non-overlapping key ranges
Expand Down
10 changes: 0 additions & 10 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,6 @@ func (ba *BatchRequest) IsSinglePushTxnRequest() bool {
return false
}

// IsSingleQueryTxnRequest returns true iff the batch contains a single
// request, and that request is for a QueryTxn.
func (ba *BatchRequest) IsSingleQueryTxnRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*QueryTxnRequest)
return ok
}
return false
}

// IsSingleHeartbeatTxnRequest returns true iff the batch contains a single
// request, and that request is a HeartbeatTxn.
func (ba *BatchRequest) IsSingleHeartbeatTxnRequest() bool {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/tests/monotonic_insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -107,6 +108,10 @@ func testMonotonicInserts(t *testing.T, distSQLMode sessiondata.DistSQLExecMode)
st := server.ClusterSettings()
st.Manual.Store(true)
sql.DistSQLClusterExecMode.Override(&st.SV, int64(distSQLMode))
// Let transactions push immediately to detect deadlocks. The test creates a
// large amount of contention and dependency cycles, and could take a long
// time to complete without this.
concurrency.LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0)
}

var clients []mtClient
Expand Down
9 changes: 6 additions & 3 deletions pkg/storage/batcheval/cmd_clear_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ func init() {
}

func declareKeysClearRange(
desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
desc *roachpb.RangeDescriptor,
header roachpb.Header,
req roachpb.Request,
latchSpans, lockSpans *spanset.SpanSet,
) {
DefaultDeclareKeys(desc, header, req, spans)
DefaultDeclareKeys(desc, header, req, latchSpans, lockSpans)
// We look up the range descriptor key to check whether the span
// is equal to the entire range for fast stats updating.
spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})
}

// ClearRange wipes all MVCC versions of keys covered by the specified
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/batcheval/cmd_compute_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func init() {
}

func declareKeysComputeChecksum(
*roachpb.RangeDescriptor, roachpb.Header, roachpb.Request, *spanset.SpanSet,
_ *roachpb.RangeDescriptor, _ roachpb.Header, _ roachpb.Request, _, _ *spanset.SpanSet,
) {
// Intentionally declare no keys, as ComputeChecksum does not need to be
// serialized with any other commands. It simply needs to be committed into
Expand Down
15 changes: 12 additions & 3 deletions pkg/storage/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func init() {
RegisterReadWriteCommand(roachpb.ConditionalPut, DefaultDeclareKeys, ConditionalPut)
RegisterReadWriteCommand(roachpb.ConditionalPut, DefaultDeclareIsolatedKeys, ConditionalPut)
}

// ConditionalPut sets the value for a specified key only if
Expand All @@ -41,8 +41,17 @@ func ConditionalPut(
}
}
handleMissing := engine.CPutMissingBehavior(args.AllowIfDoesNotExist)
var err error
if args.Blind {
return result.Result{}, engine.MVCCBlindConditionalPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.ExpValue, handleMissing, h.Txn)
err = engine.MVCCBlindConditionalPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.ExpValue, handleMissing, h.Txn)
} else {
err = engine.MVCCConditionalPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.ExpValue, handleMissing, h.Txn)
}
return result.Result{}, engine.MVCCConditionalPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.ExpValue, handleMissing, h.Txn)
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
// without re-evaluating the batch. This behavior isn't particularly
// desirable, but while it remains, we need to assume that an intent could
// have been written even when an error is returned. This is harmless if the
// error is not consumed by the caller because the result will be discarded.
return result.FromWrittenIntents(h.Txn, args.Key), err
}
11 changes: 9 additions & 2 deletions pkg/storage/batcheval/cmd_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func init() {
RegisterReadWriteCommand(roachpb.Delete, DefaultDeclareKeys, Delete)
RegisterReadWriteCommand(roachpb.Delete, DefaultDeclareIsolatedKeys, Delete)
}

// Delete deletes the key and value specified by key.
Expand All @@ -29,5 +29,12 @@ func Delete(
args := cArgs.Args.(*roachpb.DeleteRequest)
h := cArgs.Header

return result.Result{}, engine.MVCCDelete(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, h.Txn)
err := engine.MVCCDelete(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, h.Txn)
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
// without re-evaluating the batch. This behavior isn't particularly
// desirable, but while it remains, we need to assume that an intent could
// have been written even when an error is returned. This is harmless if the
// error is not consumed by the caller because the result will be discarded.
return result.FromWrittenIntents(h.Txn, args.Key), err
}
27 changes: 19 additions & 8 deletions pkg/storage/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ func init() {
}

func declareKeysDeleteRange(
_ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
desc *roachpb.RangeDescriptor,
header roachpb.Header,
req roachpb.Request,
latchSpans, lockSpans *spanset.SpanSet,
) {
args := req.(*roachpb.DeleteRangeRequest)
access := spanset.SpanReadWrite

if args.Inline {
spans.AddNonMVCC(access, req.Header().Span())
DefaultDeclareKeys(desc, header, req, latchSpans, lockSpans)
} else {
spans.AddMVCC(access, req.Header().Span(), header.Timestamp)
DefaultDeclareIsolatedKeys(desc, header, req, latchSpans, lockSpans)
}
}

Expand All @@ -50,16 +51,26 @@ func DeleteRange(
if !args.Inline {
timestamp = h.Timestamp
}
// NB: Even if args.ReturnKeys is false, we want to know which intents were
// written if we're evaluating the DeleteRange for a transaction so that we
// can update the Result's WrittenIntents field.
returnKeys := args.ReturnKeys || h.Txn != nil
deleted, resumeSpan, num, err := engine.MVCCDeleteRange(
ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, h.MaxSpanRequestKeys, timestamp, h.Txn, args.ReturnKeys,
ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, h.MaxSpanRequestKeys, timestamp, h.Txn, returnKeys,
)
if err == nil {
if err == nil && args.ReturnKeys {
reply.Keys = deleted
}
reply.NumKeys = num
if resumeSpan != nil {
reply.ResumeSpan = resumeSpan
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
}
return result.Result{}, err
// NB: even if MVCC returns an error, it may still have written an intent
// into the batch. This allows callers to consume errors like WriteTooOld
// without re-evaluating the batch. This behavior isn't particularly
// desirable, but while it remains, we need to assume that an intent could
// have been written even when an error is returned. This is harmless if the
// error is not consumed by the caller because the result will be discarded.
return result.FromWrittenIntents(h.Txn, deleted...), err
}
Loading