Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/txnwait: terminate push when pusher aborted at lower epoch #45603

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,8 +1029,15 @@ func (t *Transaction) Update(o *Transaction) {
t.IgnoredSeqNums = o.IgnoredSeqNums
}
} else /* t.Epoch > o.Epoch */ {
// Ignore epoch-specific state from previous epoch.
if o.Status == COMMITTED {
// Ignore epoch-specific state from previous epoch. However, ensure that
// the transaction status still makes sense.
switch o.Status {
case ABORTED:
// Once aborted, always aborted. The transaction coordinator might
// have incremented the txn's epoch without realizing that it was
// aborted.
t.Status = ABORTED
case COMMITTED:
log.Warningf(context.Background(), "updating txn %s with COMMITTED txn at earlier epoch %s", t.String(), o.String())
}
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,25 @@ func TestTransactionUpdateStaging(t *testing.T) {
}
}

// TestTransactionUpdateAbortedOldEpoch tests that Transaction.Update propagates
// an ABORTED status even when that status comes from a proto with an old epoch.
// Once a transaction is ABORTED, it will stay aborted, even if its coordinator
// doesn't know this at the time that it increments its epoch and retries.
func TestTransactionUpdateAbortedOldEpoch(t *testing.T) {
txn := nonZeroTxn
txn.Status = ABORTED

txnRestart := txn
txnRestart.Epoch++
txnRestart.Status = PENDING
txnRestart.Update(&txn)

expTxn := txn
expTxn.Epoch++
expTxn.Status = ABORTED
require.Equal(t, expTxn, txnRestart)
}

func TestTransactionClone(t *testing.T) {
txnPtr := nonZeroTxn.Clone()
txn := *txnPtr
Expand Down
175 changes: 97 additions & 78 deletions pkg/storage/txn_wait_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package storage
import (
"bytes"
"context"
"fmt"
"reflect"
"regexp"
"sync/atomic"
Expand Down Expand Up @@ -582,85 +583,103 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()

testutils.RunTrueAndFalse(t, "txnRecordExists", func(t *testing.T, txnRecordExists bool) {
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

txn, err := createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
var pusher *roachpb.Transaction
if txnRecordExists {
pusher, err = createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
} else {
pusher = newTransaction("pusher", roachpb.Key("a"), 1, tc.Clock())
}

req := roachpb.PushTxnRequest{
PushType: roachpb.PUSH_ABORT,
PusherTxn: *pusher,
PusheeTxn: txn.TxnMeta,
}

q := tc.repl.concMgr.TxnWaitQueue()
q.Enable()
q.EnqueueTxn(txn)

retCh := make(chan RespWithErr, 1)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), &req)
retCh <- RespWithErr{resp, pErr}
}()

testutils.SucceedsSoon(t, func() error {
expDeps := []uuid.UUID{pusher.ID}
if deps := q.GetDependents(txn.ID); !reflect.DeepEqual(deps, expDeps) {
return errors.Errorf("expected GetDependents %+v; got %+v", expDeps, deps)
}
return nil
})

// If the record doesn't exist yet, give the push queue enough
// time to query the missing record and notice.
if !txnRecordExists {
time.Sleep(10 * time.Millisecond)
// Test with the pusher txn record below the pusher's expected epoch, at
// the pusher's expected epoch, and above the pusher's expected epoch.
// Regardless of which epoch the transaction record is written at, if
// it is marked as ABORTED, it should terminate the push.
pushEpoch := enginepb.TxnEpoch(2)
for _, c := range []struct {
name string
recordEpoch enginepb.TxnEpoch
}{
{"below", pushEpoch - 1},
{"equal", pushEpoch},
{"above", pushEpoch + 1},
} {
t.Run(fmt.Sprintf("recordEpoch=%s", c.name), func(t *testing.T) {
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

txn, err := createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
var pusher *roachpb.Transaction
if txnRecordExists {
pusher, err = createTxnForPushQueue(context.Background(), &tc)
if err != nil {
t.Fatal(err)
}
} else {
pusher = newTransaction("pusher", roachpb.Key("a"), 1, tc.Clock())
}
pusher.Epoch = pushEpoch

req := roachpb.PushTxnRequest{
PushType: roachpb.PUSH_ABORT,
PusherTxn: *pusher,
PusheeTxn: txn.TxnMeta,
}

q := tc.repl.concMgr.TxnWaitQueue()
q.Enable()
q.EnqueueTxn(txn)

retCh := make(chan RespWithErr, 1)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), &req)
retCh <- RespWithErr{resp, pErr}
}()

testutils.SucceedsSoon(t, func() error {
expDeps := []uuid.UUID{pusher.ID}
if deps := q.GetDependents(txn.ID); !reflect.DeepEqual(deps, expDeps) {
return errors.Errorf("expected GetDependents %+v; got %+v", expDeps, deps)
}
return nil
})

// If the record doesn't exist yet, give the push queue enough
// time to query the missing record and notice.
if !txnRecordExists {
time.Sleep(10 * time.Millisecond)
}

// Update txn on disk with status ABORTED.
pusherUpdate := *pusher
pusherUpdate.Epoch = c.recordEpoch
pusherUpdate.Status = roachpb.ABORTED
if err := writeTxnRecord(context.Background(), &tc, &pusherUpdate); err != nil {
t.Fatal(err)
}
q.UpdateTxn(context.Background(), &pusherUpdate)

respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("expected nil response; got %+v", respWithErr.resp)
}
expErr := "TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED)"
if !testutils.IsPError(respWithErr.pErr, regexp.QuoteMeta(expErr)) {
t.Errorf("expected %s; got %v", expErr, respWithErr.pErr)
}

m := tc.store.txnWaitMetrics
testutils.SucceedsSoon(t, func() error {
if act, exp := m.PusherWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushers, but want %d", act, exp)
}
if act, exp := m.PusheeWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushees, but want %d", act, exp)
}
if act, exp := m.QueryWaiting.Value(), int64(0); act != exp {
return errors.Errorf("%d queries, but want %d", act, exp)
}
return nil
})
})
}

// Update txn on disk with status ABORTED.
pusherUpdate := *pusher
pusherUpdate.Status = roachpb.ABORTED
if err := writeTxnRecord(context.Background(), &tc, &pusherUpdate); err != nil {
t.Fatal(err)
}
q.UpdateTxn(context.Background(), &pusherUpdate)

respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("expected nil response; got %+v", respWithErr.resp)
}
expErr := "TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED)"
if !testutils.IsPError(respWithErr.pErr, regexp.QuoteMeta(expErr)) {
t.Errorf("expected %s; got %v", expErr, respWithErr.pErr)
}

m := tc.store.txnWaitMetrics
testutils.SucceedsSoon(t, func() error {
if act, exp := m.PusherWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushers, but want %d", act, exp)
}
if act, exp := m.PusheeWaiting.Value(), int64(1); act != exp {
return errors.Errorf("%d pushees, but want %d", act, exp)
}
if act, exp := m.QueryWaiting.Value(), int64(0); act != exp {
return errors.Errorf("%d queries, but want %d", act, exp)
}
return nil
})
})
}

Expand Down