From ad0d6ffcefe0517543adc87b28f90fd0c83bb5e6 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 23 Apr 2021 16:52:25 +0100 Subject: [PATCH 1/2] importccl: add more errors to transient retryable error list This change adds a few more errors that were exposed when stressing a previously skipped test, that can occur when a worker node running the import is shutdown. These errors should not cause the job to fail but instead trigger a replan. I ran 500+ iterations under stress of TestImportWorkerFailure. Release note: None --- pkg/ccl/importccl/import_stmt.go | 7 ++++--- pkg/ccl/importccl/import_stmt_test.go | 17 ++++++----------- pkg/ccl/utilccl/errors.go | 21 ++++++++++++++++++++- pkg/sql/flowinfra/flow_registry.go | 8 ++++++++ 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index b5ec45e6bc19..b1a521c45c72 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -2136,8 +2136,9 @@ func ingestWithRetry( MaxRetries: 5, } - // We want to retry a restore if there are transient failures (i.e. worker nodes - // dying), so if we receive a retryable error, re-plan and retry the backup. + // We want to retry an import if there are transient failures (i.e. worker + // nodes dying), so if we receive a retryable error, re-plan and retry the + // import. var res roachpb.BulkOpSummary var err error for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { @@ -2146,7 +2147,7 @@ func ingestWithRetry( break } - if !utilccl.IsDistSQLRetryableError(err) { + if utilccl.IsPermanentBulkJobError(err) { return roachpb.BulkOpSummary{}, err } diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 640905c5171a..00f651aa5d35 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -4887,17 +4887,12 @@ func TestImportControlJobRBAC(t *testing.T) { } } -// TestImportWorkerFailure tests that IMPORT can restart after the failure -// of a worker node. +// TestImportWorkerFailure tests that IMPORT retries after the failure of a +// worker node. func TestImportWorkerFailure(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // TODO(mjibson): Although this test passes most of the time it still - // sometimes fails because not all kinds of failures caused by shutting a - // node down are detected and retried. - skip.WithIssue(t, 51793, "flaky due to undetected kinds of failures when the node is shutdown") - defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)() allowResponse := make(chan struct{}) @@ -4940,13 +4935,13 @@ func TestImportWorkerFailure(t *testing.T) { var jobID jobspb.JobID sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID) - // Shut down a node. This should force LoadCSV to fail in its current - // execution. It should detect this as a context canceled error. + // Shut down a node. tc.StopServer(1) close(allowResponse) - // We expect the statement to fail. - if err := <-errCh; !testutils.IsError(err, "node failure") { + // We expect the statement to retry since it should have encountered a + // retryable error. + if err := <-errCh; err != nil { t.Fatal(err) } diff --git a/pkg/ccl/utilccl/errors.go b/pkg/ccl/utilccl/errors.go index 46ad5d387991..132819cc253a 100644 --- a/pkg/ccl/utilccl/errors.go +++ b/pkg/ccl/utilccl/errors.go @@ -8,7 +8,12 @@ package utilccl -import "strings" +import ( + "strings" + + "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" +) // IsDistSQLRetryableError returns true if the supplied error, or any of its parent // causes is an rpc error. @@ -28,3 +33,17 @@ func IsDistSQLRetryableError(err error) bool { // `(*DistSQLPlanner).Run`. return strings.Contains(errStr, `rpc error`) } + +// IsPermanentBulkJobError returns true if the error results in a permanent +// failure of a bulk job (IMPORT, BACKUP, RESTORE). This function is a allowlist +// instead of a blocklist: only known safe errors are confirmed to not be +// permanent errors. Anything unknown is assumed to be permanent. +func IsPermanentBulkJobError(err error) bool { + if err == nil { + return false + } + + return !IsDistSQLRetryableError(err) && + !grpcutil.IsClosedConnection(err) && + !flowinfra.IsNoInboundStreamConnectionError(err) +} diff --git a/pkg/sql/flowinfra/flow_registry.go b/pkg/sql/flowinfra/flow_registry.go index 5096dda017ea..c706d1f41feb 100644 --- a/pkg/sql/flowinfra/flow_registry.go +++ b/pkg/sql/flowinfra/flow_registry.go @@ -28,8 +28,16 @@ import ( "github.com/cockroachdb/redact" ) +// errNoInboundStreamConnection is the error propagated through the flow when +// the timeout to setup the flow is exceeded. var errNoInboundStreamConnection = errors.New("no inbound stream connection") +// IsNoInboundStreamConnectionError returns true if err's Cause is an +// errNoInboundStreamConnection. +func IsNoInboundStreamConnectionError(err error) bool { + return errors.Is(err, errNoInboundStreamConnection) +} + // SettingFlowStreamTimeout is a cluster setting that sets the default flow // stream timeout. var SettingFlowStreamTimeout = settings.RegisterDurationSetting( From b657f9b207323f88dce543c03e6fce80cf94d9b5 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 23 Apr 2021 17:08:14 +0100 Subject: [PATCH 2/2] backupccl: add test that shuts down worker during BACKUP This change adds a unit test that shuts down a worker to test that the backup job retries when it sees certain retryable errors. This test uncovered a situation in which a backup data processor could be moved to state other than StatusRunning, that caused it to exit early from the Next() method. The goroutines responsible for exporting the spans get stuck attempting to push to the progCh that is only drained after the above StatusRunning check. This change drains the progCh at the beginning of the Next() method. Release note: None --- pkg/ccl/backupccl/backup_job.go | 4 +- pkg/ccl/backupccl/backup_processor.go | 6 ++- pkg/ccl/backupccl/backup_test.go | 60 ++++++++++++++++++++++++++ pkg/ccl/utilccl/BUILD.bazel | 3 ++ pkg/ccl/utilccl/errors.go | 4 +- pkg/kv/kvclient/kvcoord/dist_sender.go | 5 +++ 6 files changed, 78 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 78da731e2f4d..c644886e2706 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -472,7 +472,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { break } - if !utilccl.IsDistSQLRetryableError(err) { + if utilccl.IsPermanentBulkJobError(err) { return errors.Wrap(err, "failed to run backup") } @@ -483,7 +483,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { var reloadBackupErr error backupManifest, reloadBackupErr = b.readManifestOnResume(ctx, p.ExecCfg(), defaultStore, details) if reloadBackupErr != nil { - log.Warning(ctx, "could not reload backup manifest when retrying, continuing with old progress") + return errors.Wrap(reloadBackupErr, "could not reload backup manifest when retrying") } } if err != nil { diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 66c53d68f0cd..b4ecda9a6a7f 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -370,7 +370,11 @@ func runBackupProcessor( return err } prog.ProgressDetails = *details - progCh <- prog + select { + case <-ctx.Done(): + return ctx.Err() + case progCh <- prog: + } } else { // Update the partial progress as we still have a resumeSpan to // process. diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index a28c044716f8..057b31a0b178 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -8150,3 +8150,63 @@ func TestBackupOnlyPublicIndexes(t *testing.T) { sqlDB.Exec(t, `DROP DATABASE restoredb CASCADE;`) } } + +func TestBackupWorkerFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderStress(t, "under stress the test unexpectedly surfaces non-retryable errors on"+ + " backup failure") + + defer jobs.TestingSetAdoptAndCancelIntervals(10*time.Millisecond, 10*time.Millisecond)() + + allowResponse := make(chan struct{}) + params := base.TestClusterArgs{} + params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse), + } + + const numAccounts = 100 + + _, tc, _, _, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts, + InitManualReplication, params) + conn := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(conn) + defer cleanup() + + var expectedCount int + sqlDB.QueryRow(t, `SELECT count(*) FROM data.bank`).Scan(&expectedCount) + query := `BACKUP DATABASE data TO 'userfile:///worker-failure'` + errCh := make(chan error) + go func() { + _, err := conn.Exec(query) + errCh <- err + }() + select { + case allowResponse <- struct{}{}: + case err := <-errCh: + t.Fatalf("%s: query returned before expected: %s", err, query) + } + var jobID jobspb.JobID + sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID) + + // Shut down a node. + tc.StopServer(1) + + close(allowResponse) + // We expect the statement to retry since it should have encountered a + // retryable error. + if err := <-errCh; err != nil { + t.Fatal(err) + } + + // But the job should be restarted and succeed eventually. + jobutils.WaitForJob(t, sqlDB, jobID) + + // Drop database and restore to ensure that the backup was successful. + sqlDB.Exec(t, `DROP DATABASE data`) + sqlDB.Exec(t, `RESTORE DATABASE data FROM 'userfile:///worker-failure'`) + var actualCount int + sqlDB.QueryRow(t, `SELECT count(*) FROM data.bank`).Scan(&actualCount) + require.Equal(t, expectedCount, actualCount) +} diff --git a/pkg/ccl/utilccl/BUILD.bazel b/pkg/ccl/utilccl/BUILD.bazel index f977d1ef5d3f..d6623227dacd 100644 --- a/pkg/ccl/utilccl/BUILD.bazel +++ b/pkg/ccl/utilccl/BUILD.bazel @@ -12,12 +12,15 @@ go_library( deps = [ "//pkg/base", "//pkg/ccl/utilccl/licenseccl", + "//pkg/kv/kvclient/kvcoord", "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog/colinfo", + "//pkg/sql/flowinfra", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/types", + "//pkg/util/grpcutil", "//pkg/util/timeutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ccl/utilccl/errors.go b/pkg/ccl/utilccl/errors.go index 132819cc253a..c4ab54eb23f1 100644 --- a/pkg/ccl/utilccl/errors.go +++ b/pkg/ccl/utilccl/errors.go @@ -11,6 +11,7 @@ package utilccl import ( "strings" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" ) @@ -45,5 +46,6 @@ func IsPermanentBulkJobError(err error) bool { return !IsDistSQLRetryableError(err) && !grpcutil.IsClosedConnection(err) && - !flowinfra.IsNoInboundStreamConnectionError(err) + !flowinfra.IsNoInboundStreamConnectionError(err) && + !kvcoord.IsSendError(err) } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index b9c0286ab617..7344ed787bdd 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2129,3 +2129,8 @@ func newSendError(msg string) error { func (s sendError) Error() string { return "failed to send RPC: " + s.message } + +// IsSendError returns true if err is a sendError. +func IsSendError(err error) bool { + return errors.HasType(err, sendError{}) +}