Skip to content

Commit

Permalink
txnwait: increase TxnLivenessThreshold significantly
Browse files Browse the repository at this point in the history
This change increases the duration between transaction heartbeats
before a transaction is considered expired from 2 seconds to 5 seconds.
This has been found to dramatically reduce the frequency of transaction
aborts when a cluster is under significant load. This is not expected
to noticeably hurt cluster availability in the presence of dead nodes
because we already have availability loss on the order of 9 seconds due
to the epoch-based lease duration.

This is especially important now that the hack in #25034 is gone. That
hack was hiding some of this badness and giving transactions a bit more
room to avoid being aborted.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 15, 2019
1 parent ee59435 commit 9147c58
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 54 deletions.
97 changes: 49 additions & 48 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -4923,9 +4924,8 @@ func TestPushTxnQueryPusheeHasNewerVersion(t *testing.T) {
}
}

// TestPushTxnHeartbeatTimeout verifies that a txn which
// hasn't been heartbeat within 2x the heartbeat interval can be
// pushed/aborted.
// TestPushTxnHeartbeatTimeout verifies that a txn which hasn't been
// heartbeat within its transaction liveness threshold can be pushed/aborted.
func TestPushTxnHeartbeatTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
tc := testContext{}
Expand All @@ -4937,6 +4937,7 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) {
const txnPushError = "failed to push"
const indetCommitError = "txn in indeterminate STAGING state"

m := int64(txnwait.TxnLivenessHeartbeatMultiplier)
ns := base.DefaultHeartbeatInterval.Nanoseconds()
testCases := []struct {
status roachpb.TransactionStatus // -1 for no record
Expand All @@ -4953,24 +4954,24 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) {
{roachpb.PENDING, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, 0, ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, 0, ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, noError},
{roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_ABORT, noError},
{roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_TOUCH, noError},
{roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, ns, ns * 3, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, ns, ns * 3, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, ns, ns * 3, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_TIMESTAMP, noError},
{roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_ABORT, noError},
{roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_TOUCH, noError},
{roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, 0, m * ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, noError},
{roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_ABORT, noError},
{roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_TOUCH, noError},
{roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_TIMESTAMP, noError},
{roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_ABORT, noError},
{roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_TOUCH, noError},
// If the transaction record is STAGING then any case that previously
// returned a TransactionPushError will continue to return that error,
// but any case that previously succeeded in pushing the transaction
Expand All @@ -4981,24 +4982,24 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) {
{roachpb.STAGING, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, 0, ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, 0, ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, indetCommitError},
{roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_ABORT, indetCommitError},
{roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_TOUCH, indetCommitError},
{roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, ns, ns * 3, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, ns, ns * 3, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, ns, ns * 3, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_TIMESTAMP, indetCommitError},
{roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_ABORT, indetCommitError},
{roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_TOUCH, indetCommitError},
{roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, 0, m * ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, indetCommitError},
{roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_ABORT, indetCommitError},
{roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_TOUCH, indetCommitError},
{roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_TIMESTAMP, indetCommitError},
{roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_ABORT, indetCommitError},
{roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_TOUCH, indetCommitError},
// Even when a transaction record doesn't exist, if the timestamp
// from the PushTxn request indicates sufficiently recent client
// activity, the push will fail.
Expand All @@ -5008,15 +5009,15 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) {
{-1, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{-1, 0, ns, roachpb.PUSH_ABORT, txnPushError},
{-1, 0, ns, roachpb.PUSH_TOUCH, txnPushError},
{-1, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{-1, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError},
{-1, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError},
{-1, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError},
{-1, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError},
{-1, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError},
{-1, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, noError},
{-1, 0, ns*2 + 1, roachpb.PUSH_ABORT, noError},
{-1, 0, ns*2 + 1, roachpb.PUSH_TOUCH, noError},
{-1, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{-1, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError},
{-1, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError},
{-1, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{-1, 0, m * ns, roachpb.PUSH_ABORT, txnPushError},
{-1, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError},
{-1, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, noError},
{-1, 0, m*ns + 1, roachpb.PUSH_ABORT, noError},
{-1, 0, m*ns + 1, roachpb.PUSH_TOUCH, noError},
}

for i, test := range testCases {
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -2440,7 +2441,7 @@ func TestStoreScanIntentsFromTwoTxns(t *testing.T) {
// Now, expire the transactions by moving the clock forward. This will
// result in the subsequent scan operation pushing both transactions
// in a single batch.
manualClock.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1)
manualClock.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1)

// Scan the range and verify empty result (expired txn is aborted,
// cleaning up intents).
Expand Down Expand Up @@ -2491,7 +2492,7 @@ func TestStoreScanMultipleIntents(t *testing.T) {
// Now, expire the transactions by moving the clock forward. This will
// result in the subsequent scan operation pushing both transactions
// in a single batch.
manual.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1)
manual.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1)

// Query the range with a single INCONSISTENT scan, which should
// cause all intents to be resolved.
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/txn_recovery_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func TestTxnRecoveryFromStaging(t *testing.T) {
}

// Pretend the transaction coordinator for the parallel commit died at this point.
// Wait for twice the TxnLivenessThreshold and then issue a read on one of the
// keys that the transaction wrote. This will result in a transaction push and
// Wait for longer than the TxnLivenessThreshold and then issue a read on one of
// the keys that the transaction wrote. This will result in a transaction push and
// eventually a full transaction recovery in order to resolve the indeterminate
// commit.
manual.Increment(2 * txnwait.TxnLivenessThreshold.Nanoseconds())
manual.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1)

gArgs := getArgs(keyA)
gReply, pErr := client.SendWrapped(ctx, store.TestSender(), &gArgs)
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/txnwait/txnqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ import (

const maxWaitForQueryTxn = 50 * time.Millisecond

// TxnLivenessHeartbeatMultiplier specifies what multiple the transaction
// liveness threshold should be of the transaction heartbeat internval.
const TxnLivenessHeartbeatMultiplier = 5

// TxnLivenessThreshold is the maximum duration between transaction heartbeats
// before the transaction is considered expired by Queue. It is exposed and
// mutable to allow tests to override it.
var TxnLivenessThreshold = 2 * base.DefaultHeartbeatInterval
var TxnLivenessThreshold = TxnLivenessHeartbeatMultiplier * base.DefaultHeartbeatInterval

// ShouldPushImmediately returns whether the PushTxn request should
// proceed without queueing. This is true for pushes which are neither
Expand Down

0 comments on commit 9147c58

Please sign in to comment.