Skip to content

Commit

Permalink
fix(core): ForEach graph should be sequential when concurrencyLimit=1
Browse files Browse the repository at this point in the history
Fixes #4474
  • Loading branch information
loicmathieu committed Jul 30, 2024
1 parent b84d8fd commit 7a63960
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions core/src/main/java/io/kestra/plugin/core/flow/ForEach.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.kestra.core.utils.GraphUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.PositiveOrZero;
import lombok.*;
import lombok.experimental.SuperBuilder;

Expand Down Expand Up @@ -147,6 +148,7 @@ public class ForEach extends Sequential implements FlowableTask<VoidOutput> {
)
private Object values;

@PositiveOrZero
@NotNull
@Builder.Default
@Schema(
Expand All @@ -164,13 +166,23 @@ public class ForEach extends Sequential implements FlowableTask<VoidOutput> {
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.DYNAMIC);

GraphUtils.parallel(
subGraph,
this.getTasks(),
this.getErrors(),
taskRun,
execution
);
if (concurrencyLimit == 1) {
GraphUtils.sequential(
subGraph,
this.getTasks(),
this.getErrors(),
taskRun,
execution
);
} else {
GraphUtils.parallel(
subGraph,
this.getTasks(),
this.getErrors(),
taskRun,
execution
);
}

return subGraph;
}
Expand Down

0 comments on commit 7a63960

Please sign in to comment.