From 63aab8f3fd56b107ae2a5a4af25cadc962e87f8a Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Thu, 2 Jun 2022 22:35:18 +0200 Subject: [PATCH] feat(jdbc): add support for executor delayed --- cli/src/main/resources/application.yml | 3 + .../io/kestra/core/utils/ExecutorsUtils.java | 10 ++ .../io/kestra/core/tasks/flows/PauseTest.java | 107 +++++++----- .../mysql/MysqlExecutionDelayStorage.java | 15 ++ .../migrations/mysql/V1__initial.sql | 22 ++- jdbc-mysql/src/test/resources/application.yml | 5 +- .../PostgresExecutionDelayStorage.java | 15 ++ .../kestra/runner/postgres/PostgresQueue.java | 8 +- .../migrations/postgres/V1__initial.sql | 12 +- .../src/test/resources/application.yml | 3 + .../kestra/jdbc/AbstractJdbcRepository.java | 16 +- .../runner/AbstractExecutionDelayStorage.java | 48 ++++++ .../io/kestra/jdbc/runner/JdbcExecutor.java | 158 ++++++++++++------ .../java/io/kestra/jdbc/runner/JdbcQueue.java | 25 ++- .../kestra/jdbc/runner/JdbcQueueIndexer.java | 2 - .../kestra/jdbc/runner/JdbcQueueService.java | 3 + .../io/kestra/jdbc/runner/JdbcRunnerTest.java | 18 +- 17 files changed, 349 insertions(+), 121 deletions(-) create mode 100644 jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlExecutionDelayStorage.java create mode 100644 jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresExecutionDelayStorage.java create mode 100644 jdbc/src/main/java/io/kestra/jdbc/runner/AbstractExecutionDelayStorage.java diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index 2e2caf1e50..29d47cf356 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -233,6 +233,9 @@ kestra: executorstate: table: "executorstate" cls: io.kestra.jdbc.runner.JdbcExecutorState + executordelayed: + table: "executordelayed" + cls: io.kestra.core.runners.ExecutionDelay elasticsearch: defaults: diff --git a/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java b/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java index 827cdea8f6..cba2c7e37f 100644 --- a/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java +++ b/core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java @@ -62,6 +62,15 @@ public ExecutorService singleThreadExecutor(String name) { ); } + public ExecutorService singleThreadScheduledExecutor(String name) { + return this.wrap( + name, + Executors.newSingleThreadScheduledExecutor( + threadFactoryBuilder.build(name + "_%d") + ) + ); + } + private ExecutorService wrap(String name, ExecutorService executorService) { return ExecutorServiceMetrics.monitor( meterRegistry, @@ -69,4 +78,5 @@ private ExecutorService wrap(String name, ExecutorService executorService) { name ); } + } diff --git a/core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java b/core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java index 2d00412e9a..d61455e50d 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java @@ -7,9 +7,11 @@ import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.runners.AbstractMemoryRunnerTest; +import io.kestra.core.runners.RunnerUtils; import io.kestra.core.services.ExecutionService; import jakarta.inject.Inject; import jakarta.inject.Named; +import jakarta.inject.Singleton; import org.junit.jupiter.api.Test; import java.time.Duration; @@ -19,57 +21,72 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -class PauseTest extends AbstractMemoryRunnerTest { +public class PauseTest extends AbstractMemoryRunnerTest { @Inject - ExecutionService executionService; - - @Inject - FlowRepositoryInterface flowRepository; - - @Inject - @Named(QueueFactoryInterface.EXECUTION_NAMED) - protected QueueInterface executionQueue; + Suite suite; @Test void run() throws Exception { - Flow flow = flowRepository.findById("io.kestra.tests", "pause").orElseThrow(); - Execution execution = runnerUtils.runOne("io.kestra.tests", "pause", Duration.ofSeconds(120)); - - assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED)); - assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED)); - assertThat(execution.getTaskRunList(), hasSize(1)); - - Execution restarted = executionService.markAs( - execution, - execution.findTaskRunByTaskIdAndValue("pause", List.of()).getId(), - State.Type.RUNNING - ); - - execution = runnerUtils.awaitExecution( - e -> e.getState().getCurrent() == State.Type.SUCCESS, - () -> executionQueue.emit(restarted), - Duration.ofSeconds(120) - ); - - assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); + suite.run(runnerUtils); } @Test - void runDelay() throws Exception { - Execution execution = runnerUtils.runOne("io.kestra.tests", "pause-delay"); - - assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED)); - assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED)); - assertThat(execution.getTaskRunList(), hasSize(1)); - - execution = runnerUtils.awaitExecution( - e -> e.getState().getCurrent() == State.Type.SUCCESS, - () -> {}, - Duration.ofSeconds(30) - ); - - assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L)); - assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(2L)); - assertThat(execution.getTaskRunList(), hasSize(3)); + void failed() throws Exception { + suite.runDelay(runnerUtils); + } + + @Singleton + public static class Suite { + @Inject + ExecutionService executionService; + + @Inject + FlowRepositoryInterface flowRepository; + + @Inject + @Named(QueueFactoryInterface.EXECUTION_NAMED) + protected QueueInterface executionQueue; + + public void run(RunnerUtils runnerUtils) throws Exception { + Flow flow = flowRepository.findById("io.kestra.tests", "pause").orElseThrow(); + Execution execution = runnerUtils.runOne("io.kestra.tests", "pause", Duration.ofSeconds(120)); + + assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED)); + assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED)); + assertThat(execution.getTaskRunList(), hasSize(1)); + + Execution restarted = executionService.markAs( + execution, + execution.findTaskRunByTaskIdAndValue("pause", List.of()).getId(), + State.Type.RUNNING + ); + + execution = runnerUtils.awaitExecution( + e -> e.getState().getCurrent() == State.Type.SUCCESS, + () -> executionQueue.emit(restarted), + Duration.ofSeconds(120) + ); + + assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); + } + + public void runDelay(RunnerUtils runnerUtils) throws Exception { + Execution execution = runnerUtils.runOne("io.kestra.tests", "pause-delay"); + + assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED)); + assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED)); + assertThat(execution.getTaskRunList(), hasSize(1)); + + execution = runnerUtils.awaitExecution( + e -> e.getState().getCurrent() == State.Type.SUCCESS, + () -> {}, + Duration.ofSeconds(30) + ); + + assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L)); + assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(2L)); + assertThat(execution.getTaskRunList(), hasSize(3)); + } } + } \ No newline at end of file diff --git a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlExecutionDelayStorage.java b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlExecutionDelayStorage.java new file mode 100644 index 0000000000..9bc71c0d8a --- /dev/null +++ b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlExecutionDelayStorage.java @@ -0,0 +1,15 @@ +package io.kestra.runner.mysql; + +import io.kestra.core.runners.ExecutionDelay; +import io.kestra.jdbc.runner.AbstractExecutionDelayStorage; +import io.kestra.repository.mysql.MysqlRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Singleton; + +@Singleton +@MysqlQueueEnabled +public class MysqlExecutionDelayStorage extends AbstractExecutionDelayStorage { + public MysqlExecutionDelayStorage(ApplicationContext applicationContext) { + super(new MysqlRepository<>(ExecutionDelay.class, applicationContext)); + } +} diff --git a/jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql b/jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql index bff67c6b6f..dbc222b1bb 100644 --- a/jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql +++ b/jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql @@ -197,8 +197,8 @@ CREATE TABLE multipleconditions ( ) ) ) STORED NOT NULL, - INDEX namespace__flow_id__condition_id (namespace, flow_id, condition_id), - INDEX start_date__end_date (start_date, end_date) + INDEX ix_namespace__flow_id__condition_id (namespace, flow_id, condition_id), + INDEX ix_start_date__end_date (start_date, end_date) ) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; @@ -212,3 +212,21 @@ CREATE TABLE executorstate ( `key` VARCHAR(250) NOT NULL PRIMARY KEY, `value` JSON NOT NULL ) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + + +CREATE TABLE executordelayed ( + `key` VARCHAR(250) NOT NULL PRIMARY KEY, + `value` JSON NOT NULL, + `date` DATETIME(6) GENERATED ALWAYS AS ( + IF( + SUBSTRING(value ->> '$.date', LENGTH(value ->> '$.date'), LENGTH(value ->> '$.date')) = 'Z', + STR_TO_DATE(value ->> '$.date', '%Y-%m-%dT%H:%i:%s.%fZ'), + CONVERT_TZ( + STR_TO_DATE(SUBSTRING(value ->> '$.date', 1, LENGTH(value ->> '$.date') - 6), '%Y-%m-%dT%H:%i:%s.%f'), + SUBSTRING(value ->> '$.date', LENGTH(value ->> '$.date') - 5, 5), + 'UTC' + ) + ) + ) STORED NOT NULL, + INDEX ix_date (`date`) +) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; diff --git a/jdbc-mysql/src/test/resources/application.yml b/jdbc-mysql/src/test/resources/application.yml index 8b3f96aa73..81124ce704 100644 --- a/jdbc-mysql/src/test/resources/application.yml +++ b/jdbc-mysql/src/test/resources/application.yml @@ -50,4 +50,7 @@ kestra: cls: io.kestra.core.runners.WorkerTaskExecution executorstate: table: "executorstate" - cls: io.kestra.jdbc.runner.JdbcExecutorState \ No newline at end of file + cls: io.kestra.jdbc.runner.JdbcExecutorState + executordelayed: + table: "executordelayed" + cls: io.kestra.core.runners.ExecutionDelay \ No newline at end of file diff --git a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresExecutionDelayStorage.java b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresExecutionDelayStorage.java new file mode 100644 index 0000000000..97fd44ef60 --- /dev/null +++ b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresExecutionDelayStorage.java @@ -0,0 +1,15 @@ +package io.kestra.runner.postgres; + +import io.kestra.core.runners.ExecutionDelay; +import io.kestra.jdbc.runner.AbstractExecutionDelayStorage; +import io.kestra.repository.postgres.PostgresRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Singleton; + +@Singleton +@PostgresQueueEnabled +public class PostgresExecutionDelayStorage extends AbstractExecutionDelayStorage { + public PostgresExecutionDelayStorage(ApplicationContext applicationContext) { + super(new PostgresRepository<>(ExecutionDelay.class, applicationContext)); + } +} diff --git a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueue.java b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueue.java index 7914ea4561..975ba7c30a 100644 --- a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueue.java +++ b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueue.java @@ -28,7 +28,7 @@ protected Map, Object> produceFields(String key, T message) { map.put( DSL.field(DSL.quotedName("type")), - DSL.field("CAST(? AS " + this.prefix + "queue_type)", this.cls.getName()) + DSL.field("CAST(? AS queue_type)", this.cls.getName()) ); return map; @@ -43,7 +43,7 @@ protected Result receiveFetch(DSLContext ctx, @NonNull Integer offset) { "FROM " + table.getName() + "\n" + "WHERE 1 = 1" + "\n" + (offset != 0 ? "AND \"offset\" > ?" + "\n" : "") + - "AND type = CAST(? AS " + this.prefix + "queue_type)" + "\n" + + "AND type = CAST(? AS queue_type)" + "\n" + "ORDER BY \"offset\" ASC" + "\n" + "LIMIT 10" + "\n" + "FOR UPDATE SKIP LOCKED", @@ -62,9 +62,9 @@ protected Result receiveFetch(DSLContext ctx, String consumerGroup) { "FROM " + table.getName() + "\n" + "WHERE (" + " \"consumers\" IS NULL" + "\n" + - " OR NOT(CAST(? AS " + this.prefix + "queue_consumers) = ANY(\"consumers\"))" + "\n" + + " OR NOT(CAST(? AS queue_consumers) = ANY(\"consumers\"))" + "\n" + ")" + "\n" + - "AND type = CAST(? AS " + this.prefix + "queue_type)" + "\n" + + "AND type = CAST(? AS queue_type)" + "\n" + "ORDER BY \"offset\" ASC" + "\n" + "LIMIT 10" + "\n" + "FOR UPDATE SKIP LOCKED", diff --git a/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql b/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql index 7c05198d63..9c5803d2a9 100644 --- a/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql +++ b/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql @@ -242,4 +242,14 @@ CREATE TABLE workertaskexecutions ( CREATE TABLE executorstate ( key VARCHAR(250) NOT NULL PRIMARY KEY, value JSONB NOT NULL -); \ No newline at end of file +); + + +/* ----------------------- executorstate ----------------------- */ +CREATE TABLE executordelayed ( + key VARCHAR(250) NOT NULL PRIMARY KEY, + value JSONB NOT NULL, + date TIMESTAMPTZ NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'date')) STORED +); + +CREATE INDEX executordelayed_date ON executordelayed (date); diff --git a/jdbc-postgres/src/test/resources/application.yml b/jdbc-postgres/src/test/resources/application.yml index 972f1a0b55..f59f9ca0b2 100644 --- a/jdbc-postgres/src/test/resources/application.yml +++ b/jdbc-postgres/src/test/resources/application.yml @@ -51,3 +51,6 @@ kestra: executorstate: table: "executorstate" cls: io.kestra.jdbc.runner.JdbcExecutorState + executordelayed: + table: "executordelayed" + cls: io.kestra.core.runners.ExecutionDelay \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java b/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java index df04115ec5..1c6f156dbe 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java @@ -87,12 +87,16 @@ public void persist(T entity, DSLContext dslContext, Map, Object> } public void delete(T entity) { - dslContext.transaction(configuration -> - DSL.using(configuration) - .delete(table) - .where(DSL.field(DSL.quotedName("key")).eq(key(entity))) - .execute() - ); + dslContext.transaction(configuration -> { + this.delete(DSL.using(configuration), entity); + }); + } + + public void delete(DSLContext dslContext, T entity) { + dslContext + .delete(table) + .where(DSL.field(DSL.quotedName("key")).eq(key(entity))) + .execute(); } public T map(R record) { diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractExecutionDelayStorage.java b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractExecutionDelayStorage.java new file mode 100644 index 0000000000..1749dcc125 --- /dev/null +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractExecutionDelayStorage.java @@ -0,0 +1,48 @@ +package io.kestra.jdbc.runner; + +import io.kestra.core.runners.ExecutionDelay; +import io.kestra.jdbc.AbstractJdbcRepository; +import io.kestra.jdbc.repository.AbstractRepository; +import org.jooq.Field; +import org.jooq.Record1; +import org.jooq.SelectConditionStep; +import org.jooq.impl.DSL; + +import java.time.ZonedDateTime; +import java.util.Map; +import java.util.function.Consumer; + +public abstract class AbstractExecutionDelayStorage extends AbstractRepository { + protected AbstractJdbcRepository jdbcRepository; + + public AbstractExecutionDelayStorage(AbstractJdbcRepository jdbcRepository) { + this.jdbcRepository = jdbcRepository; + } + + public void get(Consumer consumer) { + ZonedDateTime now = ZonedDateTime.now(); + + this.jdbcRepository + .getDslContext() + .transaction(configuration -> { + SelectConditionStep> select = DSL + .using(configuration) + .select(DSL.field("value")) + .from(this.jdbcRepository.getTable()) + .where( + DSL.field("date").lessOrEqual(now.toOffsetDateTime()) + ); + + this.jdbcRepository.fetch(select) + .forEach(executionDelay -> { + consumer.accept(executionDelay); + jdbcRepository.delete(executionDelay); + }); + }); + } + + public void save(ExecutionDelay executionDelay) { + Map, Object> fields = this.jdbcRepository.persistFields(executionDelay); + this.jdbcRepository.persist(executionDelay, fields); + } +} diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index a9ccaa96a2..c0cf987050 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -11,14 +11,14 @@ import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.runners.*; -import io.kestra.core.services.ConditionService; -import io.kestra.core.services.FlowListenersInterface; -import io.kestra.core.services.FlowService; -import io.kestra.core.services.TaskDefaultService; +import io.kestra.core.runners.Executor; +import io.kestra.core.runners.ExecutorService; +import io.kestra.core.services.*; import io.kestra.core.tasks.flows.Template; import io.kestra.core.utils.Await; import io.kestra.jdbc.repository.AbstractExecutionRepository; import io.micronaut.context.ApplicationContext; +import io.micronaut.transaction.exceptions.CannotCreateTransactionException; import jakarta.inject.Inject; import jakarta.inject.Named; import jakarta.inject.Singleton; @@ -30,12 +30,17 @@ import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.concurrent.*; import java.util.stream.Collectors; @Singleton @JdbcRunnerEnabled @Slf4j public class JdbcExecutor implements ExecutorInterface { + private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor(); + + private Boolean isShutdown = false; + @Inject private ApplicationContext applicationContext; @@ -91,6 +96,12 @@ public class JdbcExecutor implements ExecutorInterface { @Inject private AbstractWorkerTaskExecutionStorage workerTaskExecutionStorage; + @Inject + private ExecutionService executionService; + + @Inject + private AbstractExecutionDelayStorage abstractExecutionDelayStorage; + private List allFlows; @SneakyThrows @@ -105,10 +116,38 @@ public void run() { this.executionQueue.receive(Executor.class, this::executionQueue); this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue); + + ScheduledFuture handle = schedulerDelay.scheduleAtFixedRate( + this::executionDelaySend, + 0, + 1, + TimeUnit.SECONDS + ); + + // look at exception on the main thread + Thread schedulerDelayThread = new Thread( + () -> { + Await.until(handle::isDone); + + try { + handle.get(); + } catch (ExecutionException | InterruptedException e) { + if (e.getCause().getClass() != CannotCreateTransactionException.class) { + log.error("Executor fatal exception", e); + + applicationContext.close(); + Runtime.getRuntime().exit(1); + } + } + }, + "jdbc-delay" + ); + + schedulerDelayThread.start(); } private void executionQueue(Execution message) { - executionRepository.lock(message.getId(), pair -> { + Executor result = executionRepository.lock(message.getId(), pair -> { Execution execution = pair.getLeft(); JdbcExecutorState executorState = pair.getRight(); @@ -128,14 +167,6 @@ private void executionQueue(Execution message) { ); } - if (executor.getException() != null) { - toExecution( - handleFailedExecutionFromExecutor(executor, executor.getException()) - ); - } else if (executor.isExecutionUpdated()) { - toExecution(executor); - } - // worker task if (executor.getWorkerTasks().size() > 0) { List workerTasksDedup = executor @@ -164,39 +195,11 @@ private void executionQueue(Execution message) { .forEach(workerTaskResultQueue::emit); } - -// // schedulerDelay -// if (executor.getExecutionDelays().size() > 0) { -// executor.getExecutionDelays() -// .forEach(workerTaskResultDelay -> { -// long between = ChronoUnit.MICROS.between(Instant.now(), workerTaskResultDelay.getDate()); -// -// if (between <= 0) { -// between = 1; -// } -// -// schedulerDelay.schedule( -// () -> { -// try { -// ExecutionState executionState = EXECUTIONS.get(workerTaskResultDelay.getExecutionId()); -// -// Execution markAsExecution = executionService.markAs( -// executionState.execution, -// workerTaskResultDelay.getTaskRunId(), -// State.Type.RUNNING -// ); -// -// executionQueue.emit(markAsExecution); -// } catch (Exception e) { -// throw new RuntimeException(e); -// } -// }, -// between, -// TimeUnit.MICROSECONDS -// ); -// }); -// } - + // schedulerDelay + if (executor.getExecutionDelays().size() > 0) { + executor.getExecutionDelays() + .forEach(executionDelay -> abstractExecutionDelayStorage.save(executionDelay)); + } // worker task execution watchers if (executor.getWorkerTaskExecutions().size() > 0) { @@ -206,10 +209,14 @@ private void executionQueue(Execution message) { .getWorkerTaskExecutions() .forEach(workerTaskExecution -> { String log = "Create new execution for flow '" + - workerTaskExecution.getExecution().getNamespace() + "'." + workerTaskExecution.getExecution().getFlowId() + - "' with id '" + workerTaskExecution.getExecution().getId() + "' from task '" + workerTaskExecution.getTask().getId() + + workerTaskExecution.getExecution() + .getNamespace() + "'." + workerTaskExecution.getExecution().getFlowId() + + "' with id '" + workerTaskExecution.getExecution() + .getId() + "' from task '" + workerTaskExecution.getTask().getId() + "' and taskrun '" + workerTaskExecution.getTaskRun().getId() + - (workerTaskExecution.getTaskRun().getValue() != null ? " (" + workerTaskExecution.getTaskRun().getValue() + ")" : "") + "'"; + (workerTaskExecution.getTaskRun() + .getValue() != null ? " (" + workerTaskExecution.getTaskRun() + .getValue() + ")" : "") + "'"; JdbcExecutor.log.info(log); @@ -268,6 +275,10 @@ private void executionQueue(Execution message) { executorState ); }); + + if (result != null) { + this.toExecution(result); + } } @@ -330,6 +341,19 @@ private void workerTaskResultQueue(WorkerTaskResult message) { } private void toExecution(Executor executor) { + boolean shouldSend = false; + + if (executor.getException() != null) { + executor = handleFailedExecutionFromExecutor(executor, executor.getException()); + shouldSend = true; + } else if (executor.isExecutionUpdated()) { + shouldSend = true; + } + + if (!shouldSend) { + return; + } + if (log.isDebugEnabled()) { executorService.log(log, false, executor); } @@ -343,7 +367,6 @@ private void toExecution(Executor executor) { } } - private Flow transform(Flow flow, Execution execution) { try { flow = Template.injectTemplate( @@ -358,6 +381,39 @@ private Flow transform(Flow flow, Execution execution) { return taskDefaultService.injectDefaults(flow, execution); } + private void executionDelaySend() { + if (isShutdown) { + return; + } + + abstractExecutionDelayStorage.get(executionDelay -> { + Executor result = executionRepository.lock(executionDelay.getExecutionId(), pair -> { + Executor executor = new Executor(pair.getLeft(), null); + + try { + Execution markAsExecution = executionService.markAs( + pair.getKey(), + executionDelay.getTaskRunId(), + State.Type.RUNNING + ); + + executor = executor.withExecution(markAsExecution, "pausedRestart"); + } catch (Exception e) { + executor = handleFailedExecutionFromExecutor(executor, e); + } + + return Pair.of( + executor, + pair.getRight() + ); + }); + + if (result != null) { + this.toExecution(result); + } + }); + } + private boolean deduplicateNexts(Execution execution, JdbcExecutorState executorState, List taskRuns) { return taskRuns .stream() @@ -414,6 +470,8 @@ private Executor handleFailedExecutionFromExecutor(Executor executor, Exception @Override public void close() throws IOException { + isShutdown = true; + schedulerDelay.shutdown(); executionQueue.close(); workerTaskQueue.close(); workerTaskResultQueue.close(); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java index c62802b728..63aa03810c 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java @@ -12,6 +12,7 @@ import io.kestra.core.utils.IdUtils; import io.kestra.jdbc.JdbcConfiguration; import io.micronaut.context.ApplicationContext; +import io.micronaut.transaction.exceptions.CannotCreateTransactionException; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.jooq.*; @@ -25,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import javax.sql.DataSource; @Slf4j public abstract class JdbcQueue implements QueueInterface { @@ -38,12 +40,14 @@ public abstract class JdbcQueue implements QueueInterface { protected final DSLContext dslContext; - protected final Table table; + protected final DataSource dataSource; - protected final String prefix; + protected final Table table; protected final JdbcQueueIndexer jdbcQueueIndexer; + protected Boolean isShutdown = false; + public JdbcQueue(Class cls, ApplicationContext applicationContext) { if (poolExecutor == null) { ExecutorsUtils executorsUtils = applicationContext.getBean(ExecutorsUtils.class); @@ -53,11 +57,11 @@ public JdbcQueue(Class cls, ApplicationContext applicationContext) { this.queueService = applicationContext.getBean(QueueService.class); this.cls = cls; this.dslContext = applicationContext.getBean(DSLContext.class); + this.dataSource = applicationContext.getBean(DataSource.class); JdbcConfiguration jdbcConfiguration = applicationContext.getBean(JdbcConfiguration.class); this.table = DSL.table(jdbcConfiguration.tableConfig("queues").getTable()); - this.prefix = ""; // @TODO this.jdbcQueueIndexer = applicationContext.getBean(JdbcQueueIndexer.class); } @@ -178,12 +182,19 @@ public Runnable receive(Class consumerGroup, Consumer consumer) { }); } + @SuppressWarnings("BusyWait") private Runnable poll(Runnable runnable) { AtomicBoolean running = new AtomicBoolean(true); poolExecutor.execute(() -> { - while (running.get()) { - runnable.run(); + while (running.get() && !this.isShutdown) { + try { + runnable.run(); + } catch (CannotCreateTransactionException e) { + if (log.isDebugEnabled()) { + log.debug("Can't poll on receive", e); + } + } try { Thread.sleep(500); @@ -213,10 +224,12 @@ private void send(Result fetch, Consumer consumer) { @Override public void pause() { - // @TODO + this.isShutdown = true; } @Override public void close() throws IOException { + this.isShutdown = true; + poolExecutor.shutdown(); } } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexer.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexer.java index 594a423af3..44af37776a 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexer.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexer.java @@ -22,8 +22,6 @@ public class JdbcQueueIndexer { public JdbcQueueIndexer(ApplicationContext applicationContext) { applicationContext.getBeansOfType(JdbcIndexerInterface.class) .forEach(saveRepositoryInterface -> { -// Class genericInterfaces = (Class) saveRepositoryInterface.getClass().getGenericInterfaces()[0]; -// String typeName = ((ParameterizedType) genericInterfaces.getGenericInterfaces()[0]).getActualTypeArguments()[0].getTypeName(); String typeName = ((ParameterizedType) ((Class) saveRepositoryInterface.getClass() .getGenericSuperclass()).getGenericInterfaces()[1]).getActualTypeArguments()[0].getTypeName(); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueService.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueService.java index 977f03801c..3c4cf17b62 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueService.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueService.java @@ -1,6 +1,7 @@ package io.kestra.jdbc.runner; import io.kestra.core.queues.QueueService; +import io.kestra.core.runners.ExecutionDelay; import io.micronaut.context.annotation.Replaces; import jakarta.inject.Singleton; @@ -10,6 +11,8 @@ public class JdbcQueueService extends QueueService{ public String key(Object object) { if (object.getClass() == JdbcExecutorState.class) { return ((JdbcExecutorState) object).getExecutionId(); + } else if (object.getClass() == ExecutionDelay.class) { + return ((ExecutionDelay) object).getExecutionId(); } return super.key(object); diff --git a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java index dc7ac061b4..6692633e58 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java @@ -8,10 +8,7 @@ import io.kestra.core.repositories.LocalFlowRepositoryLoader; import io.kestra.core.repositories.TemplateRepositoryInterface; import io.kestra.core.runners.*; -import io.kestra.core.tasks.flows.EachSequentialTest; -import io.kestra.core.tasks.flows.FlowCaseTest; -import io.kestra.core.tasks.flows.TemplateTest; -import io.kestra.core.tasks.flows.WorkerTest; +import io.kestra.core.tasks.flows.*; import io.kestra.core.utils.TestsUtils; import io.kestra.jdbc.JdbcTestUtils; import io.micronaut.test.extensions.junit5.annotation.MicronautTest; @@ -68,6 +65,9 @@ public abstract class JdbcRunnerTest { @Inject private WorkerTest.Suite workerTest; + @Inject + private PauseTest.Suite pauseTest; + @BeforeEach void init() throws IOException, URISyntaxException { jdbcTestUtils.drop(); @@ -212,4 +212,14 @@ public void workerFailed() throws Exception { public void workerEach() throws Exception { workerTest.each(runnerUtils); } + + @Test + public void pauseRun() throws Exception { + pauseTest.run(runnerUtils); + } + + @Test + public void pauseRunDelay() throws Exception { + pauseTest.runDelay(runnerUtils); + } }