Skip to content

Commit

Permalink
feat(jdbc): cleanup queue and executor state
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 7, 2022
1 parent 4122422 commit 90372ac
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
check:
name: Check & Publish
runs-on: ubuntu-latest
timeout-minutes: 45
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
Expand Down
5 changes: 5 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ kestra:
max-poll-interval: 1000ms
poll-switch-interval: 5s

cleaner:
initial-delay: 1h
fixed-delay: 1h
retention: 7d

elasticsearch:
defaults:
indice-prefix: "kestra_"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ CREATE TABLE queues (
key VARCHAR(250) NOT NULL,
value JSONB NOT NULL,
consumers queue_consumers[],
updated timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP
updated TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX queues_type__consumers ON queues (type, consumers, "offset");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ public void save(DSLContext dslContext, JdbcExecutorState jdbcExecutorState) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(jdbcExecutorState);
this.jdbcRepository.persist(jdbcExecutorState, dslContext, fields);
}

public void delete(Execution execution) {
this.jdbcRepository.delete(new JdbcExecutorState(execution.getId()));
}
}
65 changes: 65 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcCleaner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.kestra.jdbc.runner;

import io.kestra.core.queues.QueueException;
import io.kestra.jdbc.DSLContextWrapper;
import io.kestra.jdbc.JdbcConfiguration;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jooq.Record;
import org.jooq.Table;
import org.jooq.impl.DSL;

import java.time.Duration;
import java.time.ZonedDateTime;

@Singleton
@JdbcRunnerEnabled
@Slf4j
@Requires(property = "kestra.jdbc.cleaner")
public class JdbcCleaner {
private final DSLContextWrapper dslContextWrapper;
private final Configuration configuration;

protected final Table<Record> queueTable;

@Inject
public JdbcCleaner(ApplicationContext applicationContext) {
JdbcConfiguration jdbcConfiguration = applicationContext.getBean(JdbcConfiguration.class);

this.dslContextWrapper = applicationContext.getBean(DSLContextWrapper.class);
this.configuration = applicationContext.getBean(Configuration.class);

this.queueTable = DSL.table(jdbcConfiguration.tableConfig("queues").getTable());
}

public void deleteQueue() throws QueueException {
dslContextWrapper.transaction(configuration -> {
int deleted = DSL
.using(configuration)
.delete(this.queueTable)
.where(
DSL.field("updated")
.lessOrEqual(ZonedDateTime.now().minus(this.configuration.getRetention()).toOffsetDateTime())
)
.execute();
log.info("Cleaned {} records from {}", deleted, this.queueTable.getName());
});
}

@Scheduled(initialDelay = "${kestra.jdbc.cleaner.initial-delay}", fixedDelay = "${kestra.jdbc.cleaner.fixed-delay}")
public void report() {
deleteQueue();
}

@ConfigurationProperties("kestra.jdbc.cleaner")
@Getter
public static class Configuration {
Duration retention;
}
}
7 changes: 7 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public class JdbcExecutor implements ExecutorInterface {
@Inject
private AbstractExecutionDelayStorage abstractExecutionDelayStorage;

@Inject
private AbstractExecutorStateStorage executorStateStorage;

private List<Flow> allFlows;

@SneakyThrows
Expand Down Expand Up @@ -278,6 +281,10 @@ private void executionQueue(Execution message) {

if (result != null) {
this.toExecution(result);

if (executorService.canBePurged(result)) {
executorStateStorage.delete(result.getExecution());
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ public void close() throws IOException {
@ConfigurationProperties("kestra.jdbc.queues")
@Getter
public static class Configuration {
Duration minPollInterval;
Duration maxPollInterval;
Duration pollSwitchInterval;
Duration minPollInterval = Duration.ofMillis(100);
Duration maxPollInterval = Duration.ofMillis(500);
Duration pollSwitchInterval = Duration.ofSeconds(30);
}
}

0 comments on commit 90372ac

Please sign in to comment.