Skip to content

Commit

Permalink
fix(core): avoid restarting errors task (#1056)
Browse files Browse the repository at this point in the history
close #1032
  • Loading branch information
loicmathieu authored Mar 10, 2023
1 parent c78cb93 commit 922e5a2
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 10 deletions.
14 changes: 13 additions & 1 deletion core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import javax.validation.Valid;
import javax.validation.constraints.*;


@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
Expand Down Expand Up @@ -171,6 +170,19 @@ private Stream<Task> allTasksWithChilds(Task task) {
}
}

public List<Task> allErrorsWithChilds() {
var allErrors = allTasksWithChilds().stream()
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getErrors() != null)
.flatMap(task -> ((FlowableTask<?>) task).getErrors().stream())
.collect(Collectors.toCollection(ArrayList::new));

if (this.getErrors() != null && !this.getErrors().isEmpty()) {
allErrors.addAll(this.getErrors());
}

return allErrors;
}

public Task findTaskByTaskId(String taskId) throws InternalException {
return allTasks()
.flatMap(t -> t.findById(taskId).stream())
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/java/io/kestra/core/services/ExecutionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public Execution restart(final Execution execution, @Nullable Integer revision)

Set<String> taskRunToRestart = this.taskRunToRestart(
execution,
flow,
taskRun -> taskRun.getState().getCurrent().isFailed()
);

Expand All @@ -86,6 +85,11 @@ public Execution restart(final Execution execution, @Nullable Integer revision)
this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId)
.forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r)));

// We need to remove global error tasks and flowable error tasks if any
flow
.allErrorsWithChilds()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));

// Build and launch new execution
Execution newExecution = execution
.childExecution(
Expand All @@ -97,7 +101,7 @@ public Execution restart(final Execution execution, @Nullable Integer revision)
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}

private Set<String> taskRunToRestart(Execution execution, Flow flow, Predicate<TaskRun> predicate) throws InternalException {
private Set<String> taskRunToRestart(Execution execution, Predicate<TaskRun> predicate) {
// Original tasks to be restarted
Set<String> finalTaskRunToRestart = this
.taskRunWithAncestors(
Expand Down Expand Up @@ -128,7 +132,6 @@ public Execution replay(final Execution execution, String taskRunId, @Nullable I

Set<String> taskRunToRestart = this.taskRunToRestart(
execution,
flow,
taskRun -> taskRun.getId().equals(taskRunId)
);

Expand Down Expand Up @@ -187,7 +190,6 @@ public Execution markAs(final Execution execution, String taskRunId, State.Type

Set<String> taskRunToRestart = this.taskRunToRestart(
execution,
flow,
taskRun -> taskRun.getId().equals(taskRunId)
);

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/io/kestra/core/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.function.Consumer;

public class Helpers {
public static long FLOWS_COUNT = 53;
public static long FLOWS_COUNT = 55;

public static ApplicationContext applicationContext() throws URISyntaxException {
return applicationContext(
Expand Down
74 changes: 73 additions & 1 deletion core/src/test/java/io/kestra/core/runners/RestartCaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class RestartCaseTest {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;

public void restartFailed() throws Exception {
public void restartFailedThenSuccess() throws Exception {
Flow flow = flowRepository.findById("io.kestra.tests", "restart_last_failed").orElseThrow();

Execution firstExecution = runnerUtils.runOne(flow.getNamespace(), flow.getId(), Duration.ofSeconds(60));
Expand Down Expand Up @@ -74,6 +74,78 @@ public void restartFailed() throws Exception {
.forEach(state -> assertThat(state.getCurrent(), is(State.Type.SUCCESS)));
}

public void restartFailedThenFailureWithGlobalErrors() throws Exception {
Flow flow = flowRepository.findById("io.kestra.tests", "restart_always_failed").orElseThrow();

Execution firstExecution = runnerUtils.runOne(flow.getNamespace(), flow.getId(), Duration.ofSeconds(60));

assertThat(firstExecution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(firstExecution.getTaskRunList(), hasSize(2));
assertThat(firstExecution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.FAILED));

// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.FAILED,
throwRunnable(() -> {
Thread.sleep(1000);
Execution restartedExec = executionService.restart(firstExecution, null);
executionQueue.emit(restartedExec);

assertThat(restartedExec, notNullValue());
assertThat(restartedExec.getId(), is(firstExecution.getId()));
assertThat(restartedExec.getParentId(), nullValue());
assertThat(restartedExec.getTaskRunList().size(), is(1));
assertThat(restartedExec.getState().getCurrent(), is(State.Type.RESTARTED));
}),
Duration.ofSeconds(60)
);

assertThat(finishedRestartedExecution, notNullValue());
assertThat(finishedRestartedExecution.getId(), is(firstExecution.getId()));
assertThat(finishedRestartedExecution.getParentId(), nullValue());
assertThat(finishedRestartedExecution.getTaskRunList().size(), is(2));

assertThat(finishedRestartedExecution.getTaskRunList().get(0).getAttempts().size(), is(2));

assertThat(finishedRestartedExecution.getState().getCurrent(), is(State.Type.FAILED));
}

public void restartFailedThenFailureWithLocalErrors() throws Exception {
Flow flow = flowRepository.findById("io.kestra.tests", "restart_local_errors").orElseThrow();

Execution firstExecution = runnerUtils.runOne(flow.getNamespace(), flow.getId(), Duration.ofSeconds(60));

assertThat(firstExecution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(firstExecution.getTaskRunList(), hasSize(5));
assertThat(firstExecution.getTaskRunList().get(3).getState().getCurrent(), is(State.Type.FAILED));

// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.FAILED,
throwRunnable(() -> {
Thread.sleep(1000);
Execution restartedExec = executionService.restart(firstExecution, null);
executionQueue.emit(restartedExec);

assertThat(restartedExec, notNullValue());
assertThat(restartedExec.getId(), is(firstExecution.getId()));
assertThat(restartedExec.getParentId(), nullValue());
assertThat(restartedExec.getTaskRunList().size(), is(4));
assertThat(restartedExec.getState().getCurrent(), is(State.Type.RESTARTED));
}),
Duration.ofSeconds(60)
);

assertThat(finishedRestartedExecution, notNullValue());
assertThat(finishedRestartedExecution.getId(), is(firstExecution.getId()));
assertThat(finishedRestartedExecution.getParentId(), nullValue());
assertThat(finishedRestartedExecution.getTaskRunList().size(), is(5));

assertThat(finishedRestartedExecution.getTaskRunList().get(3).getAttempts().size(), is(2));

assertThat(finishedRestartedExecution.getState().getCurrent(), is(State.Type.FAILED));
}

public void replay() throws Exception {
Flow flow = flowRepository.findById("io.kestra.tests", "restart-each").orElseThrow();

Expand Down
14 changes: 12 additions & 2 deletions core/src/test/java/io/kestra/core/runners/RestartTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,18 @@ public class RestartTest extends AbstractMemoryRunnerTest {
private RestartCaseTest restartCaseTest;

@Test
void restartFailed() throws Exception {
restartCaseTest.restartFailed();
void restartFailedThenSuccess() throws Exception {
restartCaseTest.restartFailedThenSuccess();
}

@Test
void restartFailedThenFailureWithGlobalErrors() throws Exception {
restartCaseTest.restartFailedThenFailureWithGlobalErrors();
}

@Test
void restartFailedThenFailureWithLocalErrors() throws Exception {
restartCaseTest.restartFailedThenFailureWithLocalErrors();
}

@Test
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/resources/flows/valids/restart_always_failed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
id: restart_always_failed
namespace: io.kestra.tests

tasks:
- id: failStep
type: io.kestra.core.tasks.scripts.Bash
description: "This fails"
commands:
- 'exit 1'
errors:
- id: errorHandler
type: io.kestra.core.tasks.debugs.Echo
format: I'm failing {{task.id}}
level: INFO
27 changes: 27 additions & 0 deletions core/src/test/resources/flows/valids/restart_local_errors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
id: restart_local_errors
namespace: io.kestra.tests

tasks:
- id: before
type: io.kestra.core.tasks.debugs.Echo
format: I'm before

- id: sequential
type: io.kestra.core.tasks.flows.Sequential
tasks:
- id: close
type: io.kestra.core.tasks.debugs.Echo
format: I'm close to fail
- id: failStep
type: io.kestra.core.tasks.scripts.Bash
description: "This fails"
commands:
- 'exit 1'
errors:
- id: errorHandler
type: io.kestra.core.tasks.debugs.Echo
format: I'm failing {{task.id}}

- id: after
type: io.kestra.core.tasks.debugs.Echo
format: I'm after
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void eachParallelNested() throws TimeoutException {

@Test
void restartFailed() throws Exception {
restartCaseTest.restartFailed();
restartCaseTest.restartFailedThenSuccess();
}

@Test
Expand Down

0 comments on commit 922e5a2

Please sign in to comment.