Skip to content

Commit

Permalink
fix(core): each can be blocked if there is a warning state
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jul 6, 2022
1 parent 0d3de92 commit 26ba57e
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolved

List<TaskRun> errorsFlow = this.findTaskRunByTasks(resolvedErrors, parentTaskRun);

if (errorsFlow.size() > 0 || this.hasFailed(resolvedTasks)) {
if (errorsFlow.size() > 0 || this.hasFailed(resolvedTasks, parentTaskRun)) {
return resolvedErrors == null ? new ArrayList<>() : resolvedErrors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static Optional<State.Type> resolveState(
TaskRun parentTaskRun,
RunContext runContext
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors);
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, parentTaskRun);

if (currentTasks == null) {
runContext.logger().warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,16 @@ void eachWithExecution() throws TimeoutException, IllegalVariableEvaluationExcep
Flow flow = this.parse("flows/valids/each-sequential.yaml");
FlowGraph flowGraph = FlowGraph.of(flow, execution);

assertThat(flowGraph.getNodes().size(), is(11));
assertThat(flowGraph.getEdges().size(), is(12));
assertThat(flowGraph.getClusters().size(), is(1));
assertThat(flowGraph.getNodes().size(), is(17));
assertThat(flowGraph.getEdges().size(), is(18));
assertThat(flowGraph.getClusters().size(), is(4));

assertThat(edge(flowGraph, "1-1_value 1", "1-1_value 2").getRelation().getValue(), is("value 2"));
assertThat(edge(flowGraph, "1-1_value 2", "1-1_value 3").getRelation().getValue(), is("value 3"));
assertThat(edge(flowGraph, "1-2_value 3", "1_each_.*_end"), is(notNullValue()));
assertThat(edge(flowGraph, "1-2_value 3", "failed_value 3_.*_end"), is(notNullValue()));

assertThat(edge(flowGraph, "1-2_value 1","1-2_value 2").getRelation().getValue(), is("value 2"));
assertThat(edge(flowGraph, "1-2_value 2","1-2_value 3").getRelation().getValue(), is("value 3"));
assertThat(edge(flowGraph, "failed_value 1_.*","failed_value 2_.*_root").getRelation().getValue(), is("value 2"));
assertThat(edge(flowGraph, "1-1_value 2","1-1_value 3").getRelation().getValue(), is("value 3"));
}

private Flow parse(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class EachSequentialTest extends AbstractMemoryRunnerTest {
void sequential() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential");

assertThat(execution.getTaskRunList(), hasSize(8));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getTaskRunList(), hasSize(11));
assertThat(execution.getState().getCurrent(), is(State.Type.WARNING));
}

@Test
Expand Down
10 changes: 7 additions & 3 deletions core/src/test/resources/flows/valids/each-sequential.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ tasks:
- id: 1-1
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.value}} > {{taskrun.startDate}}"
- id: 1-2
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.value}} > {{taskrun.startDate}}"
- id: failed
type: io.kestra.core.tasks.flows.AllowFailure
tasks:
- id: 1-2
type: io.kestra.core.tasks.scripts.Bash
commands:
- "exit {{ parent.taskrun.value == 'value 1' ? 1 : 0 }}"
- id: 2_end
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"

0 comments on commit 26ba57e

Please sign in to comment.