Skip to content

Commit

Permalink
Merge #36748
Browse files Browse the repository at this point in the history
36748: txnwait: increase TxnLivenessThreshold significantly, reduce txn aborts under load r=nvanbenschoten a=nvanbenschoten

This change increases the duration between transaction heartbeats before a transaction is considered expired from 2 seconds to 5 seconds. This has been found to dramatically reduce the frequency of transaction aborts when a cluster is under significant load. This is not expected to noticeably hurt cluster availability in the presence of dead nodes because we already have availability loss on the order of 9 seconds due to the epoch-based lease duration.

This is especially important now that the hack in #25034 is gone. That hack was hiding some of this badness and giving transactions a bit more room to avoid being aborted.

Here's some testing with TPC-C 2300 on AWS with 3 `c5d.4xlarge` machines. I first ran with the the 2 second TxnLivenessThreshold and then with the 5 second:

<img width="872" alt="Screen Shot 2019-04-10 at 4 51 28 PM" src="https://user-images.githubusercontent.com/5438456/55926033-ae332c00-5bdd-11e9-8d6d-184ba1d5ffcd.png">

The top graph is transaction aborts and the bottom graph is p99 service latency. During my tests I actually saw an even bigger difference most of the time. In this trial, leases were still moving around because I didn't use a ramp period (which skews the txn aborts when it stops). I ran another 15 minute run again half an hour later once the leases had balanced (again with a 5 second TxnLivenessThreshold) and saw the latencies I was more accustomed to seeing with the change:

<img width="869" alt="Screen Shot 2019-04-10 at 5 59 19 PM" src="https://user-images.githubusercontent.com/5438456/55926104-09651e80-5bde-11e9-9540-9d7c697a9181.png">

cc. @danhhz, who picked this up back in January when monitoring the performance of an alpha release. I dug into the changes made in 8143b45 that seemed to be causing issues with transaction aborts but never developed a good theory for what could be causing them and had trouble reliably reproducing them. Little did I know that it wasn't what was added in 8143b45 that made the difference, it was what [was removed](8143b45#diff-3c31a6497771c33db423a4cead092979L130).

The first commit is from #36729 and seems like a generally good change to make. There's no reason we should be sending async aborts if we already know that the transaction was finalized. This was probably also made a little worse by 8143b45 because we now check to see whether a transaction is commitable when its record is missing when evaluating a `HeartbeatTxn` requests. Before we would simply return a `TransactionNotFoundStatusError`, which was ignored.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Apr 16, 2019
2 parents f83aff3 + 9147c58 commit d913b9b
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 60 deletions.
15 changes: 9 additions & 6 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,25 +375,28 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {

// Clone the txn in order to put it in the heartbeat request.
txn := h.mu.txn.Clone()

if txn.Key == nil {
log.Fatalf(ctx, "attempting to heartbeat txn without anchor key: %v", txn)
}

ba := roachpb.BatchRequest{}
ba.Txn = txn

hb := &roachpb.HeartbeatTxnRequest{
ba.Add(&roachpb.HeartbeatTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txn.Key,
},
Now: h.clock.Now(),
}
ba.Add(hb)
})

// Send the heartbeat request directly through the gatekeeper interceptor.
// See comment on h.gatekeeper for a discussion of why.
log.VEvent(ctx, 2, "heartbeat")
br, pErr := h.gatekeeper.SendLocked(ctx, ba)

// If the txn is no longer pending, ignore the result of the heartbeat.
if h.mu.txn.Status != roachpb.PENDING {
return false
}

var respTxn *roachpb.Transaction
if pErr != nil {
log.VEventf(ctx, 2, "heartbeat failed: %s", pErr)
Expand Down
97 changes: 49 additions & 48 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -4923,9 +4924,8 @@ func TestPushTxnQueryPusheeHasNewerVersion(t *testing.T) {
}
}

// TestPushTxnHeartbeatTimeout verifies that a txn which
// hasn't been heartbeat within 2x the heartbeat interval can be
// pushed/aborted.
// TestPushTxnHeartbeatTimeout verifies that a txn which hasn't been
// heartbeat within its transaction liveness threshold can be pushed/aborted.
func TestPushTxnHeartbeatTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
tc := testContext{}
Expand All @@ -4937,6 +4937,7 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) {
const txnPushError = "failed to push"
const indetCommitError = "txn in indeterminate STAGING state"

m := int64(txnwait.TxnLivenessHeartbeatMultiplier)
ns := base.DefaultHeartbeatInterval.Nanoseconds()
testCases := []struct {
status roachpb.TransactionStatus // -1 for no record
Expand All @@ -4953,24 +4954,24 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) {
{roachpb.PENDING, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, 0, ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, 0, ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, noError},
{roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_ABORT, noError},
{roachpb.PENDING, 0, ns*2 + 1, roachpb.PUSH_TOUCH, noError},
{roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, ns, ns*2 + 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, ns, ns * 3, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, ns, ns * 3, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, ns, ns * 3, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_TIMESTAMP, noError},
{roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_ABORT, noError},
{roachpb.PENDING, ns, ns*3 + 1, roachpb.PUSH_TOUCH, noError},
{roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, 0, m * ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, noError},
{roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_ABORT, noError},
{roachpb.PENDING, 0, m*ns + 1, roachpb.PUSH_TOUCH, noError},
{roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, ns, m*ns + 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.PENDING, ns, (m + 1) * ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_TIMESTAMP, noError},
{roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_ABORT, noError},
{roachpb.PENDING, ns, (m+1)*ns + 1, roachpb.PUSH_TOUCH, noError},
// If the transaction record is STAGING then any case that previously
// returned a TransactionPushError will continue to return that error,
// but any case that previously succeeded in pushing the transaction
Expand All @@ -4981,24 +4982,24 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) {
{roachpb.STAGING, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, 0, ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, 0, ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, indetCommitError},
{roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_ABORT, indetCommitError},
{roachpb.STAGING, 0, ns*2 + 1, roachpb.PUSH_TOUCH, indetCommitError},
{roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, ns, ns*2 + 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, ns, ns * 3, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, ns, ns * 3, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, ns, ns * 3, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_TIMESTAMP, indetCommitError},
{roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_ABORT, indetCommitError},
{roachpb.STAGING, ns, ns*3 + 1, roachpb.PUSH_TOUCH, indetCommitError},
{roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, 0, m * ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, indetCommitError},
{roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_ABORT, indetCommitError},
{roachpb.STAGING, 0, m*ns + 1, roachpb.PUSH_TOUCH, indetCommitError},
{roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, ns, m*ns + 1, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_ABORT, txnPushError},
{roachpb.STAGING, ns, (m + 1) * ns, roachpb.PUSH_TOUCH, txnPushError},
{roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_TIMESTAMP, indetCommitError},
{roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_ABORT, indetCommitError},
{roachpb.STAGING, ns, (m+1)*ns + 1, roachpb.PUSH_TOUCH, indetCommitError},
// Even when a transaction record doesn't exist, if the timestamp
// from the PushTxn request indicates sufficiently recent client
// activity, the push will fail.
Expand All @@ -5008,15 +5009,15 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) {
{-1, 0, ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{-1, 0, ns, roachpb.PUSH_ABORT, txnPushError},
{-1, 0, ns, roachpb.PUSH_TOUCH, txnPushError},
{-1, 0, ns*2 - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{-1, 0, ns*2 - 1, roachpb.PUSH_ABORT, txnPushError},
{-1, 0, ns*2 - 1, roachpb.PUSH_TOUCH, txnPushError},
{-1, 0, ns * 2, roachpb.PUSH_TIMESTAMP, txnPushError},
{-1, 0, ns * 2, roachpb.PUSH_ABORT, txnPushError},
{-1, 0, ns * 2, roachpb.PUSH_TOUCH, txnPushError},
{-1, 0, ns*2 + 1, roachpb.PUSH_TIMESTAMP, noError},
{-1, 0, ns*2 + 1, roachpb.PUSH_ABORT, noError},
{-1, 0, ns*2 + 1, roachpb.PUSH_TOUCH, noError},
{-1, 0, m*ns - 1, roachpb.PUSH_TIMESTAMP, txnPushError},
{-1, 0, m*ns - 1, roachpb.PUSH_ABORT, txnPushError},
{-1, 0, m*ns - 1, roachpb.PUSH_TOUCH, txnPushError},
{-1, 0, m * ns, roachpb.PUSH_TIMESTAMP, txnPushError},
{-1, 0, m * ns, roachpb.PUSH_ABORT, txnPushError},
{-1, 0, m * ns, roachpb.PUSH_TOUCH, txnPushError},
{-1, 0, m*ns + 1, roachpb.PUSH_TIMESTAMP, noError},
{-1, 0, m*ns + 1, roachpb.PUSH_ABORT, noError},
{-1, 0, m*ns + 1, roachpb.PUSH_TOUCH, noError},
}

for i, test := range testCases {
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -2440,7 +2441,7 @@ func TestStoreScanIntentsFromTwoTxns(t *testing.T) {
// Now, expire the transactions by moving the clock forward. This will
// result in the subsequent scan operation pushing both transactions
// in a single batch.
manualClock.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1)
manualClock.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1)

// Scan the range and verify empty result (expired txn is aborted,
// cleaning up intents).
Expand Down Expand Up @@ -2491,7 +2492,7 @@ func TestStoreScanMultipleIntents(t *testing.T) {
// Now, expire the transactions by moving the clock forward. This will
// result in the subsequent scan operation pushing both transactions
// in a single batch.
manual.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1)
manual.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1)

// Query the range with a single INCONSISTENT scan, which should
// cause all intents to be resolved.
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/txn_recovery_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func TestTxnRecoveryFromStaging(t *testing.T) {
}

// Pretend the transaction coordinator for the parallel commit died at this point.
// Wait for twice the TxnLivenessThreshold and then issue a read on one of the
// keys that the transaction wrote. This will result in a transaction push and
// Wait for longer than the TxnLivenessThreshold and then issue a read on one of
// the keys that the transaction wrote. This will result in a transaction push and
// eventually a full transaction recovery in order to resolve the indeterminate
// commit.
manual.Increment(2 * txnwait.TxnLivenessThreshold.Nanoseconds())
manual.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1)

gArgs := getArgs(keyA)
gReply, pErr := client.SendWrapped(ctx, store.TestSender(), &gArgs)
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/txnwait/txnqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ import (

const maxWaitForQueryTxn = 50 * time.Millisecond

// TxnLivenessHeartbeatMultiplier specifies what multiple the transaction
// liveness threshold should be of the transaction heartbeat internval.
const TxnLivenessHeartbeatMultiplier = 5

// TxnLivenessThreshold is the maximum duration between transaction heartbeats
// before the transaction is considered expired by Queue. It is exposed and
// mutable to allow tests to override it.
var TxnLivenessThreshold = 2 * base.DefaultHeartbeatInterval
var TxnLivenessThreshold = TxnLivenessHeartbeatMultiplier * base.DefaultHeartbeatInterval

// ShouldPushImmediately returns whether the PushTxn request should
// proceed without queueing. This is true for pushes which are neither
Expand Down

0 comments on commit d913b9b

Please sign in to comment.