Skip to content

Commit

Permalink
storage: only write AbortSpan entries if intents are removed
Browse files Browse the repository at this point in the history
This reduces the frequency of AbortSpan entries that can be abandoned even
without a transaction coordinator failure. Specifically, it protects against
the case where intent resolution races with a transaction coordinator cleaning
up its own transaction record and intents. This can happen for both aborted and
committed transactions.

In the first case, a pusher might find a transaction's intent and then find its
record to be aborted after that transaction had cleanly rolled back its own intents.
Even though the transaction's coordinator had already cleaned up and potentially
"unpoisoned" AbortSpans, the pusher would happily re-introduce AbortSpan records when
it goes to resolve the intents that were already cleaned up. These AbortSpan entries
would be fully abandoned and have to wait out the GC.

Similarly, in the second case, the transaction might have committed. Here, the
pushee might hit an intent and the txn coordinator might clean up and auto-GC its
txn record before the pushee arrives at the txn record. Once the pushee gets there,
it would mistake the txn for aborted (by design) and proceed to write an AbortSpan
record where the intent it had once observed had been (not by design).

We can tell both of these cases by simply recognizing whether intent resolution actually
succeeds. If intent resolution doesn't find an intent, then we might be in either
case. That's fine, because we only need to ever poison the abort span if we actually
remove an intent that could confuse a zombie transaction.

Release note: None
  • Loading branch information
nvanbenschoten committed Nov 26, 2019
1 parent 38d49eb commit a4e578f
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 77 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ func removeDeadReplicas(
return nil, err
}
intent.Status = roachpb.ABORTED
if err := engine.MVCCResolveWriteIntent(ctx, batch, &ms, intent); err != nil {
if _, err := engine.MVCCResolveWriteIntent(ctx, batch, &ms, intent); err != nil {
return nil, err
}
// With the intent resolved, we can try again.
Expand Down
10 changes: 7 additions & 3 deletions pkg/storage/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,11 @@ func resolveLocalIntents(
return nil
}
resolveMS := ms
resolveAllowance--
return engine.MVCCResolveWriteIntentUsingIter(ctx, batch, iterAndBuf, resolveMS, intent)
ok, err := engine.MVCCResolveWriteIntentUsingIter(ctx, batch, iterAndBuf, resolveMS, intent)
if ok {
resolveAllowance--
}
return err
}
// For intent ranges, cut into parts inside and outside our key
// range. Resolve locally inside, delegate the rest. In particular,
Expand Down Expand Up @@ -505,7 +508,8 @@ func resolveLocalIntents(
}
}

if WriteAbortSpanOnResolve(txn.Status) {
removedAny := resolveAllowance != intentResolutionBatchSize
if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) {
if err := SetAbortSpan(ctx, evalCtx, batch, ms, txn.TxnMeta, args.Poison); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestRefreshRangeTimeBoundIterator(t *testing.T) {
// (committed). The sstable also has a second write at a different (older)
// timestamp, because if it were empty other than the deletion tombstone, it
// would not have any timestamp bounds and would be selected for every read.
if err := engine.MVCCResolveWriteIntent(ctx, db, nil, roachpb.Intent{
if _, err := engine.MVCCResolveWriteIntent(ctx, db, nil, roachpb.Intent{
Span: roachpb.Span{Key: k},
Txn: txn.TxnMeta,
Status: roachpb.COMMITTED,
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func declareKeysResolveIntentCombined(
status = t.Status
txnID = t.IntentTxn.ID
}
if WriteAbortSpanOnResolve(status) {
if status == roachpb.ABORTED {
spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, txnID)})
}
}
Expand All @@ -52,7 +52,7 @@ func declareKeysResolveIntent(

func resolveToMetricType(status roachpb.TransactionStatus, poison bool) *result.Metrics {
var typ result.Metrics
if WriteAbortSpanOnResolve(status) {
if status == roachpb.ABORTED {
if poison {
typ.ResolvePoison = 1
} else {
Expand Down Expand Up @@ -82,14 +82,15 @@ func ResolveIntent(
Txn: args.IntentTxn,
Status: args.Status,
}
if err := engine.MVCCResolveWriteIntent(ctx, batch, ms, intent); err != nil {
ok, err := engine.MVCCResolveWriteIntent(ctx, batch, ms, intent)
if err != nil {
return result.Result{}, err
}

var res result.Result
res.Local.Metrics = resolveToMetricType(args.Status, args.Poison)

if WriteAbortSpanOnResolve(args.Status) {
if WriteAbortSpanOnResolve(args.Status, args.Poison, ok) {
if err := SetAbortSpan(ctx, cArgs.EvalCtx, batch, ms, args.IntentTxn, args.Poison); err != nil {
return result.Result{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func ResolveIntentRange(
var res result.Result
res.Local.Metrics = resolveToMetricType(args.Status, args.Poison)

if WriteAbortSpanOnResolve(args.Status) {
if WriteAbortSpanOnResolve(args.Status, args.Poison, numKeys > 0) {
if err := SetAbortSpan(ctx, cArgs.EvalCtx, batch, ms, args.IntentTxn, args.Poison); err != nil {
return result.Result{}, err
}
Expand Down
22 changes: 19 additions & 3 deletions pkg/storage/batcheval/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,25 @@ func VerifyTransaction(
}

// WriteAbortSpanOnResolve returns true if the abort span must be written when
// the transaction with the given status is resolved.
func WriteAbortSpanOnResolve(status roachpb.TransactionStatus) bool {
return status == roachpb.ABORTED
// the transaction with the given status is resolved. It avoids instructing the
// caller to write to the abort span if the caller didn't actually remove any
// intents but intends to poison.
func WriteAbortSpanOnResolve(status roachpb.TransactionStatus, poison, removedIntents bool) bool {
if status != roachpb.ABORTED {
// Only update the AbortSpan for aborted transactions,
return false
}
if !poison {
// We can remove any entries from the AbortSpan.
return true
}
// We only need to add AbortSpan entries for transactions that we have
// invalidated by removing intents. This avoids leaking AbortSpan entries if
// a request raced with txn record GC and mistakenly interpreted a committed
// txn as aborted only to return to the intent it wanted to push and find it
// already resolved. We're only required to write an entry if we do
// something that could confuse/invalidate a zombie transaction.
return removedIntents
}

// SetAbortSpan clears any AbortSpan entry if poison is false.
Expand Down
118 changes: 99 additions & 19 deletions pkg/storage/batcheval/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package batcheval

import (
"context"
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -126,6 +127,7 @@ func TestSetAbortSpan(t *testing.T) {
args := CommandArgs{
EvalCtx: rec,
Args: &req,
MaxKeys: math.MaxInt64,
}

var resp roachpb.ResolveIntentRangeResponse
Expand All @@ -143,15 +145,15 @@ func TestSetAbortSpan(t *testing.T) {
// EndTransactionRequest //
///////////////////////////////////////////////////////////////////////
{
name: "end txn, rollback, no poison, abort span missing",
name: "end txn, rollback, no poison, intent missing, abort span missing",
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, false /* commit */, false /* poison */)
},
// Not poisoning, should not add an abort span entry.
exp: nil,
},
{
name: "end txn, rollback, no poison, abort span present",
name: "end txn, rollback, no poison, intent missing, abort span present",
before: addAbortSpanEntry,
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, false /* commit */, false /* poison */)
Expand All @@ -160,24 +162,101 @@ func TestSetAbortSpan(t *testing.T) {
exp: nil,
},
{
name: "end txn, rollback, poison, abort span missing",
name: "end txn, rollback, poison, intent missing, abort span missing",
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, false /* commit */, true /* poison */)
},
// Poisoning, but no intents found, should not add an abort span entry.
exp: nil,
},
{
name: "end txn, rollback, poison, intent missing, abort span present",
before: addAbortSpanEntry,
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, false /* commit */, true /* poison */)
},
// Poisoning, but no intents found, don't touch abort span.
exp: &txnPrevAbortSpanEntry,
},
{
name: "end txn, commit, no poison, intent missing, abort span missing",
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, true /* commit */, false /* poison */)
},
// Not poisoning, should not add an abort span entry.
exp: nil,
},
{
// NOTE: this request doesn't make sense, but we handle it. An abort
// span shouldn't be present if the transaction is still committable.
name: "end txn, commit, no poison, intent missing, abort span present",
before: addAbortSpanEntry,
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, true /* commit */, false /* poison */)
},
// Not aborted, don't touch abort span.
exp: &txnPrevAbortSpanEntry,
},
{
// NOTE: this request doesn't make sense, but we handle it. An EndTxn
// should never pass Commit = true and Poison = true.
name: "end txn, commit, poison, intent missing, abort span missing",
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, true /* commit */, true /* poison */)
},
// Poisoning but not aborted, should not add an abort span entry.
exp: nil,
},
{
// NOTE: this request doesn't make sense, but we handle it. An EndTxn
// should never pass Commit = true and Poison = true.
name: "end txn, commit, poison, intent missing, abort span present",
before: addAbortSpanEntry,
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, true /* commit */, true /* poison */)
},
// Not aborted, don't touch abort span.
exp: &txnPrevAbortSpanEntry,
},
{
name: "end txn, rollback, no poison, intent present, abort span missing",
before: addIntent,
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, false /* commit */, false /* poison */)
},
// Not poisoning, should not add an abort span entry.
exp: nil,
},
{
name: "end txn, rollback, no poison, intent present, abort span present",
before: compose(addIntent, addAbortSpanEntry),
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, false /* commit */, false /* poison */)
},
// Not poisoning, should clean up abort span entry.
exp: nil,
},
{
name: "end txn, rollback, poison, intent present, abort span missing",
before: addIntent,
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, false /* commit */, true /* poison */)
},
// Poisoning, should add an abort span entry.
exp: &txnAbortSpanEntry,
},
{
name: "end txn, rollback, poison, abort span present",
before: addAbortSpanEntry,
name: "end txn, rollback, poison, intent present, abort span present",
before: compose(addIntent, addAbortSpanEntry),
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, false /* commit */, true /* poison */)
},
// Poisoning, should update abort span entry.
exp: &txnAbortSpanEntry,
},
{
name: "end txn, commit, no poison, abort span missing",
name: "end txn, commit, no poison, intent present, abort span missing",
before: addIntent,
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, true /* commit */, false /* poison */)
},
Expand All @@ -187,8 +266,8 @@ func TestSetAbortSpan(t *testing.T) {
{
// NOTE: this request doesn't make sense, but we handle it. An abort
// span shouldn't be present if the transaction is still committable.
name: "end txn, commit, no poison, abort span present",
before: addAbortSpanEntry,
name: "end txn, commit, no poison, intent present, abort span present",
before: compose(addIntent, addAbortSpanEntry),
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, true /* commit */, false /* poison */)
},
Expand All @@ -198,7 +277,8 @@ func TestSetAbortSpan(t *testing.T) {
{
// NOTE: this request doesn't make sense, but we handle it. An EndTxn
// should never pass Commit = true and Poison = true.
name: "end txn, commit, poison, abort span missing",
name: "end txn, commit, poison, intent present, abort span missing",
before: addIntent,
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, true /* commit */, true /* poison */)
},
Expand All @@ -208,8 +288,8 @@ func TestSetAbortSpan(t *testing.T) {
{
// NOTE: this request doesn't make sense, but we handle it. An EndTxn
// should never pass Commit = true and Poison = true.
name: "end txn, commit, poison, abort span present",
before: addAbortSpanEntry,
name: "end txn, commit, poison, intent present, abort span present",
before: compose(addIntent, addAbortSpanEntry),
run: func(b engine.ReadWriter, rec EvalContext) error {
return endTxn(b, rec, true /* commit */, true /* poison */)
},
Expand Down Expand Up @@ -333,17 +413,17 @@ func TestSetAbortSpan(t *testing.T) {
run: func(b engine.ReadWriter, rec EvalContext) error {
return resolveIntent(b, rec, roachpb.ABORTED, true /* poison */)
},
// Poisoning, should add an abort span entry.
exp: &txnAbortSpanEntry,
// Poisoning, but no intents found, should not add an abort span entry.
exp: nil,
},
{
name: "resolve intent, txn aborted, poison, intent missing, abort span present",
before: addAbortSpanEntry,
run: func(b engine.ReadWriter, rec EvalContext) error {
return resolveIntent(b, rec, roachpb.ABORTED, true /* poison */)
},
// Poisoning, should update abort span entry.
exp: &txnAbortSpanEntry,
// Poisoning, but no intents found, don't touch abort span.
exp: &txnPrevAbortSpanEntry,
},
{
name: "resolve intent, txn aborted, poison, intent present, abort span missing",
Expand Down Expand Up @@ -560,17 +640,17 @@ func TestSetAbortSpan(t *testing.T) {
run: func(b engine.ReadWriter, rec EvalContext) error {
return resolveIntentRange(b, rec, roachpb.ABORTED, true /* poison */)
},
// Poisoning, should add an abort span entry.
exp: &txnAbortSpanEntry,
// Poisoning, but no intents found, should not add an abort span entry.
exp: nil,
},
{
name: "resolve intent range, txn aborted, poison, intent missing, abort span present",
before: addAbortSpanEntry,
run: func(b engine.ReadWriter, rec EvalContext) error {
return resolveIntentRange(b, rec, roachpb.ABORTED, true /* poison */)
},
// Poisoning, should update abort span entry.
exp: &txnAbortSpanEntry,
// Poisoning, but no intents found, don't touch abort span.
exp: &txnPrevAbortSpanEntry,
},
{
name: "resolve intent range, txn aborted, poison, intent present, abort span missing",
Expand Down
13 changes: 12 additions & 1 deletion pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ func TestStoreSplitAbortSpan(t *testing.T) {

populateAbortSpan := func(key roachpb.Key, ts hlc.Timestamp) *roachpb.ResolveIntentRequest {
pushee := txn(key, ts)

// First write an intent on the key...
incArgs := incrementArgs(key, 1)
_, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: pushee}, incArgs)
if pErr != nil {
t.Fatalf("while sending +%v: %s", incArgs, pErr)
}

// Then resolve the intent and poison. Without the intent write, the
// intent resolution would be a no-op and wouldn't leave an AbortSpan
// entry.
expAll = append(expAll, roachpb.AbortSpanEntry{
Key: key,
Timestamp: ts,
Expand Down Expand Up @@ -185,7 +196,7 @@ func TestStoreSplitAbortSpan(t *testing.T) {
}

for _, arg := range args {
_, pErr := client.SendWrapped(context.Background(), store.TestSender(), arg)
_, pErr := client.SendWrapped(ctx, store.TestSender(), arg)
if pErr != nil {
t.Fatalf("while sending +%v: %s", arg, pErr)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func setupMVCCData(
key := keys[idx]
txnMeta := txn.TxnMeta
txnMeta.WriteTimestamp = hlc.Timestamp{WallTime: int64(counts[idx]) * 5}
if err := MVCCResolveWriteIntent(ctx, batch, nil /* ms */, roachpb.Intent{
if _, err := MVCCResolveWriteIntent(ctx, batch, nil /* ms */, roachpb.Intent{
Span: roachpb.Span{Key: key},
Status: roachpb.COMMITTED,
Txn: txnMeta,
Expand Down
Loading

0 comments on commit a4e578f

Please sign in to comment.