Skip to content

Commit

Permalink
fix(#1032): avoid restarting errors task
Browse files Browse the repository at this point in the history
As they else they will run un-conditionnaly and the flow will be SUCCESSFUL
  • Loading branch information
loicmathieu committed Mar 10, 2023
1 parent ae18ca3 commit f41756e
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 8 deletions.
21 changes: 17 additions & 4 deletions core/src/main/java/io/kestra/core/services/ExecutionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.AbstractGraphTask;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
Expand Down Expand Up @@ -62,7 +63,6 @@ public Execution restart(final Execution execution, @Nullable Integer revision)

Set<String> taskRunToRestart = this.taskRunToRestart(
execution,
flow,
taskRun -> taskRun.getState().getCurrent().isFailed()
);

Expand All @@ -86,6 +86,10 @@ public Execution restart(final Execution execution, @Nullable Integer revision)
this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId)
.forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));

// We need to remove global error tasks and flowable error tasks if any
Set<Task> errorTasks = this.errorTaskIds(flow);
errorTasks.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));

// Build and launch new execution
Execution newExecution = execution
.childExecution(
Expand All @@ -97,7 +101,7 @@ public Execution restart(final Execution execution, @Nullable Integer revision)
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}

private Set<String> taskRunToRestart(Execution execution, Flow flow, Predicate<TaskRun> predicate) throws InternalException {
private Set<String> taskRunToRestart(Execution execution, Predicate<TaskRun> predicate) {
// Original tasks to be restarted
Set<String> finalTaskRunToRestart = this
.taskRunWithAncestors(
Expand Down Expand Up @@ -128,7 +132,6 @@ public Execution replay(final Execution execution, String taskRunId, @Nullable I

Set<String> taskRunToRestart = this.taskRunToRestart(
execution,
flow,
taskRun -> taskRun.getId().equals(taskRunId)
);

Expand Down Expand Up @@ -187,7 +190,6 @@ public Execution markAs(final Execution execution, String taskRunId, State.Type

Set<String> taskRunToRestart = this.taskRunToRestart(
execution,
flow,
taskRun -> taskRun.getId().equals(taskRunId)
);

Expand Down Expand Up @@ -303,6 +305,17 @@ private Set<String> removeWorkerTask(Flow flow, Execution execution, Set<String>
.collect(Collectors.toSet());
}

private Set<Task> errorTaskIds(Flow flow) {
var allErrors = flow.getTasks().stream()
.filter(Task::isFlowable)
.flatMap(task -> ((FlowableTask<?>) task).getErrors().stream())
.collect(Collectors.toCollection(HashSet::new));
if(flow.getErrors() != null && !flow.getErrors().isEmpty()) {
allErrors.addAll(flow.getErrors());
}
return allErrors;
}

private Set<String> getAncestors(Execution execution, TaskRun taskRun) {
return Stream
.concat(
Expand Down
38 changes: 37 additions & 1 deletion core/src/test/java/io/kestra/core/runners/RestartCaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class RestartCaseTest {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;

public void restartFailed() throws Exception {
public void restartFailedThenSuccess() throws Exception {
Flow flow = flowRepository.findById("io.kestra.tests", "restart_last_failed").orElseThrow();

Execution firstExecution = runnerUtils.runOne(flow.getNamespace(), flow.getId(), Duration.ofSeconds(60));
Expand Down Expand Up @@ -74,6 +74,42 @@ public void restartFailed() throws Exception {
.forEach(state -> assertThat(state.getCurrent(), is(State.Type.SUCCESS)));
}

public void restartFailedThenFailure() throws Exception {
Flow flow = flowRepository.findById("io.kestra.tests", "restart_always_failed").orElseThrow();

Execution firstExecution = runnerUtils.runOne(flow.getNamespace(), flow.getId(), Duration.ofSeconds(60));

assertThat(firstExecution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(firstExecution.getTaskRunList(), hasSize(2));
assertThat(firstExecution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.FAILED));

// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.FAILED,
throwRunnable(() -> {
Thread.sleep(1000);
Execution restartedExec = executionService.restart(firstExecution, null);
executionQueue.emit(restartedExec);

assertThat(restartedExec, notNullValue());
assertThat(restartedExec.getId(), is(firstExecution.getId()));
assertThat(restartedExec.getParentId(), nullValue());
assertThat(restartedExec.getTaskRunList().size(), is(1));
assertThat(restartedExec.getState().getCurrent(), is(State.Type.RESTARTED));
}),
Duration.ofSeconds(60)
);

assertThat(finishedRestartedExecution, notNullValue());
assertThat(finishedRestartedExecution.getId(), is(firstExecution.getId()));
assertThat(finishedRestartedExecution.getParentId(), nullValue());
assertThat(finishedRestartedExecution.getTaskRunList().size(), is(2));

assertThat(finishedRestartedExecution.getTaskRunList().get(0).getAttempts().size(), is(2));

assertThat(finishedRestartedExecution.getState().getCurrent(), is(State.Type.FAILED));
}

public void replay() throws Exception {
Flow flow = flowRepository.findById("io.kestra.tests", "restart-each").orElseThrow();

Expand Down
10 changes: 8 additions & 2 deletions core/src/test/java/io/kestra/core/runners/RestartTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ public class RestartTest extends AbstractMemoryRunnerTest {
private RestartCaseTest restartCaseTest;

@Test
void restartFailed() throws Exception {
restartCaseTest.restartFailed();
void restartFailedThenSuccess() throws Exception {
restartCaseTest.restartFailedThenSuccess();
}

@Test
void restartFailedThenFailure() throws Exception {
restartCaseTest.restartFailedThenFailure();
}


@Test
void replay() throws Exception {
restartCaseTest.replay();
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/resources/flows/valids/restart_always_failed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
id: restart_always_failed
namespace: io.kestra.tests

tasks:
- id: failStep
type: io.kestra.core.tasks.scripts.Bash
description: "This fails"
commands:
- 'exit 1'
errors:
- id: errorHandler
type: io.kestra.core.tasks.debugs.Echo
format: I'm failing {{task.id}}
level: INFO
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void eachParallelNested() throws TimeoutException {

@Test
void restartFailed() throws Exception {
restartCaseTest.restartFailed();
restartCaseTest.restartFailedThenSuccess();
}

@Test
Expand Down

0 comments on commit f41756e

Please sign in to comment.