Skip to content

Commit

Permalink
Merge #68899 #68961
Browse files Browse the repository at this point in the history
68899: utilccl: add ErrBreakerOpen to bulk retryable error r=pbardea a=adityamaru

In #68787
we saw a backup job failing on retry because of attempting
to plan a flow on a node that has been shutdown. This should
not usually happen since everytime we plan the flow we fetch
the nodes that can participate in the distsql flow.

In case we do encounter a `breaker open` error we can retry
hoping that the next time the flow is planned it doesn't
attempt to dial the dead node.

Release note: None

68961: backupccl: deflake TestBackupRestoreSystemJobProgress r=pbardea a=adityamaru

In the case of backup we only update the jobs fraction
progressed if we have exported a complete span. This
change adds some logic to wait until atleast one complete
span has been exported, before checking for a progress update.

Release note: None

Co-authored-by: Aditya Maru <adityamaru@gmail.com>
  • Loading branch information
craig[bot] and adityamaru committed Aug 16, 2021
3 parents becc3f9 + 9f11b43 + 4dcd54c commit dfe97af
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func runBackupProcessor(

if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
if backupKnobs.RunAfterExportingSpanEntry != nil {
backupKnobs.RunAfterExportingSpanEntry(ctx)
backupKnobs.RunAfterExportingSpanEntry(ctx, res)
}
}

Expand Down
36 changes: 33 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,23 +1482,31 @@ func checkInProgressBackupRestore(
t testing.TB, checkBackup inProgressChecker, checkRestore inProgressChecker,
) {
var allowResponse chan struct{}
var exportSpanCompleteCh chan struct{}
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterExportingSpanEntry: func(_ context.Context) {
RunAfterExportingSpanEntry: func(_ context.Context, res *roachpb.ExportResponse) {
<-allowResponse
// If ResumeSpan is set to nil, it means that we have completed
// exporting a span and the job will update its fraction progressed.
if res.ResumeSpan == nil {
<-exportSpanCompleteCh
}
},
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
<-allowResponse
},
}},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
params.ServerArgs.Knobs = knobs

const numAccounts = 1000
const numAccounts = 100

ctx, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts, InitManualReplication, params)
ctx, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts,
InitManualReplication, params)
conn := sqlDB.DB.(*gosql.DB)
defer cleanup()

Expand All @@ -1508,6 +1516,21 @@ func checkInProgressBackupRestore(
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1'`)

// Ensure that each node has at least one leaseholder. (These splits were
// made in BackupRestoreTestSetup.) These are wrapped with SucceedsSoon()
// because EXPERIMENTAL_RELOCATE can fail if there are other replication
// changes happening.
for _, stmt := range []string{
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 0)`,
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 30)`,
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[3], 80)`,
} {
testutils.SucceedsSoon(t, func() error {
_, err := sqlDB.DB.ExecContext(ctx, stmt)
return err
})
}

var totalExpectedBackupRequests int
// mergedRangeQuery calculates the number of spans we expect PartitionSpans to
// produce. It merges contiguous ranges on the same node.
Expand Down Expand Up @@ -1543,6 +1566,7 @@ WHERE

var totalExpectedResponses int
if strings.Contains(query, "BACKUP") {
exportSpanCompleteCh = make(chan struct{})
// totalExpectedBackupRequests takes into account the merging that backup
// does of co-located ranges. It is the expected number of ExportRequests
// backup issues. DistSender will still split those requests to different
Expand Down Expand Up @@ -1570,6 +1594,12 @@ WHERE
allowResponse <- struct{}{}
}

// Due to ExportRequest pagination, in the case of backup, we want to wait
// until an entire span has been exported before checking job progress.
if strings.Contains(query, "BACKUP") {
exportSpanCompleteCh <- struct{}{}
close(exportSpanCompleteCh)
}
err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
return check(ctx, inProgressState{
DB: conn,
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/utilccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/util/grpcutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/utilccl/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ package utilccl
import (
"strings"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/errors"
)

// IsDistSQLRetryableError returns true if the supplied error, or any of its parent
Expand All @@ -35,6 +37,11 @@ func IsDistSQLRetryableError(err error) bool {
return strings.Contains(errStr, `rpc error`)
}

// isBreakerOpenError returns true if err is a circuit.ErrBreakerOpen.
func isBreakerOpenError(err error) bool {
return errors.Is(err, circuit.ErrBreakerOpen)
}

// IsPermanentBulkJobError returns true if the error results in a permanent
// failure of a bulk job (IMPORT, BACKUP, RESTORE). This function is a allowlist
// instead of a blocklist: only known safe errors are confirmed to not be
Expand All @@ -47,5 +54,6 @@ func IsPermanentBulkJobError(err error) bool {
return !IsDistSQLRetryableError(err) &&
!grpcutil.IsClosedConnection(err) &&
!flowinfra.IsNoInboundStreamConnectionError(err) &&
!kvcoord.IsSendError(err)
!kvcoord.IsSendError(err) &&
!isBreakerOpenError(err)
}
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ type BackupRestoreTestingKnobs struct {

// RunAfterExportingSpanEntry allows blocking the BACKUP job after a single
// span has been exported.
RunAfterExportingSpanEntry func(ctx context.Context)
RunAfterExportingSpanEntry func(ctx context.Context, response *roachpb.ExportResponse)
}

var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}
Expand Down

0 comments on commit dfe97af

Please sign in to comment.