Skip to content

Commit

Permalink
storage: allow HeartbeatTxn and EndTxn requests to create txn records
Browse files Browse the repository at this point in the history
Informs #25437.
Informs #32971.

This is the first part of addressing #32971. Part two will update
concurrent txns to not immediately abort missing txn records and
part three will update the txn client to stop sending BeginTxn
records.

The change closely resembles what was laid out in the corresponding
RFC sections. `HeartbeatTxn` and `EndTxn` are both updated to permit
the creation of transaction records when they finds that one is missing.

To prevent replays from causing issues, they *both* check the write
timestamp cache when creating a transaction record. This is one area
where the change diverges from the RFC. Instead of having `EndTxn`
always check the write timestamp cache, it only does so if it is
creating a txn record from scratch. To make this safe, `HeartbeatTxn`
also checks the write timestamp cache if it is creating a txn record
from scratch. This prevents a long running transaction from increasingly
running the risk of being aborted by a lease transfer as its `EndTxn`
continues to be delayed. Instead, a lease transfer will only abort a
transaction if it comes before the transaction record creation, which
should be within a second of the transaction starting.

The change pulls out a new `CanCreateTxnRecord` method, which has the
potential of being useful for detecting eagerly GCed transaction records
and solving the major problem from the RFC without an extra QueryIntent.
This is what Andrei was pushing for before. More details are included
in TODOs within the PR.

_### Migration Safety

Most of the changes to the transaction state machine are straightforward
to validate. Transaction records can still never be created without
checking for replays and only an EndTxn from the client's sync path
can GC an ABORTED txn record. This means that it is still impossible for
a transaction to move between the two finalized statuses (at some point
it would be worth it to draw this state machine).

The one tricky part is ensuring that the changes in this PR are safe
when run in mixed-version clusters. The two areas of concern are:
1. lease transfers between a new and an old cluster on a range that
should/does contain a transaction record.
2. an old txn client that is not expecting HeartbeatTxn and EndTxn
requests to create txn records if they are found to be missing.
3. an old txn client may not expect a HeartbeatTxn to hit the
write timestamp cache if run concurrently with an EndTxn.

The first concern is protected by the write timestamp cache. Regardless
of which replica creates that transaction record from a BeginTxn req or
a HeartbeatTxn req (on the new replica), it will first need to check
the timestamp cache. This prevents against any kind of replay that could
create a transaction record that the old replica is not prepared to
handle.

We can break the second concern into two parts. First, an old txn client
will never send an EndTxn without having sent a successful BeginTxn, so
the new behavior there is not an issue. Second, an old txn client may
send a HeartbeatTxn concurrently with a BeginTxn. If the heartbeat wins,
it will create a txn record on a new replica. If the BeginTxn evaluates
on a new replica, it will be a no-op. If it evaluates on an old replica,
it will result in a retryable error.

The third concern is specific to the implementation of the heartbeat loop
itself. If a HeartbeatTxn loses a race with an EndTxn, it may get an
aborted error after checking the timestamp cache. This is desirable from
the replica-side, but we'd need to ensure that the client will handle it
correctly. This PR adds some protection (see `sendingEndTxn`) to the txn
client to prevent this case from causing weirdness on the client, but I
don't think this could actually cause issues even without this protection.
The reason is that the txn client might mark itself as aborted due to the
heartbeat, but this will be overwritten when the EndTxn returns and the sql
client will still hear back from the successful EndTxn. This must have
actually always been an issue because it was always possible for a committed
txn record to be GCed and then written as aborted later, at which point a
concurrent heartbeat could have observed it.

I'd like Andrei to sign off on this last hazard, as he's thought about
this kind of thing more than anybody.

Release note: None
  • Loading branch information
nvanbenschoten committed Jan 7, 2019
1 parent 7472973 commit 8143b45
Show file tree
Hide file tree
Showing 16 changed files with 1,425 additions and 553 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,9 @@ func TestReverseScanWithSplitAndMerge(t *testing.T) {

func TestBadRequest(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip("TODO(andreimatei): This last assertion in this test was broken by #33150. " +
"I suspect the reason is that there is no longer a single Range " +
"that spans [KeyMin, z), so we're not hitting the error.")
s, db := startNoSplitMergeServer(t)
defer s.Stopper().Stop(context.TODO())
ctx := context.TODO()
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
// 3) Another txn runs into the intents on r2, tries to push, and succeeds
// because the txn record doesn't exist yet. It writes the txn record as
// Aborted.
// TODO(nvanbenschoten): This test will change when #25437 is fully addressed.
// 4) If the Begin were to execute now, it'd discover the Aborted txn and return
// a retryable TxnAbortedError. But it doesn't execute now; it's still delayed
// somehow.
Expand All @@ -67,6 +68,7 @@ import (
// result of this convoluted scenario.
func TestDelayedBeginRetryable(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip(`Flaky, will be removed when #25437 is addressed`)

// Here's how this test is gonna go:
// - We're going to send a BeginTxn+Put(a)+Put(c). The first two will be split
Expand Down Expand Up @@ -183,7 +185,7 @@ func TestDelayedBeginRetryable(t *testing.T) {
if _, ok := pErr.GetDetail().(*roachpb.HandledRetryableTxnError); !ok {
t.Fatalf("expected HandledRetryableTxnError, got: %v", pErr)
}
exp := "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)"
exp := "TransactionAbortedError(ABORT_REASON_ABORT_SPAN)"
if !testutils.IsPError(pErr, regexp.QuoteMeta(exp)) {
t.Fatalf("expected %s, got: %s", exp, pErr)
}
Expand Down
27 changes: 12 additions & 15 deletions pkg/kv/txn_interceptor_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,27 +459,30 @@ func (h *txnHeartbeat) heartbeat(ctx context.Context) bool {
// then we ignore the error. This is possible if the heartbeat loop was
// started before a BeginTxn request succeeds because of ambiguity in the
// first write request's response.
//
// TODO(nvanbenschoten): Remove this in 2.3.
if tse, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); ok &&
tse.Reason == roachpb.TransactionStatusError_REASON_TXN_NOT_FOUND {
return true
}

if pErr.GetTxn() != nil {
// It is not expected for a 2.1 node to return an error with a transaction
// in it. For one, heartbeats are not supposed to return
// TransactionAbortedErrors.
// TODO(andrei): Remove this in 2.2.
respTxn = pErr.GetTxn()
} else {
return true
}
respTxn = pErr.GetTxn()
} else {
respTxn = br.Responses[0].GetInner().(*roachpb.HeartbeatTxnResponse).Txn
}

// Update our txn. In particular, we need to make sure that the client will
// notice when the txn has been aborted (in which case we'll give them an
// error on their next request).
//
// TODO(nvanbenschoten): It's possible for a HeartbeatTxn request to observe
// the result of an EndTransaction request and beat it back to the client.
// This is an issue when a COMMITTED txn record is GCed and later re-written
// as ABORTED. The coordinator's local status could flip from PENDING to
// ABORTED (after heartbeat response) to COMMITTED (after commit response).
// This appears to be benign, but it's still somewhat disconcerting. If this
// ever causes any issues, we'll need to be smarter about detecting this race
// on the client and conditionally ignoring the result of heartbeat responses.
h.mu.txn.Update(respTxn)
if h.mu.txn.Status != roachpb.PENDING {
if h.mu.txn.Status == roachpb.ABORTED {
Expand All @@ -494,12 +497,6 @@ func (h *txnHeartbeat) heartbeat(ctx context.Context) bool {
// abortTxnAsyncLocked send an EndTransaction(commmit=false) asynchronously.
// The asyncAbortCallbackLocked callback is also called.
func (h *txnHeartbeat) abortTxnAsyncLocked(ctx context.Context) {
// Stop the heartbeat loop if it is still running.
if h.mu.txnEnd != nil {
close(h.mu.txnEnd)
h.mu.txnEnd = nil
}

if h.mu.txn.Status != roachpb.ABORTED {
log.Fatalf(ctx, "abortTxnAsyncLocked called for non-aborted txn: %s", h.mu.txn)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ func (*ScanRequest) flags() int { return isRead | isRange | isTxn | update
func (*ReverseScanRequest) flags() int {
return isRead | isRange | isReverse | isTxn | updatesReadTSCache | needsRefresh
}
func (*BeginTransactionRequest) flags() int { return isWrite | isTxn | consultsTSCache }
func (*BeginTransactionRequest) flags() int { return isWrite | isTxn }

// EndTransaction updates the write timestamp cache to prevent
// replays. Replays for the same transaction key and timestamp will
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func (t *Transaction) BumpEpoch() {
func (t *Transaction) InclusiveTimeBounds() (hlc.Timestamp, hlc.Timestamp) {
min := t.OrigTimestamp
max := t.Timestamp
if t.Epoch != 0 {
if t.Epoch != 0 && t.EpochZeroTimestamp != (hlc.Timestamp{}) {
if min.Less(t.EpochZeroTimestamp) {
panic(fmt.Sprintf("orig timestamp %s less than epoch zero %s", min, t.EpochZeroTimestamp))
}
Expand Down
9 changes: 0 additions & 9 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,15 +408,6 @@ func NewTransactionStatusError(msg string) *TransactionStatusError {
}
}

// NewTransactionNotFoundStatusError initializes a new TransactionStatusError with
// a REASON_TXN_NOT_FOUND reason.
func NewTransactionNotFoundStatusError() *TransactionStatusError {
return &TransactionStatusError{
Msg: "txn record not found",
Reason: TransactionStatusError_REASON_TXN_NOT_FOUND,
}
}

// NewTransactionCommittedStatusError initializes a new TransactionStatusError
// with a REASON_TXN_COMMITTED.
func NewTransactionCommittedStatusError() *TransactionStatusError {
Expand Down
Loading

0 comments on commit 8143b45

Please sign in to comment.