From a6038603810d332de11a7fe45f057ae394e85b2e Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Tue, 6 Jul 2021 12:21:53 -0500 Subject: [PATCH 1/9] WIP: Job history purging --- .../persistence/DefaultJobPersistence.java | 47 ++++++ .../scheduler/persistence/JobPersistence.java | 8 + .../DefaultJobPersistenceTest.java | 148 ++++++++++++++++++ 3 files changed, 203 insertions(+) diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 8c1085a6011c..9309219d0cd2 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -82,6 +82,10 @@ public class DefaultJobPersistence implements JobPersistence { + public static final int JOB_HISTORY_MINIMUM_AGE_IN_DAYS = 10; + public static final int JOB_HISTORY_MINIMUM_RECENCY = 10; + public static final int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = 100; + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJobPersistence.class); private static final JSONFormat DB_JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); protected static final String DEFAULT_SCHEMA = "public"; @@ -477,6 +481,49 @@ private List listTables(final String schema) throws IOException { } } + @Override + public void purgeJobHistory(LocalDateTime asOfDate) throws IOException { + // JENNY TODO: Figure out how to accommodate timezone for asOfDate. + final String JOB_HISTORY_PURGE_SQL = "delete from jobs where jobs.id in (" + + "select jobs.id \n" + + "from jobs \n" + + "left join (\n" + + " select scope, count(jobs.id) as jobCount from jobs group by scope\n" + + ") counts on jobs.scope = counts.scope \n" + + "where \n" + + " -- job must be at least MINIMUM_AGE_IN_DAYS old or connection has more than EXCESSIVE_NUMBER_OF_JOBS \n" + + " (jobs.created_at < (CAST(? as TIMESTAMP) - interval '" + JOB_HISTORY_MINIMUM_AGE_IN_DAYS + "' day) or counts.jobCount > ?) \n" + + "and jobs.id not in (\n" + + " -- cannot be the last job with saved state \n" + + " select max(job_id) as latest_job_id_with_state from (\n" + + " select jobs.scope, \n" + + " jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status, \n" + + " bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) as outputStateExists\n" + + " from jobs left join attempts on jobs.id = attempts.job_id \n" + + " group by scope, jobs.id\n" + + " having bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) = true\n" + + " order by scope, jobs.created_at desc, jobs.id desc\n" + + " ) jobs_with_state group by scope \n" + + ") and jobs.id not in (\n" + + " -- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope\n" + + " select job_id from (\n" + + " select jobs.scope, \n" + + " row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as recency,\n" + + " jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status\n" + + " from jobs \n" + + " group by scope, jobs.id\n" + + " order by scope, jobs.created_at desc, jobs.id desc\n" + + " ) jobs_by_recency \n" + + " where recency <= ? \n" + + "))"; + + // JENNY TODO: figure out error handling around this, messaging, etc. Plus test cases. + database.transaction(ctx -> ctx.execute(JOB_HISTORY_PURGE_SQL, asOfDate, + JOB_HISTORY_MINIMUM_AGE_IN_DAYS, + JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS, + JOB_HISTORY_MINIMUM_RECENCY)); + } + private Stream exportTable(final String schema, final String tableName) throws IOException { final Table tableSql = getTable(schema, tableName); try (final Stream records = database.query(ctx -> ctx.select(DSL.asterisk()).from(tableSql).fetchStream())) { diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index 5ed04b1ed4cb..e9050eab4d6c 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -31,6 +31,7 @@ import io.airbyte.scheduler.models.JobStatus; import java.io.IOException; import java.nio.file.Path; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.Optional; @@ -182,4 +183,11 @@ public interface JobPersistence { */ void importDatabase(String airbyteVersion, Map> data) throws IOException; + /** + * Purges job history while ensuring that the latest saved-state information is maintained. + * + * @throws IOException + */ + void purgeJobHistory(LocalDateTime asOfDate) throws IOException; + } diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index df1637de96de..b614de32c042 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -38,6 +38,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.text.Sqls; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobGetSpecConfig; @@ -58,6 +59,8 @@ import java.nio.file.Path; import java.sql.SQLException; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -1002,6 +1005,151 @@ void testResetJobCancelled() throws IOException { assertEquals(JobStatus.CANCELLED, updated.getStatus()); } + private Job persistJobForTesting(String scope, JobConfig jobConfig, JobStatus status, LocalDateTime runDate) throws IOException, SQLException { + Optional id = database.query( + ctx -> ctx.fetch( + "INSERT INTO jobs(config_type, scope, created_at, updated_at, status, config) " + + "SELECT CAST(? AS JOB_CONFIG_TYPE), ?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB) " + + "RETURNING id ", + Sqls.toSqlName(jobConfig.getConfigType()), + scope, + runDate, + runDate, + Sqls.toSqlName(status), + Jsons.serialize(jobConfig))) + .stream() + .findFirst() + .map(r -> r.getValue("id", Long.class)); + return jobPersistence.getJob(id.get()); + } + + private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDateTime runDate, boolean shouldHaveState) + throws IOException, SQLException { + String attemptOutputWithState = "{\n" + + " \"sync\": {\n" + + " \"state\": {\n" + + " \"state\": {\n" + + " \"bookmarks\": {" + + "}}}}}"; + String attemptOutputWithoutState = "{\n" + + " \"sync\": {\n" + + " \"output_catalog\": {" + + "}}}"; + Integer attemptNumber = database.query(ctx -> ctx.fetch( + "INSERT INTO attempts(job_id, attempt_number, log_path, status, created_at, updated_at, output) " + + "VALUES(?, ?, ?, CAST(? AS ATTEMPT_STATUS), ?, ?, CAST(? as JSONB)) RETURNING attempt_number", + job.getId(), + job.getAttemptsCount(), + logPath, + Sqls.toSqlName(AttemptStatus.FAILED), + runDate, + runDate, + shouldHaveState ? attemptOutputWithState : attemptOutputWithoutState) + .stream() + .findFirst() + .map(r -> r.get("attempt_number", Integer.class)) + .orElseThrow(() -> new RuntimeException("This should not happen"))); + return attemptNumber; + } + + @Test + @DisplayName("Should purge older job history but maintain certain more recent ones") + void testPurgeJobHistory() throws IOException, SQLException { + // a couple of connectors for testing against + final String SCOPE1 = UUID.randomUUID().toString(); + final String SCOPE2 = UUID.randomUUID().toString(); + + // Business Rules for purging a job: + // Job must be older than X days or its conn has excessive number of jobs + // Job cannot be one of the last 10 jobs on that conn (last 10 jobs are always kept). + // Job cannot be holding the most recent saved state (most recent saved state is always kept). + + // Test Scenarios + // 1. Goal: Purge jobs older than X days from connections with jobs X days old, except keep the most + // recent 10 and keep the latest with state. + // Against one connection/scope, + // - Setup: create a history of jobs that goes back X + 12 days (but produces no more than one job a + // day) + // - Setup: the most recent job with state in it should be at least X + 5 jobs back + // - Assert: ensure that after purging, there are X+1 jobs left (and at least 10), including the one + // with the most recent state. + // - Assert: ensure that after purging, there are X+1 jobs left (and at least 10), including the X + // most recent + // - Assert: ensure that after purging, all other job history has been deleted. + + // Scenario 1 setup + LocalDateTime fakeNow = LocalDateTime.of(2021, 6, 20, 0, 0); + List allJobs = new ArrayList<>(); + for (int i = 0; i < (DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS + 12); i++) { + allJobs.add(persistJobForTesting(SCOPE1, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); + } + // At least one job should have state, and it should be older than the recency limit. + Job lastJobWithState = allJobs.get(DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY + 1); + int attemptNumber = persistAttemptForJobHistoryTesting(lastJobWithState, LOG_PATH.toString(), + LocalDateTime.ofEpochSecond(lastJobWithState.getCreatedAtInSecond(), 0, ZoneOffset.UTC), true); + lastJobWithState = jobPersistence.getJob(lastJobWithState.getId()); // reloads with attempts + // sanity check + assertTrue(lastJobWithState.getAttempts().get(0).getOutput() != null); + + // ----- remove me after troubleshooting ------ + String sql1 = "select count(*) as rowcount from jobs where (jobs.created_at < (CAST('2021-06-20' as TIMESTAMP) - interval '15' day) ) "; + String sql2 = "select max(job_id) as latest_job_id_with_state from (\n" + + " select jobs.scope, \n" + + " jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status, \n" + + " bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) as outputStateExists\n" + + " from jobs left join attempts on jobs.id = attempts.job_id \n" + + " group by scope, jobs.id\n" + + " having bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) = true\n" + + " order by scope, jobs.created_at desc, jobs.id desc\n" + + " ) jobs_with_state group by scope \n"; + String sql3 = " select job_id from (\n" + + " select jobs.scope, \n" + + " row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as recency,\n" + + " jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status\n" + + " from jobs \n" + + " group by scope, jobs.id\n" + + " order by scope, jobs.created_at desc, jobs.id desc\n" + + " ) jobs_by_recency \n" + + " where recency <= '10' \n"; + List ids = database.query(ctx -> ctx.fetch(sql3).getValues("job_id", Integer.class)); + // TODO: JENNY on TUESDAY. It's weird that each piece of the sql query correctly checks out as to + // which ids it returns, yet when run together as a delete, it doesn't actually delete anything, and + // the jobs returned after the purge are the same ones as were there before purging. Maybe there is + // something funky happening with transactional commits not occuring? + // ----- remove me after troubleshooting ------ + + // Scenario 1 run purge + List beforePurge = jobPersistence.listJobs(ConfigType.SYNC, SCOPE1, 999, 0); + jobPersistence.purgeJobHistory(fakeNow); + List afterPurge = jobPersistence.listJobs(ConfigType.SYNC, SCOPE1, 999, 0); + + // Scenario 1 test - contains most-recent plus saved-state, and no more than that + assertEquals(afterPurge.size(), DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS + 1); + // Scenario 1 test - most-recent are actually the most recent + for (int i = 0; i < (DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS); i++) { + assert (afterPurge.get(i).equals(allJobs.get(i))); + } + // Scenario 1 test - job with state is kept despite being older + assert (afterPurge.get(DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS + 1).equals(lastJobWithState)); + + // TODO take this out, just here for reference for now. + int minAge = DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS; + int minRecency = DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY; + int tooManyJobs = DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS; + + // 2. Goal: Purge jobs from connections with a large number of jobs, except keep the most recent 10 + // and keep the latest with state. + // Against one connection/scope, + // - Setup: create a history of jobs that goes back only 3 days (but includes at least 200 jobs) + // - Setup: the most recent job with state in it should be at least 15 jobs back + // - Assert: ensure that after purging, there are 11 jobs left, including the one with the most + // recent state. + // - Assert: ensure that after purging, there are 11 jobs left, including the 10 most recent + // - Assert: ensure that after purging, all other job history has been deleted. + // + + } + } } From a03953ed5d283f53d24dd39cb193604e18d601d1 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Wed, 7 Jul 2021 17:28:04 -0500 Subject: [PATCH 2/9] Created test cases that handle variations of job history purging configuration --- .../persistence/DefaultJobPersistence.java | 27 +-- .../DefaultJobPersistenceTest.java | 165 ++++++++++-------- 2 files changed, 102 insertions(+), 90 deletions(-) diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 9309219d0cd2..a328cab0b442 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -82,9 +82,10 @@ public class DefaultJobPersistence implements JobPersistence { - public static final int JOB_HISTORY_MINIMUM_AGE_IN_DAYS = 10; - public static final int JOB_HISTORY_MINIMUM_RECENCY = 10; - public static final int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = 100; + // not final because job history test case manipulates these. + public static int JOB_HISTORY_MINIMUM_AGE_IN_DAYS = 15; + public static int JOB_HISTORY_MINIMUM_RECENCY = 5; + public static int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = 10; private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJobPersistence.class); private static final JSONFormat DB_JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); @@ -492,7 +493,8 @@ public void purgeJobHistory(LocalDateTime asOfDate) throws IOException { + ") counts on jobs.scope = counts.scope \n" + "where \n" + " -- job must be at least MINIMUM_AGE_IN_DAYS old or connection has more than EXCESSIVE_NUMBER_OF_JOBS \n" - + " (jobs.created_at < (CAST(? as TIMESTAMP) - interval '" + JOB_HISTORY_MINIMUM_AGE_IN_DAYS + "' day) or counts.jobCount > ?) \n" + + " (jobs.created_at < (TO_TIMESTAMP(?, 'YYYY-MM-DD') - interval '" + (JOB_HISTORY_MINIMUM_AGE_IN_DAYS - 1) + + "' day) or counts.jobCount > ?) \n" + "and jobs.id not in (\n" + " -- cannot be the last job with saved state \n" + " select max(job_id) as latest_job_id_with_state from (\n" @@ -503,23 +505,22 @@ public void purgeJobHistory(LocalDateTime asOfDate) throws IOException { + " group by scope, jobs.id\n" + " having bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) = true\n" + " order by scope, jobs.created_at desc, jobs.id desc\n" - + " ) jobs_with_state group by scope \n" + + " ) jobs_with_state group by scope\n " + ") and jobs.id not in (\n" + " -- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope\n" - + " select job_id from (\n" - + " select jobs.scope, \n" - + " row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as recency,\n" - + " jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status\n" - + " from jobs \n" + + " select id from (\n" + + " select jobs.scope, jobs.id, jobs.created_at,\n" + + " row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as recency\n" + + " from jobs\n" + " group by scope, jobs.id\n" + " order by scope, jobs.created_at desc, jobs.id desc\n" + " ) jobs_by_recency \n" - + " where recency <= ? \n" + + " where recency <= ?\n" + "))"; // JENNY TODO: figure out error handling around this, messaging, etc. Plus test cases. - database.transaction(ctx -> ctx.execute(JOB_HISTORY_PURGE_SQL, asOfDate, - JOB_HISTORY_MINIMUM_AGE_IN_DAYS, + final Integer rows = database.query(ctx -> ctx.execute(JOB_HISTORY_PURGE_SQL, + asOfDate.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")), JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS, JOB_HISTORY_MINIMUM_RECENCY)); } diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index b614de32c042..4c0e8a1942e5 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -82,6 +82,8 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; @@ -1006,6 +1008,7 @@ void testResetJobCancelled() throws IOException { } private Job persistJobForTesting(String scope, JobConfig jobConfig, JobStatus status, LocalDateTime runDate) throws IOException, SQLException { + String when = runDate.toString(); Optional id = database.query( ctx -> ctx.fetch( "INSERT INTO jobs(config_type, scope, created_at, updated_at, status, config) " + @@ -1052,102 +1055,110 @@ private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDat return attemptNumber; } - @Test + /** + * Testing job history deletion is sensitive to exactly how the constants are configured for + * controlling deletion logic. Thus, the test case injects overrides for those constants, testing a + * comprehensive set of combinations to make sure that the logic is robust to reasonable + * configurations. Extreme configurations such as zero-day retention period are not covered. + * + * Business rules for deletions 1. Job must be older than X days or its conn has excessive number of + * jobs 2. Job cannot be one of the last N jobs on that conn (last N jobs are always kept). 3. Job + * cannot be holding the most recent saved state (most recent saved state is always kept). + * + * @param numJobs How many test jobs to generate; make this enough that all other parameters are + * fully included, for predictable results. + * @param tooManyJobs Takes the place of DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS + * - how many jobs are needed before it ignores date-based age of job when doing deletions. + * @param ageCutoff Takes the place of DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS - + * retention period in days for the most recent jobs; older than this gets deleted. + * @param recencyCutoff Takes the place of DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY - + * retention period in number of jobs; at least this many jobs will be retained after + * deletion (provided enough existed in the first place). + * @param lastStatePosition How far back in the list is the job with the latest saved state. This + * can be manipulated to have the saved-state job inside or prior to the retention period. + * @param expectedAfterPurge How many matching jobs are expected after deletion, given the input + * parameters. This was calculated by a human based on understanding the requirements. + * @param goalOfTestScenario Description of the purpose of that test scenario, so it's easier to + * maintain and understand failures. + * + */ @DisplayName("Should purge older job history but maintain certain more recent ones") - void testPurgeJobHistory() throws IOException, SQLException { + @ParameterizedTest + // Cols: numJobs, tooManyJobsCutoff, ageCutoff, recencyCutoff, lastSavedStatePosition, + // expectedAfterPurge, descr + @CsvSource(value = { + "50,100,10,5,9,10,'Validate age cutoff alone'", + "50,100,10,5,13,11,'Validate saved state after age cutoff'", + "50,100,10,15,9,15,'Validate recency cutoff alone'", + "50,100,10,15,17,16,'Validate saved state after recency cutoff'", + "50,20,30,10,9,10,'Validate excess jobs cutoff alone'", + "50,20,30,10,25,11,'Validate saved state after excess jobs cutoff'", + "50,20,30,20,9,20,'Validate recency cutoff with excess jobs cutoff'", + "50,20,30,20,25,21,'Validate saved state after recency and excess jobs cutoff but before age'", + "50,20,30,20,35,21,'Validate saved state after recency and excess jobs cutoff and after age'" + }, + delimiter = ',') + void testPurgeJobHistory(int numJobs, + int tooManyJobs, + int ageCutoff, + int recencyCutoff, + int lastStatePosition, + int expectedAfterPurge, + String goalOfTestScenario) + throws IOException, SQLException { // a couple of connectors for testing against final String SCOPE1 = UUID.randomUUID().toString(); - final String SCOPE2 = UUID.randomUUID().toString(); - // Business Rules for purging a job: - // Job must be older than X days or its conn has excessive number of jobs - // Job cannot be one of the last 10 jobs on that conn (last 10 jobs are always kept). - // Job cannot be holding the most recent saved state (most recent saved state is always kept). + DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS = ageCutoff; + DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = tooManyJobs; + DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY = recencyCutoff; - // Test Scenarios - // 1. Goal: Purge jobs older than X days from connections with jobs X days old, except keep the most - // recent 10 and keep the latest with state. + // Goal: Set up jobs according to the parameters passed in. Then delete according to the rules, and + // make sure the right number of jobs are left. // Against one connection/scope, - // - Setup: create a history of jobs that goes back X + 12 days (but produces no more than one job a + // - Setup: create a history of jobs that goes back many days (but produces no more than one job a // day) - // - Setup: the most recent job with state in it should be at least X + 5 jobs back - // - Assert: ensure that after purging, there are X+1 jobs left (and at least 10), including the one - // with the most recent state. - // - Assert: ensure that after purging, there are X+1 jobs left (and at least 10), including the X - // most recent + // - Setup: the most recent job with state in it should be at least N jobs back + // - Assert: ensure that after purging, there are the right number of jobs left (and at least min + // recency), including the one with the most recent state. + // - Assert: ensure that after purging, there are the right number of jobs left (and at least min + // recency), including the X most recent // - Assert: ensure that after purging, all other job history has been deleted. - // Scenario 1 setup + // Setup LocalDateTime fakeNow = LocalDateTime.of(2021, 6, 20, 0, 0); List allJobs = new ArrayList<>(); - for (int i = 0; i < (DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS + 12); i++) { + + // Created in reverse chronological order; id order is the inverse of old-to-new date order. + // That simplifies comparison math for test cases, even though it's unintuitive for human viewing. + for (int i = 0; i < numJobs; i++) { allJobs.add(persistJobForTesting(SCOPE1, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); } - // At least one job should have state, and it should be older than the recency limit. - Job lastJobWithState = allJobs.get(DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY + 1); - int attemptNumber = persistAttemptForJobHistoryTesting(lastJobWithState, LOG_PATH.toString(), + + // At least one job should have state. Find the desired job and add state to it. + Job lastJobWithState = allJobs.get(lastStatePosition); + persistAttemptForJobHistoryTesting(lastJobWithState, LOG_PATH.toString(), LocalDateTime.ofEpochSecond(lastJobWithState.getCreatedAtInSecond(), 0, ZoneOffset.UTC), true); lastJobWithState = jobPersistence.getJob(lastJobWithState.getId()); // reloads with attempts - // sanity check - assertTrue(lastJobWithState.getAttempts().get(0).getOutput() != null); - - // ----- remove me after troubleshooting ------ - String sql1 = "select count(*) as rowcount from jobs where (jobs.created_at < (CAST('2021-06-20' as TIMESTAMP) - interval '15' day) ) "; - String sql2 = "select max(job_id) as latest_job_id_with_state from (\n" - + " select jobs.scope, \n" - + " jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status, \n" - + " bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) as outputStateExists\n" - + " from jobs left join attempts on jobs.id = attempts.job_id \n" - + " group by scope, jobs.id\n" - + " having bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) = true\n" - + " order by scope, jobs.created_at desc, jobs.id desc\n" - + " ) jobs_with_state group by scope \n"; - String sql3 = " select job_id from (\n" - + " select jobs.scope, \n" - + " row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as recency,\n" - + " jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status\n" - + " from jobs \n" - + " group by scope, jobs.id\n" - + " order by scope, jobs.created_at desc, jobs.id desc\n" - + " ) jobs_by_recency \n" - + " where recency <= '10' \n"; - List ids = database.query(ctx -> ctx.fetch(sql3).getValues("job_id", Integer.class)); - // TODO: JENNY on TUESDAY. It's weird that each piece of the sql query correctly checks out as to - // which ids it returns, yet when run together as a delete, it doesn't actually delete anything, and - // the jobs returned after the purge are the same ones as were there before purging. Maybe there is - // something funky happening with transactional commits not occuring? - // ----- remove me after troubleshooting ------ - - // Scenario 1 run purge - List beforePurge = jobPersistence.listJobs(ConfigType.SYNC, SCOPE1, 999, 0); - jobPersistence.purgeJobHistory(fakeNow); - List afterPurge = jobPersistence.listJobs(ConfigType.SYNC, SCOPE1, 999, 0); + // sanity check that the attempt does have saved state so the purge history sql detects it correctly + assertTrue(lastJobWithState.getAttempts().get(0).getOutput() != null, + goalOfTestScenario + " - missing saved state on job that was supposed to have it."); - // Scenario 1 test - contains most-recent plus saved-state, and no more than that - assertEquals(afterPurge.size(), DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS + 1); - // Scenario 1 test - most-recent are actually the most recent - for (int i = 0; i < (DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS); i++) { - assert (afterPurge.get(i).equals(allJobs.get(i))); - } - // Scenario 1 test - job with state is kept despite being older - assert (afterPurge.get(DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS + 1).equals(lastJobWithState)); + // Execute the job history purge and check what jobs are left. + jobPersistence.purgeJobHistory(fakeNow); + List afterPurge = jobPersistence.listJobs(ConfigType.SYNC, SCOPE1, 9999, 0); - // TODO take this out, just here for reference for now. - int minAge = DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS; - int minRecency = DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY; - int tooManyJobs = DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS; + // Test - contains expected number of jobs and no more than that + assertEquals(expectedAfterPurge, afterPurge.size(), goalOfTestScenario + " - Incorrect number of jobs remain after deletion."); - // 2. Goal: Purge jobs from connections with a large number of jobs, except keep the most recent 10 - // and keep the latest with state. - // Against one connection/scope, - // - Setup: create a history of jobs that goes back only 3 days (but includes at least 200 jobs) - // - Setup: the most recent job with state in it should be at least 15 jobs back - // - Assert: ensure that after purging, there are 11 jobs left, including the one with the most - // recent state. - // - Assert: ensure that after purging, there are 11 jobs left, including the 10 most recent - // - Assert: ensure that after purging, all other job history has been deleted. - // + // Test - most-recent are actually the most recent by date (fyi id order is inverse of + // date order due to creation above). + for (int i = 0; i < Math.min(ageCutoff, recencyCutoff); i++) { + assertEquals(allJobs.get(i).getId(), afterPurge.get(i).getId(), goalOfTestScenario + " - Incorrect sort order after deletion."); + } + // Test - job with state is kept despite being older + assertTrue(afterPurge.contains(lastJobWithState), goalOfTestScenario + " - Missing last job with saved state after deletion."); } } From e2bbf593f20ee5c406cbf2cbc0c8563567198958 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Wed, 7 Jul 2021 17:29:59 -0500 Subject: [PATCH 3/9] Typo fix --- .../scheduler/persistence/DefaultJobPersistenceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index 4c0e8a1942e5..3e7926cefc15 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -1085,7 +1085,7 @@ private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDat @DisplayName("Should purge older job history but maintain certain more recent ones") @ParameterizedTest // Cols: numJobs, tooManyJobsCutoff, ageCutoff, recencyCutoff, lastSavedStatePosition, - // expectedAfterPurge, descr + // expectedAfterPurge, description @CsvSource(value = { "50,100,10,5,9,10,'Validate age cutoff alone'", "50,100,10,5,13,11,'Validate saved state after age cutoff'", From 30db1cb7ec0054011db2bc9981b369b3448a5394 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Thu, 8 Jul 2021 16:18:59 -0500 Subject: [PATCH 4/9] Expanded test cases to control for job history on multiple connections at once. --- .../persistence/DefaultJobPersistence.java | 1 - .../DefaultJobPersistenceTest.java | 78 +++++++++++-------- 2 files changed, 45 insertions(+), 34 deletions(-) diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 4b44458a9a90..07d0625463fc 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -513,7 +513,6 @@ private List listTables(final String schema) throws IOException { @Override public void purgeJobHistory(LocalDateTime asOfDate) throws IOException { - // JENNY TODO: Figure out how to accommodate timezone for asOfDate. final String JOB_HISTORY_PURGE_SQL = "delete from jobs where jobs.id in (" + "select jobs.id \n" + "from jobs \n" diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index 3e7926cefc15..0a9d39f106b5 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -1061,10 +1061,22 @@ private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDat * comprehensive set of combinations to make sure that the logic is robust to reasonable * configurations. Extreme configurations such as zero-day retention period are not covered. * - * Business rules for deletions 1. Job must be older than X days or its conn has excessive number of - * jobs 2. Job cannot be one of the last N jobs on that conn (last N jobs are always kept). 3. Job - * cannot be holding the most recent saved state (most recent saved state is always kept). + * Business rules for deletions. 1. Job must be older than X days or its conn has excessive number + * of jobs 2. Job cannot be one of the last N jobs on that conn (last N jobs are always kept). 3. + * Job cannot be holding the most recent saved state (most recent saved state is always kept). * + * Testing Goal: Set up jobs according to the parameters passed in. Then delete according to the rules, and + * make sure the right number of jobs are left. Against one connection/scope, + *
    + *
  1. Setup: create a history of jobs that goes back many days (but produces + * no more than one job a day)
  2. + *
  3. Setup: the most recent job with state in it should be at least N jobs back
  4. + *
  5. Assert: ensure that after purging, there are the right number of jobs left (and at least min + * recency), including the one with the most recent state.
  6. + *
  7. Assert: ensure that after purging, there are the right number of jobs left (and at least min + * recency), including the X most recent
  8. + *
  9. Assert: ensure that after purging, all other job history has been deleted.
  10. + *
* @param numJobs How many test jobs to generate; make this enough that all other parameters are * fully included, for predictable results. * @param tooManyJobs Takes the place of DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS @@ -1086,7 +1098,7 @@ private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDat @ParameterizedTest // Cols: numJobs, tooManyJobsCutoff, ageCutoff, recencyCutoff, lastSavedStatePosition, // expectedAfterPurge, description - @CsvSource(value = { + @CsvSource({ "50,100,10,5,9,10,'Validate age cutoff alone'", "50,100,10,5,13,11,'Validate saved state after age cutoff'", "50,100,10,15,9,15,'Validate recency cutoff alone'", @@ -1096,8 +1108,7 @@ private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDat "50,20,30,20,9,20,'Validate recency cutoff with excess jobs cutoff'", "50,20,30,20,25,21,'Validate saved state after recency and excess jobs cutoff but before age'", "50,20,30,20,35,21,'Validate saved state after recency and excess jobs cutoff and after age'" - }, - delimiter = ',') + }) void testPurgeJobHistory(int numJobs, int tooManyJobs, int ageCutoff, @@ -1106,61 +1117,62 @@ void testPurgeJobHistory(int numJobs, int expectedAfterPurge, String goalOfTestScenario) throws IOException, SQLException { - // a couple of connectors for testing against - final String SCOPE1 = UUID.randomUUID().toString(); + final String CURRENT_SCOPE = UUID.randomUUID().toString(); + // Decoys - these jobs will help mess up bad sql queries, even though they shouldn't be deleted. + final String DECOY_SCOPE = UUID.randomUUID().toString(); + + // Reconfigure constants to test various combinations of tuning knobs and make sure all work. DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS = ageCutoff; DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = tooManyJobs; DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY = recencyCutoff; - // Goal: Set up jobs according to the parameters passed in. Then delete according to the rules, and - // make sure the right number of jobs are left. - // Against one connection/scope, - // - Setup: create a history of jobs that goes back many days (but produces no more than one job a - // day) - // - Setup: the most recent job with state in it should be at least N jobs back - // - Assert: ensure that after purging, there are the right number of jobs left (and at least min - // recency), including the one with the most recent state. - // - Assert: ensure that after purging, there are the right number of jobs left (and at least min - // recency), including the X most recent - // - Assert: ensure that after purging, all other job history has been deleted. - - // Setup LocalDateTime fakeNow = LocalDateTime.of(2021, 6, 20, 0, 0); - List allJobs = new ArrayList<>(); - // Created in reverse chronological order; id order is the inverse of old-to-new date order. - // That simplifies comparison math for test cases, even though it's unintuitive for human viewing. + // Jobs are created in reverse chronological order; id order is the inverse of old-to-new date order. + // The most-recent job is in allJobs[0] which means keeping the 10 most recent is [0-9], simplifying + // testing math as we don't have to care how many jobs total existed and were deleted. + List allJobs = new ArrayList<>(); + List decoyJobs = new ArrayList<>(); for (int i = 0; i < numJobs; i++) { - allJobs.add(persistJobForTesting(SCOPE1, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); + allJobs.add(persistJobForTesting(CURRENT_SCOPE, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); + decoyJobs.add(persistJobForTesting(DECOY_SCOPE, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); } // At least one job should have state. Find the desired job and add state to it. - Job lastJobWithState = allJobs.get(lastStatePosition); - persistAttemptForJobHistoryTesting(lastJobWithState, LOG_PATH.toString(), - LocalDateTime.ofEpochSecond(lastJobWithState.getCreatedAtInSecond(), 0, ZoneOffset.UTC), true); - lastJobWithState = jobPersistence.getJob(lastJobWithState.getId()); // reloads with attempts + Job lastJobWithState = addStateToJob(allJobs.get(lastStatePosition)); + addStateToJob(decoyJobs.get(lastStatePosition-1)); + addStateToJob(decoyJobs.get(lastStatePosition+1)); + + // An older job with state should also exist, so we ensure we picked the most-recent with queries. + Job olderJobWithState = addStateToJob(allJobs.get(lastStatePosition+1)); + // sanity check that the attempt does have saved state so the purge history sql detects it correctly assertTrue(lastJobWithState.getAttempts().get(0).getOutput() != null, goalOfTestScenario + " - missing saved state on job that was supposed to have it."); // Execute the job history purge and check what jobs are left. jobPersistence.purgeJobHistory(fakeNow); - List afterPurge = jobPersistence.listJobs(ConfigType.SYNC, SCOPE1, 9999, 0); + List afterPurge = jobPersistence.listJobs(ConfigType.SYNC, CURRENT_SCOPE, 9999, 0); // Test - contains expected number of jobs and no more than that assertEquals(expectedAfterPurge, afterPurge.size(), goalOfTestScenario + " - Incorrect number of jobs remain after deletion."); - // Test - most-recent are actually the most recent by date (fyi id order is inverse of - // date order due to creation above). + // Test - most-recent are actually the most recent by date (see above, reverse order) for (int i = 0; i < Math.min(ageCutoff, recencyCutoff); i++) { assertEquals(allJobs.get(i).getId(), afterPurge.get(i).getId(), goalOfTestScenario + " - Incorrect sort order after deletion."); } - // Test - job with state is kept despite being older + // Test - job with latest state is always kept despite being older than some cutoffs assertTrue(afterPurge.contains(lastJobWithState), goalOfTestScenario + " - Missing last job with saved state after deletion."); } + private Job addStateToJob(Job job) throws IOException, SQLException { + persistAttemptForJobHistoryTesting(job, LOG_PATH.toString(), + LocalDateTime.ofEpochSecond(job.getCreatedAtInSecond(), 0, ZoneOffset.UTC), true); + return jobPersistence.getJob(job.getId()); // reload job to include its attempts + } + } } From ff60e2679482ca79ef8e6ed04e83f166b56594f1 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Thu, 8 Jul 2021 16:39:11 -0500 Subject: [PATCH 5/9] Handle latest job with saved state correctly regardless of order of ids --- .../scheduler/persistence/DefaultJobPersistence.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 07d0625463fc..43adc9da381a 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -524,16 +524,17 @@ public void purgeJobHistory(LocalDateTime asOfDate) throws IOException { + " (jobs.created_at < (TO_TIMESTAMP(?, 'YYYY-MM-DD') - interval '" + (JOB_HISTORY_MINIMUM_AGE_IN_DAYS - 1) + "' day) or counts.jobCount > ?) \n" + "and jobs.id not in (\n" - + " -- cannot be the last job with saved state \n" - + " select max(job_id) as latest_job_id_with_state from (\n" + + " -- cannot be the most recent job with saved state \n" + + " select job_id as latest_job_id_with_state from (\n" + " select jobs.scope, \n" + " jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status, \n" - + " bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) as outputStateExists\n" + + " bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) as outputStateExists,\n" + + " row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as stateRecency\n" + " from jobs left join attempts on jobs.id = attempts.job_id \n" + " group by scope, jobs.id\n" + " having bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) = true\n" + " order by scope, jobs.created_at desc, jobs.id desc\n" - + " ) jobs_with_state group by scope\n " + + " ) jobs_with_state where stateRecency=1\n " + ") and jobs.id not in (\n" + " -- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope\n" + " select id from (\n" From ff758ed921ab4f04a3af3caaa7b27f9452eff65f Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Thu, 8 Jul 2021 16:47:55 -0500 Subject: [PATCH 6/9] Whitespace --- .../persistence/DefaultJobPersistenceTest.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index 0a9d39f106b5..bd8317281d14 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -1065,11 +1065,11 @@ private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDat * of jobs 2. Job cannot be one of the last N jobs on that conn (last N jobs are always kept). 3. * Job cannot be holding the most recent saved state (most recent saved state is always kept). * - * Testing Goal: Set up jobs according to the parameters passed in. Then delete according to the rules, and - * make sure the right number of jobs are left. Against one connection/scope, + * Testing Goal: Set up jobs according to the parameters passed in. Then delete according to the + * rules, and make sure the right number of jobs are left. Against one connection/scope, *
    - *
  1. Setup: create a history of jobs that goes back many days (but produces - * no more than one job a day)
  2. + *
  3. Setup: create a history of jobs that goes back many days (but produces no more than one job a + * day)
  4. *
  5. Setup: the most recent job with state in it should be at least N jobs back
  6. *
  7. Assert: ensure that after purging, there are the right number of jobs left (and at least min * recency), including the one with the most recent state.
  8. @@ -1077,6 +1077,7 @@ private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDat * recency), including the X most recent *
  9. Assert: ensure that after purging, all other job history has been deleted.
  10. *
+ * * @param numJobs How many test jobs to generate; make this enough that all other parameters are * fully included, for predictable results. * @param tooManyJobs Takes the place of DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS @@ -1129,7 +1130,8 @@ void testPurgeJobHistory(int numJobs, LocalDateTime fakeNow = LocalDateTime.of(2021, 6, 20, 0, 0); - // Jobs are created in reverse chronological order; id order is the inverse of old-to-new date order. + // Jobs are created in reverse chronological order; id order is the inverse of old-to-new date + // order. // The most-recent job is in allJobs[0] which means keeping the 10 most recent is [0-9], simplifying // testing math as we don't have to care how many jobs total existed and were deleted. List allJobs = new ArrayList<>(); @@ -1141,11 +1143,11 @@ void testPurgeJobHistory(int numJobs, // At least one job should have state. Find the desired job and add state to it. Job lastJobWithState = addStateToJob(allJobs.get(lastStatePosition)); - addStateToJob(decoyJobs.get(lastStatePosition-1)); - addStateToJob(decoyJobs.get(lastStatePosition+1)); + addStateToJob(decoyJobs.get(lastStatePosition - 1)); + addStateToJob(decoyJobs.get(lastStatePosition + 1)); // An older job with state should also exist, so we ensure we picked the most-recent with queries. - Job olderJobWithState = addStateToJob(allJobs.get(lastStatePosition+1)); + Job olderJobWithState = addStateToJob(allJobs.get(lastStatePosition + 1)); // sanity check that the attempt does have saved state so the purge history sql detects it correctly assertTrue(lastJobWithState.getAttempts().get(0).getOutput() != null, From e1d9533b201b883d4fb008c2efdd575bc2762bb6 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Fri, 9 Jul 2021 11:50:32 -0500 Subject: [PATCH 7/9] Externalized sql. Cleaned up constants. --- .../airbyte/scheduler/app/SchedulerApp.java | 1 + .../persistence/DefaultJobPersistence.java | 75 ++++++++----------- .../scheduler/persistence/JobPersistence.java | 2 +- .../src/main/resources/job_history_purge.sql | 33 ++++++++ .../DefaultJobPersistenceTest.java | 8 +- 5 files changed, 68 insertions(+), 51 deletions(-) create mode 100644 airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 62064bfe32c7..e9b9fdceea8b 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -153,6 +153,7 @@ public void start() throws IOException { () -> { MDC.setContextMap(mdc); jobCleaner.run(); + jobPersistence.purgeJobHistory(); }, CLEANING_DELAY.toSeconds(), CLEANING_DELAY.toSeconds(), diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 43adc9da381a..66ea5816da6b 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -31,6 +31,7 @@ import com.google.common.collect.Sets; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.text.Names; import io.airbyte.commons.text.Sqls; import io.airbyte.commons.version.AirbyteVersion; @@ -82,10 +83,10 @@ public class DefaultJobPersistence implements JobPersistence { - // not final because job history test case manipulates these. - public static int JOB_HISTORY_MINIMUM_AGE_IN_DAYS = 15; - public static int JOB_HISTORY_MINIMUM_RECENCY = 5; - public static int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = 10; + // not static because job history test case manipulates these. + private final int JOB_HISTORY_MINIMUM_AGE_IN_DAYS; + private final int JOB_HISTORY_MINIMUM_RECENCY; + private final int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS; private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJobPersistence.class); private static final Set SYSTEM_SCHEMA = Set @@ -124,13 +125,19 @@ public class DefaultJobPersistence implements JobPersistence { private final Supplier timeSupplier; @VisibleForTesting - DefaultJobPersistence(Database database, Supplier timeSupplier) { + DefaultJobPersistence(Database database, Supplier timeSupplier, + int minimumAgeInDays, + int excessiveNumberOfJobs, + int minimumRecencyCount) { this.database = new ExceptionWrappingDatabase(database); this.timeSupplier = timeSupplier; + JOB_HISTORY_MINIMUM_AGE_IN_DAYS = minimumAgeInDays; + JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = excessiveNumberOfJobs; + JOB_HISTORY_MINIMUM_RECENCY = minimumRecencyCount; } public DefaultJobPersistence(Database database) { - this(database, Instant::now); + this(database, Instant::now, 30, 500, 10); } @Override @@ -512,45 +519,23 @@ private List listTables(final String schema) throws IOException { } @Override - public void purgeJobHistory(LocalDateTime asOfDate) throws IOException { - final String JOB_HISTORY_PURGE_SQL = "delete from jobs where jobs.id in (" - + "select jobs.id \n" - + "from jobs \n" - + "left join (\n" - + " select scope, count(jobs.id) as jobCount from jobs group by scope\n" - + ") counts on jobs.scope = counts.scope \n" - + "where \n" - + " -- job must be at least MINIMUM_AGE_IN_DAYS old or connection has more than EXCESSIVE_NUMBER_OF_JOBS \n" - + " (jobs.created_at < (TO_TIMESTAMP(?, 'YYYY-MM-DD') - interval '" + (JOB_HISTORY_MINIMUM_AGE_IN_DAYS - 1) - + "' day) or counts.jobCount > ?) \n" - + "and jobs.id not in (\n" - + " -- cannot be the most recent job with saved state \n" - + " select job_id as latest_job_id_with_state from (\n" - + " select jobs.scope, \n" - + " jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status, \n" - + " bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) as outputStateExists,\n" - + " row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as stateRecency\n" - + " from jobs left join attempts on jobs.id = attempts.job_id \n" - + " group by scope, jobs.id\n" - + " having bool_or(attempts.\"output\" -> 'sync' -> 'state' -> 'state' is not null) = true\n" - + " order by scope, jobs.created_at desc, jobs.id desc\n" - + " ) jobs_with_state where stateRecency=1\n " - + ") and jobs.id not in (\n" - + " -- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope\n" - + " select id from (\n" - + " select jobs.scope, jobs.id, jobs.created_at,\n" - + " row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as recency\n" - + " from jobs\n" - + " group by scope, jobs.id\n" - + " order by scope, jobs.created_at desc, jobs.id desc\n" - + " ) jobs_by_recency \n" - + " where recency <= ?\n" - + "))"; - - final Integer rows = database.query(ctx -> ctx.execute(JOB_HISTORY_PURGE_SQL, - asOfDate.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")), - JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS, - JOB_HISTORY_MINIMUM_RECENCY)); + public void purgeJobHistory() { + purgeJobHistory(LocalDateTime.now()); + } + + @VisibleForTesting + public void purgeJobHistory(LocalDateTime asOfDate) { + try { + String JOB_HISTORY_PURGE_SQL = MoreResources.readResource("job_history_purge.sql"); + // interval '?' days cannot use a ? bind, so we're using %d instead. + String sql = String.format(JOB_HISTORY_PURGE_SQL, (JOB_HISTORY_MINIMUM_AGE_IN_DAYS - 1)); + final Integer rows = database.query(ctx -> ctx.execute(sql, + asOfDate.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")), + JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS, + JOB_HISTORY_MINIMUM_RECENCY)); + } catch (IOException e) { + throw new RuntimeException(e); + } } private List listAllTables(final String schema) throws IOException { diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index 4c1e49126fa4..7ed680ee6e85 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -190,6 +190,6 @@ public interface JobPersistence { * * @throws IOException */ - void purgeJobHistory(LocalDateTime asOfDate) throws IOException; + void purgeJobHistory(); } diff --git a/airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql b/airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql new file mode 100644 index 000000000000..6db302477c46 --- /dev/null +++ b/airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql @@ -0,0 +1,33 @@ +delete from jobs where jobs.id in ( + select jobs.id + from jobs + left join ( + select scope, count(jobs.id) as jobCount from jobs group by scope + ) counts on jobs.scope = counts.scope + where + -- job must be at least MINIMUM_AGE_IN_DAYS old or connection has more than EXCESSIVE_NUMBER_OF_JOBS + (jobs.created_at < (TO_TIMESTAMP(?, 'YYYY-MM-DD') - interval '%d' day) or counts.jobCount > ?) + and jobs.id not in ( + -- cannot be the most recent job with saved state + select job_id as latest_job_id_with_state from ( + select jobs.scope, jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status, + bool_or(attempts."output" -> 'sync' -> 'state' -> 'state' is not null) as outputStateExists, + row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as stateRecency + from jobs + left join attempts on jobs.id = attempts.job_id + group by scope, jobs.id + having bool_or(attempts."output" -> 'sync' -> 'state' -> 'state' is not null) = true + order by scope, jobs.created_at desc, jobs.id desc + ) jobs_with_state where stateRecency=1 + ) and jobs.id not in ( + -- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope + select id from ( + select jobs.scope, jobs.id, jobs.created_at, + row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as recency + from jobs + group by scope, jobs.id + order by scope, jobs.created_at desc, jobs.id desc + ) jobs_by_recency + where recency <= ? + ) +) diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index bd8317281d14..9ea6eafebdee 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -168,7 +168,7 @@ public void setup() throws Exception { timeSupplier = mock(Supplier.class); when(timeSupplier.get()).thenReturn(NOW); - jobPersistence = new DefaultJobPersistence(database, timeSupplier); + jobPersistence = new DefaultJobPersistence(database, timeSupplier, 30, 500, 10); } @AfterEach @@ -1124,9 +1124,7 @@ void testPurgeJobHistory(int numJobs, final String DECOY_SCOPE = UUID.randomUUID().toString(); // Reconfigure constants to test various combinations of tuning knobs and make sure all work. - DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS = ageCutoff; - DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = tooManyJobs; - DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY = recencyCutoff; + DefaultJobPersistence jobPersistence = new DefaultJobPersistence(database, timeSupplier, ageCutoff, tooManyJobs, recencyCutoff); LocalDateTime fakeNow = LocalDateTime.of(2021, 6, 20, 0, 0); @@ -1154,7 +1152,7 @@ void testPurgeJobHistory(int numJobs, goalOfTestScenario + " - missing saved state on job that was supposed to have it."); // Execute the job history purge and check what jobs are left. - jobPersistence.purgeJobHistory(fakeNow); + ((DefaultJobPersistence)jobPersistence).purgeJobHistory(fakeNow); List afterPurge = jobPersistence.listJobs(ConfigType.SYNC, CURRENT_SCOPE, 9999, 0); // Test - contains expected number of jobs and no more than that From 5039d1240b3e59782a011d871f47e044a21aebba Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Fri, 9 Jul 2021 12:12:14 -0500 Subject: [PATCH 8/9] Cleaned up test case persistence code and structure --- .../persistence/DefaultJobPersistence.java | 9 +++++---- .../scheduler/persistence/JobPersistence.java | 1 - .../persistence/DefaultJobPersistenceTest.java | 18 ++++++++++++------ 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 66ea5816da6b..23d9d4ef3daf 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -125,10 +125,11 @@ public class DefaultJobPersistence implements JobPersistence { private final Supplier timeSupplier; @VisibleForTesting - DefaultJobPersistence(Database database, Supplier timeSupplier, - int minimumAgeInDays, - int excessiveNumberOfJobs, - int minimumRecencyCount) { + DefaultJobPersistence(Database database, + Supplier timeSupplier, + int minimumAgeInDays, + int excessiveNumberOfJobs, + int minimumRecencyCount) { this.database = new ExceptionWrappingDatabase(database); this.timeSupplier = timeSupplier; JOB_HISTORY_MINIMUM_AGE_IN_DAYS = minimumAgeInDays; diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index 7ed680ee6e85..73ff78050d53 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -31,7 +31,6 @@ import io.airbyte.scheduler.models.JobStatus; import java.io.IOException; import java.nio.file.Path; -import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.Optional; diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index 9ea6eafebdee..e38a97c8bac8 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -1007,7 +1007,14 @@ void testResetJobCancelled() throws IOException { assertEquals(JobStatus.CANCELLED, updated.getStatus()); } - private Job persistJobForTesting(String scope, JobConfig jobConfig, JobStatus status, LocalDateTime runDate) throws IOException, SQLException { + } + + @Nested + @DisplayName("When purging job history") + class PurgeJobHistory { + + private Job persistJobForJobHistoryTesting(String scope, JobConfig jobConfig, JobStatus status, LocalDateTime runDate) + throws IOException, SQLException { String when = runDate.toString(); Optional id = database.query( ctx -> ctx.fetch( @@ -1026,7 +1033,7 @@ private Job persistJobForTesting(String scope, JobConfig jobConfig, JobStatus st return jobPersistence.getJob(id.get()); } - private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDateTime runDate, boolean shouldHaveState) + private void persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDateTime runDate, boolean shouldHaveState) throws IOException, SQLException { String attemptOutputWithState = "{\n" + " \"sync\": {\n" @@ -1052,7 +1059,6 @@ private int persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDat .findFirst() .map(r -> r.get("attempt_number", Integer.class)) .orElseThrow(() -> new RuntimeException("This should not happen"))); - return attemptNumber; } /** @@ -1135,8 +1141,8 @@ void testPurgeJobHistory(int numJobs, List allJobs = new ArrayList<>(); List decoyJobs = new ArrayList<>(); for (int i = 0; i < numJobs; i++) { - allJobs.add(persistJobForTesting(CURRENT_SCOPE, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); - decoyJobs.add(persistJobForTesting(DECOY_SCOPE, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); + allJobs.add(persistJobForJobHistoryTesting(CURRENT_SCOPE, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); + decoyJobs.add(persistJobForJobHistoryTesting(DECOY_SCOPE, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); } // At least one job should have state. Find the desired job and add state to it. @@ -1152,7 +1158,7 @@ void testPurgeJobHistory(int numJobs, goalOfTestScenario + " - missing saved state on job that was supposed to have it."); // Execute the job history purge and check what jobs are left. - ((DefaultJobPersistence)jobPersistence).purgeJobHistory(fakeNow); + ((DefaultJobPersistence) jobPersistence).purgeJobHistory(fakeNow); List afterPurge = jobPersistence.listJobs(ConfigType.SYNC, CURRENT_SCOPE, 9999, 0); // Test - contains expected number of jobs and no more than that From b94f2181bfcb4277bac006d9fef068bda5d071c0 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Fri, 9 Jul 2021 14:11:24 -0500 Subject: [PATCH 9/9] Whitespace and formatting per standard tooling. --- .../src/main/resources/job_history_purge.sql | 131 +++++++++++++----- 1 file changed, 99 insertions(+), 32 deletions(-) diff --git a/airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql b/airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql index 6db302477c46..931634f0c4b2 100644 --- a/airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql +++ b/airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql @@ -1,33 +1,100 @@ -delete from jobs where jobs.id in ( - select jobs.id - from jobs - left join ( - select scope, count(jobs.id) as jobCount from jobs group by scope - ) counts on jobs.scope = counts.scope - where - -- job must be at least MINIMUM_AGE_IN_DAYS old or connection has more than EXCESSIVE_NUMBER_OF_JOBS - (jobs.created_at < (TO_TIMESTAMP(?, 'YYYY-MM-DD') - interval '%d' day) or counts.jobCount > ?) - and jobs.id not in ( - -- cannot be the most recent job with saved state - select job_id as latest_job_id_with_state from ( - select jobs.scope, jobs.id as job_id, jobs.config_type, jobs.created_at, jobs.status, - bool_or(attempts."output" -> 'sync' -> 'state' -> 'state' is not null) as outputStateExists, - row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as stateRecency - from jobs - left join attempts on jobs.id = attempts.job_id - group by scope, jobs.id - having bool_or(attempts."output" -> 'sync' -> 'state' -> 'state' is not null) = true - order by scope, jobs.created_at desc, jobs.id desc - ) jobs_with_state where stateRecency=1 - ) and jobs.id not in ( - -- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope - select id from ( - select jobs.scope, jobs.id, jobs.created_at, - row_number() OVER (PARTITION BY scope ORDER BY jobs.created_at desc, jobs.id desc) as recency - from jobs - group by scope, jobs.id - order by scope, jobs.created_at desc, jobs.id desc - ) jobs_by_recency - where recency <= ? +DELETE +FROM + jobs +WHERE + jobs.id IN( + SELECT + jobs.id + FROM + jobs + LEFT JOIN( + SELECT + SCOPE, + COUNT( jobs.id ) AS jobCount + FROM + jobs + GROUP BY + SCOPE + ) counts ON + jobs.scope = counts.scope + WHERE + -- job must be at least MINIMUM_AGE_IN_DAYS old or connection has more than EXCESSIVE_NUMBER_OF_JOBS +( + jobs.created_at <( + TO_TIMESTAMP( + ?, + 'YYYY-MM-DD' + )- INTERVAL '%d' DAY + ) + OR counts.jobCount >? + ) + AND jobs.id NOT IN( + -- cannot be the most recent job with saved state + SELECT + job_id AS latest_job_id_with_state + FROM + ( + SELECT + jobs.scope, + jobs.id AS job_id, + jobs.config_type, + jobs.created_at, + jobs.status, + bool_or( + attempts."output" -> 'sync' -> 'state' -> 'state' IS NOT NULL + ) AS outputStateExists, + ROW_NUMBER() OVER( + PARTITION BY SCOPE + ORDER BY + jobs.created_at DESC, + jobs.id DESC + ) AS stateRecency + FROM + jobs + LEFT JOIN attempts ON + jobs.id = attempts.job_id + GROUP BY + SCOPE, + jobs.id + HAVING + bool_or( + attempts."output" -> 'sync' -> 'state' -> 'state' IS NOT NULL + )= TRUE + ORDER BY + SCOPE, + jobs.created_at DESC, + jobs.id DESC + ) jobs_with_state + WHERE + stateRecency = 1 + ) + AND jobs.id NOT IN( + -- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope + SELECT + id + FROM + ( + SELECT + jobs.scope, + jobs.id, + jobs.created_at, + ROW_NUMBER() OVER( + PARTITION BY SCOPE + ORDER BY + jobs.created_at DESC, + jobs.id DESC + ) AS recency + FROM + jobs + GROUP BY + SCOPE, + jobs.id + ORDER BY + SCOPE, + jobs.created_at DESC, + jobs.id DESC + ) jobs_by_recency + WHERE + recency <=? + ) ) -)