Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
69966: kvcoord: avoid concurrent rollbacks when making parallel commits explicit r=nvanbenschoten,andreimatei a=erikgrinaker

`TxnCoordSender` allows `EndTxn(commit=false)` rollback requests even if
the transaction state is finalized, since clients can send multiple
rollbacks (e.g. due to context cancellation). However, it allowed this
even when the transaction was committed. This could pass the request
through while the `txnCommitter` was asynchronously making an implicit
commit explicit, which would violate the `txnLockGatekeeper` requirement
that transaction requests are synchronous (non-concurrent) which would
return an unexpected error for the rollback.

This patch rejects additional `EndTxn(commit=false)` requests if the
finalized transaction is known to be committed, to prevent this race
condition. If rejected, the returned error is of the same type that
would be returned by `EndTxn` evaluation, although with a different
message string.

Note that even though the returned error should really have
`REASON_TXN_COMMITTED` in this case, which is also what `txn.Rollback()`
expects in order to omit logging, the current `EndTxn` code incorrectly
returns `REASON_TXN_UNKNOWN` in this case. This behavior is retained to
minimize the change, but should be corrected separately.

Resolves cockroachdb#68643.
Informs cockroachdb#69965.

Release justification: fixes for high-priority or high-severity bugs in existing functionality
Release note: None

70210: rowexec: fix a ctx NPE in couple of processors in edge cases r=yuzefovich a=yuzefovich

We recently extended the statistics that are collected during EXPLAIN
ANALYZE runs to include the "scan stats". However, in the joinReader and
the tableReader those are collected when closing the processor based on
the context argument, and in some edge cases (like when `Start` was
never called) might be nil leading to a NPE. This commit fixes the
problem by unifying all processors to collect all execution stats in
`execStatsForTrace` (which isn't called in the edge case mentioned
above).

Fixes: cockroachdb#70075.
Fixes: cockroachdb#70107.

Release note: None (no release with this bug)

Co-authored-by: Erik Grinaker <grinaker@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
3 people committed Sep 14, 2021
3 parents f26d7ad + 40ab220 + 26b4cd5 commit b367897
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 9 deletions.
7 changes: 6 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,16 @@ func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) er
func (tc *TxnCoordSender) maybeRejectClientLocked(
ctx context.Context, ba *roachpb.BatchRequest,
) *roachpb.Error {
if ba != nil && ba.IsSingleAbortTxnRequest() {
if ba != nil && ba.IsSingleAbortTxnRequest() && tc.mu.txn.Status != roachpb.COMMITTED {
// As a special case, we allow rollbacks to be sent at any time. Any
// rollback attempt moves the TxnCoordSender state to txnFinalized, but higher
// layers are free to retry rollbacks if they want (and they do, for
// example, when the context was canceled while txn.Rollback() was running).
//
// However, we reject this if we know that the transaction has been
// committed, to avoid sending the rollback concurrently with the
// txnCommitter asynchronously making the commit explicit. See:
// https://github.com/cockroachdb/cockroach/issues/68643
return nil
}

Expand Down
78 changes: 78 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -379,6 +380,83 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {
}
}

// TestTxnCoordSenderCommitCanceled is a regression test for
// https://github.com/cockroachdb/cockroach/issues/68643. It makes sure that an
// EndTxn(commit=false) sent by the caller in response to a client context
// cancellation isn't passed through TxnCoordSender concurrently with an
// asynchronous EndTxn(commit=true) request sent by txnCommitter to make an
// implicitly committed transaction explicit.
func TestTxnCoordSenderCommitCanceled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

// blockCommits is used to block commit responses for a given txn. The key is
// a txn ID, and the value is a ready channel (chan struct) that will be
// closed when the commit has been received and blocked.
var blockCommits sync.Map
responseFilter := func(_ context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if arg, ok := ba.GetArg(roachpb.EndTxn); ok && ba.Txn != nil {
et := arg.(*roachpb.EndTxnRequest)
readyC, ok := blockCommits.Load(ba.Txn.ID)
if ok && et.Commit && len(et.InFlightWrites) == 0 {
close(readyC.(chan struct{})) // notify test that commit is received and blocked
<-ctx.Done() // wait for test to complete (NB: not the passed context)
}
}
return nil
}

s := createTestDBWithKnobs(t, &kvserver.StoreTestingKnobs{
TestingResponseFilter: responseFilter,
})
defer s.Stop()
ctx, _ = s.Stopper().WithCancelOnQuiesce(ctx)

// Set up a new txn, and write a couple of values.
txn := kv.NewTxn(ctx, s.DB, 0)
require.NoError(t, txn.Put(ctx, "a", "1"))
require.NoError(t, txn.Put(ctx, "b", "2"))

// Read back a. This is crucial to reproduce the original bug. We need
// txnPipeliner to record the lock in its lock footprint, but it doesn't do
// that if the intents are proven together with the commit EndTxn request
// (because it incorrectly assumes no further requests will be sent). If the
// lock footprint isn't updated, the TxnCoordSender will incorrectly believe
// the txn hasn't taken out any locks, and will elide the final
// EndTxn(commit=false) rollback request. For details, see:
// https://github.com/cockroachdb/cockroach/issues/68643
_, err := txn.Get(ctx, "a")
require.NoError(t, err)

// Commit the transaction, but ask the response filter to block the final
// async commit sent by txnCommitter to make the implicit commit explicit.
readyC := make(chan struct{})
blockCommits.Store(txn.ID(), readyC)
require.NoError(t, txn.Commit(ctx))
<-readyC

// From the TxnCoordSender's point of view, the txn is implicitly committed,
// and the commit response is on its way back up the stack. However, if the
// client were to disconnect before receiving the response (canceling the
// context), and something rolls back the transaction because of that, then
// txn.Rollback() would send an asynchronous rollback request using a separate
// context.
//
// However, this is hard to test since txn.Rollback() in this case sends the
// EndTxn(commit=false) async. We instead replicate what Txn.Rollback() would
// do here (i.e. send a EndTxn(commit=false)) and assert that we receive the
// expected error.
var ba roachpb.BatchRequest
ba.Add(&roachpb.EndTxnRequest{Commit: false})
_, pErr := txn.Send(ctx, ba)
require.NotNil(t, pErr)
require.IsType(t, &roachpb.TransactionStatusError{}, pErr.GetDetail())
// TODO(erikgrinaker): This should really assert REASON_TXN_COMMITTED, but
// we return REASON_TXN_UNKNOWN to preserve existing EndTxn behavior.
}

// TestTxnCoordSenderAddLockOnError verifies that locks are tracked if the
// transaction is, even on error.
func TestTxnCoordSenderAddLockOnError(t *testing.T) {
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,11 +883,6 @@ func (jr *joinReader) ConsumerClosed() {
}

func (jr *joinReader) close() {
// Make sure to clone any tracing span so that stats can pick it up later.
// Stats are only collected after we finish closing the processor.
if !jr.Closed {
jr.scanStats = execinfra.GetScanStats(jr.Ctx)
}
if jr.InternalClose() {
if jr.fetcher != nil {
jr.fetcher.Close(jr.Ctx)
Expand Down Expand Up @@ -918,6 +913,7 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats {
}

// TODO(asubiotto): Add memory and disk usage to EXPLAIN ANALYZE.
jr.scanStats = execinfra.GetScanStats(jr.Ctx)
ret := &execinfrapb.ComponentStats{
Inputs: []execinfrapb.InputStats{is},
KV: execinfrapb.KVStats{
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/rowexec/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,6 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata

func (tr *tableReader) close() {
if tr.InternalClose() {
// scanStats is collected from the trace after we finish doing work for this
// join.
tr.scanStats = execinfra.GetScanStats(tr.Ctx)
if tr.fetcher != nil {
tr.fetcher.Close(tr.Ctx)
}
Expand All @@ -319,6 +316,7 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats {
if !ok {
return nil
}
tr.scanStats = execinfra.GetScanStats(tr.Ctx)
ret := &execinfrapb.ComponentStats{
KV: execinfrapb.KVStats{
BytesRead: optional.MakeUint(uint64(tr.fetcher.GetBytesRead())),
Expand Down

0 comments on commit b367897

Please sign in to comment.