Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve each tasks #1060

Merged
merged 2 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions core/src/main/java/io/kestra/core/runners/FlowableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,36 @@ public static List<NextTaskRun> resolveParallelNexts(
private final static TypeReference<List<Object>> TYPE_REFERENCE = new TypeReference<>() {};
private final static ObjectMapper MAPPER = JacksonMapper.ofJson();

public static List<ResolvedTask> resolveEachTasks(RunContext runContext, TaskRun parentTaskRun, List<Task> tasks, String value) throws IllegalVariableEvaluationException {
String renderValue = runContext.render(value);

public static List<ResolvedTask> resolveEachTasks(RunContext runContext, TaskRun parentTaskRun, List<Task> tasks, Object value) throws IllegalVariableEvaluationException {
List<Object> values;
try {
values = MAPPER.readValue(renderValue, TYPE_REFERENCE);
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e);

if(value instanceof String) {
String renderValue = runContext.render((String) value);
try {
values = MAPPER.readValue(renderValue, TYPE_REFERENCE);
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e);
}
}
else if(value instanceof List) {
values = new ArrayList<>(((List<?>) value).size());
for(Object obj: (List<Object>) value) {
if(obj instanceof String){
values.add(runContext.render((String) obj));
}
else if(obj instanceof Map) {
//JSON or YAML map
values.add(runContext.render((Map) obj));
}
else {
throw new IllegalVariableEvaluationException("Unknown value element type: " + obj.getClass());
}
}
}
else {
throw new IllegalVariableEvaluationException("Unknown value type: " + value.getClass());
}


List<Object> distinctValue = values
.stream()
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/debugs/Echo.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Simple debugging task that log a renderer value.",
title = "Debugging task that logs a rendered value.",
description = "This task is mostly useful for debugging purpose.\n\n" +
"This one allow you to logs inputs or outputs variables for example, or to debug some templated functions."
"It allows you to log inputs or outputs variables or to debug some templated functions."
)
@Plugin(
examples = {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/core/tasks/debugs/Return.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Simple debugging task that return a renderer value.",
title = "Debugging task that returns a rendered value.",
description = "This task is mostly useful for debugging purpose.\n\n" +
"This one allow you to see inputs or outputs variables for example, or to debug some templated functions."
"It allows you to see inputs or outputs variables or to debug some templated functions."
)
@Plugin(
examples = {
Expand All @@ -34,7 +34,7 @@
)
public class Return extends Task implements RunnableTask<Return.Output> {
@Schema(
title = "The templatized string to render"
title = "The templated string to render"
)
@PluginProperty(dynamic = true)
private String format;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
@NoArgsConstructor
@Schema(
title = "List execution counts for a list of flow",
description = "Mostly use for send an alert if a conditions is meet about execution counts."
description = "Can be used to send an alert if a condition is met about execution counts."
)
@Plugin(
examples = {
@Example(
title = "Send a slack notification if no execution for a flow on last 24h",
title = "Send a slack notification if no execution for a flow on the last 24h",
full = true,
code = {
"id: executions-count",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Allow a task to failed",
description = "If any child tasks failed, the flow will stop child tasks, but will continue the main flow."
title = "Allow a list of task to fail",
description = "If any child tasks failed, the flow will stop executing child tasks, but will continue on the main flow execution."
)
@Plugin(
examples = {
Expand Down
35 changes: 26 additions & 9 deletions core/src/main/java/io/kestra/core/tasks/flows/EachParallel.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Execute a tasks for a list of value in parallel.",
description = "For each `value`, `tasks` will be executed\n" +
"The value must be valid json string representing an arrays, like `[\"value1\", \"value2\"]` or `[{\"key\":\"value1\"}, {\"key\":\"value2\"}]` \n" +
"The current value is available on vars `{{ taskrun.value }}`.\n" +
"The task list will be executed in parallel, for example if you have a 3 value with each one 2 tasks, all the " +
"6 tasks will be computed in parallel with out any garantee on the order.\n" +
title = "Execute a task for a list of values in parallel.",
description = "For each `value`, the `tasks` list will be executed\n" +
"The value must be valid json string representing an arrays, like `[\"value1\", \"value2\"]` or `[{\"key\":\"value1\"}, {\"key\":\"value2\"}]` or an array of valid JSON strings.\n" +
"The current value is available on the variable `{{ taskrun.value }}`.\n" +
"The task list will be executed in parallel, for example if you have a 3 values with 2 tasks, all the " +
"6 tasks will be computed in parallel without any guarantee on the order.\n" +
"If you want to have each value in parallel, but no concurrent task for each value, you need to wrap the tasks " +
"with a `Sequential` tasks"
)
Expand All @@ -54,7 +54,19 @@
}
),
@Example(
title = "Handling each value in parralel but only 1 child task for each value at the same time.",
code = {
"value: ",
"- value 1",
"- value 2",
"- value 3",
"tasks:",
" - id: each-value",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ task.id }} with current value '{{ taskrun.value }}'\"",
}
),
@Example(
title = "Handling each value in parallel but only 1 child task for each value at the same time.",
code = {
"value: '[\"value 1\", \"value 2\", \"value 3\"]'",
"tasks:",
Expand All @@ -80,7 +92,7 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
@NotBlank
@Builder.Default
@Schema(
title = "Number of concurrent parrallels tasks",
title = "Number of concurrent parallel tasks",
description = "If the value is `0`, no limit exist and all the tasks will start at the same time"
)
@PluginProperty
Expand All @@ -89,7 +101,12 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
@NotNull
@NotBlank
@PluginProperty(dynamic = true)
private String value;
@Schema(
title = "The list of values for this task",
description = "The value car be passed as a String, a list of String, or a list of objects",
anyOf = {String.class, Object[].class}
)
private Object value;

@Valid
@PluginProperty
Expand Down
29 changes: 23 additions & 6 deletions core/src/main/java/io/kestra/core/tasks/flows/EachSequential.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Execute a tasks for a list of value sequentially",
description = "For each `value`, `tasks` will be executed\n" +
"The value must be valid json string representing an arrays, like `[\"value1\", \"value2\"]` or `[{\"key\":\"value1\"}, {\"key\":\"value2\"}]` \n" +
"The current value is available on vars `{{ taskrun.value }}`."
title = "Execute a task for a list of values sequentially",
description = "For each `value`, the `tasks` list will be executed\n" +
"The value must be valid json string representing an arrays, like `[\"value1\", \"value2\"]` or `[{\"key\":\"value1\"}, {\"key\":\"value2\"}]` or an array of valid JSON strings.\n" +
"The current value is available on the variable `{{ taskrun.value }}`."
)
@Plugin(
examples = {
Expand All @@ -51,14 +51,31 @@
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ task.id }} with current value '{{ taskrun.value }}'\"",
}
)
),
@Example(
code = {
"value: ",
"- value 1",
"- value 2",
"- value 3",
"tasks:",
" - id: each-value",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ task.id }} with current value '{{ taskrun.value }}'\"",
}
),
}
)
public class EachSequential extends Sequential implements FlowableTask<VoidOutput> {
@NotNull
@NotBlank
@PluginProperty(dynamic = true)
private String value;
@Schema(
title = "The list of values for this task",
description = "The value car be passed as a String, a list of String, or a list of objects",
anyOf = {String.class, Object[].class}
)
private Object value;

@Valid
@PluginProperty
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@Plugin(
examples = {
@Example(
title = "Trigger another flow, passing some file and arguments",
title = "Trigger another flow, passing some files and arguments as inputs",
code = {
"namespace: io.kestra.tests",
"flowId: my-sub-flows",
Expand All @@ -54,13 +54,13 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {

@NotNull
@Schema(
title = "The flowId to trigger"
title = "The identifier of the flow to trigger"
)
@PluginProperty(dynamic = true)
private String flowId;

@Schema(
title = "The revision of the flow you want to trigger",
title = "The revision of the flow to trigger",
description = "By default, we trigger the last version."
)
@PluginProperty(dynamic = true)
Expand Down Expand Up @@ -90,7 +90,7 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {

@Schema(
title = "Extract outputs from triggered executions.",
description = "Allow to specify key value (with value renderered), in order to extract any outputs from " +
description = "Allow to specify key value (with value rendered), in order to extract any outputs from " +
"triggered execution."
)
@PluginProperty(dynamic = true)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/core/tasks/flows/Parallel.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Process task in parallel",
description = "This task processes tasks in parallel. It makes it convinient to process many tasks at once."
title = "Process tasks in parallel",
description = "This task processes tasks in parallel. It makes it convenient to process many tasks at once."
)
@Plugin(
examples = {
Expand Down Expand Up @@ -66,7 +66,7 @@ public class Parallel extends Task implements FlowableTask<VoidOutput> {
@NotBlank
@Builder.Default
@Schema(
title = "Number of concurrent parrallels tasks",
title = "Number of concurrent parallel tasks",
description = "If the value is `0`, no limit exist and all the tasks will start at the same time"
)
@PluginProperty
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/kestra/core/tasks/flows/Pause.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Pause current execution and wait for a manual approval or a delay"
title = "Pause the current execution and wait for a manual approval (changing the task state from the UI) or a delay"
)
@Plugin(
examples = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/flows/Sequential.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Process tasks ones after others sequentially",
description = "Mostly use in order to group tasks."
title = "Process tasks one after the other sequentially",
description = "Used to group tasks."
)
@Plugin(
examples = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/flows/Switch.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Process some tasks conditionnaly depending on a contextual value",
description = "Allow some workflow based on context variables, allow you to branch your based on previous task."
title = "Process some tasks conditionally depending on a contextual value",
description = "Allow some workflow based on context variables, for example branch a flow based on a previous task."
)
@Plugin(
examples = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
@NoArgsConstructor
@Slf4j
@Schema(
title = "Include a resuable template inside a flow"
title = "Include a reusable template inside a flow"
)
@Plugin(
examples = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/scripts/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
@NoArgsConstructor
@Schema(
title = "Execute a Node.js script",
description = "With this Node task, we can execute a full javascript script.\n" +
"The task will create a temprorary folder for every tasks and allows you to install some npm packages defined in an optional `package.json` file.\n" +
description = "With the Node task, you can execute a full javascript script.\n" +
"The task will create a temporary folder for each tasks and allows to install some npm packages defined in an optional `package.json` file.\n" +
"\n" +
"By convention, you need to define at least a `main.js` files in `inputFiles` that will be the script used.\n" +
"You can also add as many javascript files as you need in `inputFiles`.\n" +
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/scripts/Python.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
@NoArgsConstructor
@Schema(
title = "Execute a Python script",
description = "With this Python task, we can execute a full python script.\n" +
"The task will create a fresh `virtualenv` for every tasks and allow you to install some python package define in `requirements` property.\n" +
description = "With the Python task, you can execute a full Python script.\n" +
"The task will create a fresh `virtualenv` for every tasks and allows to install some Python package define in `requirements` property.\n" +
"\n" +
"By convention, you need to define at least a `main.py` files in `inputFiles` that will be the script used.\n" +
"But you are also able to add as many script as you need in `inputFiles`.\n" +
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/states/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,26 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Delete a state from internal storage."
title = "Delete a state from the state store."
)
@Plugin(
examples = {
@Example(
title = "Delete a state isolated by flow with `default` state name ",
title = "Delete the default state for the current flow",
code = {
"id: getState",
"type: io.kestra.core.tasks.states.Delete",
},
full = true
),
@Example(
title = "Delete the `myState` state for the current flow",
code = {
"id: getState",
"type: io.kestra.core.tasks.states.Delete",
"name: myState",
},
full = true
)
}
)
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/states/Get.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,26 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Get a state from internal storage."
title = "Get a state from the state store."
)
@Plugin(
examples = {
@Example(
title = "Get a state isolated by flow with `default` state name ",
title = "Get the default state for the current flow",
code = {
"id: getState",
"type: io.kestra.core.tasks.states.Get",
},
full = true
),
@Example(
title = "Get the `myState` state for the current flow",
code = {
"id: getState",
"type: io.kestra.core.tasks.states.Get",
"name: myState",
},
full = true
)
}
)
Expand Down
Loading