From 604c6a72281deddb8c5a84447868233e149de393 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Wed, 15 Jan 2025 16:57:30 +0100 Subject: [PATCH] feat(core, jdbc): change the state of a subflow restart parent execution --- .../core/queues/QueueFactoryInterface.java | 3 + .../io/kestra/core/queues/QueueService.java | 2 + .../kestra/core/runners/ExecutableUtils.java | 30 +++- .../java/io/kestra/core/runners/Executor.java | 11 ++ .../kestra/core/runners/ExecutorService.java | 22 ++- .../core/runners/SubflowExecutionEnd.java | 27 ++++ .../core/runners/AbstractRunnerTest.java | 37 ++++- .../core/runners/ChangeStateTestCase.java | 113 ++++++++++++++ .../core/runners/ExecutionServiceTest.java | 12 ++ .../io/kestra/core/runners/RestartTest.java | 53 ------- .../valids/subflow-parent-of-failed.yaml | 8 + .../io/kestra/runner/h2/H2QueueFactory.java | 8 + .../runner/h2/H2SubflowExecutionStorage.java | 16 -- .../h2/V1_29__subflow_execution_end.sql | 18 +++ .../h2/H2SubflowExecutionStorageTest.java | 7 - .../runner/mysql/MysqlQueueFactory.java | 8 + .../mysql/MysqlSubflowExecutionStorage.java | 16 -- .../mysql/V1_29__subflow_execution_end.sql | 18 +++ .../MysqlSubflowExecutionStorageTest.java | 7 - .../runner/postgres/PostgresQueueFactory.java | 8 + .../PostgresSubflowExecutionStorage.java | 15 -- .../postgres/V1_29__subflow_execution_end.sql | 1 + .../PostgresSubflowExecutionStorageTest.java | 17 --- .../kestra/jdbc/JdbcTableConfigsFactory.java | 6 - .../AbstractJdbcSubflowExecutionStorage.java | 73 --------- .../io/kestra/jdbc/runner/JdbcExecutor.java | 140 ++++++++++-------- .../runner/AbstractSubflowExecutionTest.java | 78 ---------- 27 files changed, 398 insertions(+), 356 deletions(-) create mode 100644 core/src/main/java/io/kestra/core/runners/SubflowExecutionEnd.java create mode 100644 core/src/test/java/io/kestra/core/runners/ChangeStateTestCase.java delete mode 100644 core/src/test/java/io/kestra/core/runners/RestartTest.java create mode 100644 core/src/test/resources/flows/valids/subflow-parent-of-failed.yaml delete mode 100644 jdbc-h2/src/main/java/io/kestra/runner/h2/H2SubflowExecutionStorage.java create mode 100644 jdbc-h2/src/main/resources/migrations/h2/V1_29__subflow_execution_end.sql delete mode 100644 jdbc-h2/src/test/java/io/kestra/runner/h2/H2SubflowExecutionStorageTest.java delete mode 100644 jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlSubflowExecutionStorage.java create mode 100644 jdbc-mysql/src/main/resources/migrations/mysql/V1_29__subflow_execution_end.sql delete mode 100644 jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlSubflowExecutionStorageTest.java delete mode 100644 jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorage.java create mode 100644 jdbc-postgres/src/main/resources/migrations/postgres/V1_29__subflow_execution_end.sql delete mode 100644 jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorageTest.java delete mode 100644 jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcSubflowExecutionStorage.java delete mode 100644 jdbc/src/test/java/io/kestra/jdbc/runner/AbstractSubflowExecutionTest.java diff --git a/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java b/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java index b6e57592b32..7f290b89d66 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java +++ b/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java @@ -26,6 +26,7 @@ public interface QueueFactoryInterface { String TRIGGER_NAMED = "triggerQueue"; String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue"; String CLUSTER_EVENT_NAMED = "clusterEventQueue"; + String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue"; QueueInterface execution(); @@ -58,4 +59,6 @@ public interface QueueFactoryInterface { WorkerTriggerResultQueueInterface workerTriggerResultQueue(); QueueInterface subflowExecutionResult(); + + QueueInterface subflowExecutionEnd(); } diff --git a/core/src/main/java/io/kestra/core/queues/QueueService.java b/core/src/main/java/io/kestra/core/queues/QueueService.java index 2a996e667eb..0349938b571 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueService.java +++ b/core/src/main/java/io/kestra/core/queues/QueueService.java @@ -29,6 +29,8 @@ public String key(Object object) { return null; } else if (object.getClass() == ExecutionRunning.class) { return ((ExecutionRunning) object).getExecution().getId(); + } else if (object.getClass() == SubflowExecutionEnd.class) { + return ((SubflowExecutionEnd) object).getExecutionId(); } else { throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'"); } diff --git a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java index 923f1bb2524..db15dd1a4c4 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java @@ -146,7 +146,8 @@ public static > Optional> "namespace", currentFlow.getNamespace(), "flowId", currentFlow.getId(), "flowRevision", currentFlow.getRevision(), - "taskRunId", currentTaskRun.getId() + "taskRunId", currentTaskRun.getId(), + "taskId", currentTaskRun.getTaskId() )); if (currentTaskRun.getValue() != null) { variables.put("taskRunValue", currentTaskRun.getValue()); @@ -185,6 +186,10 @@ private static List