Skip to content

Commit

Permalink
fix(core): handle subflow retries
Browse files Browse the repository at this point in the history
closes #4823
  • Loading branch information
brian-mulier-p committed Sep 10, 2024
1 parent 1bb8ad9 commit cf065cc
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 11 deletions.
12 changes: 4 additions & 8 deletions core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@
import io.kestra.core.storages.Storage;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;

@Slf4j
public final class ExecutableUtils {
Expand All @@ -44,12 +40,12 @@ public static State.Type guessState(Execution execution, boolean transmitFailed,
}

public static SubflowExecutionResult subflowExecutionResult(TaskRun parentTaskrun, Execution execution) {
List<TaskRunAttempt> attempts = parentTaskrun.getAttempts() == null ? new ArrayList<>() : new ArrayList<>(parentTaskrun.getAttempts());
attempts.add(TaskRunAttempt.builder().state(parentTaskrun.getState()).build());
return SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(parentTaskrun.getState().getCurrent())
.parentTaskRun(parentTaskrun.withAttempts(
Collections.singletonList(TaskRunAttempt.builder().state(parentTaskrun.getState()).build())
))
.parentTaskRun(parentTaskrun.withAttempts(attempts))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.kestra.core.services.ExecutionService;
import io.kestra.core.services.LogService;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.flow.Subflow;
import io.kestra.plugin.core.flow.WaitFor;
import io.kestra.plugin.core.flow.Pause;
import io.kestra.plugin.core.flow.WorkingDirectory;
Expand Down Expand Up @@ -483,7 +484,7 @@ private Executor handleChildWorkerTaskResult(Executor executor) throws Exception
*/
if (!executor.getExecution().getState().isRetrying() &&
taskRun.getState().isFailed() &&
task instanceof RunnableTask<?> &&
(task instanceof RunnableTask<?> || task instanceof Subflow) &&
(task.getRetry() != null || executor.getFlow().getRetry() != null || (parentTask != null && parentTask.getRetry() != null))
) {
Instant nextRetryDate;
Expand Down
13 changes: 13 additions & 0 deletions core/src/test/java/io/kestra/plugin/core/flow/RetryCaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,19 @@ public void retryFlowable() throws TimeoutException {
assertThat(execution.getTaskRunList().get(1).attemptNumber(), is(3));
}

public void retrySubflow() throws TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests",
"retry-subflow",
null,
null
);

assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.getTaskRunList().get(0).getAttempts().size(), is(3));
}

public void retryFlowableChild() throws TimeoutException {
Execution execution = runnerUtils.runOne(
null,
Expand Down
11 changes: 11 additions & 0 deletions core/src/test/resources/flows/valids/retry-subflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
id: retry-subflow
namespace: io.kestra.tests
retry:
type: constant
interval: PT1S
maxAttempt: 3
tasks:
- id: subflow
type: io.kestra.plugin.core.flow.Subflow
namespace: io.kestra.tests
flowId: failed-first
8 changes: 6 additions & 2 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -917,18 +917,22 @@ private boolean deduplicateWorkerTask(Execution execution, ExecutorState executo

private boolean deduplicateSubflowExecution(Execution execution, ExecutorState executorState, TaskRun taskRun) {
// There can be multiple executions for the same task, so we need to deduplicated with the worker task execution iteration
String deduplicationKey = taskRun.getId() + (taskRun.getIteration() == null ? "" : "-" + taskRun.getIteration());
String deduplicationKey = deduplicationKey(taskRun);
State.Type current = executorState.getSubflowExecutionDeduplication().get(deduplicationKey);

if (current == taskRun.getState().getCurrent()) {
log.trace("Duplicate SubflowExecution on execution '{}' for taskRun '{}', value '{}, taskId '{}'", execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId());
log.trace("Duplicate SubflowExecution on execution '{}' for taskRun '{}', value '{}', taskId '{}', attempt '{}'", execution.getId(), taskRun.getId(), taskRun.getValue(), taskRun.getTaskId(), taskRun.getAttempts().size() + 1);
return false;
} else {
executorState.getSubflowExecutionDeduplication().put(deduplicationKey, taskRun.getState().getCurrent());
return true;
}
}

private String deduplicationKey(TaskRun taskRun) {
return taskRun.getId() + (taskRun.getAttempts() != null ? "-" + taskRun.getAttempts().size() : "") + (taskRun.getIteration() == null ? "" : "-" + taskRun.getIteration());
}

private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ void retryFlowable() throws TimeoutException {
retryCaseTest.retryFlowable();
}

@Test
void retrySubflow() throws TimeoutException {
retryCaseTest.retrySubflow();
}

@Test
void retryFlowableChild() throws TimeoutException {
retryCaseTest.retryFlowableChild();
Expand Down

0 comments on commit cf065cc

Please sign in to comment.