Skip to content

Commit

Permalink
sql: prioritize retryable errors in synchronizeParallelStmts
Browse files Browse the repository at this point in the history
At the moment, parallel statement execution works by sending batches
concurrently through a single `client.Txn`. This make the handling of
retryable errors tricky because it's difficult to know when its safe
to prepare the transaction state for a retry. Our approach to this is
far from optimal, and relies on a mess of locking in both `client.Txn`
and `TxnCoordSender`. This works well enough to prevent anything from
seriously going wrong (cockroachdb#17197), but can result in some confounding error
behavior when statements operate in the context of transaction epochs
that they weren't expecting.

The ideal situation would be for all statements with a handle to a txn
to always work under the same txn epoch at a single point in time. Any
retryable error seen by these statements would be propagated up through
`client.Txn` without changing any state (and without yet being converted
to a `HandledRetryableTxnError`), and only after the statements have all
been synchronized would the retryable error be used to update the txn
and prepare for the retry attempt. This would require a change like cockroachdb#22615.
I've created a POC for this approach, but it is way to invasive to
cherry-pick.

So with our current state of things, we need to do a better job catching
errors caused by concurrent retries. In the past we've tried to carefully
determine which errors could be a symptom of a concurrent retry and ignore
them. I now think this was a mistake, as this process of inferring which
errors could be caused by a txn retry is fraught for failure. We now
always return retryable errors from synchronizeParallelStmts when they
exist. The reasoning for this is that if an error was a symptom of the
txn retry, it will not be present during the next txn attempt. If it was
not and instead was a legitimate query execution error, we expect to
hit it again on the next txn attempt and the behavior will mirror that
where the statement throwing the execution error was not even run before
the parallel queue hit the retryable error.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 1, 2018
1 parent 34c7c69 commit 3692794
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 62 deletions.
8 changes: 0 additions & 8 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,14 +1011,6 @@ func (txn *Txn) Send(
// this Txn object has already aborted and restarted the txn.
txn.updateStateOnRetryableErrLocked(ctx, retryErr)
}
case *roachpb.TransactionReplayError:
if pErr.GetTxn().ID != txn.mu.Proto.ID {
// It is possible that a concurrent request through this Txn
// object has already aborted and restarted the txn. In this
// case, we may see a TransactionReplayError if the abort
// beats the original BeginTxn request to the txn record.
return nil, roachpb.NewError(&roachpb.TxnPrevAttemptError{})
}
}
// Note that unhandled retryable txn errors are allowed from leaf
// transactions. We pass them up through distributed SQL flows to
Expand Down
68 changes: 41 additions & 27 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"math"
"sort"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -1383,39 +1384,52 @@ func (ex *connExecutor) synchronizeParallelStmts(ctx context.Context) error {
ex.state.mu.Lock()
defer ex.state.mu.Unlock()

// Check that all errors are retryable. If any are not, return the
// first non-retryable error.
var retryErr *roachpb.HandledRetryableTxnError
for _, err := range errs {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
// Ignore retryable errors to previous incarnations of this transaction.
curTxn := ex.state.mu.txn.Proto()
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
retryErr = t
// Sort the errors according to their importance.
curTxn := ex.state.mu.txn.Proto()
sort.Slice(errs, func(i, j int) bool {
errPriority := func(err error) int {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
// A retryable error for the current transaction
// incarnation is given the highest priority.
return 1
}
return 2
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry.
return 3
default:
// Any other error. We sort these behind retryable errors
// and errors we know to be their symptoms because it is
// impossible to conclusively determine in all cases whether
// one of these errors is a symptom of a concurrent retry or
// not. If the error is a symptom then we want to ignore it.
// If it is not, we expect to see the same error during a
// transaction retry.
return 4
}
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry, ignore.
default:
return err
}
}
return errPriority(errs[i]) < errPriority(errs[j])
})

if retryErr == nil {
// Return the "best" error.
bestErr := errs[0]
switch bestErr.(type) {
case *roachpb.HandledRetryableTxnError:
// If any of the errors are retryable, we need to bump the transaction
// epoch to invalidate any writes performed by any workers after the
// retry updated the txn's proto but before we synchronized (some of
// these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
ex.state.mu.txn.Proto().BumpEpoch()
case *roachpb.TxnPrevAttemptError:
log.Fatalf(ctx, "found symptoms of a concurrent retry, but did "+
"not find the final retry error: %v", errs)
}

// If all errors are retryable, we return the one meant for the current
// incarnation of this transaction. Before doing so though, we need to bump
// the transaction epoch to invalidate any writes performed by any workers
// after the retry updated the txn's proto but before we synchronized (some
// of these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
ex.state.mu.txn.Proto().BumpEpoch()
return retryErr
return bestErr
}
return nil
}
Expand Down
68 changes: 41 additions & 27 deletions pkg/sql/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"regexp"
"sort"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -760,39 +761,52 @@ func (s *Session) synchronizeParallelStmts(ctx context.Context) error {
s.TxnState.mu.Lock()
defer s.TxnState.mu.Unlock()

// Check that all errors are retryable. If any are not, return the
// first non-retryable error.
var retryErr *roachpb.HandledRetryableTxnError
for _, err := range errs {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
// Ignore retryable errors to previous incarnations of this transaction.
curTxn := s.TxnState.mu.txn.Proto()
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
retryErr = t
// Sort the errors according to their importance.
curTxn := s.TxnState.mu.txn.Proto()
sort.Slice(errs, func(i, j int) bool {
errPriority := func(err error) int {
switch t := err.(type) {
case *roachpb.HandledRetryableTxnError:
errTxn := t.Transaction
if errTxn.ID == curTxn.ID && errTxn.Epoch == curTxn.Epoch {
// A retryable error for the current transaction
// incarnation is given the highest priority.
return 1
}
return 2
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry.
return 3
default:
// Any other error. We sort these behind retryable errors
// and errors we know to be their symptoms because it is
// impossible to conclusively determine in all cases whether
// one of these errors is a symptom of a concurrent retry or
// not. If the error is a symptom then we want to ignore it.
// If it is not, we expect to see the same error during a
// transaction retry.
return 4
}
case *roachpb.TxnPrevAttemptError:
// Symptom of concurrent retry, ignore.
default:
return err
}
}
return errPriority(errs[i]) < errPriority(errs[j])
})

if retryErr == nil {
// Return the "best" error.
bestErr := errs[0]
switch bestErr.(type) {
case *roachpb.HandledRetryableTxnError:
// If any of the errors are retryable, we need to bump the transaction
// epoch to invalidate any writes performed by any workers after the
// retry updated the txn's proto but before we synchronized (some of
// these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
s.TxnState.mu.txn.Proto().BumpEpoch()
case *roachpb.TxnPrevAttemptError:
log.Fatalf(ctx, "found symptoms of a concurrent retry, but did "+
"not find the final retry error: %v", errs)
}

// If all errors are retryable, we return the one meant for the current
// incarnation of this transaction. Before doing so though, we need to bump
// the transaction epoch to invalidate any writes performed by any workers
// after the retry updated the txn's proto but before we synchronized (some
// of these writes might have been performed at the wrong epoch). Note
// that we don't need to lock the client.Txn because we're synchronized.
// See #17197.
s.TxnState.mu.txn.Proto().BumpEpoch()
return retryErr
return bestErr
}
return nil
}
Expand Down

0 comments on commit 3692794

Please sign in to comment.