Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
64136: importccl: add more errors to transient retryable error list r=pbardea a=adityamaru

This change adds a few more errors that were exposed when stressing
a previously skipped test, that can occur when a worker node running
the import is shutdown. These errors should not cause the job to fail
but instead trigger a replan.

I ran 500+ iterations under stress of TestImportWorkerFailure.

Release note: None

Co-authored-by: Aditya Maru <adityamaru@gmail.com>
  • Loading branch information
craig[bot] and adityamaru committed May 5, 2021
2 parents 3b8ec3a + b657f9b commit 782d320
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 18 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
break
}

if !utilccl.IsDistSQLRetryableError(err) {
if utilccl.IsPermanentBulkJobError(err) {
return errors.Wrap(err, "failed to run backup")
}

Expand All @@ -483,7 +483,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
var reloadBackupErr error
backupManifest, reloadBackupErr = b.readManifestOnResume(ctx, p.ExecCfg(), defaultStore, details)
if reloadBackupErr != nil {
log.Warning(ctx, "could not reload backup manifest when retrying, continuing with old progress")
return errors.Wrap(reloadBackupErr, "could not reload backup manifest when retrying")
}
}
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,11 @@ func runBackupProcessor(
return err
}
prog.ProgressDetails = *details
progCh <- prog
select {
case <-ctx.Done():
return ctx.Err()
case progCh <- prog:
}
} else {
// Update the partial progress as we still have a resumeSpan to
// process.
Expand Down
60 changes: 60 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8150,3 +8150,63 @@ func TestBackupOnlyPublicIndexes(t *testing.T) {
sqlDB.Exec(t, `DROP DATABASE restoredb CASCADE;`)
}
}

func TestBackupWorkerFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStress(t, "under stress the test unexpectedly surfaces non-retryable errors on"+
" backup failure")

defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

allowResponse := make(chan struct{})
params := base.TestClusterArgs{}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse),
}

const numAccounts = 100

_, tc, _, _, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts,
InitManualReplication, params)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)
defer cleanup()

var expectedCount int
sqlDB.QueryRow(t, `SELECT count(*) FROM data.bank`).Scan(&expectedCount)
query := `BACKUP DATABASE data TO 'userfile:///worker-failure'`
errCh := make(chan error)
go func() {
_, err := conn.Exec(query)
errCh <- err
}()
select {
case allowResponse <- struct{}{}:
case err := <-errCh:
t.Fatalf("%s: query returned before expected: %s", err, query)
}
var jobID jobspb.JobID
sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)

// Shut down a node.
tc.StopServer(1)

close(allowResponse)
// We expect the statement to retry since it should have encountered a
// retryable error.
if err := <-errCh; err != nil {
t.Fatal(err)
}

// But the job should be restarted and succeed eventually.
jobutils.WaitForJob(t, sqlDB, jobID)

// Drop database and restore to ensure that the backup was successful.
sqlDB.Exec(t, `DROP DATABASE data`)
sqlDB.Exec(t, `RESTORE DATABASE data FROM 'userfile:///worker-failure'`)
var actualCount int
sqlDB.QueryRow(t, `SELECT count(*) FROM data.bank`).Scan(&actualCount)
require.Equal(t, expectedCount, actualCount)
}
7 changes: 4 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2136,8 +2136,9 @@ func ingestWithRetry(
MaxRetries: 5,
}

// We want to retry a restore if there are transient failures (i.e. worker nodes
// dying), so if we receive a retryable error, re-plan and retry the backup.
// We want to retry an import if there are transient failures (i.e. worker
// nodes dying), so if we receive a retryable error, re-plan and retry the
// import.
var res roachpb.BulkOpSummary
var err error
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
Expand All @@ -2146,7 +2147,7 @@ func ingestWithRetry(
break
}

if !utilccl.IsDistSQLRetryableError(err) {
if utilccl.IsPermanentBulkJobError(err) {
return roachpb.BulkOpSummary{}, err
}

Expand Down
17 changes: 6 additions & 11 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4887,17 +4887,12 @@ func TestImportControlJobRBAC(t *testing.T) {
}
}

// TestImportWorkerFailure tests that IMPORT can restart after the failure
// of a worker node.
// TestImportWorkerFailure tests that IMPORT retries after the failure of a
// worker node.
func TestImportWorkerFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// TODO(mjibson): Although this test passes most of the time it still
// sometimes fails because not all kinds of failures caused by shutting a
// node down are detected and retried.
skip.WithIssue(t, 51793, "flaky due to undetected kinds of failures when the node is shutdown")

defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)()

allowResponse := make(chan struct{})
Expand Down Expand Up @@ -4940,13 +4935,13 @@ func TestImportWorkerFailure(t *testing.T) {
var jobID jobspb.JobID
sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)

// Shut down a node. This should force LoadCSV to fail in its current
// execution. It should detect this as a context canceled error.
// Shut down a node.
tc.StopServer(1)

close(allowResponse)
// We expect the statement to fail.
if err := <-errCh; !testutils.IsError(err, "node failure") {
// We expect the statement to retry since it should have encountered a
// retryable error.
if err := <-errCh; err != nil {
t.Fatal(err)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/utilccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ go_library(
deps = [
"//pkg/base",
"//pkg/ccl/utilccl/licenseccl",
"//pkg/kv/kvclient/kvcoord",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/flowinfra",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/types",
"//pkg/util/grpcutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
Expand Down
23 changes: 22 additions & 1 deletion pkg/ccl/utilccl/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@

package utilccl

import "strings"
import (
"strings"

"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
)

// IsDistSQLRetryableError returns true if the supplied error, or any of its parent
// causes is an rpc error.
Expand All @@ -28,3 +34,18 @@ func IsDistSQLRetryableError(err error) bool {
// `(*DistSQLPlanner).Run`.
return strings.Contains(errStr, `rpc error`)
}

// IsPermanentBulkJobError returns true if the error results in a permanent
// failure of a bulk job (IMPORT, BACKUP, RESTORE). This function is a allowlist
// instead of a blocklist: only known safe errors are confirmed to not be
// permanent errors. Anything unknown is assumed to be permanent.
func IsPermanentBulkJobError(err error) bool {
if err == nil {
return false
}

return !IsDistSQLRetryableError(err) &&
!grpcutil.IsClosedConnection(err) &&
!flowinfra.IsNoInboundStreamConnectionError(err) &&
!kvcoord.IsSendError(err)
}
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2129,3 +2129,8 @@ func newSendError(msg string) error {
func (s sendError) Error() string {
return "failed to send RPC: " + s.message
}

// IsSendError returns true if err is a sendError.
func IsSendError(err error) bool {
return errors.HasType(err, sendError{})
}
8 changes: 8 additions & 0 deletions pkg/sql/flowinfra/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ import (
"github.com/cockroachdb/redact"
)

// errNoInboundStreamConnection is the error propagated through the flow when
// the timeout to setup the flow is exceeded.
var errNoInboundStreamConnection = errors.New("no inbound stream connection")

// IsNoInboundStreamConnectionError returns true if err's Cause is an
// errNoInboundStreamConnection.
func IsNoInboundStreamConnectionError(err error) bool {
return errors.Is(err, errNoInboundStreamConnection)
}

// SettingFlowStreamTimeout is a cluster setting that sets the default flow
// stream timeout.
var SettingFlowStreamTimeout = settings.RegisterDurationSetting(
Expand Down

0 comments on commit 782d320

Please sign in to comment.