From 9147c58c61ef111479c07f30c248f9c4b853e137 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 10 Apr 2019 15:27:23 -0400 Subject: [PATCH] txnwait: increase TxnLivenessThreshold significantly This change increases the duration between transaction heartbeats before a transaction is considered expired from 2 seconds to 5 seconds. This has been found to dramatically reduce the frequency of transaction aborts when a cluster is under significant load. This is not expected to noticeably hurt cluster availability in the presence of dead nodes because we already have availability loss on the order of 9 seconds due to the epoch-based lease duration. This is especially important now that the hack in #25034 is gone. That hack was hiding some of this badness and giving transactions a bit more room to avoid being aborted. Release note: None --- pkg/storage/replica_test.go | 97 ++++++++++---------- pkg/storage/store_test.go | 5 +- pkg/storage/txn_recovery_integration_test.go | 6 +- pkg/storage/txnwait/txnqueue.go | 6 +- 4 files changed, 60 insertions(+), 54 deletions(-) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 10cdce0c4f75..4ce9eb952bb7 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -4923,9 +4924,8 @@ func TestPushTxnQueryPusheeHasNewerVersion(t *testing.T) { } } -// TestPushTxnHeartbeatTimeout verifies that a txn which -// hasn't been heartbeat within 2x the heartbeat interval can be -// pushed/aborted. +// TestPushTxnHeartbeatTimeout verifies that a txn which hasn't been +// heartbeat within its transaction liveness threshold can be pushed/aborted. func TestPushTxnHeartbeatTimeout(t *testing.T) { defer leaktest.AfterTest(t)() tc := testContext{} @@ -4937,6 +4937,7 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { const txnPushError = "failed to push" const indetCommitError = "txn in indeterminate STAGING state" + m := int64(txnwait.TxnLivenessHeartbeatMultiplier) ns := base.DefaultHeartbeatInterval.Nanoseconds() testCases := []struct { status roachpb.TransactionStatus // -1 for no record @@ -4953,24 +4954,24 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { {roachpb.PENDING, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError}, {roachpb.PENDING, 0, ns, roachpb.PUSH_ABORT, txnPushError}, {roachpb.PENDING, 0, ns, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.PENDING, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.PENDING, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.PENDING, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, noError}, - {roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_ABORT, noError}, - {roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_TOUCH, noError}, - {roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.PENDING, ns, ns * 3, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.PENDING, ns, ns * 3, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.PENDING, ns, ns * 3, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_TIMESTAMP, noError}, - {roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_ABORT, noError}, - {roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_TOUCH, noError}, + {roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.PENDING, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.PENDING, 0, m * ns, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.PENDING, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, noError}, + {roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_ABORT, noError}, + {roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_TOUCH, noError}, + {roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_TIMESTAMP, noError}, + {roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_ABORT, noError}, + {roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_TOUCH, noError}, // If the transaction record is STAGING then any case that previously // returned a TransactionPushError will continue to return that error, // but any case that previously succeeded in pushing the transaction @@ -4981,24 +4982,24 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { {roachpb.STAGING, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError}, {roachpb.STAGING, 0, ns, roachpb.PUSH_ABORT, txnPushError}, {roachpb.STAGING, 0, ns, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.STAGING, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.STAGING, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.STAGING, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, indetCommitError}, - {roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_ABORT, indetCommitError}, - {roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_TOUCH, indetCommitError}, - {roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.STAGING, ns, ns * 3, roachpb.PUSH_TIMESTAMP, txnPushError}, - {roachpb.STAGING, ns, ns * 3, roachpb.PUSH_ABORT, txnPushError}, - {roachpb.STAGING, ns, ns * 3, roachpb.PUSH_TOUCH, txnPushError}, - {roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_TIMESTAMP, indetCommitError}, - {roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_ABORT, indetCommitError}, - {roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_TOUCH, indetCommitError}, + {roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.STAGING, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.STAGING, 0, m * ns, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.STAGING, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, indetCommitError}, + {roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_ABORT, indetCommitError}, + {roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_TOUCH, indetCommitError}, + {roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_TIMESTAMP, txnPushError}, + {roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_ABORT, txnPushError}, + {roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_TOUCH, txnPushError}, + {roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_TIMESTAMP, indetCommitError}, + {roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_ABORT, indetCommitError}, + {roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_TOUCH, indetCommitError}, // Even when a transaction record doesn't exist, if the timestamp // from the PushTxn request indicates sufficiently recent client // activity, the push will fail. @@ -5008,15 +5009,15 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { {-1, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError}, {-1, 0, ns, roachpb.PUSH_ABORT, txnPushError}, {-1, 0, ns, roachpb.PUSH_TOUCH, txnPushError}, - {-1, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, - {-1, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError}, - {-1, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError}, - {-1, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError}, - {-1, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError}, - {-1, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError}, - {-1, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, noError}, - {-1, 0, ns*2 + 1, roachpb.PUSH_ABORT, noError}, - {-1, 0, ns*2 + 1, roachpb.PUSH_TOUCH, noError}, + {-1, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError}, + {-1, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError}, + {-1, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError}, + {-1, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError}, + {-1, 0, m * ns, roachpb.PUSH_ABORT, txnPushError}, + {-1, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError}, + {-1, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, noError}, + {-1, 0, m*ns + 1, roachpb.PUSH_ABORT, noError}, + {-1, 0, m*ns + 1, roachpb.PUSH_TOUCH, noError}, } for i, test := range testCases { diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index b3adac9c03c5..517d5310363f 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -2440,7 +2441,7 @@ func TestStoreScanIntentsFromTwoTxns(t *testing.T) { // Now, expire the transactions by moving the clock forward. This will // result in the subsequent scan operation pushing both transactions // in a single batch. - manualClock.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1) + manualClock.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1) // Scan the range and verify empty result (expired txn is aborted, // cleaning up intents). @@ -2491,7 +2492,7 @@ func TestStoreScanMultipleIntents(t *testing.T) { // Now, expire the transactions by moving the clock forward. This will // result in the subsequent scan operation pushing both transactions // in a single batch. - manual.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1) + manual.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1) // Query the range with a single INCONSISTENT scan, which should // cause all intents to be resolved. diff --git a/pkg/storage/txn_recovery_integration_test.go b/pkg/storage/txn_recovery_integration_test.go index ca870cef84b9..5d8038211d84 100644 --- a/pkg/storage/txn_recovery_integration_test.go +++ b/pkg/storage/txn_recovery_integration_test.go @@ -103,11 +103,11 @@ func TestTxnRecoveryFromStaging(t *testing.T) { } // Pretend the transaction coordinator for the parallel commit died at this point. - // Wait for twice the TxnLivenessThreshold and then issue a read on one of the - // keys that the transaction wrote. This will result in a transaction push and + // Wait for longer than the TxnLivenessThreshold and then issue a read on one of + // the keys that the transaction wrote. This will result in a transaction push and // eventually a full transaction recovery in order to resolve the indeterminate // commit. - manual.Increment(2 * txnwait.TxnLivenessThreshold.Nanoseconds()) + manual.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1) gArgs := getArgs(keyA) gReply, pErr := client.SendWrapped(ctx, store.TestSender(), &gArgs) diff --git a/pkg/storage/txnwait/txnqueue.go b/pkg/storage/txnwait/txnqueue.go index a43934f6d2ec..3b464a3a1535 100644 --- a/pkg/storage/txnwait/txnqueue.go +++ b/pkg/storage/txnwait/txnqueue.go @@ -35,10 +35,14 @@ import ( const maxWaitForQueryTxn = 50 * time.Millisecond +// TxnLivenessHeartbeatMultiplier specifies what multiple the transaction +// liveness threshold should be of the transaction heartbeat internval. +const TxnLivenessHeartbeatMultiplier = 5 + // TxnLivenessThreshold is the maximum duration between transaction heartbeats // before the transaction is considered expired by Queue. It is exposed and // mutable to allow tests to override it. -var TxnLivenessThreshold = 2 * base.DefaultHeartbeatInterval +var TxnLivenessThreshold = TxnLivenessHeartbeatMultiplier * base.DefaultHeartbeatInterval // ShouldPushImmediately returns whether the PushTxn request should // proceed without queueing. This is true for pushes which are neither