From ad0ed45fdb33e66a913d2b62f4301ef03b068417 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 19 Dec 2019 20:12:21 -0500 Subject: [PATCH] storage: stop returning responses directly from TxnWaitQueue This change removes a small optimization where we would return synthesized PushTxnResponses directly from the TxnWaitQueue when we felt we had enough information to do so. This was useful, but also an abstraction violation. It also opened up the opportunity for this response to get out of sync with what would be returned from properly evaluating the request under latches. This is not likely to come with a noticeable performance impact, in part because the contentionQueue already limits the number of requests that can wait in the TxnWaitQueue for a single transaction at a time. To test this, I ran YCSB A at a few concurrency levels and saw no difference in performance. TODO DURING REVIEW: I'm not 100% sold on this last commit. I'd like to gauge how people feel about it. Release note: None --- pkg/storage/replica_send.go | 3 +- pkg/storage/store_send.go | 34 +++------ pkg/storage/txn_wait_queue_test.go | 118 ++++++++++------------------- pkg/storage/txnwait/queue.go | 57 ++++++-------- pkg/storage/txnwait/queue_test.go | 4 +- 5 files changed, 77 insertions(+), 139 deletions(-) diff --git a/pkg/storage/replica_send.go b/pkg/storage/replica_send.go index dec2c956b3b9..1e385e050e72 100644 --- a/pkg/storage/replica_send.go +++ b/pkg/storage/replica_send.go @@ -182,8 +182,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( // pending updates to the target transaction for either PushTxn or // QueryTxn requests. // TODO(nvanbenschoten): Push this into the concurrency package. - br, pErr = r.maybeWaitForPushee(ctx, ba) - if br != nil || pErr != nil { + if pErr = r.maybeWaitForPushee(ctx, ba); pErr != nil { return br, pErr } diff --git a/pkg/storage/store_send.go b/pkg/storage/store_send.go index 2041fc9306ae..be79540ff366 100644 --- a/pkg/storage/store_send.go +++ b/pkg/storage/store_send.go @@ -231,34 +231,22 @@ func (s *Store) Send( // the txnwait.Queue, where it will wait for updates to the target // transaction. // TODO(nvanbenschoten): Move this method. -func (r *Replica) maybeWaitForPushee( - ctx context.Context, ba *roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - // If this is a push txn request, check the push queue first, which - // may cause this request to wait and either return a successful push - // txn response or else allow this request to proceed. - if ba.IsSinglePushTxnRequest() { +func (r *Replica) maybeWaitForPushee(ctx context.Context, ba *roachpb.BatchRequest) *roachpb.Error { + switch { + case ba.IsSinglePushTxnRequest(): + // If this is a push txn request, check the push queue first, which + // may cause this request to wait and either return a successful push + // txn response or else allow this request to proceed. if r.store.cfg.TestingKnobs.DontRetryPushTxnFailures { - return nil, nil + return nil } pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest) - pushResp, pErr := r.txnWaitQueue.MaybeWaitForPush(ctx, r, pushReq) - if pErr != nil { - return nil, pErr - } else if pushResp != nil { - br := &roachpb.BatchResponse{} - br.Add(pushResp) - return br, nil - } - } else if ba.IsSingleQueryTxnRequest() { + return r.txnWaitQueue.MaybeWaitForPush(ctx, r, pushReq) + case ba.IsSingleQueryTxnRequest(): // For query txn requests, wait in the txn wait queue either for // transaction update or for dependent transactions to change. queryReq := ba.Requests[0].GetInner().(*roachpb.QueryTxnRequest) - pErr := r.txnWaitQueue.MaybeWaitForQuery(ctx, r, queryReq) - if pErr != nil { - return nil, pErr - } + return r.txnWaitQueue.MaybeWaitForQuery(ctx, r, queryReq) } - - return nil, nil + return nil } diff --git a/pkg/storage/txn_wait_queue_test.go b/pkg/storage/txn_wait_queue_test.go index a9ce9d9d3e46..3cace66565cc 100644 --- a/pkg/storage/txn_wait_queue_test.go +++ b/pkg/storage/txn_wait_queue_test.go @@ -46,11 +46,6 @@ func createTxnForPushQueue(ctx context.Context, tc *testContext) (*roachpb.Trans return txn, writeTxnRecord(ctx, tc, txn) } -type RespWithErr struct { - resp *roachpb.PushTxnResponse - pErr *roachpb.Error -} - func checkAllGaugesZero(tc testContext) error { m := tc.store.GetTxnWaitMetrics() if act := m.PusheeWaiting.Value(); act != 0 { @@ -103,10 +98,9 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) { PusheeTxn: txn.TxnMeta, } - retCh := make(chan RespWithErr, 1) + retCh := make(chan *roachpb.Error, 1) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req) - retCh <- RespWithErr{resp, pErr} + retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req) }() testutils.SucceedsSoon(t, func() error { @@ -133,12 +127,9 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) { t.Fatal(err.Error()) } - respWithErr := <-retCh - if respWithErr.resp != nil { - t.Errorf("expected nil response; got %+v", respWithErr.resp) - } - if respWithErr.pErr != nil { - t.Errorf("expected nil err; got %+v", respWithErr.pErr) + respErr := <-retCh + if respErr != nil { + t.Errorf("expected nil err; got %+v", respErr) } if deps := q.GetDependents(txn.ID); deps != nil { @@ -158,8 +149,8 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) { t.Fatalf("expected update to silently fail since queue is disabled") } - if resp, pErr := q.MaybeWaitForPush(context.TODO(), tc.repl, &req); resp != nil || pErr != nil { - t.Errorf("expected nil resp and err as queue is disabled; got %+v, %s", resp, pErr) + if pErr := q.MaybeWaitForPush(context.TODO(), tc.repl, &req); pErr != nil { + t.Errorf("expected nil err as queue is disabled; got %s", pErr) } if err := checkAllGaugesZero(tc); err != nil { t.Fatal(err.Error()) @@ -195,10 +186,9 @@ func TestTxnWaitQueueCancel(t *testing.T) { assert.EqualValues(tc, 0, m.PusherWaiting.Value()) ctx, cancel := context.WithCancel(context.Background()) - retCh := make(chan RespWithErr, 1) + retCh := make(chan *roachpb.Error, 1) go func() { - resp, pErr := q.MaybeWaitForPush(ctx, tc.repl, &req) - retCh <- RespWithErr{resp, pErr} + retCh <- q.MaybeWaitForPush(ctx, tc.repl, &req) }() testutils.SucceedsSoon(t, func() error { @@ -218,12 +208,9 @@ func TestTxnWaitQueueCancel(t *testing.T) { }) cancel() - respWithErr := <-retCh - if respWithErr.resp != nil { - t.Errorf("expected nil response; got %+v", respWithErr.resp) - } - if !testutils.IsPError(respWithErr.pErr, context.Canceled.Error()) { - t.Errorf("expected context canceled error; got %v", respWithErr.pErr) + respErr := <-retCh + if !testutils.IsPError(respErr, context.Canceled.Error()) { + t.Errorf("expected context canceled error; got %v", respErr) } } @@ -256,10 +243,9 @@ func TestTxnWaitQueueUpdateTxn(t *testing.T) { m := tc.store.GetTxnWaitMetrics() assert.EqualValues(tc, 1, m.PusheeWaiting.Value()) - retCh := make(chan RespWithErr, 2) + retCh := make(chan *roachpb.Error, 2) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req1) - retCh <- RespWithErr{resp, pErr} + retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req1) }() testutils.SucceedsSoon(t, func() error { expDeps := []uuid.UUID{pusher1.ID} @@ -282,8 +268,7 @@ func TestTxnWaitQueueUpdateTxn(t *testing.T) { }) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req2) - retCh <- RespWithErr{resp, pErr} + retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req2) }() testutils.SucceedsSoon(t, func() error { expDeps := []uuid.UUID{pusher1.ID, pusher2.ID} @@ -310,10 +295,7 @@ func TestTxnWaitQueueUpdateTxn(t *testing.T) { }) for i := 0; i < 2; i++ { - respWithErr := <-retCh - if respWithErr.resp == nil || respWithErr.resp.PusheeTxn.Status != roachpb.COMMITTED { - t.Errorf("expected committed txn response; got %+v, err=%v", respWithErr.resp, respWithErr.pErr) - } + require.Nil(t, <-retCh) } } @@ -365,10 +347,9 @@ func TestTxnWaitQueueTxnSilentlyCompletes(t *testing.T) { q.Enable() q.Enqueue(txn) - retCh := make(chan RespWithErr, 2) + retCh := make(chan *roachpb.Error, 2) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, req) - retCh <- RespWithErr{resp, pErr} + retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, req) }() m := tc.store.GetTxnWaitMetrics() @@ -397,9 +378,9 @@ func TestTxnWaitQueueTxnSilentlyCompletes(t *testing.T) { // Skip calling q.UpdateTxn to test that the wait queue periodically polls // txn's record and notices when it is no longer pending. - respWithErr := <-retCh - if respWithErr.resp == nil || respWithErr.resp.PusheeTxn.Status != roachpb.COMMITTED { - t.Errorf("expected committed txn response; got %+v, err=%v", respWithErr.resp, respWithErr.pErr) + respErr := <-retCh + if respErr != nil { + t.Errorf("expected nil err; got %+v", respErr) } testutils.SucceedsSoon(t, func() error { if act, exp := m.PusherWaiting.Value(), int64(1); act != exp { @@ -440,10 +421,9 @@ func TestTxnWaitQueueUpdateNotPushedTxn(t *testing.T) { q.Enable() q.Enqueue(txn) - retCh := make(chan RespWithErr, 1) + retCh := make(chan *roachpb.Error, 1) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req) - retCh <- RespWithErr{resp, pErr} + retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req) }() testutils.SucceedsSoon(t, func() error { @@ -458,12 +438,9 @@ func TestTxnWaitQueueUpdateNotPushedTxn(t *testing.T) { updatedTxn.WriteTimestamp = txn.WriteTimestamp.Add(1, 0) q.UpdateTxn(context.Background(), &updatedTxn) - respWithErr := <-retCh - if respWithErr.resp != nil { - t.Errorf("on non-committed txn update, expected nil response; got %+v", respWithErr.resp) - } - if respWithErr.pErr != nil { - t.Errorf("expected nil error; got %s", respWithErr.pErr) + respErr := <-retCh + if respErr != nil { + t.Errorf("expected nil err; got %+v", respErr) } testutils.SucceedsSoon(tc.TB, func() error { return checkAllGaugesZero(tc) @@ -515,10 +492,9 @@ func TestTxnWaitQueuePusheeExpires(t *testing.T) { q.Enable() q.Enqueue(txn) - retCh := make(chan RespWithErr, 2) + retCh := make(chan *roachpb.Error, 2) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req1) - retCh <- RespWithErr{resp, pErr} + retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req1) }() testutils.SucceedsSoon(t, func() error { expDeps := []uuid.UUID{pusher1.ID} @@ -529,8 +505,7 @@ func TestTxnWaitQueuePusheeExpires(t *testing.T) { }) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req2) - retCh <- RespWithErr{resp, pErr} + retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req2) }() testutils.SucceedsSoon(t, func() error { expDeps := []uuid.UUID{pusher1.ID, pusher2.ID} @@ -541,12 +516,9 @@ func TestTxnWaitQueuePusheeExpires(t *testing.T) { }) for i := 0; i < 2; i++ { - respWithErr := <-retCh - if respWithErr.resp != nil { - t.Errorf("expected nil txn response; got %+v", respWithErr.resp) - } - if respWithErr.pErr != nil { - t.Errorf("expected nil error; got %s", respWithErr.pErr) + respErr := <-retCh + if respErr != nil { + t.Errorf("expected nil error; got %s", respErr) } } @@ -603,10 +575,9 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) { q.Enable() q.Enqueue(txn) - retCh := make(chan RespWithErr, 1) + retCh := make(chan *roachpb.Error, 1) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req) - retCh <- RespWithErr{resp, pErr} + retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req) }() testutils.SucceedsSoon(t, func() error { @@ -631,13 +602,10 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) { } q.UpdateTxn(context.Background(), &pusherUpdate) - respWithErr := <-retCh - if respWithErr.resp != nil { - t.Errorf("expected nil response; got %+v", respWithErr.resp) - } + respErr := <-retCh expErr := "TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED)" - if !testutils.IsPError(respWithErr.pErr, regexp.QuoteMeta(expErr)) { - t.Errorf("expected %s; got %v", expErr, respWithErr.pErr) + if !testutils.IsPError(respErr, regexp.QuoteMeta(expErr)) { + t.Errorf("expected %s; got %v", expErr, respErr) } m := tc.store.GetTxnWaitMetrics() @@ -714,19 +682,17 @@ func TestTxnWaitQueueDependencyCycle(t *testing.T) { m := tc.store.GetTxnWaitMetrics() assert.EqualValues(tc, 0, m.DeadlocksTotal.Count()) - retCh := make(chan RespWithErr, 3) + retCh := make(chan *roachpb.Error, 3) for _, req := range []*roachpb.PushTxnRequest{reqA, reqB, reqC} { go func(req *roachpb.PushTxnRequest) { - resp, pErr := q.MaybeWaitForPush(ctx, tc.repl, req) - retCh <- RespWithErr{resp, pErr} + retCh <- q.MaybeWaitForPush(ctx, tc.repl, req) }(req) } // Wait for first request to finish, which should break the // dependency cycle by performing a force push abort. - respWithErr := <-retCh - require.Nil(t, respWithErr.pErr) - require.Nil(t, respWithErr.resp) + respErr := <-retCh + require.Nil(t, respErr) testutils.SucceedsSoon(t, func() error { if act, exp := m.DeadlocksTotal.Count(), int64(0); act <= exp { @@ -804,7 +770,7 @@ func TestTxnWaitQueueDependencyCycleWithPriorityInversion(t *testing.T) { retCh := make(chan ReqWithErr, 2) for _, req := range []*roachpb.PushTxnRequest{reqA, reqB} { go func(req *roachpb.PushTxnRequest) { - _, pErr := q.MaybeWaitForPush(ctx, tc.repl, req) + pErr := q.MaybeWaitForPush(ctx, tc.repl, req) retCh <- ReqWithErr{req, pErr} }(req) } diff --git a/pkg/storage/txnwait/queue.go b/pkg/storage/txnwait/queue.go index e2908c99a9f8..eecf677984ca 100644 --- a/pkg/storage/txnwait/queue.go +++ b/pkg/storage/txnwait/queue.go @@ -92,13 +92,6 @@ func IsExpired(now hlc.Timestamp, txn *roachpb.Transaction) bool { return TxnExpiration(txn).Less(now) } -// createPushTxnResponse returns a PushTxnResponse struct with a -// copy of the supplied transaction. It is necessary to fully copy -// each field in the transaction to avoid race conditions. -func createPushTxnResponse(txn *roachpb.Transaction) *roachpb.PushTxnResponse { - return &roachpb.PushTxnResponse{PusheeTxn: *txn} -} - // A waitingPush represents a PushTxn command that is waiting on the // pushee transaction to commit or abort. It maintains a transitive // set of all txns which are waiting on this txn in order to detect @@ -401,9 +394,9 @@ func (q *Queue) releaseWaitingQueriesLocked(ctx context.Context, txnID uuid.UUID // the first return value is a non-nil PushTxnResponse object. func (q *Queue) MaybeWaitForPush( ctx context.Context, repl ReplicaInterface, req *roachpb.PushTxnRequest, -) (*roachpb.PushTxnResponse, *roachpb.Error) { +) *roachpb.Error { if ShouldPushImmediately(req) { - return nil, nil + return nil } q.mu.Lock() @@ -414,19 +407,19 @@ func (q *Queue) MaybeWaitForPush( // ensure that it's not cleared before an incorrect insertion happens. if q.mu.txns == nil || !repl.ContainsKey(req.Key) { q.mu.Unlock() - return nil, nil + return nil } // If there's no pending queue for this txn, return not pushed. If - // already pushed, return push success. + // already pushed, let the push proceed. pending, ok := q.mu.txns[req.PusheeTxn.ID] if !ok { q.mu.Unlock() - return nil, nil + return nil } if txn := pending.getTxn(); isPushed(req, txn) { q.mu.Unlock() - return createPushTxnResponse(txn), nil + return nil } push := &waitingPush{ @@ -522,28 +515,26 @@ func (q *Queue) MaybeWaitForPush( case <-ctx.Done(): // Caller has given up. log.VEvent(ctx, 2, "pusher giving up due to context cancellation") - return nil, roachpb.NewError(ctx.Err()) + return roachpb.NewError(ctx.Err()) case <-q.store.Stopper().ShouldQuiesce(): // Let the push out so that they can be sent looking elsewhere. - return nil, nil + return nil case txn := <-push.pending: log.VEventf(ctx, 2, "result of pending push: %v", txn) // If txn is nil, the queue was cleared, presumably because the // replica lost the range lease. Return not pushed so request // proceeds and is redirected to the new range lease holder. if txn == nil { - return nil, nil + return nil } // Transaction was committed, aborted or had its timestamp - // pushed. If this PushTxn request is satisfied, return - // successful PushTxn response. + // pushed. Let the push proceed to evaluation. if isPushed(req, txn) { log.VEvent(ctx, 2, "push request is satisfied") - return createPushTxnResponse(txn), nil + } else { + log.VEvent(ctx, 2, "not pushed; returning to caller") } - // If not successfully pushed, return not pushed so request proceeds. - log.VEvent(ctx, 2, "not pushed; returning to caller") - return nil, nil + return nil case <-pusheeTxnTimer.C: log.VEvent(ctx, 2, "querying pushee") @@ -553,11 +544,11 @@ func (q *Queue) MaybeWaitForPush( ctx, req.PusheeTxn, false, nil, q.store.Clock().Now(), ) if pErr != nil { - return nil, pErr + return pErr } else if updatedPushee == nil { // Continue with push. log.VEvent(ctx, 2, "pushee not found, push should now succeed") - return nil, nil + return nil } pusheePriority = updatedPushee.Priority pending.txn.Store(updatedPushee) @@ -586,11 +577,11 @@ func (q *Queue) MaybeWaitForPush( // is now uncommittable. q.UpdateTxn(ctx, updatedPushee) } - return createPushTxnResponse(updatedPushee), nil + return nil } if IsExpired(q.store.Clock().Now(), updatedPushee) { log.VEventf(ctx, 1, "pushing expired txn %s", req.PusheeTxn.ID.Short()) - return nil, nil + return nil } // Set the timer to check for the pushee txn's expiration. expiration := TxnExpiration(updatedPushee).GoTime() @@ -601,10 +592,11 @@ func (q *Queue) MaybeWaitForPush( switch updatedPusher.Status { case roachpb.COMMITTED: log.VEventf(ctx, 1, "pusher committed: %v", updatedPusher) - return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionCommittedStatusError(), updatedPusher) + return roachpb.NewErrorWithTxn( + roachpb.NewTransactionCommittedStatusError(), updatedPusher) case roachpb.ABORTED: log.VEventf(ctx, 1, "pusher aborted: %v", updatedPusher) - return nil, roachpb.NewErrorWithTxn( + return roachpb.NewErrorWithTxn( roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_PUSHER_ABORTED), updatedPusher) } log.VEventf(ctx, 2, "pusher was updated: %v", updatedPusher) @@ -650,12 +642,7 @@ func (q *Queue) MaybeWaitForPush( dependents, ) metrics.DeadlocksTotal.Inc(1) - // TODO(nvanbenschoten): As it, it would make sense to return - // the force push's PushTxnResponse, but the following commit - // is planning on removing the response path here and letting - // all pushes fall through to standard evaluation to create - // their response. - return nil, q.forcePushAbort(ctx, req) + return q.forcePushAbort(ctx, req) } } // Signal the pusher query txn loop to continue. @@ -663,7 +650,7 @@ func (q *Queue) MaybeWaitForPush( case pErr := <-queryPusherErrCh: queryPusherErrCh = nil - return nil, pErr + return pErr } } } diff --git a/pkg/storage/txnwait/queue_test.go b/pkg/storage/txnwait/queue_test.go index a4c3292bd56c..96f4c804d4dc 100644 --- a/pkg/storage/txnwait/queue_test.go +++ b/pkg/storage/txnwait/queue_test.go @@ -286,10 +286,8 @@ func TestPushersReleasedAfterAnyQueryTxnFindsAbortedTxn(t *testing.T) { defer wg.Done() ctx := context.Background() req := roachpb.PushTxnRequest{PusheeTxn: txn.TxnMeta, PushType: roachpb.PUSH_ABORT} - res, err := q.MaybeWaitForPush(ctx, mockRepl{}, &req) + err := q.MaybeWaitForPush(ctx, mockRepl{}, &req) require.Nil(t, err) - require.NotNil(t, res) - require.Equal(t, roachpb.ABORTED, res.PusheeTxn.Status) }() } wg.Wait()