Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job history purging #4575

Merged
merged 10 commits into from
Jul 9, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public void start() throws IOException {
() -> {
MDC.setContextMap(mdc);
jobCleaner.run();
jobPersistence.purgeJobHistory();
},
CLEANING_DELAY.toSeconds(),
CLEANING_DELAY.toSeconds(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.Sets;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.text.Names;
import io.airbyte.commons.text.Sqls;
import io.airbyte.commons.version.AirbyteVersion;
Expand Down Expand Up @@ -82,6 +83,11 @@

public class DefaultJobPersistence implements JobPersistence {

// not static because job history test case manipulates these.
private final int JOB_HISTORY_MINIMUM_AGE_IN_DAYS;
private final int JOB_HISTORY_MINIMUM_RECENCY;
private final int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS;

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJobPersistence.class);
private static final Set<String> SYSTEM_SCHEMA = Set
.of("pg_toast", "information_schema", "pg_catalog", "import_backup", "pg_internal",
Expand Down Expand Up @@ -119,13 +125,20 @@ public class DefaultJobPersistence implements JobPersistence {
private final Supplier<Instant> timeSupplier;

@VisibleForTesting
DefaultJobPersistence(Database database, Supplier<Instant> timeSupplier) {
DefaultJobPersistence(Database database,
Supplier<Instant> timeSupplier,
int minimumAgeInDays,
int excessiveNumberOfJobs,
int minimumRecencyCount) {
this.database = new ExceptionWrappingDatabase(database);
this.timeSupplier = timeSupplier;
JOB_HISTORY_MINIMUM_AGE_IN_DAYS = minimumAgeInDays;
JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = excessiveNumberOfJobs;
JOB_HISTORY_MINIMUM_RECENCY = minimumRecencyCount;
}

public DefaultJobPersistence(Database database) {
this(database, Instant::now);
this(database, Instant::now, 30, 500, 10);
}

@Override
Expand Down Expand Up @@ -506,6 +519,26 @@ private List<String> listTables(final String schema) throws IOException {
}
}

@Override
public void purgeJobHistory() {
purgeJobHistory(LocalDateTime.now());
}

@VisibleForTesting
public void purgeJobHistory(LocalDateTime asOfDate) {
try {
String JOB_HISTORY_PURGE_SQL = MoreResources.readResource("job_history_purge.sql");
// interval '?' days cannot use a ? bind, so we're using %d instead.
String sql = String.format(JOB_HISTORY_PURGE_SQL, (JOB_HISTORY_MINIMUM_AGE_IN_DAYS - 1));
final Integer rows = database.query(ctx -> ctx.execute(sql,
asOfDate.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")),
JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS,
JOB_HISTORY_MINIMUM_RECENCY));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private List<String> listAllTables(final String schema) throws IOException {
if (schema != null) {
return database.query(context -> context.meta().getSchemas(schema).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,11 @@ public interface JobPersistence {
*/
void importDatabase(String airbyteVersion, Map<DatabaseSchema, Stream<JsonNode>> data) throws IOException;

/**
* Purges job history while ensuring that the latest saved-state information is maintained.
*
* @throws IOException
*/
void purgeJobHistory();

}
100 changes: 100 additions & 0 deletions airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
DELETE
FROM
jobs
WHERE
jobs.id IN(
SELECT
jobs.id
FROM
jobs
LEFT JOIN(
SELECT
SCOPE,
COUNT( jobs.id ) AS jobCount
FROM
jobs
GROUP BY
SCOPE
) counts ON
jobs.scope = counts.scope
WHERE
-- job must be at least MINIMUM_AGE_IN_DAYS old or connection has more than EXCESSIVE_NUMBER_OF_JOBS
(
jobs.created_at <(
TO_TIMESTAMP(
?,
'YYYY-MM-DD'
)- INTERVAL '%d' DAY
)
OR counts.jobCount >?
)
AND jobs.id NOT IN(
-- cannot be the most recent job with saved state
SELECT
job_id AS latest_job_id_with_state
FROM
(
SELECT
jobs.scope,
jobs.id AS job_id,
jobs.config_type,
jobs.created_at,
jobs.status,
bool_or(
attempts."output" -> 'sync' -> 'state' -> 'state' IS NOT NULL
) AS outputStateExists,
ROW_NUMBER() OVER(
PARTITION BY SCOPE
ORDER BY
jobs.created_at DESC,
jobs.id DESC
) AS stateRecency
FROM
jobs
LEFT JOIN attempts ON
jobs.id = attempts.job_id
GROUP BY
SCOPE,
jobs.id
HAVING
bool_or(
attempts."output" -> 'sync' -> 'state' -> 'state' IS NOT NULL
)= TRUE
ORDER BY
SCOPE,
jobs.created_at DESC,
jobs.id DESC
) jobs_with_state
WHERE
stateRecency = 1
)
AND jobs.id NOT IN(
-- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope
SELECT
id
FROM
(
SELECT
jobs.scope,
jobs.id,
jobs.created_at,
ROW_NUMBER() OVER(
PARTITION BY SCOPE
ORDER BY
jobs.created_at DESC,
jobs.id DESC
) AS recency
FROM
jobs
GROUP BY
SCOPE,
jobs.id
ORDER BY
SCOPE,
jobs.created_at DESC,
jobs.id DESC
) jobs_by_recency
WHERE
recency <=?
)
)
Loading