diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 78da731e2f4d..c644886e2706 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -472,7 +472,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { break } - if !utilccl.IsDistSQLRetryableError(err) { + if utilccl.IsPermanentBulkJobError(err) { return errors.Wrap(err, "failed to run backup") } @@ -483,7 +483,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { var reloadBackupErr error backupManifest, reloadBackupErr = b.readManifestOnResume(ctx, p.ExecCfg(), defaultStore, details) if reloadBackupErr != nil { - log.Warning(ctx, "could not reload backup manifest when retrying, continuing with old progress") + return errors.Wrap(reloadBackupErr, "could not reload backup manifest when retrying") } } if err != nil { diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 66c53d68f0cd..b4ecda9a6a7f 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -370,7 +370,11 @@ func runBackupProcessor( return err } prog.ProgressDetails = *details - progCh <- prog + select { + case <-ctx.Done(): + return ctx.Err() + case progCh <- prog: + } } else { // Update the partial progress as we still have a resumeSpan to // process. diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index a28c044716f8..057b31a0b178 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -8150,3 +8150,63 @@ func TestBackupOnlyPublicIndexes(t *testing.T) { sqlDB.Exec(t, `DROP DATABASE restoredb CASCADE;`) } } + +func TestBackupWorkerFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderStress(t, "under stress the test unexpectedly surfaces non-retryable errors on"+ + " backup failure") + + defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)() + + allowResponse := make(chan struct{}) + params := base.TestClusterArgs{} + params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse), + } + + const numAccounts = 100 + + _, tc, _, _, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts, + InitManualReplication, params) + conn := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(conn) + defer cleanup() + + var expectedCount int + sqlDB.QueryRow(t, `SELECT count(*) FROM data.bank`).Scan(&expectedCount) + query := `BACKUP DATABASE data TO 'userfile:///worker-failure'` + errCh := make(chan error) + go func() { + _, err := conn.Exec(query) + errCh <- err + }() + select { + case allowResponse <- struct{}{}: + case err := <-errCh: + t.Fatalf("%s: query returned before expected: %s", err, query) + } + var jobID jobspb.JobID + sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID) + + // Shut down a node. + tc.StopServer(1) + + close(allowResponse) + // We expect the statement to retry since it should have encountered a + // retryable error. + if err := <-errCh; err != nil { + t.Fatal(err) + } + + // But the job should be restarted and succeed eventually. + jobutils.WaitForJob(t, sqlDB, jobID) + + // Drop database and restore to ensure that the backup was successful. + sqlDB.Exec(t, `DROP DATABASE data`) + sqlDB.Exec(t, `RESTORE DATABASE data FROM 'userfile:///worker-failure'`) + var actualCount int + sqlDB.QueryRow(t, `SELECT count(*) FROM data.bank`).Scan(&actualCount) + require.Equal(t, expectedCount, actualCount) +} diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index b5ec45e6bc19..b1a521c45c72 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -2136,8 +2136,9 @@ func ingestWithRetry( MaxRetries: 5, } - // We want to retry a restore if there are transient failures (i.e. worker nodes - // dying), so if we receive a retryable error, re-plan and retry the backup. + // We want to retry an import if there are transient failures (i.e. worker + // nodes dying), so if we receive a retryable error, re-plan and retry the + // import. var res roachpb.BulkOpSummary var err error for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { @@ -2146,7 +2147,7 @@ func ingestWithRetry( break } - if !utilccl.IsDistSQLRetryableError(err) { + if utilccl.IsPermanentBulkJobError(err) { return roachpb.BulkOpSummary{}, err } diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 640905c5171a..00f651aa5d35 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -4887,17 +4887,12 @@ func TestImportControlJobRBAC(t *testing.T) { } } -// TestImportWorkerFailure tests that IMPORT can restart after the failure -// of a worker node. +// TestImportWorkerFailure tests that IMPORT retries after the failure of a +// worker node. func TestImportWorkerFailure(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TODO(mjibson): Although this test passes most of the time it still - // sometimes fails because not all kinds of failures caused by shutting a - // node down are detected and retried. - skip.WithIssue(t, 51793, "flaky due to undetected kinds of failures when the node is shutdown") - defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)() allowResponse := make(chan struct{}) @@ -4940,13 +4935,13 @@ func TestImportWorkerFailure(t *testing.T) { var jobID jobspb.JobID sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID) - // Shut down a node. This should force LoadCSV to fail in its current - // execution. It should detect this as a context canceled error. + // Shut down a node. tc.StopServer(1) close(allowResponse) - // We expect the statement to fail. - if err := <-errCh; !testutils.IsError(err, "node failure") { + // We expect the statement to retry since it should have encountered a + // retryable error. + if err := <-errCh; err != nil { t.Fatal(err) } diff --git a/pkg/ccl/utilccl/BUILD.bazel b/pkg/ccl/utilccl/BUILD.bazel index f977d1ef5d3f..d6623227dacd 100644 --- a/pkg/ccl/utilccl/BUILD.bazel +++ b/pkg/ccl/utilccl/BUILD.bazel @@ -12,12 +12,15 @@ go_library( deps = [ "//pkg/base", "//pkg/ccl/utilccl/licenseccl", + "//pkg/kv/kvclient/kvcoord", "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog/colinfo", + "//pkg/sql/flowinfra", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/types", + "//pkg/util/grpcutil", "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ccl/utilccl/errors.go b/pkg/ccl/utilccl/errors.go index 46ad5d387991..c4ab54eb23f1 100644 --- a/pkg/ccl/utilccl/errors.go +++ b/pkg/ccl/utilccl/errors.go @@ -8,7 +8,13 @@ package utilccl -import "strings" +import ( + "strings" + + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" +) // IsDistSQLRetryableError returns true if the supplied error, or any of its parent // causes is an rpc error. @@ -28,3 +34,18 @@ func IsDistSQLRetryableError(err error) bool { // `(*DistSQLPlanner).Run`. return strings.Contains(errStr, `rpc error`) } + +// IsPermanentBulkJobError returns true if the error results in a permanent +// failure of a bulk job (IMPORT, BACKUP, RESTORE). This function is a allowlist +// instead of a blocklist: only known safe errors are confirmed to not be +// permanent errors. Anything unknown is assumed to be permanent. +func IsPermanentBulkJobError(err error) bool { + if err == nil { + return false + } + + return !IsDistSQLRetryableError(err) && + !grpcutil.IsClosedConnection(err) && + !flowinfra.IsNoInboundStreamConnectionError(err) && + !kvcoord.IsSendError(err) +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index b9c0286ab617..7344ed787bdd 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2129,3 +2129,8 @@ func newSendError(msg string) error { func (s sendError) Error() string { return "failed to send RPC: " + s.message } + +// IsSendError returns true if err is a sendError. +func IsSendError(err error) bool { + return errors.HasType(err, sendError{}) +} diff --git a/pkg/sql/flowinfra/flow_registry.go b/pkg/sql/flowinfra/flow_registry.go index 5096dda017ea..c706d1f41feb 100644 --- a/pkg/sql/flowinfra/flow_registry.go +++ b/pkg/sql/flowinfra/flow_registry.go @@ -28,8 +28,16 @@ import ( "github.com/cockroachdb/redact" ) +// errNoInboundStreamConnection is the error propagated through the flow when +// the timeout to setup the flow is exceeded. var errNoInboundStreamConnection = errors.New("no inbound stream connection") +// IsNoInboundStreamConnectionError returns true if err's Cause is an +// errNoInboundStreamConnection. +func IsNoInboundStreamConnectionError(err error) bool { + return errors.Is(err, errNoInboundStreamConnection) +} + // SettingFlowStreamTimeout is a cluster setting that sets the default flow // stream timeout. var SettingFlowStreamTimeout = settings.RegisterDurationSetting(