Skip to content

Commit

Permalink
feat(core): allow passing the list of values for the each task in a list
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Mar 13, 2023
1 parent c72d6d2 commit 103ad64
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 15 deletions.
35 changes: 28 additions & 7 deletions core/src/main/java/io/kestra/core/runners/FlowableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,36 @@ public static List<NextTaskRun> resolveParallelNexts(
private final static TypeReference<List<Object>> TYPE_REFERENCE = new TypeReference<>() {};
private final static ObjectMapper MAPPER = JacksonMapper.ofJson();

public static List<ResolvedTask> resolveEachTasks(RunContext runContext, TaskRun parentTaskRun, List<Task> tasks, String value) throws IllegalVariableEvaluationException {
String renderValue = runContext.render(value);

public static List<ResolvedTask> resolveEachTasks(RunContext runContext, TaskRun parentTaskRun, List<Task> tasks, Object value) throws IllegalVariableEvaluationException {
List<Object> values;
try {
values = MAPPER.readValue(renderValue, TYPE_REFERENCE);
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e);

if(value instanceof String) {
String renderValue = runContext.render((String) value);
try {
values = MAPPER.readValue(renderValue, TYPE_REFERENCE);
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e);
}
}
else if(value instanceof List) {
values = new ArrayList<>(((List<?>) value).size());
for(Object obj: (List<Object>) value) {
if(obj instanceof String){
values.add(runContext.render((String) obj));
}
else if(obj instanceof Map) {
//JSON or YAML map
values.add(runContext.render((Map) obj));
}
else {
throw new IllegalVariableEvaluationException("Unknown value element type: " + obj.getClass());
}
}
}
else {
throw new IllegalVariableEvaluationException("Unknown value type: " + value.getClass());
}


List<Object> distinctValue = values
.stream()
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/flows/EachParallel.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@
" format: \"{{ task.id }} with current value '{{ taskrun.value }}'\"",
}
),
@Example(
code = {
"value: ",
"- value 1",
"- value 2",
"- value 3",
"tasks:",
" - id: each-value",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ task.id }} with current value '{{ taskrun.value }}'\"",
}
),
@Example(
title = "Handling each value in parallel but only 1 child task for each value at the same time.",
code = {
Expand Down Expand Up @@ -80,7 +92,7 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
@NotBlank
@Builder.Default
@Schema(
title = "Number of concurrent parrallels tasks",
title = "Number of concurrent parallel tasks",
description = "If the value is `0`, no limit exist and all the tasks will start at the same time"
)
@PluginProperty
Expand All @@ -89,7 +101,12 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
@NotNull
@NotBlank
@PluginProperty(dynamic = true)
private String value;
@Schema(
title = "The list of values for this task",
description = "The value car be passed as a String, a list of String, or a list of objects",
anyOf = {String.class, Object[].class}
)
private Object value;

@Valid
@PluginProperty
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/flows/EachSequential.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,31 @@
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ task.id }} with current value '{{ taskrun.value }}'\"",
}
)
),
@Example(
code = {
"value: ",
"- value 1",
"- value 2",
"- value 3",
"tasks:",
" - id: each-value",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ task.id }} with current value '{{ taskrun.value }}'\"",
}
),
}
)
public class EachSequential extends Sequential implements FlowableTask<VoidOutput> {
@NotNull
@NotBlank
@PluginProperty(dynamic = true)
private String value;
@Schema(
title = "The list of values for this task",
description = "The value car be passed as a String, a list of String, or a list of objects",
anyOf = {String.class, Object[].class}
)
private Object value;

@Valid
@PluginProperty
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/io/kestra/core/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.function.Consumer;

public class Helpers {
public static long FLOWS_COUNT = 55;
public static long FLOWS_COUNT = 56;

public static ApplicationContext applicationContext() throws URISyntaxException {
return applicationContext(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kestra.core.tasks.flows;

import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
Expand Down Expand Up @@ -45,6 +44,15 @@ void object() throws TimeoutException {
assertThat((String) execution.getTaskRunList().get(6).getOutputs().get("value"), containsString("json > JSON > [\"my-complex\"]"));
}

@Test
void objectInList() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-object-in-list");

assertThat(execution.getTaskRunList(), hasSize(8));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat((String) execution.getTaskRunList().get(6).getOutputs().get("value"), containsString("json > JSON > [\"my-complex\"]"));
}

@Test
void sequentialNested() throws TimeoutException, InternalException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential-nested");
Expand Down
31 changes: 31 additions & 0 deletions core/src/test/resources/flows/valids/each-object-in-list.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
id: each-object-in-list
namespace: io.kestra.tests

tasks:
- id: 1_each
type: io.kestra.core.tasks.flows.EachSequential
value:
- value 1
- {"key": "my-key", "value": "my-value"}
- key: my-complex
value:
sub: 1
bool: true

tasks:
- id: is-json
type: io.kestra.core.tasks.flows.Switch
value: "{{ taskrun.value is json }}"
cases:
"false":
- id: not-json
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > STRING > {{parent.taskrun.value}}"
defaults:
- id: json
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > JSON > {{ parent.taskrun.value | jq('.key') }} > {{ parent.taskrun.value | jq('.value') }}"

- id: 2_end
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ namespace: io.kestra.tests
tasks:
- id: 1_each
type: io.kestra.core.tasks.flows.EachParallel
value: '["value 1", "value 2", "value 3"]'
value:
- value 1
- value 2
- value 3
tasks:
- id: 2-1_seq
type: io.kestra.core.tasks.flows.Sequential
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace: io.kestra.tests
tasks:
- id: 1_each
type: io.kestra.core.tasks.flows.EachSequential
value: '["s1", "s2", "s3"]'
value: ["s1", "s2", "s3"]
tasks:
- id: 1-1_return
type: io.kestra.core.tasks.debugs.Return
Expand Down

0 comments on commit 103ad64

Please sign in to comment.