Skip to content

Commit

Permalink
storage: stop returning responses directly from TxnWaitQueue
Browse files Browse the repository at this point in the history
This change removes a small optimization where we would return synthesized
PushTxnResponses directly from the TxnWaitQueue when we felt we had enough
information to do so. This was useful, but also an abstraction violation.
It also opened up the opportunity for this response to get out of sync with
what would be returned from properly evaluating the request under latches.

This is not likely to come with a noticeable performance impact, in
part because the contentionQueue already limits the number of requests
that can wait in the TxnWaitQueue for a single transaction at a time.
To test this, I ran YCSB A at a few concurrency levels and saw no
difference in performance.

TODO DURING REVIEW: I'm not 100% sold on this last commit. I'd like to
gauge how people feel about it.

Release note: None
  • Loading branch information
nvanbenschoten committed Dec 20, 2019
1 parent e50605d commit ad0ed45
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 139 deletions.
3 changes: 1 addition & 2 deletions pkg/storage/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
// pending updates to the target transaction for either PushTxn or
// QueryTxn requests.
// TODO(nvanbenschoten): Push this into the concurrency package.
br, pErr = r.maybeWaitForPushee(ctx, ba)
if br != nil || pErr != nil {
if pErr = r.maybeWaitForPushee(ctx, ba); pErr != nil {
return br, pErr
}

Expand Down
34 changes: 11 additions & 23 deletions pkg/storage/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,34 +231,22 @@ func (s *Store) Send(
// the txnwait.Queue, where it will wait for updates to the target
// transaction.
// TODO(nvanbenschoten): Move this method.
func (r *Replica) maybeWaitForPushee(
ctx context.Context, ba *roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// If this is a push txn request, check the push queue first, which
// may cause this request to wait and either return a successful push
// txn response or else allow this request to proceed.
if ba.IsSinglePushTxnRequest() {
func (r *Replica) maybeWaitForPushee(ctx context.Context, ba *roachpb.BatchRequest) *roachpb.Error {
switch {
case ba.IsSinglePushTxnRequest():
// If this is a push txn request, check the push queue first, which
// may cause this request to wait and either return a successful push
// txn response or else allow this request to proceed.
if r.store.cfg.TestingKnobs.DontRetryPushTxnFailures {
return nil, nil
return nil
}
pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest)
pushResp, pErr := r.txnWaitQueue.MaybeWaitForPush(ctx, r, pushReq)
if pErr != nil {
return nil, pErr
} else if pushResp != nil {
br := &roachpb.BatchResponse{}
br.Add(pushResp)
return br, nil
}
} else if ba.IsSingleQueryTxnRequest() {
return r.txnWaitQueue.MaybeWaitForPush(ctx, r, pushReq)
case ba.IsSingleQueryTxnRequest():
// For query txn requests, wait in the txn wait queue either for
// transaction update or for dependent transactions to change.
queryReq := ba.Requests[0].GetInner().(*roachpb.QueryTxnRequest)
pErr := r.txnWaitQueue.MaybeWaitForQuery(ctx, r, queryReq)
if pErr != nil {
return nil, pErr
}
return r.txnWaitQueue.MaybeWaitForQuery(ctx, r, queryReq)
}

return nil, nil
return nil
}
118 changes: 42 additions & 76 deletions pkg/storage/txn_wait_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ func createTxnForPushQueue(ctx context.Context, tc *testContext) (*roachpb.Trans
return txn, writeTxnRecord(ctx, tc, txn)
}

type RespWithErr struct {
resp *roachpb.PushTxnResponse
pErr *roachpb.Error
}

func checkAllGaugesZero(tc testContext) error {
m := tc.store.GetTxnWaitMetrics()
if act := m.PusheeWaiting.Value(); act != 0 {
Expand Down Expand Up @@ -103,10 +98,9 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) {
PusheeTxn: txn.TxnMeta,
}

retCh := make(chan RespWithErr, 1)
retCh := make(chan *roachpb.Error, 1)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req)
retCh <- RespWithErr{resp, pErr}
retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req)
}()

testutils.SucceedsSoon(t, func() error {
Expand All @@ -133,12 +127,9 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) {
t.Fatal(err.Error())
}

respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("expected nil response; got %+v", respWithErr.resp)
}
if respWithErr.pErr != nil {
t.Errorf("expected nil err; got %+v", respWithErr.pErr)
respErr := <-retCh
if respErr != nil {
t.Errorf("expected nil err; got %+v", respErr)
}

if deps := q.GetDependents(txn.ID); deps != nil {
Expand All @@ -158,8 +149,8 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) {
t.Fatalf("expected update to silently fail since queue is disabled")
}

if resp, pErr := q.MaybeWaitForPush(context.TODO(), tc.repl, &req); resp != nil || pErr != nil {
t.Errorf("expected nil resp and err as queue is disabled; got %+v, %s", resp, pErr)
if pErr := q.MaybeWaitForPush(context.TODO(), tc.repl, &req); pErr != nil {
t.Errorf("expected nil err as queue is disabled; got %s", pErr)
}
if err := checkAllGaugesZero(tc); err != nil {
t.Fatal(err.Error())
Expand Down Expand Up @@ -195,10 +186,9 @@ func TestTxnWaitQueueCancel(t *testing.T) {
assert.EqualValues(tc, 0, m.PusherWaiting.Value())

ctx, cancel := context.WithCancel(context.Background())
retCh := make(chan RespWithErr, 1)
retCh := make(chan *roachpb.Error, 1)
go func() {
resp, pErr := q.MaybeWaitForPush(ctx, tc.repl, &req)
retCh <- RespWithErr{resp, pErr}
retCh <- q.MaybeWaitForPush(ctx, tc.repl, &req)
}()

testutils.SucceedsSoon(t, func() error {
Expand All @@ -218,12 +208,9 @@ func TestTxnWaitQueueCancel(t *testing.T) {
})
cancel()

respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("expected nil response; got %+v", respWithErr.resp)
}
if !testutils.IsPError(respWithErr.pErr, context.Canceled.Error()) {
t.Errorf("expected context canceled error; got %v", respWithErr.pErr)
respErr := <-retCh
if !testutils.IsPError(respErr, context.Canceled.Error()) {
t.Errorf("expected context canceled error; got %v", respErr)
}
}

Expand Down Expand Up @@ -256,10 +243,9 @@ func TestTxnWaitQueueUpdateTxn(t *testing.T) {
m := tc.store.GetTxnWaitMetrics()
assert.EqualValues(tc, 1, m.PusheeWaiting.Value())

retCh := make(chan RespWithErr, 2)
retCh := make(chan *roachpb.Error, 2)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req1)
retCh <- RespWithErr{resp, pErr}
retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req1)
}()
testutils.SucceedsSoon(t, func() error {
expDeps := []uuid.UUID{pusher1.ID}
Expand All @@ -282,8 +268,7 @@ func TestTxnWaitQueueUpdateTxn(t *testing.T) {
})

go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req2)
retCh <- RespWithErr{resp, pErr}
retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req2)
}()
testutils.SucceedsSoon(t, func() error {
expDeps := []uuid.UUID{pusher1.ID, pusher2.ID}
Expand All @@ -310,10 +295,7 @@ func TestTxnWaitQueueUpdateTxn(t *testing.T) {
})

for i := 0; i < 2; i++ {
respWithErr := <-retCh
if respWithErr.resp == nil || respWithErr.resp.PusheeTxn.Status != roachpb.COMMITTED {
t.Errorf("expected committed txn response; got %+v, err=%v", respWithErr.resp, respWithErr.pErr)
}
require.Nil(t, <-retCh)
}
}

Expand Down Expand Up @@ -365,10 +347,9 @@ func TestTxnWaitQueueTxnSilentlyCompletes(t *testing.T) {
q.Enable()
q.Enqueue(txn)

retCh := make(chan RespWithErr, 2)
retCh := make(chan *roachpb.Error, 2)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, req)
retCh <- RespWithErr{resp, pErr}
retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, req)
}()

m := tc.store.GetTxnWaitMetrics()
Expand Down Expand Up @@ -397,9 +378,9 @@ func TestTxnWaitQueueTxnSilentlyCompletes(t *testing.T) {
// Skip calling q.UpdateTxn to test that the wait queue periodically polls
// txn's record and notices when it is no longer pending.

respWithErr := <-retCh
if respWithErr.resp == nil || respWithErr.resp.PusheeTxn.Status != roachpb.COMMITTED {
t.Errorf("expected committed txn response; got %+v, err=%v", respWithErr.resp, respWithErr.pErr)
respErr := <-retCh
if respErr != nil {
t.Errorf("expected nil err; got %+v", respErr)
}
testutils.SucceedsSoon(t, func() error {
if act, exp := m.PusherWaiting.Value(), int64(1); act != exp {
Expand Down Expand Up @@ -440,10 +421,9 @@ func TestTxnWaitQueueUpdateNotPushedTxn(t *testing.T) {
q.Enable()
q.Enqueue(txn)

retCh := make(chan RespWithErr, 1)
retCh := make(chan *roachpb.Error, 1)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req)
retCh <- RespWithErr{resp, pErr}
retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req)
}()

testutils.SucceedsSoon(t, func() error {
Expand All @@ -458,12 +438,9 @@ func TestTxnWaitQueueUpdateNotPushedTxn(t *testing.T) {
updatedTxn.WriteTimestamp = txn.WriteTimestamp.Add(1, 0)
q.UpdateTxn(context.Background(), &updatedTxn)

respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("on non-committed txn update, expected nil response; got %+v", respWithErr.resp)
}
if respWithErr.pErr != nil {
t.Errorf("expected nil error; got %s", respWithErr.pErr)
respErr := <-retCh
if respErr != nil {
t.Errorf("expected nil err; got %+v", respErr)
}
testutils.SucceedsSoon(tc.TB, func() error {
return checkAllGaugesZero(tc)
Expand Down Expand Up @@ -515,10 +492,9 @@ func TestTxnWaitQueuePusheeExpires(t *testing.T) {
q.Enable()
q.Enqueue(txn)

retCh := make(chan RespWithErr, 2)
retCh := make(chan *roachpb.Error, 2)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req1)
retCh <- RespWithErr{resp, pErr}
retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req1)
}()
testutils.SucceedsSoon(t, func() error {
expDeps := []uuid.UUID{pusher1.ID}
Expand All @@ -529,8 +505,7 @@ func TestTxnWaitQueuePusheeExpires(t *testing.T) {
})

go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req2)
retCh <- RespWithErr{resp, pErr}
retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req2)
}()
testutils.SucceedsSoon(t, func() error {
expDeps := []uuid.UUID{pusher1.ID, pusher2.ID}
Expand All @@ -541,12 +516,9 @@ func TestTxnWaitQueuePusheeExpires(t *testing.T) {
})

for i := 0; i < 2; i++ {
respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("expected nil txn response; got %+v", respWithErr.resp)
}
if respWithErr.pErr != nil {
t.Errorf("expected nil error; got %s", respWithErr.pErr)
respErr := <-retCh
if respErr != nil {
t.Errorf("expected nil error; got %s", respErr)
}
}

Expand Down Expand Up @@ -603,10 +575,9 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) {
q.Enable()
q.Enqueue(txn)

retCh := make(chan RespWithErr, 1)
retCh := make(chan *roachpb.Error, 1)
go func() {
resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req)
retCh <- RespWithErr{resp, pErr}
retCh <- q.MaybeWaitForPush(context.Background(), tc.repl, &req)
}()

testutils.SucceedsSoon(t, func() error {
Expand All @@ -631,13 +602,10 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) {
}
q.UpdateTxn(context.Background(), &pusherUpdate)

respWithErr := <-retCh
if respWithErr.resp != nil {
t.Errorf("expected nil response; got %+v", respWithErr.resp)
}
respErr := <-retCh
expErr := "TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED)"
if !testutils.IsPError(respWithErr.pErr, regexp.QuoteMeta(expErr)) {
t.Errorf("expected %s; got %v", expErr, respWithErr.pErr)
if !testutils.IsPError(respErr, regexp.QuoteMeta(expErr)) {
t.Errorf("expected %s; got %v", expErr, respErr)
}

m := tc.store.GetTxnWaitMetrics()
Expand Down Expand Up @@ -714,19 +682,17 @@ func TestTxnWaitQueueDependencyCycle(t *testing.T) {
m := tc.store.GetTxnWaitMetrics()
assert.EqualValues(tc, 0, m.DeadlocksTotal.Count())

retCh := make(chan RespWithErr, 3)
retCh := make(chan *roachpb.Error, 3)
for _, req := range []*roachpb.PushTxnRequest{reqA, reqB, reqC} {
go func(req *roachpb.PushTxnRequest) {
resp, pErr := q.MaybeWaitForPush(ctx, tc.repl, req)
retCh <- RespWithErr{resp, pErr}
retCh <- q.MaybeWaitForPush(ctx, tc.repl, req)
}(req)
}

// Wait for first request to finish, which should break the
// dependency cycle by performing a force push abort.
respWithErr := <-retCh
require.Nil(t, respWithErr.pErr)
require.Nil(t, respWithErr.resp)
respErr := <-retCh
require.Nil(t, respErr)

testutils.SucceedsSoon(t, func() error {
if act, exp := m.DeadlocksTotal.Count(), int64(0); act <= exp {
Expand Down Expand Up @@ -804,7 +770,7 @@ func TestTxnWaitQueueDependencyCycleWithPriorityInversion(t *testing.T) {
retCh := make(chan ReqWithErr, 2)
for _, req := range []*roachpb.PushTxnRequest{reqA, reqB} {
go func(req *roachpb.PushTxnRequest) {
_, pErr := q.MaybeWaitForPush(ctx, tc.repl, req)
pErr := q.MaybeWaitForPush(ctx, tc.repl, req)
retCh <- ReqWithErr{req, pErr}
}(req)
}
Expand Down
Loading

0 comments on commit ad0ed45

Please sign in to comment.