diff --git a/core/src/main/java/io/kestra/core/services/ExecutionService.java b/core/src/main/java/io/kestra/core/services/ExecutionService.java index e6ddea70899..649f5027ed3 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionService.java @@ -8,6 +8,7 @@ import io.kestra.core.models.flows.State; import io.kestra.core.models.hierarchies.AbstractGraphTask; import io.kestra.core.models.hierarchies.GraphCluster; +import io.kestra.core.models.tasks.FlowableTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface; @@ -62,7 +63,6 @@ public Execution restart(final Execution execution, @Nullable Integer revision) Set taskRunToRestart = this.taskRunToRestart( execution, - flow, taskRun -> taskRun.getState().getCurrent().isFailed() ); @@ -86,6 +86,10 @@ public Execution restart(final Execution execution, @Nullable Integer revision) this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId) .forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r))); + // We need to remove global error tasks and flowable error tasks if any + Set errorTasks = this.errorTaskIds(flow); + errorTasks.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId()))); + // Build and launch new execution Execution newExecution = execution .childExecution( @@ -97,7 +101,7 @@ public Execution restart(final Execution execution, @Nullable Integer revision) return revision != null ? newExecution.withFlowRevision(revision) : newExecution; } - private Set taskRunToRestart(Execution execution, Flow flow, Predicate predicate) throws InternalException { + private Set taskRunToRestart(Execution execution, Predicate predicate) { // Original tasks to be restarted Set finalTaskRunToRestart = this .taskRunWithAncestors( @@ -128,7 +132,6 @@ public Execution replay(final Execution execution, String taskRunId, @Nullable I Set taskRunToRestart = this.taskRunToRestart( execution, - flow, taskRun -> taskRun.getId().equals(taskRunId) ); @@ -187,7 +190,6 @@ public Execution markAs(final Execution execution, String taskRunId, State.Type Set taskRunToRestart = this.taskRunToRestart( execution, - flow, taskRun -> taskRun.getId().equals(taskRunId) ); @@ -303,6 +305,17 @@ private Set removeWorkerTask(Flow flow, Execution execution, Set .collect(Collectors.toSet()); } + private Set errorTaskIds(Flow flow) { + var allErrors = flow.getTasks().stream() + .filter(Task::isFlowable) + .flatMap(task -> ((FlowableTask) task).getErrors().stream()) + .collect(Collectors.toCollection(HashSet::new)); + if(flow.getErrors() != null && !flow.getErrors().isEmpty()) { + allErrors.addAll(flow.getErrors()); + } + return allErrors; + } + private Set getAncestors(Execution execution, TaskRun taskRun) { return Stream .concat( diff --git a/core/src/test/java/io/kestra/core/runners/RestartCaseTest.java b/core/src/test/java/io/kestra/core/runners/RestartCaseTest.java index be8b0f71d2c..dff7e52a835 100644 --- a/core/src/test/java/io/kestra/core/runners/RestartCaseTest.java +++ b/core/src/test/java/io/kestra/core/runners/RestartCaseTest.java @@ -34,7 +34,7 @@ public class RestartCaseTest { @Named(QueueFactoryInterface.EXECUTION_NAMED) private QueueInterface executionQueue; - public void restartFailed() throws Exception { + public void restartFailedThenSuccess() throws Exception { Flow flow = flowRepository.findById("io.kestra.tests", "restart_last_failed").orElseThrow(); Execution firstExecution = runnerUtils.runOne(flow.getNamespace(), flow.getId(), Duration.ofSeconds(60)); @@ -74,6 +74,42 @@ public void restartFailed() throws Exception { .forEach(state -> assertThat(state.getCurrent(), is(State.Type.SUCCESS))); } + public void restartFailedThenFailure() throws Exception { + Flow flow = flowRepository.findById("io.kestra.tests", "restart_always_failed").orElseThrow(); + + Execution firstExecution = runnerUtils.runOne(flow.getNamespace(), flow.getId(), Duration.ofSeconds(60)); + + assertThat(firstExecution.getState().getCurrent(), is(State.Type.FAILED)); + assertThat(firstExecution.getTaskRunList(), hasSize(2)); + assertThat(firstExecution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.FAILED)); + + // wait + Execution finishedRestartedExecution = runnerUtils.awaitExecution( + execution -> execution.getState().getCurrent() == State.Type.FAILED, + throwRunnable(() -> { + Thread.sleep(1000); + Execution restartedExec = executionService.restart(firstExecution, null); + executionQueue.emit(restartedExec); + + assertThat(restartedExec, notNullValue()); + assertThat(restartedExec.getId(), is(firstExecution.getId())); + assertThat(restartedExec.getParentId(), nullValue()); + assertThat(restartedExec.getTaskRunList().size(), is(1)); + assertThat(restartedExec.getState().getCurrent(), is(State.Type.RESTARTED)); + }), + Duration.ofSeconds(60) + ); + + assertThat(finishedRestartedExecution, notNullValue()); + assertThat(finishedRestartedExecution.getId(), is(firstExecution.getId())); + assertThat(finishedRestartedExecution.getParentId(), nullValue()); + assertThat(finishedRestartedExecution.getTaskRunList().size(), is(2)); + + assertThat(finishedRestartedExecution.getTaskRunList().get(0).getAttempts().size(), is(2)); + + assertThat(finishedRestartedExecution.getState().getCurrent(), is(State.Type.FAILED)); + } + public void replay() throws Exception { Flow flow = flowRepository.findById("io.kestra.tests", "restart-each").orElseThrow(); diff --git a/core/src/test/java/io/kestra/core/runners/RestartTest.java b/core/src/test/java/io/kestra/core/runners/RestartTest.java index 53d74cc6191..13fa368fac4 100644 --- a/core/src/test/java/io/kestra/core/runners/RestartTest.java +++ b/core/src/test/java/io/kestra/core/runners/RestartTest.java @@ -9,10 +9,16 @@ public class RestartTest extends AbstractMemoryRunnerTest { private RestartCaseTest restartCaseTest; @Test - void restartFailed() throws Exception { - restartCaseTest.restartFailed(); + void restartFailedThenSuccess() throws Exception { + restartCaseTest.restartFailedThenSuccess(); } + @Test + void restartFailedThenFailure() throws Exception { + restartCaseTest.restartFailedThenFailure(); + } + + @Test void replay() throws Exception { restartCaseTest.replay(); diff --git a/core/src/test/resources/flows/valids/restart_always_failed.yaml b/core/src/test/resources/flows/valids/restart_always_failed.yaml new file mode 100644 index 00000000000..46091988227 --- /dev/null +++ b/core/src/test/resources/flows/valids/restart_always_failed.yaml @@ -0,0 +1,14 @@ +id: restart_always_failed +namespace: io.kestra.tests + +tasks: + - id: failStep + type: io.kestra.core.tasks.scripts.Bash + description: "This fails" + commands: + - 'exit 1' +errors: + - id: errorHandler + type: io.kestra.core.tasks.debugs.Echo + format: I'm failing {{task.id}} + level: INFO diff --git a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java index dcc280c5672..3c1ed913ab2 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java @@ -143,7 +143,7 @@ void eachParallelNested() throws TimeoutException { @Test void restartFailed() throws Exception { - restartCaseTest.restartFailed(); + restartCaseTest.restartFailedThenSuccess(); } @Test