diff --git a/pkg/storage/store_send.go b/pkg/storage/store_send.go index d112c1afbfef..2041fc9306ae 100644 --- a/pkg/storage/store_send.go +++ b/pkg/storage/store_send.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -244,23 +243,13 @@ func (r *Replica) maybeWaitForPushee( } pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest) pushResp, pErr := r.txnWaitQueue.MaybeWaitForPush(ctx, r, pushReq) - // Copy the request in anticipation of setting the force arg and - // updating the Now timestamp (see below). - pushReqCopy := *pushReq - if pErr == txnwait.ErrDeadlock { - // We've experienced a deadlock; set Force=true on push request, - // and set the push type to ABORT. - pushReqCopy.Force = true - pushReqCopy.PushType = roachpb.PUSH_ABORT - } else if pErr != nil { + if pErr != nil { return nil, pErr } else if pushResp != nil { br := &roachpb.BatchResponse{} br.Add(pushResp) return br, nil } - ba.Requests = nil - ba.Add(&pushReqCopy) } else if ba.IsSingleQueryTxnRequest() { // For query txn requests, wait in the txn wait queue either for // transaction update or for dependent transactions to change. diff --git a/pkg/storage/txn_wait_queue_test.go b/pkg/storage/txn_wait_queue_test.go index 41eb29d3f424..89e22169c7fe 100644 --- a/pkg/storage/txn_wait_queue_test.go +++ b/pkg/storage/txn_wait_queue_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func writeTxnRecord(ctx context.Context, tc *testContext, txn *roachpb.Transaction) error { @@ -351,6 +352,9 @@ func TestTxnWaitQueueTxnSilentlyCompletes(t *testing.T) { } pusher := newTransaction("pusher", roachpb.Key("a"), 1, tc.Clock()) req := &roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txn.Key, + }, PushType: roachpb.PUSH_ABORT, PusherTxn: *pusher, PusheeTxn: txn.TxnMeta, @@ -651,6 +655,12 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) { }) } +type ReqWithRespAndErr struct { + req *roachpb.PushTxnRequest + resp *roachpb.PushTxnResponse + pErr *roachpb.Error +} + // TestTxnWaitQueueDependencyCycle verifies that if txn A pushes txn B // pushes txn C which in turn is pushing txn A, the cycle will be // detected and broken by a higher priority pusher. @@ -675,16 +685,25 @@ func TestTxnWaitQueueDependencyCycle(t *testing.T) { } reqA := &roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txnB.Key, + }, PushType: roachpb.PUSH_ABORT, PusherTxn: *txnA, PusheeTxn: txnB.TxnMeta, } reqB := &roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txnC.Key, + }, PushType: roachpb.PUSH_ABORT, PusherTxn: *txnB, PusheeTxn: txnC.TxnMeta, } reqC := &roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txnA.Key, + }, PushType: roachpb.PUSH_ABORT, PusherTxn: *txnC, PusheeTxn: txnA.TxnMeta, @@ -694,45 +713,45 @@ func TestTxnWaitQueueDependencyCycle(t *testing.T) { q.Enable() ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for _, txn := range []*roachpb.Transaction{txnA, txnB, txnC} { q.Enqueue(txn) } m := tc.store.GetTxnWaitMetrics() assert.EqualValues(tc, 0, m.DeadlocksTotal.Count()) - retCh := make(chan RespWithErr, 3) - for _, req := range []*roachpb.PushTxnRequest{reqA, reqB, reqC} { + reqs := []*roachpb.PushTxnRequest{reqA, reqB, reqC} + retCh := make(chan ReqWithRespAndErr, len(reqs)) + for _, req := range reqs { go func(req *roachpb.PushTxnRequest) { resp, pErr := q.MaybeWaitForPush(ctx, tc.repl, req) - retCh <- RespWithErr{resp, pErr} + retCh <- ReqWithRespAndErr{req, resp, pErr} }(req) } - // Wait for first request to finish, which should break the - // dependency cycle by returning an ErrDeadlock error. - respWithErr := <-retCh - if respWithErr.pErr != txnwait.ErrDeadlock { - t.Errorf("expected ErrDeadlock; got %v", respWithErr.pErr) - } - if respWithErr.resp != nil { - t.Errorf("expected nil response; got %+v", respWithErr.resp) - } + // Wait for first request to finish, which should break the dependency cycle + // by performing a force push abort. This will allow all other requests to + // proceed. At least one txn will be aborted by another txn, although it's + // possible that up to two are in the case that the deadlock is detected by + // multiple txns concurrently. + var pushed bool + for i := 0; i < len(reqs); i++ { + ret := <-retCh + if ret.pErr != nil { + if !testutils.IsPError(ret.pErr, context.Canceled.Error()) { + require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_PUSHER_ABORTED\)`, ret.pErr) + } + } else { + pushed = true + require.NotNil(t, ret.resp) + require.Equal(t, roachpb.ABORTED, ret.resp.PusheeTxn.Status) - testutils.SucceedsSoon(t, func() error { - if act, exp := m.DeadlocksTotal.Count(), int64(0); act <= exp { - return errors.Errorf("%d deadlocks, but want more than %d", act, exp) + // Cancel the pushers' context after the deadlock is initially broken. + cancel() } - return nil - }) - cancel() - for i := 0; i < 2; i++ { - <-retCh } -} - -type ReqWithErr struct { - req *roachpb.PushTxnRequest - pErr *roachpb.Error + require.True(t, pushed) + require.True(t, m.DeadlocksTotal.Count() >= 1) } // TestTxnWaitQueueDependencyCycleWithPriorityInversion verifies that @@ -765,11 +784,17 @@ func TestTxnWaitQueueDependencyCycleWithPriorityInversion(t *testing.T) { } reqA := &roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txnB.Key, + }, PushType: roachpb.PUSH_ABORT, PusherTxn: *txnA, PusheeTxn: txnB.TxnMeta, } reqB := &roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txnA.Key, + }, PushType: roachpb.PUSH_ABORT, PusherTxn: *txnB, PusheeTxn: updatedTxnA.TxnMeta, @@ -778,38 +803,36 @@ func TestTxnWaitQueueDependencyCycleWithPriorityInversion(t *testing.T) { q := tc.repl.txnWaitQueue q.Enable() - ctx, cancel := context.WithCancel(context.Background()) for _, txn := range []*roachpb.Transaction{txnA, txnB} { q.Enqueue(txn) } m := tc.store.GetTxnWaitMetrics() assert.EqualValues(tc, 0, m.DeadlocksTotal.Count()) - retCh := make(chan ReqWithErr, 2) - for _, req := range []*roachpb.PushTxnRequest{reqA, reqB} { + reqs := []*roachpb.PushTxnRequest{reqA, reqB} + retCh := make(chan ReqWithRespAndErr, len(reqs)) + for _, req := range reqs { go func(req *roachpb.PushTxnRequest) { - _, pErr := q.MaybeWaitForPush(ctx, tc.repl, req) - retCh <- ReqWithErr{req, pErr} + resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, req) + retCh <- ReqWithRespAndErr{req, resp, pErr} }(req) } - // Wait for first request to finish, which should break the - // dependency cycle by returning an ErrDeadlock error. The - // returned request should be reqA. - reqWithErr := <-retCh - if !reflect.DeepEqual(reqA, reqWithErr.req) { - t.Errorf("expected request %+v; got %+v", reqA, reqWithErr.req) - } - if reqWithErr.pErr != txnwait.ErrDeadlock { - t.Errorf("expected errDeadlock; got %v", reqWithErr.pErr) - } - - testutils.SucceedsSoon(t, func() error { - if act, exp := m.DeadlocksTotal.Count(), int64(0); act <= exp { - return errors.Errorf("%d deadlocks, but want more than %d", act, exp) + // Wait for the requests to finish. reqA should break the dependency + // cycle by force pushing. reqB should notice that it was aborted. + for i := 0; i < len(reqs); i++ { + ret := <-retCh + switch ret.req { + case reqA: + require.Nil(t, ret.pErr) + require.NotNil(t, ret.resp) + require.Equal(t, txnB.ID, ret.resp.PusheeTxn.ID) + require.Equal(t, roachpb.ABORTED, ret.resp.PusheeTxn.Status) + case reqB: + require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_PUSHER_ABORTED\)`, ret.pErr) + default: + t.Fatal("unexpected") } - return nil - }) - cancel() - <-retCh + } + require.EqualValues(t, 1, m.DeadlocksTotal.Count()) } diff --git a/pkg/storage/txnwait/queue.go b/pkg/storage/txnwait/queue.go index 2bb326346e39..8489f3866c5f 100644 --- a/pkg/storage/txnwait/queue.go +++ b/pkg/storage/txnwait/queue.go @@ -60,6 +60,9 @@ func TestingOverrideTxnLivenessThreshold(t time.Duration) func() { // ABORT nor TIMESTAMP, but also for ABORT and TIMESTAMP pushes where // the pushee has min priority or pusher has max priority. func ShouldPushImmediately(req *roachpb.PushTxnRequest) bool { + if req.Force { + return true + } if !(req.PushType == roachpb.PUSH_ABORT || req.PushType == roachpb.PUSH_TIMESTAMP) { return true } @@ -387,10 +390,6 @@ func (q *Queue) releaseWaitingQueriesLocked(ctx context.Context, txnID uuid.UUID } } -// ErrDeadlock is a sentinel error returned when a cyclic dependency between -// waiting transactions is detected. -var ErrDeadlock = roachpb.NewErrorf("deadlock detected") - // MaybeWaitForPush checks whether there is a queue already // established for pushing the transaction. If not, or if the PushTxn // request isn't queueable, return immediately. If there is a queue, @@ -399,9 +398,6 @@ var ErrDeadlock = roachpb.NewErrorf("deadlock detected") // // If the transaction is successfully pushed while this method is waiting, // the first return value is a non-nil PushTxnResponse object. -// -// In the event of a dependency cycle of pushers leading to deadlock, -// this method will return an ErrDeadlock error. func (q *Queue) MaybeWaitForPush( ctx context.Context, repl ReplicaInterface, req *roachpb.PushTxnRequest, ) (*roachpb.PushTxnResponse, *roachpb.Error) { @@ -653,7 +649,7 @@ func (q *Queue) MaybeWaitForPush( dependents, ) metrics.DeadlocksTotal.Inc(1) - return nil, ErrDeadlock + return q.forcePushAbort(ctx, req) } } // Signal the pusher query txn loop to continue. @@ -901,6 +897,26 @@ func (q *Queue) queryTxnStatus( return nil, nil, nil } +// forcePushAbort upgrades the PushTxn request to a "forced" push abort, which +// overrides the normal expiration and priority checks to ensure that it aborts +// the pushee. This mechanism can be used to break deadlocks between conflicting +// transactions. +func (q *Queue) forcePushAbort( + ctx context.Context, req *roachpb.PushTxnRequest, +) (*roachpb.PushTxnResponse, *roachpb.Error) { + log.VEventf(ctx, 1, "force pushing %v to break deadlock", req.PusheeTxn.ID) + forcePush := *req + forcePush.Force = true + forcePush.PushType = roachpb.PUSH_ABORT + b := &client.Batch{} + b.Header.Timestamp = q.store.Clock().Now() + b.AddRawRequest(&forcePush) + if err := q.store.DB().Run(ctx, b); err != nil { + return nil, b.MustPErr() + } + return b.RawResponse().Responses[0].GetPushTxn(), nil +} + // TrackedTxns returns a (newly minted) set containing the transaction IDs which // are being tracked (i.e. waited on). // diff --git a/pkg/storage/txnwait/queue_test.go b/pkg/storage/txnwait/queue_test.go index 5f7745646655..040d75481305 100644 --- a/pkg/storage/txnwait/queue_test.go +++ b/pkg/storage/txnwait/queue_test.go @@ -29,43 +29,77 @@ import ( func TestShouldPushImmediately(t *testing.T) { defer leaktest.AfterTest(t)() + + min := enginepb.MinTxnPriority + max := enginepb.MaxTxnPriority + mid := enginepb.TxnPriority(1) testCases := []struct { + force bool typ roachpb.PushTxnType pusherPri enginepb.TxnPriority pusheePri enginepb.TxnPriority shouldPush bool }{ - {roachpb.PUSH_ABORT, enginepb.MinTxnPriority, enginepb.MinTxnPriority, false}, - {roachpb.PUSH_ABORT, enginepb.MinTxnPriority, 1, false}, - {roachpb.PUSH_ABORT, enginepb.MinTxnPriority, enginepb.MaxTxnPriority, false}, - {roachpb.PUSH_ABORT, 1, enginepb.MinTxnPriority, true}, - {roachpb.PUSH_ABORT, 1, 1, false}, - {roachpb.PUSH_ABORT, 1, enginepb.MaxTxnPriority, false}, - {roachpb.PUSH_ABORT, enginepb.MaxTxnPriority, enginepb.MinTxnPriority, true}, - {roachpb.PUSH_ABORT, enginepb.MaxTxnPriority, 1, true}, - {roachpb.PUSH_ABORT, enginepb.MaxTxnPriority, enginepb.MaxTxnPriority, false}, - {roachpb.PUSH_TIMESTAMP, enginepb.MinTxnPriority, enginepb.MinTxnPriority, false}, - {roachpb.PUSH_TIMESTAMP, enginepb.MinTxnPriority, 1, false}, - {roachpb.PUSH_TIMESTAMP, enginepb.MinTxnPriority, enginepb.MaxTxnPriority, false}, - {roachpb.PUSH_TIMESTAMP, 1, enginepb.MinTxnPriority, true}, - {roachpb.PUSH_TIMESTAMP, 1, 1, false}, - {roachpb.PUSH_TIMESTAMP, 1, enginepb.MaxTxnPriority, false}, - {roachpb.PUSH_TIMESTAMP, enginepb.MaxTxnPriority, enginepb.MinTxnPriority, true}, - {roachpb.PUSH_TIMESTAMP, enginepb.MaxTxnPriority, 1, true}, - {roachpb.PUSH_TIMESTAMP, enginepb.MaxTxnPriority, enginepb.MaxTxnPriority, false}, - {roachpb.PUSH_TOUCH, enginepb.MinTxnPriority, enginepb.MinTxnPriority, true}, - {roachpb.PUSH_TOUCH, enginepb.MinTxnPriority, 1, true}, - {roachpb.PUSH_TOUCH, enginepb.MinTxnPriority, enginepb.MaxTxnPriority, true}, - {roachpb.PUSH_TOUCH, 1, enginepb.MinTxnPriority, true}, - {roachpb.PUSH_TOUCH, 1, 1, true}, - {roachpb.PUSH_TOUCH, 1, enginepb.MaxTxnPriority, true}, - {roachpb.PUSH_TOUCH, enginepb.MaxTxnPriority, enginepb.MinTxnPriority, true}, - {roachpb.PUSH_TOUCH, enginepb.MaxTxnPriority, 1, true}, - {roachpb.PUSH_TOUCH, enginepb.MaxTxnPriority, enginepb.MaxTxnPriority, true}, + {false, roachpb.PUSH_ABORT, min, min, false}, + {false, roachpb.PUSH_ABORT, min, mid, false}, + {false, roachpb.PUSH_ABORT, min, max, false}, + {false, roachpb.PUSH_ABORT, mid, min, true}, + {false, roachpb.PUSH_ABORT, mid, mid, false}, + {false, roachpb.PUSH_ABORT, mid, max, false}, + {false, roachpb.PUSH_ABORT, max, min, true}, + {false, roachpb.PUSH_ABORT, max, mid, true}, + {false, roachpb.PUSH_ABORT, max, max, false}, + {false, roachpb.PUSH_TIMESTAMP, min, min, false}, + {false, roachpb.PUSH_TIMESTAMP, min, mid, false}, + {false, roachpb.PUSH_TIMESTAMP, min, max, false}, + {false, roachpb.PUSH_TIMESTAMP, mid, min, true}, + {false, roachpb.PUSH_TIMESTAMP, mid, mid, false}, + {false, roachpb.PUSH_TIMESTAMP, mid, max, false}, + {false, roachpb.PUSH_TIMESTAMP, max, min, true}, + {false, roachpb.PUSH_TIMESTAMP, max, mid, true}, + {false, roachpb.PUSH_TIMESTAMP, max, max, false}, + {false, roachpb.PUSH_TOUCH, min, min, true}, + {false, roachpb.PUSH_TOUCH, min, mid, true}, + {false, roachpb.PUSH_TOUCH, min, max, true}, + {false, roachpb.PUSH_TOUCH, mid, min, true}, + {false, roachpb.PUSH_TOUCH, mid, mid, true}, + {false, roachpb.PUSH_TOUCH, mid, max, true}, + {false, roachpb.PUSH_TOUCH, max, min, true}, + {false, roachpb.PUSH_TOUCH, max, mid, true}, + {false, roachpb.PUSH_TOUCH, max, max, true}, + // Force pushes always push immediately. + {true, roachpb.PUSH_ABORT, min, min, true}, + {true, roachpb.PUSH_ABORT, min, mid, true}, + {true, roachpb.PUSH_ABORT, min, max, true}, + {true, roachpb.PUSH_ABORT, mid, min, true}, + {true, roachpb.PUSH_ABORT, mid, mid, true}, + {true, roachpb.PUSH_ABORT, mid, max, true}, + {true, roachpb.PUSH_ABORT, max, min, true}, + {true, roachpb.PUSH_ABORT, max, mid, true}, + {true, roachpb.PUSH_ABORT, max, max, true}, + {true, roachpb.PUSH_TIMESTAMP, min, min, true}, + {true, roachpb.PUSH_TIMESTAMP, min, mid, true}, + {true, roachpb.PUSH_TIMESTAMP, min, max, true}, + {true, roachpb.PUSH_TIMESTAMP, mid, min, true}, + {true, roachpb.PUSH_TIMESTAMP, mid, mid, true}, + {true, roachpb.PUSH_TIMESTAMP, mid, max, true}, + {true, roachpb.PUSH_TIMESTAMP, max, min, true}, + {true, roachpb.PUSH_TIMESTAMP, max, mid, true}, + {true, roachpb.PUSH_TIMESTAMP, max, max, true}, + {true, roachpb.PUSH_TOUCH, min, min, true}, + {true, roachpb.PUSH_TOUCH, min, mid, true}, + {true, roachpb.PUSH_TOUCH, min, max, true}, + {true, roachpb.PUSH_TOUCH, mid, min, true}, + {true, roachpb.PUSH_TOUCH, mid, mid, true}, + {true, roachpb.PUSH_TOUCH, mid, max, true}, + {true, roachpb.PUSH_TOUCH, max, min, true}, + {true, roachpb.PUSH_TOUCH, max, mid, true}, + {true, roachpb.PUSH_TOUCH, max, max, true}, } for _, test := range testCases { t.Run("", func(t *testing.T) { req := roachpb.PushTxnRequest{ + Force: test.force, PushType: test.typ, PusherTxn: roachpb.Transaction{ TxnMeta: enginepb.TxnMeta{