From ee59435b6aa05244863edea10755bc87350f753c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 10 Apr 2019 13:52:42 -0400 Subject: [PATCH 1/2] kv: ignore result of txn heartbeat after EndTransaction The changes in #33396 made it so that a `HeartbeatTxnRequest` that finds a missing transaction record will attempt to create the record. This means that it will discover if a transaction is not "committable" and return a TransactionAbortedError. Unfortunately, after a transaction commits and GCs its transaction record, it will also be considered not "committable". There will always be cases where a heartbeat request races with an EndTransaction request and incorrectly considers the transaction aborted (which is touched upon in a TODO a few lines down). However, in many of these cases, the coordinator already knows that the transaction is committed, so it doesn't need to attempt to roll back the transaction and clean up its intents. This commit checks for these cases and avoids sending useless rollbacks. The intention is to backport this to 19.1. Release note: None --- pkg/kv/txn_interceptor_heartbeater.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/kv/txn_interceptor_heartbeater.go b/pkg/kv/txn_interceptor_heartbeater.go index 8dc20341cfcc..a1cbefc56b1a 100644 --- a/pkg/kv/txn_interceptor_heartbeater.go +++ b/pkg/kv/txn_interceptor_heartbeater.go @@ -375,25 +375,28 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool { // Clone the txn in order to put it in the heartbeat request. txn := h.mu.txn.Clone() - if txn.Key == nil { log.Fatalf(ctx, "attempting to heartbeat txn without anchor key: %v", txn) } - ba := roachpb.BatchRequest{} ba.Txn = txn - - hb := &roachpb.HeartbeatTxnRequest{ + ba.Add(&roachpb.HeartbeatTxnRequest{ RequestHeader: roachpb.RequestHeader{ Key: txn.Key, }, Now: h.clock.Now(), - } - ba.Add(hb) + }) + // Send the heartbeat request directly through the gatekeeper interceptor. + // See comment on h.gatekeeper for a discussion of why. log.VEvent(ctx, 2, "heartbeat") br, pErr := h.gatekeeper.SendLocked(ctx, ba) + // If the txn is no longer pending, ignore the result of the heartbeat. + if h.mu.txn.Status != roachpb.PENDING { + return false + } + var respTxn *roachpb.Transaction if pErr != nil { log.VEventf(ctx, 2, "heartbeat failed: %s", pErr) From 9147c58c61ef111479c07f30c248f9c4b853e137 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 10 Apr 2019 15:27:23 -0400 Subject: [PATCH 2/2] txnwait: increase TxnLivenessThreshold significantly This change increases the duration between transaction heartbeats before a transaction is considered expired from 2 seconds to 5 seconds. This has been found to dramatically reduce the frequency of transaction aborts when a cluster is under significant load. This is not expected to noticeably hurt cluster availability in the presence of dead nodes because we already have availability loss on the order of 9 seconds due to the epoch-based lease duration. This is especially important now that the hack in #25034 is gone. That hack was hiding some of this badness and giving transactions a bit more room to avoid being aborted. Release note: None --- pkg/storage/replica_test.go | 97 ++++++++++---------- pkg/storage/store_test.go | 5 +- pkg/storage/txn_recovery_integration_test.go | 6 +- pkg/storage/txnwait/txnqueue.go | 6 +- 4 files changed, 60 insertions(+), 54 deletions(-) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 10cdce0c4f75..4ce9eb952bb7 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -4923,9 +4924,8 @@ func TestPushTxnQueryPusheeHasNewerVersion(t *testing.T) { } } -// TestPushTxnHeartbeatTimeout verifies that a txn which -// hasn't been heartbeat within 2x the heartbeat interval can be -// pushed/aborted. +// TestPushTxnHeartbeatTimeout verifies that a txn which hasn't been +// heartbeat within its transaction liveness threshold can be pushed/aborted. func TestPushTxnHeartbeatTimeout(t *testing.T) { defer leaktest.AfterTest(t)() tc := testContext{} @@ -4937,6 +4937,7 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { const txnPushError = "failed to push" const indetCommitError = "txn in indeterminate STAGING state" + m := int64(txnwait.TxnLivenessHeartbeatMultiplier) ns := base.DefaultHeartbeatInterval.Nanoseconds() testCases := []struct { status roachpb.TransactionStatus // -1 for no record @@ -4953,24 +4954,24 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { {roachpb.PENDING, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError}, {roachpb.PENDING, 0, ns, roachpb.PUSH_ABORT, txnPushError}, {roachpb.PENDING, 0, ns, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.PENDING, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.PENDING, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.PENDING, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, noError}, - {roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_ABORT, noError}, - {roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_TOUCH, noError}, - {roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.PENDING, ns, ns * 3, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.PENDING, ns, ns * 3, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.PENDING, ns, ns * 3, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_TIMESTAMP, noError}, - {roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_ABORT, noError}, - {roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_TOUCH, noError}, + {roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.PENDING, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.PENDING, 0, m * ns, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.PENDING, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, noError}, + {roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_ABORT, noError}, + {roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_TOUCH, noError}, + {roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_TIMESTAMP, noError}, + {roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_ABORT, noError}, + {roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_TOUCH, noError}, // If the transaction record is STAGING then any case that previously // returned a TransactionPushError will continue to return that error, // but any case that previously succeeded in pushing the transaction @@ -4981,24 +4982,24 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { {roachpb.STAGING, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError}, {roachpb.STAGING, 0, ns, roachpb.PUSH_ABORT, txnPushError}, {roachpb.STAGING, 0, ns, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.STAGING, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.STAGING, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.STAGING, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, indetCommitError}, - {roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_ABORT, indetCommitError}, - {roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_TOUCH, indetCommitError}, - {roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.STAGING, ns, ns * 3, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.STAGING, ns, ns * 3, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.STAGING, ns, ns * 3, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_TIMESTAMP, indetCommitError}, - {roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_ABORT, indetCommitError}, - {roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_TOUCH, indetCommitError}, + {roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.STAGING, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.STAGING, 0, m * ns, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.STAGING, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, indetCommitError}, + {roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_ABORT, indetCommitError}, + {roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_TOUCH, indetCommitError}, + {roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_TIMESTAMP, indetCommitError}, + {roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_ABORT, indetCommitError}, + {roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_TOUCH, indetCommitError}, // Even when a transaction record doesn't exist, if the timestamp // from the PushTxn request indicates sufficiently recent client // activity, the push will fail. @@ -5008,15 +5009,15 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { {-1, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError}, {-1, 0, ns, roachpb.PUSH_ABORT, txnPushError}, {-1, 0, ns, roachpb.PUSH_TOUCH, txnPushError}, - {-1, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, - {-1, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError}, - {-1, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError}, - {-1, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError}, - {-1, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError}, - {-1, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError}, - {-1, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, noError}, - {-1, 0, ns*2 + 1, roachpb.PUSH_ABORT, noError}, - {-1, 0, ns*2 + 1, roachpb.PUSH_TOUCH, noError}, + {-1, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, + {-1, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError}, + {-1, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError}, + {-1, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError}, + {-1, 0, m * ns, roachpb.PUSH_ABORT, txnPushError}, + {-1, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError}, + {-1, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, noError}, + {-1, 0, m*ns + 1, roachpb.PUSH_ABORT, noError}, + {-1, 0, m*ns + 1, roachpb.PUSH_TOUCH, noError}, } for i, test := range testCases { diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index b3adac9c03c5..517d5310363f 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -2440,7 +2441,7 @@ func TestStoreScanIntentsFromTwoTxns(t *testing.T) { // Now, expire the transactions by moving the clock forward. This will // result in the subsequent scan operation pushing both transactions // in a single batch. - manualClock.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1) + manualClock.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1) // Scan the range and verify empty result (expired txn is aborted, // cleaning up intents). @@ -2491,7 +2492,7 @@ func TestStoreScanMultipleIntents(t *testing.T) { // Now, expire the transactions by moving the clock forward. This will // result in the subsequent scan operation pushing both transactions // in a single batch. - manual.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1) + manual.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1) // Query the range with a single INCONSISTENT scan, which should // cause all intents to be resolved. diff --git a/pkg/storage/txn_recovery_integration_test.go b/pkg/storage/txn_recovery_integration_test.go index ca870cef84b9..5d8038211d84 100644 --- a/pkg/storage/txn_recovery_integration_test.go +++ b/pkg/storage/txn_recovery_integration_test.go @@ -103,11 +103,11 @@ func TestTxnRecoveryFromStaging(t *testing.T) { } // Pretend the transaction coordinator for the parallel commit died at this point. - // Wait for twice the TxnLivenessThreshold and then issue a read on one of the - // keys that the transaction wrote. This will result in a transaction push and + // Wait for longer than the TxnLivenessThreshold and then issue a read on one of + // the keys that the transaction wrote. This will result in a transaction push and // eventually a full transaction recovery in order to resolve the indeterminate // commit. - manual.Increment(2 * txnwait.TxnLivenessThreshold.Nanoseconds()) + manual.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1) gArgs := getArgs(keyA) gReply, pErr := client.SendWrapped(ctx, store.TestSender(), &gArgs) diff --git a/pkg/storage/txnwait/txnqueue.go b/pkg/storage/txnwait/txnqueue.go index a43934f6d2ec..3b464a3a1535 100644 --- a/pkg/storage/txnwait/txnqueue.go +++ b/pkg/storage/txnwait/txnqueue.go @@ -35,10 +35,14 @@ import ( const maxWaitForQueryTxn = 50 * time.Millisecond +// TxnLivenessHeartbeatMultiplier specifies what multiple the transaction +// liveness threshold should be of the transaction heartbeat internval. +const TxnLivenessHeartbeatMultiplier = 5 + // TxnLivenessThreshold is the maximum duration between transaction heartbeats // before the transaction is considered expired by Queue. It is exposed and // mutable to allow tests to override it. -var TxnLivenessThreshold = 2 * base.DefaultHeartbeatInterval +var TxnLivenessThreshold = TxnLivenessHeartbeatMultiplier * base.DefaultHeartbeatInterval // ShouldPushImmediately returns whether the PushTxn request should // proceed without queueing. This is true for pushes which are neither