diff --git a/jobsdb/integration_test.go b/jobsdb/integration_test.go index a20798dd47..d19a43683d 100644 --- a/jobsdb/integration_test.go +++ b/jobsdb/integration_test.go @@ -713,6 +713,7 @@ func TestJobsDB(t *testing.T) { // #numFailedJobs+#numSucceededJobs - #numTotalJobs jobs are unprocessed - should be migrated ) require.NoError(t, jobDB.Store(context.Background(), jobs)) + require.NoError( t, jobDB.UpdateJobStatus( @@ -723,6 +724,11 @@ func TestJobsDB(t *testing.T) { ), ) + unprocessedBeforeMigration, err := jobDB.GetUnprocessed(context.Background(), GetQueryParamsT{JobsLimit: 100}) + require.NoError(t, err) + failedBeforeMigration, err := jobDB.GetToRetry(context.Background(), GetQueryParamsT{JobsLimit: 100}) + require.NoError(t, err) + require.EqualValues(t, 1, jobDB.GetMaxDSIndex()) time.Sleep(time.Second * 2) // wait for some time to pass triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run @@ -748,11 +754,22 @@ func TestJobsDB(t *testing.T) { require.NoError( t, jobDB.dbHandle.QueryRow( - fmt.Sprintf(`SELECT COUNT(*) FROM %s`, tablePrefix+`_jobs_`+dsIndicesList[0]), + fmt.Sprintf(`SELECT COUNT(*) FROM %s`, tablePrefix+`_jobs_1_1`), ).Scan(&numJobs), ) require.Equal(t, numFailedJobs+numUnprocessedJobs, int(numJobs)) + // verify job statuses + var numJobstatuses, maxJobStatusID, nextSeqVal int64 + require.NoError( + t, + jobDB.dbHandle.QueryRow( + fmt.Sprintf(`SELECT COUNT(*), MAX(id), nextval('%[1]s_id_seq') FROM %[1]s`, tablePrefix+`_job_status_1_1`), + ).Scan(&numJobstatuses, &maxJobStatusID, &nextSeqVal), + ) + require.Equal(t, numFailedJobs, int(numJobstatuses)) + require.Greater(t, nextSeqVal, maxJobStatusID) + // verify that unprocessed jobs are migrated to new DS unprocessedResult, err := jobDB.GetUnprocessed(context.Background(), GetQueryParamsT{ CustomValFilters: []string{customVal}, @@ -760,16 +777,8 @@ func TestJobsDB(t *testing.T) { ParameterFilters: []ParameterFilterT{}, }) require.NoError(t, err, "GetUnprocessed failed") - require.Equal(t, numUnprocessedJobs, len(unprocessedResult.Jobs)) - expectedUnprocessedJobIDs := make([]int64, 0) - for _, job := range jobs[numFailedJobs+numSucceededJobs:] { - expectedUnprocessedJobIDs = append(expectedUnprocessedJobIDs, job.JobID) - } - actualUnprocessedJobIDs := make([]int64, 0) - for _, job := range unprocessedResult.Jobs { - actualUnprocessedJobIDs = append(actualUnprocessedJobIDs, job.JobID) - } - require.Equal(t, expectedUnprocessedJobIDs, actualUnprocessedJobIDs) + require.Len(t, unprocessedResult.Jobs, numUnprocessedJobs) + require.EqualValues(t, unprocessedBeforeMigration.Jobs, unprocessedResult.Jobs) // verifying that failed jobs are migrated to new DS failedResult, err := jobDB.GetToRetry(context.Background(), GetQueryParamsT{ @@ -778,16 +787,8 @@ func TestJobsDB(t *testing.T) { ParameterFilters: []ParameterFilterT{}, }) require.NoError(t, err, "GetToRetry failed") - expectedFailedJobIDs := make([]int64, 0) - for _, job := range jobs[:numFailedJobs] { - expectedFailedJobIDs = append(expectedFailedJobIDs, job.JobID) - } - actualFailedJobIDs := make([]int64, 0) - for _, job := range failedResult.Jobs { - actualFailedJobIDs = append(actualFailedJobIDs, job.JobID) - } - require.Equal(t, numFailedJobs, len(failedResult.Jobs)) - require.Equal(t, expectedFailedJobIDs, actualFailedJobIDs) + require.Len(t, failedResult.Jobs, numFailedJobs) + require.EqualValues(t, failedBeforeMigration.Jobs, failedResult.Jobs) }) } diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 06fc66b5a3..d297ddf575 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -1775,12 +1775,14 @@ func (jd *HandleT) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS da ), inserted_jobs as ( - insert into %[3]q (select j.* from %[2]q j left join last_status js on js.job_id = j.job_id + insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at) + (select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, j.event_payload, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id where js.job_id is null or js.job_state = ANY('{%[5]s}') order by j.job_id) returning job_id ), insertedStatuses as ( - insert into %[4]q (select * from last_status where job_state = ANY('{%[5]s}')) + insert into %[4]q (job_id, job_state, attempt, exec_time, retry_time, error_code, error_response, parameters) + (select job_id, job_state, attempt, exec_time, retry_time, error_code, error_response, parameters from last_status where job_state = ANY('{%[5]s}')) ) select count(*) from inserted_jobs;`, srcDS.JobStatusTable, @@ -1791,11 +1793,13 @@ func (jd *HandleT) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS da ) var numJobsMigrated int64 - err := tx.QueryRowContext( + if err := tx.QueryRowContext( ctx, compactDSQuery, - ).Scan(&numJobsMigrated) - if err != nil { + ).Scan(&numJobsMigrated); err != nil { + return 0, err + } + if _, err := tx.Exec(fmt.Sprintf(`ANALYZE %q, %q`, destDS.JobTable, destDS.JobStatusTable)); err != nil { return 0, err } return int(numJobsMigrated), nil