diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 3de01f8005be..c46754380672 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -381,7 +381,7 @@ func runBackupProcessor( if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { if backupKnobs.RunAfterExportingSpanEntry != nil { - backupKnobs.RunAfterExportingSpanEntry(ctx) + backupKnobs.RunAfterExportingSpanEntry(ctx, res) } } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 9b3c3bfc2745..9b3aacabcb46 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1482,23 +1482,31 @@ func checkInProgressBackupRestore( t testing.TB, checkBackup inProgressChecker, checkRestore inProgressChecker, ) { var allowResponse chan struct{} + var exportSpanCompleteCh chan struct{} params := base.TestClusterArgs{} knobs := base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{ BackupRestoreTestingKnobs: &sql.BackupRestoreTestingKnobs{ - RunAfterExportingSpanEntry: func(_ context.Context) { + RunAfterExportingSpanEntry: func(_ context.Context, res *roachpb.ExportResponse) { <-allowResponse + // If ResumeSpan is set to nil, it means that we have completed + // exporting a span and the job will update its fraction progressed. + if res.ResumeSpan == nil { + <-exportSpanCompleteCh + } }, RunAfterProcessingRestoreSpanEntry: func(_ context.Context) { <-allowResponse }, }}, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } params.ServerArgs.Knobs = knobs - const numAccounts = 1000 + const numAccounts = 100 - ctx, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts, InitManualReplication, params) + ctx, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, MultiNode, numAccounts, + InitManualReplication, params) conn := sqlDB.DB.(*gosql.DB) defer cleanup() @@ -1508,6 +1516,21 @@ func checkInProgressBackupRestore( sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '1'`) sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1'`) + // Ensure that each node has at least one leaseholder. (These splits were + // made in BackupRestoreTestSetup.) These are wrapped with SucceedsSoon() + // because EXPERIMENTAL_RELOCATE can fail if there are other replication + // changes happening. + for _, stmt := range []string{ + `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 0)`, + `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 30)`, + `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[3], 80)`, + } { + testutils.SucceedsSoon(t, func() error { + _, err := sqlDB.DB.ExecContext(ctx, stmt) + return err + }) + } + var totalExpectedBackupRequests int // mergedRangeQuery calculates the number of spans we expect PartitionSpans to // produce. It merges contiguous ranges on the same node. @@ -1543,6 +1566,7 @@ WHERE var totalExpectedResponses int if strings.Contains(query, "BACKUP") { + exportSpanCompleteCh = make(chan struct{}) // totalExpectedBackupRequests takes into account the merging that backup // does of co-located ranges. It is the expected number of ExportRequests // backup issues. DistSender will still split those requests to different @@ -1570,6 +1594,12 @@ WHERE allowResponse <- struct{}{} } + // Due to ExportRequest pagination, in the case of backup, we want to wait + // until an entire span has been exported before checking job progress. + if strings.Contains(query, "BACKUP") { + exportSpanCompleteCh <- struct{}{} + close(exportSpanCompleteCh) + } err := retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { return check(ctx, inProgressState{ DB: conn, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index bc0e91cda231..e45b043285de 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1217,7 +1217,7 @@ type BackupRestoreTestingKnobs struct { // RunAfterExportingSpanEntry allows blocking the BACKUP job after a single // span has been exported. - RunAfterExportingSpanEntry func(ctx context.Context) + RunAfterExportingSpanEntry func(ctx context.Context, response *roachpb.ExportResponse) } var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}