Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/restart parent #6799

Merged
merged 3 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public interface QueueFactoryInterface {
String TRIGGER_NAMED = "triggerQueue";
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";

QueueInterface<Execution> execution();

Expand Down Expand Up @@ -58,4 +59,6 @@ public interface QueueFactoryInterface {
WorkerTriggerResultQueueInterface workerTriggerResultQueue();

QueueInterface<SubflowExecutionResult> subflowExecutionResult();

QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
}
2 changes: 2 additions & 0 deletions core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public String key(Object object) {
return null;
} else if (object.getClass() == ExecutionRunning.class) {
return ((ExecutionRunning) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionEnd.class) {
return ((SubflowExecutionEnd) object).getParentExecutionId();
} else {
throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'");
}
Expand Down
29 changes: 28 additions & 1 deletion core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,12 @@ public static <T extends Task & ExecutableTask<?>> Optional<SubflowExecution<?>>
"namespace", currentFlow.getNamespace(),
"flowId", currentFlow.getId(),
"flowRevision", currentFlow.getRevision(),
"taskRunId", currentTaskRun.getId()
"taskRunId", currentTaskRun.getId(),
"taskId", currentTaskRun.getTaskId()
));
if (currentTaskRun.getOutputs() != null) {
variables.put("taskRunOutputs", currentTaskRun.getOutputs());
}
if (currentTaskRun.getValue() != null) {
variables.put("taskRunValue", currentTaskRun.getValue());
}
Expand Down Expand Up @@ -278,4 +282,27 @@ private static State.Type findTerminalState(Map<String, Integer> iterations, boo
}
return State.Type.SUCCESS;
}

public static SubflowExecutionResult subflowExecutionResultFromChildExecution(RunContext runContext, Flow flow, Execution execution, ExecutableTask<?> executableTask, TaskRun taskRun) {
try {
return executableTask
.createSubflowExecutionResult(runContext, taskRun, flow, execution)
.orElse(null);
} catch (Exception e) {
log.error("Unable to create the Subflow Execution Result", e);
// we return a fail subflow execution result to end the flow
return SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun.withState(State.Type.FAILED).withAttempts(List.of(TaskRunAttempt.builder().state(new State().withState(State.Type.FAILED)).build())))
.build();
}
}

public static boolean isSubflow(Execution execution) {
return execution.getTrigger() != null && (
"io.kestra.plugin.core.flow.Subflow".equals(execution.getTrigger().getType()) ||
"io.kestra.plugin.core.flow.ForEachItem$ForEachItemExecutable".equals(execution.getTrigger().getType())
);
}
}
11 changes: 11 additions & 0 deletions core/src/main/java/io/kestra/core/runners/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class Executor {
private final List<WorkerTrigger> workerTriggers = new ArrayList<>();
private WorkerJob workerJobToResubmit;
private State.Type originalState;
private SubflowExecutionEnd subflowExecutionEnd;
private SubflowExecutionEnd joinedSubflowExecutionEnd;

/**
* The sequence id should be incremented each time the execution is persisted after mutation.
Expand Down Expand Up @@ -67,6 +69,10 @@ public Executor(SubflowExecutionResult subflowExecutionResult) {
this.joinedSubflowExecutionResult = subflowExecutionResult;
}

public Executor(SubflowExecutionEnd subflowExecutionEnd) {
this.joinedSubflowExecutionEnd = subflowExecutionEnd;
}

public Executor(WorkerJob workerJob) {
this.workerJobToResubmit = workerJob;
}
Expand Down Expand Up @@ -169,6 +175,11 @@ public Executor withExecutionKilled(final List<ExecutionKilledExecution> executi
return this;
}

public Executor withSubflowExecutionEnd(SubflowExecutionEnd subflowExecutionEnd) {
this.subflowExecutionEnd = subflowExecutionEnd;
return this;
}

public Executor serialize() {
return new Executor(
this.execution,
Expand Down
22 changes: 18 additions & 4 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.sla.Violation;
import io.kestra.core.models.tasks.*;
Expand Down Expand Up @@ -873,17 +874,21 @@ private Executor handleExecutableTask(final Executor executor) {
);
} else {
executions.addAll(subflowExecutions);
if (!executableTask.waitForExecution()) {
// send immediately all workerTaskResult to ends the executable task
Optional<FlowWithSource> flow = flowExecutorInterface.findByExecution(subflowExecutions.getFirst().getExecution());
if (flow.isPresent()) {
// add SubflowExecutionResults to notify parents
for (SubflowExecution<?> subflowExecution : subflowExecutions) {
Optional<SubflowExecutionResult> subflowExecutionResult = executableTask.createSubflowExecutionResult(
runContext,
subflowExecution.getParentTaskRun().withState(State.Type.SUCCESS),
executor.getFlow(),
// if we didn't wait for the execution, we directly set the state to SUCCESS
executableTask.waitForExecution() ? subflowExecution.getParentTaskRun() : subflowExecution.getParentTaskRun().withState(State.Type.SUCCESS),
flow.get(),
subflowExecution.getExecution()
);
subflowExecutionResult.ifPresent(subflowExecutionResults::add);
}
} else {
log.error("Unable to find flow for execution {}", subflowExecutions.getFirst().getExecution().getId());
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -1030,6 +1035,15 @@ public void log(Logger log, Boolean in, SubflowExecutionResult value) {
);
}

public void log(Logger log, Boolean in, SubflowExecutionEnd value) {
log.debug(
"{} {} : {}",
in ? "<< IN " : ">> OUT",
value.getClass().getSimpleName(),
value.toStringState()
);
}

public void log(Logger log, Boolean in, Execution value) {
log.debug(
"{} {} [key='{}']\n{}",
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/java/io/kestra/core/runners/SubflowExecutionEnd.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.kestra.core.runners;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SubflowExecutionEnd {
private Execution childExecution;
private String parentExecutionId;
private String taskRunId;
private String taskId;
private State.Type state;
private Map<String, Object> outputs;

public String toStringState() {
return "SubflowExecutionEnd(" +
"childExecutionId=" + this.getChildExecution().getId() +
", parentExecutionId=" + this.getParentExecutionId() +
", taskId=" + this.getTaskId() +
", taskRunId=" + this.getTaskRunId() +
", state=" + this.getState().toString() +
")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private Execution markAs(final Execution execution, Flow flow, String taskRunId,
taskRun -> taskRun.getId().equals(taskRunId)
);

Execution newExecution = execution;
Execution newExecution = execution.withMetadata(execution.getMetadata().nextAttempt());

for (String s : taskRunToRestart) {
TaskRun originalTaskRun = newExecution.findTaskRunByTaskRunId(s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junitpioneer.jupiter.RetryingTest;
Expand Down Expand Up @@ -90,6 +92,9 @@ public abstract class AbstractRunnerTest {
@Inject
private SLATestCase slaTestCase;

@Inject
private ChangeStateTestCase changeStateTestCase;

@Test
@ExecuteFlow("flows/valids/full.yaml")
void full(Execution execution) {
Expand Down Expand Up @@ -173,6 +178,18 @@ void restartMultiple() throws Exception {
restartCaseTest.restartMultiple();
}

@Test
@LoadFlows({"flows/valids/restart_always_failed.yaml"})
void restartFailedThenFailureWithGlobalErrors() throws Exception {
restartCaseTest.restartFailedThenFailureWithGlobalErrors();
}

@RetryingTest(5)
@LoadFlows({"flows/valids/restart_local_errors.yaml"})
void restartFailedThenFailureWithLocalErrors() throws Exception {
restartCaseTest.restartFailedThenFailureWithLocalErrors();
}

@Test
@LoadFlows({"flows/valids/restart-parent.yaml", "flows/valids/restart-child.yaml"})
void restartSubflow() throws Exception {
Expand Down Expand Up @@ -244,7 +261,7 @@ void flowWaitSuccess() throws Exception {
"flows/valids/task-flow.yaml",
"flows/valids/task-flow-inherited-labels.yaml"})
void flowWaitFailed() throws Exception {
flowCaseTest.waitFailed();
flowCaseTest.waitFailed();
}

@Test
Expand Down Expand Up @@ -342,6 +359,12 @@ protected void forEachItemSubflowOutputs() throws Exception {
forEachItemCaseTest.forEachItemWithSubflowOutputs();
}

@Test
@LoadFlows({"flows/valids/restart-for-each-item.yaml", "flows/valids/restart-child.yaml"})
void restartForEachItem() throws Exception {
forEachItemCaseTest.restartForEachItem();
}

@Test
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
void concurrencyCancel() throws Exception {
Expand Down Expand Up @@ -468,4 +491,16 @@ void multipleIf() throws TimeoutException, QueueException {
assertThat(execution.getTaskRunList(), hasSize(12));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
}

@Test
@ExecuteFlow("flows/valids/failed-first.yaml")
public void changeStateShouldEndsInSuccess(Execution execution) throws Exception {
changeStateTestCase.changeStateShouldEndsInSuccess(execution);
}

@Test
@LoadFlows({"flows/valids/failed-first.yaml", "flows/valids/subflow-parent-of-failed.yaml"})
public void changeStateInSubflowShouldEndsParentFlowInSuccess() throws Exception {
changeStateTestCase.changeStateInSubflowShouldEndsParentFlowInSuccess();
}
}
113 changes: 113 additions & 0 deletions core/src/test/java/io/kestra/core/runners/ChangeStateTestCase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.kestra.core.runners;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@Singleton
public class ChangeStateTestCase {
@Inject
private FlowRepositoryInterface flowRepository;

@Inject
private ExecutionService executionService;

@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;

@Inject
private RunnerUtils runnerUtils;

public void changeStateShouldEndsInSuccess(Execution execution) throws Exception {
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED));

// await for the last execution
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Execution> lastExecution = new AtomicReference<>();
Flux<Execution> receivedExecutions = TestsUtils.receive(executionQueue, either -> {
Execution exec = either.getLeft();
if (execution.getId().equals(exec.getId()) && exec.getState().getCurrent() == State.Type.SUCCESS) {
lastExecution.set(exec);
latch.countDown();
}
});

Flow flow = flowRepository.findByExecution(execution);
Execution markedAs = executionService.markAs(execution, flow, execution.getTaskRunList().getFirst().getId(), State.Type.SUCCESS);
executionQueue.emit(markedAs);

assertThat(latch.await(10, TimeUnit.SECONDS), is(true));
receivedExecutions.blockLast();
assertThat(lastExecution.get().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(lastExecution.get().getTaskRunList(), hasSize(2));
assertThat(lastExecution.get().getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}

public void changeStateInSubflowShouldEndsParentFlowInSuccess() throws Exception {
// await for the subflow execution
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Execution> lastExecution = new AtomicReference<>();
Flux<Execution> receivedExecutions = TestsUtils.receive(executionQueue, either -> {
Execution exec = either.getLeft();
if ("failed-first".equals(exec.getFlowId()) && exec.getState().getCurrent() == State.Type.FAILED) {
lastExecution.set(exec);
latch.countDown();
}
});

// run the parent flow
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "subflow-parent-of-failed");
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED));

// assert on the subflow
assertThat(latch.await(10, TimeUnit.SECONDS), is(true));
receivedExecutions.blockLast();
assertThat(lastExecution.get().getState().getCurrent(), is(State.Type.FAILED));
assertThat(lastExecution.get().getTaskRunList(), hasSize(1));
assertThat(lastExecution.get().getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED));

// await for the parent execution
CountDownLatch parentLatch = new CountDownLatch(1);
AtomicReference<Execution> lastParentExecution = new AtomicReference<>();
receivedExecutions = TestsUtils.receive(executionQueue, either -> {
Execution exec = either.getLeft();
if (execution.getId().equals(exec.getId()) && exec.getState().isTerminated()) {
lastParentExecution.set(exec);
parentLatch.countDown();
}
});

// restart the subflow
Flow flow = flowRepository.findByExecution(lastExecution.get());
Execution markedAs = executionService.markAs(lastExecution.get(), flow, lastExecution.get().getTaskRunList().getFirst().getId(), State.Type.SUCCESS);
executionQueue.emit(markedAs);

// assert for the parent flow
assertThat(parentLatch.await(10, TimeUnit.SECONDS), is(true));
receivedExecutions.blockLast();
assertThat(lastParentExecution.get().getState().getCurrent(), is(State.Type.FAILED)); // FIXME should be success but it's FAILED on unit tests
assertThat(lastParentExecution.get().getTaskRunList(), hasSize(1));
assertThat(lastParentExecution.get().getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}
}
Loading
Loading