Skip to content

Commit

Permalink
storage/txnwait: terminate push when pusher aborted at lower epoch
Browse files Browse the repository at this point in the history
Closes cockroachdb#40786.
Closes cockroachdb#44336.

This commit resolves a bug in distributed deadlock detection that would
allow a deadlock between transactions to go undetected, stalling the
workload indefinitely.

The issue materialized as follows:
1. two transactions would deadlock and each enter a txnwait queue
2. they would poll their pushees record along with their own
3. deadlock detection would eventually pick this up and abort one of the txns
   using the pusher's copy of the txn's proto
4. however, the aborted txn has since restarted and bumped it epoch
5. the aborted txn continued to query its record, but failed to ingest any
   updates from it because the record was at a lower epoch than its own
   copy of its txn proto. So it never noticed that it was ABORTED
6. all other txns in the system including the original contending txn
   piled up behind the aborted txn in the contention queue, waiting for
   it to notice it was aborted and exit the queue
7. deadlock!

I'm optimistically closing the two `kv/contention/nodes=4` issues both
because I hope this is the cause of their recent troubles and also because
I've been spending a lot of time with the test recently in light of cockroachdb#45482
and plan to stabilize it fully.

I plan to backport this to release-19.2. This doesn't need to go all the
way back to release-19.1 because this was introduces in aed892a.
  • Loading branch information
nvanbenschoten committed Mar 3, 2020
1 parent 5f9a71a commit 0a1f251
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 80 deletions.
11 changes: 9 additions & 2 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,8 +1029,15 @@ func (t *Transaction) Update(o *Transaction) {
t.IgnoredSeqNums = o.IgnoredSeqNums
}
} else /* t.Epoch > o.Epoch */ {
// Ignore epoch-specific state from previous epoch.
if o.Status == COMMITTED {
// Ignore epoch-specific state from previous epoch. However, ensure that
// the transaction status still makes sense.
switch o.Status {
case ABORTED:
// Once aborted, always aborted. The transaction coordinator might
// have incremented the txn's epoch without realizing that it was
// aborted.
t.Status = ABORTED
case COMMITTED:
log.Warningf(context.Background(), "updating txn %s with COMMITTED txn at earlier epoch %s", t.String(), o.String())
}
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,25 @@ func TestTransactionUpdateStaging(t *testing.T) {
}
}

// TestTransactionUpdateAbortedOldEpoch tests that Transaction.Update propagates
// an ABORTED status even when that status comes from a proto with an old epoch.
// Once a transaction is ABORTED, it will stay aborted, even if its coordinator
// doesn't know this at the time that it increments its epoch and retries.
func TestTransactionUpdateAbortedOldEpoch(t *testing.T) {
txn := nonZeroTxn
txn.Status = ABORTED

txnRestart := txn
txnRestart.Epoch++
txnRestart.Status = PENDING
txnRestart.Update(&txn)

expTxn := txn
expTxn.Epoch++
expTxn.Status = ABORTED
require.Equal(t, expTxn, txnRestart)
}

func TestTransactionClone(t *testing.T) {
txnPtr := nonZeroTxn.Clone()
txn := *txnPtr
Expand Down
175 changes: 97 additions & 78 deletions pkg/storage/txn_wait_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package storage
import (
"bytes"
"context"
"fmt"
"reflect"
"regexp"
"sync/atomic"
Expand Down Expand Up @@ -582,85 +583,103 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()

testutils.RunTrueAndFalse(t, "txnRecordExists", func(t *testing.T, txnRecordExists bool) {
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

txn, err := createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
var pusher *roachpb.Transaction
if txnRecordExists {
pusher, err = createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
} else {
pusher = newTransaction("pusher", roachpb.Key("a"), 1, tc.Clock())
}

req := roachpb.PushTxnRequest{
PushType: roachpb.PUSH_ABORT,
PusherTxn: *pusher,
PusheeTxn: txn.TxnMeta,
}

q := tc.repl.concMgr.TxnWaitQueue()
q.Enable()
q.EnqueueTxn(txn)

retCh := make(chan RespWithErr, 1)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), &req)
retCh <- RespWithErr{resp, pErr}
}()

testutils.SucceedsSoon(t, func() error {
expDeps := []uuid.UUID{pusher.ID}
if deps := q.GetDependents(txn.ID); !reflect.DeepEqual(deps, expDeps) {
return errors.Errorf("expected GetDependents %+v; got %+v", expDeps, deps)
}
return nil
})

// If the record doesn't exist yet, give the push queue enough
// time to query the missing record and notice.
if !txnRecordExists {
time.Sleep(10 * time.Millisecond)
// Test with the pusher txn record below the pusher's expected epoch, at
// the pusher's expected epoch, and above the pusher's expected epoch.
// Regardless of which epoch the transaction record is written at, if
// it is marked as ABORTED, it should terminate the push.
pushEpoch := enginepb.TxnEpoch(2)
for _, c := range []struct {
name string
recordEpoch enginepb.TxnEpoch
}{
{"below", pushEpoch - 1},
{"equal", pushEpoch},
{"above", pushEpoch + 1},
} {
t.Run(fmt.Sprintf("recordEpoch=%s", c.name), func(t *testing.T) {
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

txn, err := createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
var pusher *roachpb.Transaction
if txnRecordExists {
pusher, err = createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
} else {
pusher = newTransaction("pusher", roachpb.Key("a"), 1, tc.Clock())
}
pusher.Epoch = pushEpoch

req := roachpb.PushTxnRequest{
PushType: roachpb.PUSH_ABORT,
PusherTxn: *pusher,
PusheeTxn: txn.TxnMeta,
}

q := tc.repl.concMgr.TxnWaitQueue()
q.Enable()
q.EnqueueTxn(txn)

retCh := make(chan RespWithErr, 1)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), &req)
retCh <- RespWithErr{resp, pErr}
}()

testutils.SucceedsSoon(t, func() error {
expDeps := []uuid.UUID{pusher.ID}
if deps := q.GetDependents(txn.ID); !reflect.DeepEqual(deps, expDeps) {
return errors.Errorf("expected GetDependents %+v; got %+v", expDeps, deps)
}
return nil
})

// If the record doesn't exist yet, give the push queue enough
// time to query the missing record and notice.
if !txnRecordExists {
time.Sleep(10 * time.Millisecond)
}

// Update txn on disk with status ABORTED.
pusherUpdate := *pusher
pusherUpdate.Epoch = c.recordEpoch
pusherUpdate.Status = roachpb.ABORTED
if err := writeTxnRecord(context.Background(), &tc, &pusherUpdate); err != nil {
t.Fatal(err)
}
q.UpdateTxn(context.Background(), &pusherUpdate)

respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("expected nil response; got %+v", respWithErr.resp)
}
expErr := "TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED)"
if !testutils.IsPError(respWithErr.pErr, regexp.QuoteMeta(expErr)) {
t.Errorf("expected %s; got %v", expErr, respWithErr.pErr)
}

m := tc.store.txnWaitMetrics
testutils.SucceedsSoon(t, func() error {
if act, exp := m.PusherWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushers, but want %d", act, exp)
}
if act, exp := m.PusheeWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushees, but want %d", act, exp)
}
if act, exp := m.QueryWaiting.Value(), int64(0); act != exp {
return errors.Errorf("%d queries, but want %d", act, exp)
}
return nil
})
})
}

// Update txn on disk with status ABORTED.
pusherUpdate := *pusher
pusherUpdate.Status = roachpb.ABORTED
if err := writeTxnRecord(context.Background(), &tc, &pusherUpdate); err != nil {
t.Fatal(err)
}
q.UpdateTxn(context.Background(), &pusherUpdate)

respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("expected nil response; got %+v", respWithErr.resp)
}
expErr := "TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED)"
if !testutils.IsPError(respWithErr.pErr, regexp.QuoteMeta(expErr)) {
t.Errorf("expected %s; got %v", expErr, respWithErr.pErr)
}

m := tc.store.txnWaitMetrics
testutils.SucceedsSoon(t, func() error {
if act, exp := m.PusherWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushers, but want %d", act, exp)
}
if act, exp := m.PusheeWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushees, but want %d", act, exp)
}
if act, exp := m.QueryWaiting.Value(), int64(0); act != exp {
return errors.Errorf("%d queries, but want %d", act, exp)
}
return nil
})
})
}

Expand Down

0 comments on commit 0a1f251

Please sign in to comment.