Skip to content

Commit

Permalink
storage: send force abort PushTxn req directly from TxnWaitQueue
Browse files Browse the repository at this point in the history
This commit removes the interaction between the TxnWaitQueue and the
Replica-level retry loop when transaction deadlocks are detected. Instead
of instructing a waiting PushTxnRequest to change to a force push request,
the queue now launches push abort directly. This is cleaner and avoids
the need to mutate the input batch in Replica.maybeWaitForPushee.

Release note: None
  • Loading branch information
nvanbenschoten committed Jan 2, 2020
1 parent 8306f3a commit a311978
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 95 deletions.
13 changes: 1 addition & 12 deletions pkg/storage/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -244,23 +243,13 @@ func (r *Replica) maybeWaitForPushee(
}
pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest)
pushResp, pErr := r.txnWaitQueue.MaybeWaitForPush(ctx, r, pushReq)
// Copy the request in anticipation of setting the force arg and
// updating the Now timestamp (see below).
pushReqCopy := *pushReq
if pErr == txnwait.ErrDeadlock {
// We've experienced a deadlock; set Force=true on push request,
// and set the push type to ABORT.
pushReqCopy.Force = true
pushReqCopy.PushType = roachpb.PUSH_ABORT
} else if pErr != nil {
if pErr != nil {
return nil, pErr
} else if pushResp != nil {
br := &roachpb.BatchResponse{}
br.Add(pushResp)
return br, nil
}
ba.Requests = nil
ba.Add(&pushReqCopy)
} else if ba.IsSingleQueryTxnRequest() {
// For query txn requests, wait in the txn wait queue either for
// transaction update or for dependent transactions to change.
Expand Down
119 changes: 71 additions & 48 deletions pkg/storage/txn_wait_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func writeTxnRecord(ctx context.Context, tc *testContext, txn *roachpb.Transaction) error {
Expand Down Expand Up @@ -351,6 +352,9 @@ func TestTxnWaitQueueTxnSilentlyCompletes(t *testing.T) {
}
pusher := newTransaction("pusher", roachpb.Key("a"), 1, tc.Clock())
req := &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txn.Key,
},
PushType: roachpb.PUSH_ABORT,
PusherTxn: *pusher,
PusheeTxn: txn.TxnMeta,
Expand Down Expand Up @@ -651,6 +655,12 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) {
})
}

type ReqWithRespAndErr struct {
req *roachpb.PushTxnRequest
resp *roachpb.PushTxnResponse
pErr *roachpb.Error
}

// TestTxnWaitQueueDependencyCycle verifies that if txn A pushes txn B
// pushes txn C which in turn is pushing txn A, the cycle will be
// detected and broken by a higher priority pusher.
Expand All @@ -675,16 +685,25 @@ func TestTxnWaitQueueDependencyCycle(t *testing.T) {
}

reqA := &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txnB.Key,
},
PushType: roachpb.PUSH_ABORT,
PusherTxn: *txnA,
PusheeTxn: txnB.TxnMeta,
}
reqB := &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txnC.Key,
},
PushType: roachpb.PUSH_ABORT,
PusherTxn: *txnB,
PusheeTxn: txnC.TxnMeta,
}
reqC := &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txnA.Key,
},
PushType: roachpb.PUSH_ABORT,
PusherTxn: *txnC,
PusheeTxn: txnA.TxnMeta,
Expand All @@ -694,45 +713,45 @@ func TestTxnWaitQueueDependencyCycle(t *testing.T) {
q.Enable()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, txn := range []*roachpb.Transaction{txnA, txnB, txnC} {
q.Enqueue(txn)
}
m := tc.store.GetTxnWaitMetrics()
assert.EqualValues(tc, 0, m.DeadlocksTotal.Count())

retCh := make(chan RespWithErr, 3)
for _, req := range []*roachpb.PushTxnRequest{reqA, reqB, reqC} {
reqs := []*roachpb.PushTxnRequest{reqA, reqB, reqC}
retCh := make(chan ReqWithRespAndErr, len(reqs))
for _, req := range reqs {
go func(req *roachpb.PushTxnRequest) {
resp, pErr := q.MaybeWaitForPush(ctx, tc.repl, req)
retCh <- RespWithErr{resp, pErr}
retCh <- ReqWithRespAndErr{req, resp, pErr}
}(req)
}

// Wait for first request to finish, which should break the
// dependency cycle by returning an ErrDeadlock error.
respWithErr := <-retCh
if respWithErr.pErr != txnwait.ErrDeadlock {
t.Errorf("expected ErrDeadlock; got %v", respWithErr.pErr)
}
if respWithErr.resp != nil {
t.Errorf("expected nil response; got %+v", respWithErr.resp)
}
// Wait for first request to finish, which should break the dependency cycle
// by performing a force push abort. This will allow all other requests to
// proceed. At least one txn will be aborted by another txn, although it's
// possible that up to two are in the case that the deadlock is detected by
// multiple txns concurrently.
var pushed bool
for i := 0; i < len(reqs); i++ {
ret := <-retCh
if ret.pErr != nil {
if !testutils.IsPError(ret.pErr, context.Canceled.Error()) {
require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_PUSHER_ABORTED\)`, ret.pErr)
}
} else {
pushed = true
require.NotNil(t, ret.resp)
require.Equal(t, roachpb.ABORTED, ret.resp.PusheeTxn.Status)

testutils.SucceedsSoon(t, func() error {
if act, exp := m.DeadlocksTotal.Count(), int64(0); act <= exp {
return errors.Errorf("%d deadlocks, but want more than %d", act, exp)
// Cancel the pushers' context after the deadlock is initially broken.
cancel()
}
return nil
})
cancel()
for i := 0; i < 2; i++ {
<-retCh
}
}

type ReqWithErr struct {
req *roachpb.PushTxnRequest
pErr *roachpb.Error
require.True(t, pushed)
require.True(t, m.DeadlocksTotal.Count() >= 1)
}

// TestTxnWaitQueueDependencyCycleWithPriorityInversion verifies that
Expand Down Expand Up @@ -765,11 +784,17 @@ func TestTxnWaitQueueDependencyCycleWithPriorityInversion(t *testing.T) {
}

reqA := &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txnB.Key,
},
PushType: roachpb.PUSH_ABORT,
PusherTxn: *txnA,
PusheeTxn: txnB.TxnMeta,
}
reqB := &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txnA.Key,
},
PushType: roachpb.PUSH_ABORT,
PusherTxn: *txnB,
PusheeTxn: updatedTxnA.TxnMeta,
Expand All @@ -778,38 +803,36 @@ func TestTxnWaitQueueDependencyCycleWithPriorityInversion(t *testing.T) {
q := tc.repl.txnWaitQueue
q.Enable()

ctx, cancel := context.WithCancel(context.Background())
for _, txn := range []*roachpb.Transaction{txnA, txnB} {
q.Enqueue(txn)
}
m := tc.store.GetTxnWaitMetrics()
assert.EqualValues(tc, 0, m.DeadlocksTotal.Count())

retCh := make(chan ReqWithErr, 2)
for _, req := range []*roachpb.PushTxnRequest{reqA, reqB} {
reqs := []*roachpb.PushTxnRequest{reqA, reqB}
retCh := make(chan ReqWithRespAndErr, len(reqs))
for _, req := range reqs {
go func(req *roachpb.PushTxnRequest) {
_, pErr := q.MaybeWaitForPush(ctx, tc.repl, req)
retCh <- ReqWithErr{req, pErr}
resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, req)
retCh <- ReqWithRespAndErr{req, resp, pErr}
}(req)
}

// Wait for first request to finish, which should break the
// dependency cycle by returning an ErrDeadlock error. The
// returned request should be reqA.
reqWithErr := <-retCh
if !reflect.DeepEqual(reqA, reqWithErr.req) {
t.Errorf("expected request %+v; got %+v", reqA, reqWithErr.req)
}
if reqWithErr.pErr != txnwait.ErrDeadlock {
t.Errorf("expected errDeadlock; got %v", reqWithErr.pErr)
}

testutils.SucceedsSoon(t, func() error {
if act, exp := m.DeadlocksTotal.Count(), int64(0); act <= exp {
return errors.Errorf("%d deadlocks, but want more than %d", act, exp)
// Wait for the requests to finish. reqA should break the dependency
// cycle by force pushing. reqB should notice that it was aborted.
for i := 0; i < len(reqs); i++ {
ret := <-retCh
switch ret.req {
case reqA:
require.Nil(t, ret.pErr)
require.NotNil(t, ret.resp)
require.Equal(t, txnB.ID, ret.resp.PusheeTxn.ID)
require.Equal(t, roachpb.ABORTED, ret.resp.PusheeTxn.Status)
case reqB:
require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_PUSHER_ABORTED\)`, ret.pErr)
default:
t.Fatal("unexpected")
}
return nil
})
cancel()
<-retCh
}
require.EqualValues(t, 1, m.DeadlocksTotal.Count())
}
32 changes: 24 additions & 8 deletions pkg/storage/txnwait/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func TestingOverrideTxnLivenessThreshold(t time.Duration) func() {
// ABORT nor TIMESTAMP, but also for ABORT and TIMESTAMP pushes where
// the pushee has min priority or pusher has max priority.
func ShouldPushImmediately(req *roachpb.PushTxnRequest) bool {
if req.Force {
return true
}
if !(req.PushType == roachpb.PUSH_ABORT || req.PushType == roachpb.PUSH_TIMESTAMP) {
return true
}
Expand Down Expand Up @@ -387,10 +390,6 @@ func (q *Queue) releaseWaitingQueriesLocked(ctx context.Context, txnID uuid.UUID
}
}

// ErrDeadlock is a sentinel error returned when a cyclic dependency between
// waiting transactions is detected.
var ErrDeadlock = roachpb.NewErrorf("deadlock detected")

// MaybeWaitForPush checks whether there is a queue already
// established for pushing the transaction. If not, or if the PushTxn
// request isn't queueable, return immediately. If there is a queue,
Expand All @@ -399,9 +398,6 @@ var ErrDeadlock = roachpb.NewErrorf("deadlock detected")
//
// If the transaction is successfully pushed while this method is waiting,
// the first return value is a non-nil PushTxnResponse object.
//
// In the event of a dependency cycle of pushers leading to deadlock,
// this method will return an ErrDeadlock error.
func (q *Queue) MaybeWaitForPush(
ctx context.Context, repl ReplicaInterface, req *roachpb.PushTxnRequest,
) (*roachpb.PushTxnResponse, *roachpb.Error) {
Expand Down Expand Up @@ -653,7 +649,7 @@ func (q *Queue) MaybeWaitForPush(
dependents,
)
metrics.DeadlocksTotal.Inc(1)
return nil, ErrDeadlock
return q.forcePushAbort(ctx, req)
}
}
// Signal the pusher query txn loop to continue.
Expand Down Expand Up @@ -901,6 +897,26 @@ func (q *Queue) queryTxnStatus(
return nil, nil, nil
}

// forcePushAbort upgrades the PushTxn request to a "forced" push abort, which
// overrides the normal expiration and priority checks to ensure that it aborts
// the pushee. This mechanism can be used to break deadlocks between conflicting
// transactions.
func (q *Queue) forcePushAbort(
ctx context.Context, req *roachpb.PushTxnRequest,
) (*roachpb.PushTxnResponse, *roachpb.Error) {
log.VEventf(ctx, 1, "force pushing %v to break deadlock", req.PusheeTxn.ID)
forcePush := *req
forcePush.Force = true
forcePush.PushType = roachpb.PUSH_ABORT
b := &client.Batch{}
b.Header.Timestamp = q.store.Clock().Now()
b.AddRawRequest(&forcePush)
if err := q.store.DB().Run(ctx, b); err != nil {
return nil, b.MustPErr()
}
return b.RawResponse().Responses[0].GetPushTxn(), nil
}

// TrackedTxns returns a (newly minted) set containing the transaction IDs which
// are being tracked (i.e. waited on).
//
Expand Down
Loading

0 comments on commit a311978

Please sign in to comment.