Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: deflake TestBackupRestoreSystemJobProgress #68961

Merged
merged 1 commit into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func runBackupProcessor(

if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok {
if backupKnobs.RunAfterExportingSpanEntry != nil {
backupKnobs.RunAfterExportingSpanEntry(ctx)
backupKnobs.RunAfterExportingSpanEntry(ctx, res)
}
}

Expand Down
36 changes: 33 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,23 +1482,31 @@ func checkInProgressBackupRestore(
t testing.TB, checkBackup inProgressChecker, checkRestore inProgressChecker,
) {
var allowResponse chan struct{}
var exportSpanCompleteCh chan struct{}
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{
RunAfterExportingSpanEntry: func(_ context.Context) {
RunAfterExportingSpanEntry: func(_ context.Context, res *roachpb.ExportResponse) {
<-allowResponse
// If ResumeSpan is set to nil, it means that we have completed
// exporting a span and the job will update its fraction progressed.
if res.ResumeSpan == nil {
<-exportSpanCompleteCh
}
},
RunAfterProcessingRestoreSpanEntry: func(_ context.Context) {
<-allowResponse
},
}},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
params.ServerArgs.Knobs = knobs

const numAccounts = 1000
const numAccounts = 100

ctx, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts, InitManualReplication, params)
ctx, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts,
InitManualReplication, params)
conn := sqlDB.DB.(*gosql.DB)
defer cleanup()

Expand All @@ -1508,6 +1516,21 @@ func checkInProgressBackupRestore(
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`)
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1'`)

// Ensure that each node has at least one leaseholder. (These splits were
// made in BackupRestoreTestSetup.) These are wrapped with SucceedsSoon()
// because EXPERIMENTAL_RELOCATE can fail if there are other replication
// changes happening.
for _, stmt := range []string{
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 0)`,
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 30)`,
`ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[3], 80)`,
} {
testutils.SucceedsSoon(t, func() error {
_, err := sqlDB.DB.ExecContext(ctx, stmt)
return err
})
}

var totalExpectedBackupRequests int
// mergedRangeQuery calculates the number of spans we expect PartitionSpans to
// produce. It merges contiguous ranges on the same node.
Expand Down Expand Up @@ -1543,6 +1566,7 @@ WHERE

var totalExpectedResponses int
if strings.Contains(query, "BACKUP") {
exportSpanCompleteCh = make(chan struct{})
// totalExpectedBackupRequests takes into account the merging that backup
// does of co-located ranges. It is the expected number of ExportRequests
// backup issues. DistSender will still split those requests to different
Expand Down Expand Up @@ -1570,6 +1594,12 @@ WHERE
allowResponse <- struct{}{}
}

// Due to ExportRequest pagination, in the case of backup, we want to wait
// until an entire span has been exported before checking job progress.
if strings.Contains(query, "BACKUP") {
exportSpanCompleteCh <- struct{}{}
close(exportSpanCompleteCh)
}
err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
return check(ctx, inProgressState{
DB: conn,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ type BackupRestoreTestingKnobs struct {

// RunAfterExportingSpanEntry allows blocking the BACKUP job after a single
// span has been exported.
RunAfterExportingSpanEntry func(ctx context.Context)
RunAfterExportingSpanEntry func(ctx context.Context, response *roachpb.ExportResponse)
}

var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}
Expand Down