diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 50aec9e92d..a18c21896c 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -23,7 +23,7 @@ jobs: check: name: Check & Publish runs-on: ubuntu-latest - timeout-minutes: 45 + timeout-minutes: 60 steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v3 diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index fe0e3c2052..d34ad97248 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -242,6 +242,11 @@ kestra: max-poll-interval: 1000ms poll-switch-interval: 5s + cleaner: + initial-delay: 1h + fixed-delay: 1h + retention: 7d + elasticsearch: defaults: indice-prefix: "kestra_" diff --git a/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql b/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql index 9cf9db9464..c643c8c9ae 100644 --- a/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql +++ b/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql @@ -99,7 +99,7 @@ CREATE TABLE queues ( key VARCHAR(250) NOT NULL, value JSONB NOT NULL, consumers queue_consumers[], - updated timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP + updated TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX queues_type__consumers ON queues (type, consumers, "offset"); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractExecutorStateStorage.java b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractExecutorStateStorage.java index e4943d2c01..fd74b03934 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractExecutorStateStorage.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractExecutorStateStorage.java @@ -33,4 +33,8 @@ public void save(DSLContext dslContext, JdbcExecutorState jdbcExecutorState) { Map, Object> fields = this.jdbcRepository.persistFields(jdbcExecutorState); this.jdbcRepository.persist(jdbcExecutorState, dslContext, fields); } + + public void delete(Execution execution) { + this.jdbcRepository.delete(new JdbcExecutorState(execution.getId())); + } } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcCleaner.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcCleaner.java new file mode 100644 index 0000000000..fccbc7cdad --- /dev/null +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcCleaner.java @@ -0,0 +1,65 @@ +package io.kestra.jdbc.runner; + +import io.kestra.core.queues.QueueException; +import io.kestra.jdbc.DSLContextWrapper; +import io.kestra.jdbc.JdbcConfiguration; +import io.micronaut.context.ApplicationContext; +import io.micronaut.context.annotation.ConfigurationProperties; +import io.micronaut.context.annotation.Requires; +import io.micronaut.scheduling.annotation.Scheduled; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.jooq.Record; +import org.jooq.Table; +import org.jooq.impl.DSL; + +import java.time.Duration; +import java.time.ZonedDateTime; + +@Singleton +@JdbcRunnerEnabled +@Slf4j +@Requires(property = "kestra.jdbc.cleaner") +public class JdbcCleaner { + private final DSLContextWrapper dslContextWrapper; + private final Configuration configuration; + + protected final Table queueTable; + + @Inject + public JdbcCleaner(ApplicationContext applicationContext) { + JdbcConfiguration jdbcConfiguration = applicationContext.getBean(JdbcConfiguration.class); + + this.dslContextWrapper = applicationContext.getBean(DSLContextWrapper.class); + this.configuration = applicationContext.getBean(Configuration.class); + + this.queueTable = DSL.table(jdbcConfiguration.tableConfig("queues").getTable()); + } + + public void deleteQueue() throws QueueException { + dslContextWrapper.transaction(configuration -> { + int deleted = DSL + .using(configuration) + .delete(this.queueTable) + .where( + DSL.field("updated") + .lessOrEqual(ZonedDateTime.now().minus(this.configuration.getRetention()).toOffsetDateTime()) + ) + .execute(); + log.info("Cleaned {} records from {}", deleted, this.queueTable.getName()); + }); + } + + @Scheduled(initialDelay = "${kestra.jdbc.cleaner.initial-delay}", fixedDelay = "${kestra.jdbc.cleaner.fixed-delay}") + public void report() { + deleteQueue(); + } + + @ConfigurationProperties("kestra.jdbc.cleaner") + @Getter + public static class Configuration { + Duration retention; + } +} diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index c0cf987050..34c83ebd99 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -102,6 +102,9 @@ public class JdbcExecutor implements ExecutorInterface { @Inject private AbstractExecutionDelayStorage abstractExecutionDelayStorage; + @Inject + private AbstractExecutorStateStorage executorStateStorage; + private List allFlows; @SneakyThrows @@ -278,6 +281,10 @@ private void executionQueue(Execution message) { if (result != null) { this.toExecution(result); + + if (executorService.canBePurged(result)) { + executorStateStorage.delete(result.getExecution()); + } } } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java index 5b6dcd1b70..238e7af343 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java @@ -258,8 +258,8 @@ public void close() throws IOException { @ConfigurationProperties("kestra.jdbc.queues") @Getter public static class Configuration { - Duration minPollInterval; - Duration maxPollInterval; - Duration pollSwitchInterval; + Duration minPollInterval = Duration.ofMillis(100); + Duration maxPollInterval = Duration.ofMillis(500); + Duration pollSwitchInterval = Duration.ofSeconds(30); } }