diff --git a/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java b/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java index b6e57592b32..7f290b89d66 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java +++ b/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java @@ -26,6 +26,7 @@ public interface QueueFactoryInterface { String TRIGGER_NAMED = "triggerQueue"; String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue"; String CLUSTER_EVENT_NAMED = "clusterEventQueue"; + String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue"; QueueInterface execution(); @@ -58,4 +59,6 @@ public interface QueueFactoryInterface { WorkerTriggerResultQueueInterface workerTriggerResultQueue(); QueueInterface subflowExecutionResult(); + + QueueInterface subflowExecutionEnd(); } diff --git a/core/src/main/java/io/kestra/core/queues/QueueService.java b/core/src/main/java/io/kestra/core/queues/QueueService.java index 2a996e667eb..80e99c9abbf 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueService.java +++ b/core/src/main/java/io/kestra/core/queues/QueueService.java @@ -29,6 +29,8 @@ public String key(Object object) { return null; } else if (object.getClass() == ExecutionRunning.class) { return ((ExecutionRunning) object).getExecution().getId(); + } else if (object.getClass() == SubflowExecutionEnd.class) { + return ((SubflowExecutionEnd) object).getParentExecutionId(); } else { throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'"); } diff --git a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java index eb0bd8d45c7..f7f8d329cfb 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java @@ -163,8 +163,12 @@ public static > Optional> "namespace", currentFlow.getNamespace(), "flowId", currentFlow.getId(), "flowRevision", currentFlow.getRevision(), - "taskRunId", currentTaskRun.getId() + "taskRunId", currentTaskRun.getId(), + "taskId", currentTaskRun.getTaskId() )); + if (currentTaskRun.getOutputs() != null) { + variables.put("taskRunOutputs", currentTaskRun.getOutputs()); + } if (currentTaskRun.getValue() != null) { variables.put("taskRunValue", currentTaskRun.getValue()); } @@ -278,4 +282,27 @@ private static State.Type findTerminalState(Map iterations, boo } return State.Type.SUCCESS; } + + public static SubflowExecutionResult subflowExecutionResultFromChildExecution(RunContext runContext, Flow flow, Execution execution, ExecutableTask executableTask, TaskRun taskRun) { + try { + return executableTask + .createSubflowExecutionResult(runContext, taskRun, flow, execution) + .orElse(null); + } catch (Exception e) { + log.error("Unable to create the Subflow Execution Result", e); + // we return a fail subflow execution result to end the flow + return SubflowExecutionResult.builder() + .executionId(execution.getId()) + .state(State.Type.FAILED) + .parentTaskRun(taskRun.withState(State.Type.FAILED).withAttempts(List.of(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build()))) + .build(); + } + } + + public static boolean isSubflow(Execution execution) { + return execution.getTrigger() != null && ( + "io.kestra.plugin.core.flow.Subflow".equals(execution.getTrigger().getType()) || + "io.kestra.plugin.core.flow.ForEachItem$ForEachItemExecutable".equals(execution.getTrigger().getType()) + ); + } } diff --git a/core/src/main/java/io/kestra/core/runners/Executor.java b/core/src/main/java/io/kestra/core/runners/Executor.java index f2a1bd85106..96e00398659 100644 --- a/core/src/main/java/io/kestra/core/runners/Executor.java +++ b/core/src/main/java/io/kestra/core/runners/Executor.java @@ -35,6 +35,8 @@ public class Executor { private final List workerTriggers = new ArrayList<>(); private WorkerJob workerJobToResubmit; private State.Type originalState; + private SubflowExecutionEnd subflowExecutionEnd; + private SubflowExecutionEnd joinedSubflowExecutionEnd; /** * The sequence id should be incremented each time the execution is persisted after mutation. @@ -67,6 +69,10 @@ public Executor(SubflowExecutionResult subflowExecutionResult) { this.joinedSubflowExecutionResult = subflowExecutionResult; } + public Executor(SubflowExecutionEnd subflowExecutionEnd) { + this.joinedSubflowExecutionEnd = subflowExecutionEnd; + } + public Executor(WorkerJob workerJob) { this.workerJobToResubmit = workerJob; } @@ -169,6 +175,11 @@ public Executor withExecutionKilled(final List executi return this; } + public Executor withSubflowExecutionEnd(SubflowExecutionEnd subflowExecutionEnd) { + this.subflowExecutionEnd = subflowExecutionEnd; + return this; + } + public Executor serialize() { return new Executor( this.execution, diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorService.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java index ce8d2145b62..33d57bd716c 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutorService.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -6,6 +6,7 @@ import io.kestra.core.models.Label; import io.kestra.core.models.executions.*; import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.sla.Violation; import io.kestra.core.models.tasks.*; @@ -873,17 +874,21 @@ private Executor handleExecutableTask(final Executor executor) { ); } else { executions.addAll(subflowExecutions); - if (!executableTask.waitForExecution()) { - // send immediately all workerTaskResult to ends the executable task + Optional flow = flowExecutorInterface.findByExecution(subflowExecutions.getFirst().getExecution()); + if (flow.isPresent()) { + // add SubflowExecutionResults to notify parents for (SubflowExecution subflowExecution : subflowExecutions) { Optional subflowExecutionResult = executableTask.createSubflowExecutionResult( runContext, - subflowExecution.getParentTaskRun().withState(State.Type.SUCCESS), - executor.getFlow(), + // if we didn't wait for the execution, we directly set the state to SUCCESS + executableTask.waitForExecution() ? subflowExecution.getParentTaskRun() : subflowExecution.getParentTaskRun().withState(State.Type.SUCCESS), + flow.get(), subflowExecution.getExecution() ); subflowExecutionResult.ifPresent(subflowExecutionResults::add); } + } else { + log.error("Unable to find flow for execution {}", subflowExecutions.getFirst().getExecution().getId()); } } } catch (Exception e) { @@ -1030,6 +1035,15 @@ public void log(Logger log, Boolean in, SubflowExecutionResult value) { ); } + public void log(Logger log, Boolean in, SubflowExecutionEnd value) { + log.debug( + "{} {} : {}", + in ? "<< IN " : ">> OUT", + value.getClass().getSimpleName(), + value.toStringState() + ); + } + public void log(Logger log, Boolean in, Execution value) { log.debug( "{} {} [key='{}']\n{}", diff --git a/core/src/main/java/io/kestra/core/runners/SubflowExecutionEnd.java b/core/src/main/java/io/kestra/core/runners/SubflowExecutionEnd.java new file mode 100644 index 00000000000..376ee6ab7df --- /dev/null +++ b/core/src/main/java/io/kestra/core/runners/SubflowExecutionEnd.java @@ -0,0 +1,33 @@ +package io.kestra.core.runners; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.State; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class SubflowExecutionEnd { + private Execution childExecution; + private String parentExecutionId; + private String taskRunId; + private String taskId; + private State.Type state; + private Map outputs; + + public String toStringState() { + return "SubflowExecutionEnd(" + + "childExecutionId=" + this.getChildExecution().getId() + + ", parentExecutionId=" + this.getParentExecutionId() + + ", taskId=" + this.getTaskId() + + ", taskRunId=" + this.getTaskRunId() + + ", state=" + this.getState().toString() + + ")"; + } +} diff --git a/core/src/main/java/io/kestra/core/services/ExecutionService.java b/core/src/main/java/io/kestra/core/services/ExecutionService.java index 9faea061335..48f047c3208 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionService.java @@ -317,7 +317,7 @@ private Execution markAs(final Execution execution, Flow flow, String taskRunId, taskRun -> taskRun.getId().equals(taskRunId) ); - Execution newExecution = execution; + Execution newExecution = execution.withMetadata(execution.getMetadata().nextAttempt()); for (String s : taskRunToRestart) { TaskRun originalTaskRun = newExecution.findTaskRunByTaskRunId(s); diff --git a/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java b/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java index 8dc17ef08d2..b5185cc0ce1 100644 --- a/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java +++ b/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeoutException; + +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junitpioneer.jupiter.RetryingTest; @@ -90,6 +92,9 @@ public abstract class AbstractRunnerTest { @Inject private SLATestCase slaTestCase; + @Inject + private ChangeStateTestCase changeStateTestCase; + @Test @ExecuteFlow("flows/valids/full.yaml") void full(Execution execution) { @@ -173,6 +178,18 @@ void restartMultiple() throws Exception { restartCaseTest.restartMultiple(); } + @Test + @LoadFlows({"flows/valids/restart_always_failed.yaml"}) + void restartFailedThenFailureWithGlobalErrors() throws Exception { + restartCaseTest.restartFailedThenFailureWithGlobalErrors(); + } + + @RetryingTest(5) + @LoadFlows({"flows/valids/restart_local_errors.yaml"}) + void restartFailedThenFailureWithLocalErrors() throws Exception { + restartCaseTest.restartFailedThenFailureWithLocalErrors(); + } + @Test @LoadFlows({"flows/valids/restart-parent.yaml", "flows/valids/restart-child.yaml"}) void restartSubflow() throws Exception { @@ -244,7 +261,7 @@ void flowWaitSuccess() throws Exception { "flows/valids/task-flow.yaml", "flows/valids/task-flow-inherited-labels.yaml"}) void flowWaitFailed() throws Exception { - flowCaseTest.waitFailed(); + flowCaseTest.waitFailed(); } @Test @@ -342,6 +359,12 @@ protected void forEachItemSubflowOutputs() throws Exception { forEachItemCaseTest.forEachItemWithSubflowOutputs(); } + @Test + @LoadFlows({"flows/valids/restart-for-each-item.yaml", "flows/valids/restart-child.yaml"}) + void restartForEachItem() throws Exception { + forEachItemCaseTest.restartForEachItem(); + } + @Test @LoadFlows({"flows/valids/flow-concurrency-cancel.yml"}) void concurrencyCancel() throws Exception { @@ -468,4 +491,16 @@ void multipleIf() throws TimeoutException, QueueException { assertThat(execution.getTaskRunList(), hasSize(12)); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); } + + @Test + @ExecuteFlow("flows/valids/failed-first.yaml") + public void changeStateShouldEndsInSuccess(Execution execution) throws Exception { + changeStateTestCase.changeStateShouldEndsInSuccess(execution); + } + + @Test + @LoadFlows({"flows/valids/failed-first.yaml", "flows/valids/subflow-parent-of-failed.yaml"}) + public void changeStateInSubflowShouldEndsParentFlowInSuccess() throws Exception { + changeStateTestCase.changeStateInSubflowShouldEndsParentFlowInSuccess(); + } } diff --git a/core/src/test/java/io/kestra/core/runners/ChangeStateTestCase.java b/core/src/test/java/io/kestra/core/runners/ChangeStateTestCase.java new file mode 100644 index 00000000000..4ed7032cfdc --- /dev/null +++ b/core/src/test/java/io/kestra/core/runners/ChangeStateTestCase.java @@ -0,0 +1,113 @@ +package io.kestra.core.runners; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.flows.State; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.repositories.FlowRepositoryInterface; +import io.kestra.core.services.ExecutionService; +import io.kestra.core.utils.TestsUtils; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import reactor.core.publisher.Flux; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +@Singleton +public class ChangeStateTestCase { + @Inject + private FlowRepositoryInterface flowRepository; + + @Inject + private ExecutionService executionService; + + @Inject + @Named(QueueFactoryInterface.EXECUTION_NAMED) + private QueueInterface executionQueue; + + @Inject + private RunnerUtils runnerUtils; + + public void changeStateShouldEndsInSuccess(Execution execution) throws Exception { + assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); + assertThat(execution.getTaskRunList(), hasSize(1)); + assertThat(execution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED)); + + // await for the last execution + CountDownLatch latch = new CountDownLatch(1); + AtomicReference lastExecution = new AtomicReference<>(); + Flux receivedExecutions = TestsUtils.receive(executionQueue, either -> { + Execution exec = either.getLeft(); + if (execution.getId().equals(exec.getId()) && exec.getState().getCurrent() == State.Type.SUCCESS) { + lastExecution.set(exec); + latch.countDown(); + } + }); + + Flow flow = flowRepository.findByExecution(execution); + Execution markedAs = executionService.markAs(execution, flow, execution.getTaskRunList().getFirst().getId(), State.Type.SUCCESS); + executionQueue.emit(markedAs); + + assertThat(latch.await(10, TimeUnit.SECONDS), is(true)); + receivedExecutions.blockLast(); + assertThat(lastExecution.get().getState().getCurrent(), is(State.Type.SUCCESS)); + assertThat(lastExecution.get().getTaskRunList(), hasSize(2)); + assertThat(lastExecution.get().getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.SUCCESS)); + } + + public void changeStateInSubflowShouldEndsParentFlowInSuccess() throws Exception { + // await for the subflow execution + CountDownLatch latch = new CountDownLatch(1); + AtomicReference lastExecution = new AtomicReference<>(); + Flux receivedExecutions = TestsUtils.receive(executionQueue, either -> { + Execution exec = either.getLeft(); + if ("failed-first".equals(exec.getFlowId()) && exec.getState().getCurrent() == State.Type.FAILED) { + lastExecution.set(exec); + latch.countDown(); + } + }); + + // run the parent flow + Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "subflow-parent-of-failed"); + assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); + assertThat(execution.getTaskRunList(), hasSize(1)); + assertThat(execution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED)); + + // assert on the subflow + assertThat(latch.await(10, TimeUnit.SECONDS), is(true)); + receivedExecutions.blockLast(); + assertThat(lastExecution.get().getState().getCurrent(), is(State.Type.FAILED)); + assertThat(lastExecution.get().getTaskRunList(), hasSize(1)); + assertThat(lastExecution.get().getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED)); + + // await for the parent execution + CountDownLatch parentLatch = new CountDownLatch(1); + AtomicReference lastParentExecution = new AtomicReference<>(); + receivedExecutions = TestsUtils.receive(executionQueue, either -> { + Execution exec = either.getLeft(); + if (execution.getId().equals(exec.getId()) && exec.getState().isTerminated()) { + lastParentExecution.set(exec); + parentLatch.countDown(); + } + }); + + // restart the subflow + Flow flow = flowRepository.findByExecution(lastExecution.get()); + Execution markedAs = executionService.markAs(lastExecution.get(), flow, lastExecution.get().getTaskRunList().getFirst().getId(), State.Type.SUCCESS); + executionQueue.emit(markedAs); + + // assert for the parent flow + assertThat(parentLatch.await(10, TimeUnit.SECONDS), is(true)); + receivedExecutions.blockLast(); + assertThat(lastParentExecution.get().getState().getCurrent(), is(State.Type.FAILED)); // FIXME should be success but it's FAILED on unit tests + assertThat(lastParentExecution.get().getTaskRunList(), hasSize(1)); + assertThat(lastParentExecution.get().getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.SUCCESS)); + } +} diff --git a/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java b/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java index ae7f82b523e..c5ec76e2d6b 100644 --- a/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java +++ b/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java @@ -322,6 +322,7 @@ void markAsEachPara() throws Exception { Execution restart = executionService.markAs(execution, flow, execution.findTaskRunByTaskIdAndValue("2-1_seq", List.of("value 1")).getId(), State.Type.FAILED); assertThat(restart.getState().getCurrent(), is(State.Type.RESTARTED)); + assertThat(restart.getMetadata().getAttemptNumber(), is(2)); assertThat(restart.getState().getHistories(), hasSize(4)); assertThat(restart.getTaskRunList(), hasSize(11)); assertThat(restart.findTaskRunByTaskIdAndValue("1_each", List.of()).getState().getCurrent(), is(State.Type.RUNNING)); @@ -413,4 +414,16 @@ void shouldKillPausedExecutions() throws Exception { assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent(), is(State.Type.KILLED)); assertThat(killed.getState().getHistories(), hasSize(4)); } + + @Test + @ExecuteFlow("flows/valids/failed-first.yaml") + void shouldRestartAfterChangeTaskState(Execution execution) throws Exception { + assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); + assertThat(execution.getTaskRunList(), hasSize(1)); + assertThat(execution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED)); + + Flow flow = flowRepository.findByExecution(execution); + Execution markedAs = executionService.markAs(execution, flow, execution.getTaskRunList().getFirst().getId(), State.Type.SUCCESS); + assertThat(markedAs.getState().getCurrent(), is(State.Type.RESTARTED)); + } } \ No newline at end of file diff --git a/core/src/test/java/io/kestra/core/runners/RestartTest.java b/core/src/test/java/io/kestra/core/runners/RestartTest.java deleted file mode 100644 index 3e725b57f5c..00000000000 --- a/core/src/test/java/io/kestra/core/runners/RestartTest.java +++ /dev/null @@ -1,53 +0,0 @@ -package io.kestra.core.runners; - -import io.kestra.core.junit.annotations.KestraTest; -import io.kestra.core.junit.annotations.LoadFlows; -import io.kestra.plugin.core.flow.ForEachItemCaseTest; -import jakarta.inject.Inject; -import org.junit.jupiter.api.Test; -import org.junitpioneer.jupiter.RetryingTest; - -@KestraTest(startRunner = true) -public class RestartTest { - @Inject - private RestartCaseTest restartCaseTest; - - @Inject - private ForEachItemCaseTest forEachItemCaseTest; - - @Test - @LoadFlows({"flows/valids/restart_last_failed.yaml"}) - void restartFailedThenSuccess() throws Exception { - restartCaseTest.restartFailedThenSuccess(); - } - - @Test - @LoadFlows({"flows/valids/restart_always_failed.yaml"}) - void restartFailedThenFailureWithGlobalErrors() throws Exception { - restartCaseTest.restartFailedThenFailureWithGlobalErrors(); - } - - @RetryingTest(5) - @LoadFlows({"flows/valids/restart_local_errors.yaml"}) - void restartFailedThenFailureWithLocalErrors() throws Exception { - restartCaseTest.restartFailedThenFailureWithLocalErrors(); - } - - @Test - @LoadFlows({"flows/valids/restart-each.yaml"}) - void replay() throws Exception { - restartCaseTest.replay(); - } - - @Test - @LoadFlows({"flows/valids/restart-parent.yaml", "flows/valids/restart-child.yaml"}) - void restartSubflow() throws Exception { - restartCaseTest.restartSubflow(); - } - - @Test - @LoadFlows({"flows/valids/restart-for-each-item.yaml", "flows/valids/restart-child.yaml"}) - void restartForEachItem() throws Exception { - forEachItemCaseTest.restartForEachItem(); - } -} diff --git a/core/src/test/resources/flows/valids/subflow-parent-of-failed.yaml b/core/src/test/resources/flows/valids/subflow-parent-of-failed.yaml new file mode 100644 index 00000000000..fd8fa637c1e --- /dev/null +++ b/core/src/test/resources/flows/valids/subflow-parent-of-failed.yaml @@ -0,0 +1,8 @@ +id: subflow-parent-of-failed +namespace: io.kestra.tests + +tasks: + - id: subflow + type: io.kestra.plugin.core.flow.Subflow + namespace: io.kestra.tests + flowId: failed-first \ No newline at end of file diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java index ec882a782c6..59fc43335aa 100644 --- a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java +++ b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2QueueFactory.java @@ -152,4 +152,12 @@ public WorkerTriggerResultQueueInterface workerTriggerResultQueue() { public QueueInterface subflowExecutionResult() { return new H2Queue<>(SubflowExecutionResult.class, applicationContext); } + + @Override + @Singleton + @Named(QueueFactoryInterface.SUBFLOWEXECUTIONEND_NAMED) + @Bean(preDestroy = "close") + public QueueInterface subflowExecutionEnd() { + return new H2Queue<>(SubflowExecutionEnd.class, applicationContext); + } } diff --git a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2SubflowExecutionStorage.java b/jdbc-h2/src/main/java/io/kestra/runner/h2/H2SubflowExecutionStorage.java deleted file mode 100644 index 3498641786e..00000000000 --- a/jdbc-h2/src/main/java/io/kestra/runner/h2/H2SubflowExecutionStorage.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.kestra.runner.h2; - -import io.kestra.core.runners.SubflowExecution; -import io.kestra.jdbc.runner.AbstractJdbcSubflowExecutionStorage; -import io.kestra.repository.h2.H2Repository; -import io.micronaut.context.ApplicationContext; -import jakarta.inject.Named; -import jakarta.inject.Singleton; - -@Singleton -@H2QueueEnabled -public class H2SubflowExecutionStorage extends AbstractJdbcSubflowExecutionStorage { - public H2SubflowExecutionStorage(@Named("subflow-executions") H2Repository> repository) { - super(repository); - } -} diff --git a/jdbc-h2/src/main/resources/migrations/h2/V1_29__subflow_execution_end.sql b/jdbc-h2/src/main/resources/migrations/h2/V1_29__subflow_execution_end.sql new file mode 100644 index 00000000000..79a8e750611 --- /dev/null +++ b/jdbc-h2/src/main/resources/migrations/h2/V1_29__subflow_execution_end.sql @@ -0,0 +1,18 @@ +ALTER TABLE queues ALTER COLUMN "type" ENUM( + 'io.kestra.core.models.executions.Execution', + 'io.kestra.core.models.templates.Template', + 'io.kestra.core.models.executions.ExecutionKilled', + 'io.kestra.core.runners.WorkerJob', + 'io.kestra.core.runners.WorkerTaskResult', + 'io.kestra.core.runners.WorkerInstance', + 'io.kestra.core.runners.WorkerTaskRunning', + 'io.kestra.core.models.executions.LogEntry', + 'io.kestra.core.models.triggers.Trigger', + 'io.kestra.ee.models.audits.AuditLog', + 'io.kestra.core.models.executions.MetricEntry', + 'io.kestra.core.runners.WorkerTriggerResult', + 'io.kestra.core.runners.SubflowExecutionResult', + 'io.kestra.core.models.flows.FlowWithSource', + 'io.kestra.core.server.ClusterEvent', + 'io.kestra.core.runners.SubflowExecutionEnd' +) NOT NULL; \ No newline at end of file diff --git a/jdbc-h2/src/test/java/io/kestra/runner/h2/H2SubflowExecutionStorageTest.java b/jdbc-h2/src/test/java/io/kestra/runner/h2/H2SubflowExecutionStorageTest.java deleted file mode 100644 index 0a0c2e42477..00000000000 --- a/jdbc-h2/src/test/java/io/kestra/runner/h2/H2SubflowExecutionStorageTest.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.kestra.runner.h2; - -import io.kestra.jdbc.runner.AbstractSubflowExecutionTest; - -class H2SubflowExecutionStorageTest extends AbstractSubflowExecutionTest { - -} \ No newline at end of file diff --git a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java index e49ae59a26b..9768f8a5cfa 100644 --- a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java +++ b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueueFactory.java @@ -152,4 +152,12 @@ public WorkerTriggerResultQueueInterface workerTriggerResultQueue() { public QueueInterface subflowExecutionResult() { return new MysqlQueue<>(SubflowExecutionResult.class, applicationContext); } + + @Override + @Singleton + @Named(QueueFactoryInterface.SUBFLOWEXECUTIONEND_NAMED) + @Bean(preDestroy = "close") + public QueueInterface subflowExecutionEnd() { + return new MysqlQueue<>(SubflowExecutionEnd.class, applicationContext); + } } diff --git a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlSubflowExecutionStorage.java b/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlSubflowExecutionStorage.java deleted file mode 100644 index cf13d056905..00000000000 --- a/jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlSubflowExecutionStorage.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.kestra.runner.mysql; - -import io.kestra.core.runners.SubflowExecution; -import io.kestra.jdbc.runner.AbstractJdbcSubflowExecutionStorage; -import io.kestra.repository.mysql.MysqlRepository; -import io.micronaut.context.ApplicationContext; -import jakarta.inject.Named; -import jakarta.inject.Singleton; - -@Singleton -@MysqlQueueEnabled -public class MysqlSubflowExecutionStorage extends AbstractJdbcSubflowExecutionStorage { - public MysqlSubflowExecutionStorage(@Named("subflow-executions") MysqlRepository> repository) { - super(repository); - } -} diff --git a/jdbc-mysql/src/main/resources/migrations/mysql/V1_29__subflow_execution_end.sql b/jdbc-mysql/src/main/resources/migrations/mysql/V1_29__subflow_execution_end.sql new file mode 100644 index 00000000000..5790d93191b --- /dev/null +++ b/jdbc-mysql/src/main/resources/migrations/mysql/V1_29__subflow_execution_end.sql @@ -0,0 +1,18 @@ +ALTER TABLE queues MODIFY COLUMN `type` ENUM( + 'io.kestra.core.models.executions.Execution', + 'io.kestra.core.models.templates.Template', + 'io.kestra.core.models.executions.ExecutionKilled', + 'io.kestra.core.runners.WorkerJob', + 'io.kestra.core.runners.WorkerTaskResult', + 'io.kestra.core.runners.WorkerInstance', + 'io.kestra.core.runners.WorkerTaskRunning', + 'io.kestra.core.models.executions.LogEntry', + 'io.kestra.core.models.triggers.Trigger', + 'io.kestra.ee.models.audits.AuditLog', + 'io.kestra.core.models.executions.MetricEntry', + 'io.kestra.core.runners.WorkerTriggerResult', + 'io.kestra.core.runners.SubflowExecutionResult', + 'io.kestra.core.models.flows.FlowWithSource', + 'io.kestra.core.server.ClusterEvent', + 'io.kestra.core.runners.SubflowExecutionEnd' +) NOT NULL; \ No newline at end of file diff --git a/jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlSubflowExecutionStorageTest.java b/jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlSubflowExecutionStorageTest.java deleted file mode 100644 index b34af5df22c..00000000000 --- a/jdbc-mysql/src/test/java/io/kestra/runner/mysql/MysqlSubflowExecutionStorageTest.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.kestra.runner.mysql; - -import io.kestra.jdbc.runner.AbstractSubflowExecutionTest; - -class MysqlSubflowExecutionStorageTest extends AbstractSubflowExecutionTest { - -} \ No newline at end of file diff --git a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java index 16d1b19fd25..5c04bb571d2 100644 --- a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java +++ b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresQueueFactory.java @@ -152,4 +152,12 @@ public WorkerTriggerResultQueueInterface workerTriggerResultQueue() { public QueueInterface subflowExecutionResult() { return new PostgresQueue<>(SubflowExecutionResult.class, applicationContext); } + + @Override + @Singleton + @Named(QueueFactoryInterface.SUBFLOWEXECUTIONEND_NAMED) + @Bean(preDestroy = "close") + public QueueInterface subflowExecutionEnd() { + return new PostgresQueue<>(SubflowExecutionEnd.class, applicationContext); + } } diff --git a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorage.java b/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorage.java deleted file mode 100644 index 687845b1e91..00000000000 --- a/jdbc-postgres/src/main/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorage.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.kestra.runner.postgres; - -import io.kestra.core.runners.SubflowExecution; -import io.kestra.jdbc.runner.AbstractJdbcSubflowExecutionStorage; -import io.kestra.repository.postgres.PostgresRepository; -import jakarta.inject.Named; -import jakarta.inject.Singleton; - -@Singleton -@PostgresQueueEnabled -public class PostgresSubflowExecutionStorage extends AbstractJdbcSubflowExecutionStorage { - public PostgresSubflowExecutionStorage(@Named("subflow-executions") PostgresRepository> repository) { - super(repository); - } -} diff --git a/jdbc-postgres/src/main/resources/migrations/postgres/V1_29__subflow_execution_end.sql b/jdbc-postgres/src/main/resources/migrations/postgres/V1_29__subflow_execution_end.sql new file mode 100644 index 00000000000..3aee04f3ac4 --- /dev/null +++ b/jdbc-postgres/src/main/resources/migrations/postgres/V1_29__subflow_execution_end.sql @@ -0,0 +1 @@ +ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.SubflowExecutionEnd'; \ No newline at end of file diff --git a/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorageTest.java b/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorageTest.java deleted file mode 100644 index 9dde7a46f47..00000000000 --- a/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresSubflowExecutionStorageTest.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.kestra.runner.postgres; - -import io.kestra.core.runners.DeserializationIssuesCaseTest; -import io.kestra.jdbc.runner.AbstractSubflowExecutionTest; -import org.jooq.Field; -import org.jooq.JSONB; -import org.jooq.impl.DSL; - -import java.util.Map; - -class PostgresSubflowExecutionStorageTest extends AbstractSubflowExecutionTest { - @Override - protected Map, Object> persistFields() { - return Map.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), - DSL.val(JSONB.valueOf(DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_VALUE))); - } -} \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/JdbcTableConfigsFactory.java b/jdbc/src/main/java/io/kestra/jdbc/JdbcTableConfigsFactory.java index f24b1f0be79..e9dbb095066 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/JdbcTableConfigsFactory.java +++ b/jdbc/src/main/java/io/kestra/jdbc/JdbcTableConfigsFactory.java @@ -71,12 +71,6 @@ public InstantiableJdbcTableConfig multipleConditions() { return new InstantiableJdbcTableConfig("multipleconditions", MultipleConditionWindow.class, "multipleconditions"); } - @Bean - @Named("subflow-executions") - public InstantiableJdbcTableConfig subflowExecutions() { - return new InstantiableJdbcTableConfig("subflow-executions", SubflowExecution.class, "subflow_executions"); - } - @Bean @Named("executorstate") public InstantiableJdbcTableConfig executorState() { diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcSubflowExecutionStorage.java b/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcSubflowExecutionStorage.java deleted file mode 100644 index bcc71b1c606..00000000000 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/AbstractJdbcSubflowExecutionStorage.java +++ /dev/null @@ -1,73 +0,0 @@ -package io.kestra.jdbc.runner; - -import com.fasterxml.jackson.databind.ObjectMapper; -import io.kestra.core.exceptions.DeserializationException; -import io.kestra.core.models.executions.Execution; -import io.kestra.core.models.executions.TaskRun; -import io.kestra.core.runners.SubflowExecution; -import io.kestra.jdbc.JdbcMapper; -import io.kestra.jdbc.repository.AbstractJdbcRepository; -import org.jooq.DSLContext; -import org.jooq.Field; -import org.jooq.Record1; -import org.jooq.SelectConditionStep; -import org.jooq.impl.DSL; - -import java.util.List; -import java.util.Map; -import java.util.Optional; - -public abstract class AbstractJdbcSubflowExecutionStorage extends AbstractJdbcRepository { - private final static ObjectMapper MAPPER = JdbcMapper.of(); - protected io.kestra.jdbc.AbstractJdbcRepository> jdbcRepository; - - @SuppressWarnings({"unchecked", "rawtypes"}) - public AbstractJdbcSubflowExecutionStorage(io.kestra.jdbc.AbstractJdbcRepository jdbcRepository) { - this.jdbcRepository = jdbcRepository; - } - - public Optional> get(String executionId) { - return this.jdbcRepository - .getDslContextWrapper() - .transactionResult(configuration -> { - SelectConditionStep> select = DSL - .using(configuration) - .select(AbstractJdbcRepository.field("value")) - .from(this.jdbcRepository.getTable()) - .where( - AbstractJdbcRepository.field("key").eq(executionId) - ); - - try { - return this.jdbcRepository.fetchOne(select); - } catch (DeserializationException deserializationException) { - // we may fail to deserialize a SubflowExecution if we fail to deserialize its task - var jsonNode = MAPPER.readTree(deserializationException.getRecord()); - var taskRun = MAPPER.treeToValue(jsonNode.get("parentTaskRun"), TaskRun.class); - var execution = MAPPER.treeToValue(jsonNode.get("execution"), Execution.class); - return Optional.of(SubflowExecution.builder() - .parentTaskRun(taskRun) - .execution(execution) - .build()); - } - }); - } - - public void save(List> subflowExecutions) { - this.jdbcRepository - .getDslContextWrapper() - .transaction(configuration -> { - DSLContext context = DSL.using(configuration); - - // TODO batch insert - subflowExecutions.forEach(subflowExecution -> { - Map, Object> fields = this.jdbcRepository.persistFields(subflowExecution); - this.jdbcRepository.persist(subflowExecution, context, fields); - }); - }); - } - - public void delete(SubflowExecution subflowExecution) { - this.jdbcRepository.delete(subflowExecution); - } -} diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index af9cc46192b..1416050d5e6 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -54,10 +54,7 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -105,6 +102,10 @@ public class JdbcExecutor implements ExecutorInterface, Service { @Named(QueueFactoryInterface.SUBFLOWEXECUTIONRESULT_NAMED) private QueueInterface subflowExecutionResultQueue; + @Inject + @Named(QueueFactoryInterface.SUBFLOWEXECUTIONEND_NAMED) + private QueueInterface subflowExecutionEndQueue; + @Inject @Named(QueueFactoryInterface.CLUSTER_EVENT_NAMED) private Optional> clusterEventQueue; @@ -136,10 +137,6 @@ public class JdbcExecutor implements ExecutorInterface, Service { @Inject protected FlowListenersInterface flowListeners; - // TODO we may be able to remove this storage and check that we have a parent execution or a dedicated trigger class and send a subflow execution result if needed - @Inject - private AbstractJdbcSubflowExecutionStorage subflowExecutionStorage; - @Inject private ExecutionService executionService; @@ -234,6 +231,7 @@ public void run() { this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue)); this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue)); this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue)); + this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue)); this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue))); ScheduledFuture scheduledDelayFuture = scheduledDelay.scheduleAtFixedRate( @@ -567,37 +565,30 @@ private void executionQueue(Either either) .forEach(executionDelay -> executionDelayStorage.save(executionDelay)); } - // subflow execution watchers + // subflow executions if (!executor.getSubflowExecutions().isEmpty()) { - subflowExecutionStorage.save(executor.getSubflowExecutions()); - List> subflowExecutionDedup = executor .getSubflowExecutions() .stream() .filter(subflowExecution -> this.deduplicateSubflowExecution(execution, executorState, subflowExecution.getParentTaskRun())) .toList(); - subflowExecutionDedup - .forEach(throwConsumer(subflowExecution -> { - Execution subExecution = subflowExecution.getExecution(); - String log = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace()); + subflowExecutionDedup + .forEach(throwConsumer(subflowExecution -> { + Execution subExecution = subflowExecution.getExecution(); + String log = String.format("Created new execution [[link execution=\"%s\" flowId=\"%s\" namespace=\"%s\"]]", subExecution.getId(), subExecution.getFlowId(), subExecution.getNamespace()); - JdbcExecutor.log.info(log); + JdbcExecutor.log.info(log); - logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun()).toBuilder() - .level(Level.INFO) - .message(log) - .timestamp(subflowExecution.getParentTaskRun().getState().getStartDate()) - .thread(Thread.currentThread().getName()) - .build() - ); + logQueue.emit(LogEntry.of(subflowExecution.getParentTaskRun()).toBuilder() + .level(Level.INFO) + .message(log) + .timestamp(subflowExecution.getParentTaskRun().getState().getStartDate()) + .thread(Thread.currentThread().getName()) + .build() + ); executionQueue.emit(subflowExecution.getExecution()); - - // send a running worker task result to track running vs created status - if (subflowExecution.getParentTask().waitForExecution()) { - sendSubflowExecutionResult(execution, subflowExecution, subflowExecution.getParentTaskRun()); - } })); } @@ -626,39 +617,6 @@ private void executionQueue(Either either) } } - private void sendSubflowExecutionResult(Execution execution, SubflowExecution subflowExecution, TaskRun taskRun) { - Flow workerTaskFlow = this.flowRepository.findByExecution(execution); - - ExecutableTask executableTask = subflowExecution.getParentTask(); - - RunContext runContext = runContextFactory.of( - workerTaskFlow, - subflowExecution.getParentTask(), - execution, - subflowExecution.getParentTaskRun() - ); - try { - Optional subflowExecutionResult = executableTask - .createSubflowExecutionResult(runContext, taskRun, workerTaskFlow, execution); - - subflowExecutionResult.ifPresent(throwConsumer(workerTaskResult -> this.subflowExecutionResultQueue.emit(workerTaskResult))); - } catch (Exception e) { - log.error("Unable to create the Subflow Execution Result", e); - // we send a fail subflow execution result to end the flow - try { - this.subflowExecutionResultQueue.emit( - SubflowExecutionResult.builder() - .executionId(execution.getId()) - .state(State.Type.FAILED) - .parentTaskRun(taskRun.withState(State.Type.FAILED).withAttempts(List.of(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build()))) - .build() - ); - } catch (QueueException ex) { - log.error("Unable to emit the subflow execution result", ex); - } - } - } - private void workerTaskResultQueue(Either either) { if (either.isRight()) { log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage()); @@ -772,15 +730,21 @@ private void subflowExecutionResultQueue(Either outputs = MapUtils.merge(taskRun.getOutputs(), message.getParentTaskRun().getOutputs()); + taskRun = taskRun.withOutputs(outputs); taskRun = ExecutableUtils.manageIterations( runContext.storage(), - message.getParentTaskRun(), + taskRun, current.getExecution(), forEachItem.getTransmitFailed(), forEachItem.isAllowFailure(), @@ -834,6 +798,64 @@ private void subflowExecutionResultQueue(Either either) { + if (either.isRight()) { + log.error("Unable to deserialize a subflow execution end: {}", either.getRight().getMessage()); + return; + } + + SubflowExecutionEnd message = either.getLeft(); + if (skipExecutionService.skipExecution(message.getParentExecutionId())) { + log.warn("Skipping execution {}", message.getParentExecutionId()); + return; + } + if (skipExecutionService.skipExecution(message.getChildExecution())) { + log.warn("Skipping execution {}", message.getChildExecution().getId()); + return; + } + + if (log.isDebugEnabled()) { + executorService.log(log, true, message); + } + + executionRepository.lock(message.getParentExecutionId(), pair -> { + Execution execution = pair.getLeft(); + + if (execution == null) { + throw new IllegalStateException("Execution state don't exist for " + message.getParentExecutionId() + ", receive " + message); + } + + Flow flow = this.flowRepository.findByExecution(execution); + try { + ExecutableTask executableTask = (ExecutableTask) flow.findTaskByTaskId(message.getTaskId()); + if (!executableTask.waitForExecution()) { + return null; + } + + TaskRun taskRun = execution.findTaskRunByTaskRunId(message.getTaskRunId()).withState(message.getState()).withOutputs(message.getOutputs()); + Flow childFlow = this.flowRepository.findByExecution(message.getChildExecution()); + RunContext runContext = runContextFactory.of( + childFlow, + (Task) executableTask, + message.getChildExecution(), + taskRun + ); + + SubflowExecutionResult subflowExecutionResult = ExecutableUtils.subflowExecutionResultFromChildExecution(runContext, childFlow, message.getChildExecution(), executableTask, taskRun); + if (subflowExecutionResult != null) { + try { + this.subflowExecutionResultQueue.emit(subflowExecutionResult); + } catch (QueueException ex) { + log.error("Unable to emit the subflow execution result", ex); + } + } + } catch (InternalException e) { + log.error("Unable to process the subflow execution end", e); + } + return null; + }); + } + private void killQueue(Either either) { if (either.isRight()) { log.error("Unable to deserialize a killed execution: {}", either.getRight().getMessage()); @@ -962,16 +984,16 @@ private void toExecution(Executor executor, boolean ignoreFailure) { // handle actions on terminated state // the terminated state can only come from the execution queue, and in this case we always have a flow in the executor if (executor.getFlow() != null && conditionService.isTerminatedWithListeners(executor.getFlow(), executor.getExecution())) { - // purge subflow execution storage - subflowExecutionStorage.get(execution.getId()) - .ifPresent(subflowExecution -> { - // If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service. - if (subflowExecution.getParentTask() != null && subflowExecution.getParentTask().waitForExecution()) { - sendSubflowExecutionResult(execution, subflowExecution, subflowExecution.getParentTaskRun().withState(execution.getState().getCurrent())); - } - - subflowExecutionStorage.delete(subflowExecution); - }); + // if there is a parent, we send a subflow execution result to it + if (ExecutableUtils.isSubflow(execution)) { + // locate the parent execution to find the parent task run + String parentExecutionId = (String) execution.getTrigger().getVariables().get("executionId"); + String taskRunId = (String) execution.getTrigger().getVariables().get("taskRunId"); + String taskId = (String) execution.getTrigger().getVariables().get("taskId"); + Map outputs = (Map) execution.getTrigger().getVariables().get("taskRunOutputs"); + SubflowExecutionEnd subflowExecutionEnd = new SubflowExecutionEnd(executor.getExecution(), parentExecutionId, taskRunId, taskId, execution.getState().getCurrent(), outputs); + this.subflowExecutionEndQueue.emit(subflowExecutionEnd); + } // purge SLA monitors if (!ListUtils.isEmpty(executor.getFlow().getSla()) && executor.getFlow().getSla().stream().anyMatch(ExecutionMonitoringSLA.class::isInstance)) { diff --git a/jdbc/src/test/java/io/kestra/jdbc/runner/AbstractSubflowExecutionTest.java b/jdbc/src/test/java/io/kestra/jdbc/runner/AbstractSubflowExecutionTest.java deleted file mode 100644 index cd24296c2c3..00000000000 --- a/jdbc/src/test/java/io/kestra/jdbc/runner/AbstractSubflowExecutionTest.java +++ /dev/null @@ -1,78 +0,0 @@ -package io.kestra.jdbc.runner; - -import io.kestra.core.models.executions.Execution; -import io.kestra.core.models.executions.TaskRun; -import io.kestra.core.runners.DeserializationIssuesCaseTest; -import io.kestra.core.runners.SubflowExecution; -import io.kestra.plugin.core.flow.Subflow; -import io.kestra.core.utils.IdUtils; -import io.kestra.jdbc.JdbcTestUtils; -import io.kestra.core.junit.annotations.KestraTest; -import jakarta.inject.Inject; -import org.jooq.Field; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; - -@KestraTest -public abstract class AbstractSubflowExecutionTest { - @Inject - AbstractJdbcSubflowExecutionStorage subflowExecutionStorage; - - @Inject - JdbcTestUtils jdbcTestUtils; - - @Test - void suite() throws Exception { - - SubflowExecution workerTaskExecution = SubflowExecution.builder() - .execution(Execution.builder().id(IdUtils.create()).build()) - .parentTask(Subflow.builder().type(Subflow.class.getName()).id(IdUtils.create()).build()) - .parentTaskRun(TaskRun.builder().id(IdUtils.create()).build()) - .build(); - - subflowExecutionStorage.save(List.of(workerTaskExecution)); - - - Optional> find = subflowExecutionStorage.get(workerTaskExecution.getExecution().getId()); - assertThat(find.isPresent(), is(true)); - assertThat(find.get().getExecution().getId(), is(workerTaskExecution.getExecution().getId())); - - - subflowExecutionStorage.delete(workerTaskExecution); - - find = subflowExecutionStorage.get(workerTaskExecution.getExecution().getId()); - assertThat(find.isPresent(), is(false)); - } - - @Test - void deserializationIssue() { - // insert an invalid subflowExecution - var subflowExecution = SubflowExecution.builder() - .execution(Execution.builder().id(DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_KEY).build()) - .build(); - Map, Object> fields = persistFields(); - subflowExecutionStorage.jdbcRepository.persist(subflowExecution, fields); - - // load it - Optional> find = subflowExecutionStorage.get(DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_KEY); - assertThat(find.isPresent(), is(true)); - } - - protected Map, Object> persistFields() { - return Map.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), - DeserializationIssuesCaseTest.INVALID_SUBFLOW_EXECUTION_VALUE); - } - - @BeforeEach - protected void init() { - jdbcTestUtils.drop(); - jdbcTestUtils.migrate(); - } -} \ No newline at end of file diff --git a/ui/src/components/executions/ChangeStatus.vue b/ui/src/components/executions/ChangeStatus.vue index ab8c231929d..69fee6276ca 100644 --- a/ui/src/components/executions/ChangeStatus.vue +++ b/ui/src/components/executions/ChangeStatus.vue @@ -5,7 +5,7 @@ @click="visible = !visible" :disabled="!enabled" > - {{ $t('change_status') }} + {{ $t('change state') }}