Skip to content

Commit

Permalink
Add Execution labels
Browse files Browse the repository at this point in the history
* Execution now features labels as the Flow does.
* The labels are passed in the same way the Flow inputs.
* The label names are prefixed with `label-`.
* Executions can be filtered using the labels.

Room for improvement:

* Index the labels within DB.
* Rework the labels/inputs passing - use a custom binding?
* Add a label filter to the Execution UI.

close kestra-io#906
  • Loading branch information
yuri1969 authored and tchiotludo committed May 16, 2023
1 parent 46e1e57 commit 5547bfb
Show file tree
Hide file tree
Showing 24 changed files with 292 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public Integer call() throws Exception {
runnerUtils.runOne(
all.get(0),
(flow, execution) -> runnerUtils.typedInputs(flow, execution, inputs),
Duration.ofHours(1)
);
Duration.ofHours(1),
null);

runner.close();
} catch (MissingRequiredInput e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class Execution implements DeletedInterface {
@With
Map<String, Object> inputs;

@With
Map<String, String> labels;

@With
Map<String, Object> variables;

Expand Down Expand Up @@ -93,6 +96,7 @@ public Execution withState(State.Type state) {
this.flowRevision,
this.taskRunList,
this.inputs,
this.labels,
this.variables,
this.state.withState(state),
this.parentId,
Expand Down Expand Up @@ -122,6 +126,7 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException {
this.flowRevision,
newTaskRunList,
this.inputs,
this.labels,
this.variables,
this.state,
this.parentId,
Expand All @@ -139,6 +144,7 @@ public Execution childExecution(String childExecutionId, List<TaskRun> taskRunLi
this.flowRevision,
taskRunList,
this.inputs,
this.labels,
this.variables,
state,
childExecutionId != null ? this.getId() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ ArrayListTotal<Execution> find(
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
@Nullable List<State.Type> state,
@Nullable Map<String, String> labels
);

Flowable<Execution> find(
Expand All @@ -43,7 +44,8 @@ Flowable<Execution> find(
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
@Nullable List<State.Type> state,
@Nullable Map<String, String> labels
);

ArrayListTotal<TaskRun> findTaskRun(
Expand Down
28 changes: 16 additions & 12 deletions core/src/main/java/io/kestra/core/runners/RunnerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,41 +260,41 @@ private Map<String, Object> handleNestedInputs(Map<String, Object> inputs) {
}

public Execution runOne(String namespace, String flowId) throws TimeoutException {
return this.runOne(namespace, flowId, null, null, null);
return this.runOne(namespace, flowId, null, null, null, null);
}

public Execution runOne(String namespace, String flowId, Integer revision) throws TimeoutException {
return this.runOne(namespace, flowId, revision, null, null);
return this.runOne(namespace, flowId, revision, null, null, null);
}

public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs) throws TimeoutException {
return this.runOne(namespace, flowId, revision, inputs, null);
return this.runOne(namespace, flowId, revision, inputs, null, null);
}

public Execution runOne(String namespace, String flowId, Duration duration) throws TimeoutException {
return this.runOne(namespace, flowId, null, null, duration);
return this.runOne(namespace, flowId, null, null, duration, null);
}

public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException {
public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, Map<String, String> labels) throws TimeoutException {
return this.runOne(
flowRepository
.findById(namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")),
inputs,
duration
);
duration,
labels);
}

public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs) throws TimeoutException {
return this.runOne(flow, inputs, null);
return this.runOne(flow, inputs, null, null);
}

public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException {
public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, Map<String, String> labels) throws TimeoutException {
if (duration == null) {
duration = Duration.ofSeconds(15);
}

Execution execution = this.newExecution(flow, inputs);
Execution execution = this.newExecution(flow, inputs, labels);

return this.awaitExecution(isTerminatedExecution(execution, flow), () -> {
this.executionQueue.emit(execution);
Expand All @@ -320,7 +320,7 @@ public Execution runOneUntilPaused(Flow flow, BiFunction<Flow, Execution, Map<St
duration = Duration.ofSeconds(15);
}

Execution execution = this.newExecution(flow, inputs);
Execution execution = this.newExecution(flow, inputs, null);

return this.awaitExecution(isPausedExecution(execution), () -> {
this.executionQueue.emit(execution);
Expand Down Expand Up @@ -369,7 +369,7 @@ private Predicate<Execution> isTerminatedChildExecution(Execution parentExecutio
return e -> e.getParentId() != null && e.getParentId().equals(parentExecution.getId()) && conditionService.isTerminatedWithListeners(flow, e);
}

public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs) {
public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Map<String, String> labels) {
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
Expand All @@ -382,6 +382,10 @@ public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String,
execution = execution.withInputs(inputs.apply(flow, execution));
}

if (labels != null) {
execution = execution.withLabels(labels);
}

return execution;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ public PurgeResult purge(
flowId,
null,
endDate,
state
state,
null
)
.map(execution -> {
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
Expand Down
19 changes: 16 additions & 3 deletions core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
private Integer revision;

@Schema(
title = "The input to pass to the triggered flow"
title = "The inputs to pass to the triggered flow"
)
@PluginProperty(dynamic = true)
private Map<String, String> inputs;

@Schema(
title = "The labels to pass to the triggered flow"
)
@PluginProperty(dynamic = true)
private Map<String, String> labels;

@Builder.Default
@Schema(
title = "Wait the end of the execution.",
Expand Down Expand Up @@ -112,6 +118,13 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
}
}

Map<String, String> labels = new HashMap<>();
if (this.labels != null) {
for (Map.Entry<String, String> entry: this.labels.entrySet()) {
labels.put(entry.getKey(), runContext.render(entry.getValue()));
}
}

Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow");

io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
Expand All @@ -130,8 +143,8 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
return runnerUtils
.newExecution(
flow,
(f, e) -> runnerUtils.typedInputs(f, e, inputs)
)
(f, e) -> runnerUtils.typedInputs(f, e, inputs),
labels)
.withTrigger(ExecutionTrigger.builder()
.id(this.getId())
.type(this.getType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected boolean isTriggerChild(Flow parent, Flow child) {
List<AbstractTrigger> triggers = ListUtils.emptyOnNull(child.getTriggers());

// simulated execution
Execution execution = runnerUtils.newExecution(parent, (f, e) -> null);
Execution execution = runnerUtils.newExecution(parent, (f, e) -> null, null);

// keep only flow trigger
List<io.kestra.core.models.triggers.types.Flow> flowTriggers = triggers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.flows.State;

import java.util.Collections;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -148,4 +149,14 @@ void originalId() {
);
assertThat(restart2.getOriginalId(), is(execution.getId()));
}

@Test
void labels() {
final Execution execution = Execution.builder()
.labels(Map.of("test", "test-value"))
.build();

assertThat(execution.getLabels().size(), is(1));
assertThat(execution.getLabels().get("test"), is("test-value"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ protected void inject() {
protected void find() {
inject();

ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, null);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, null, null);
assertThat(executions.getTotal(), is(28L));
assertThat(executions.size(), is(10));

executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED));
executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED), null);
assertThat(executions.getTotal(), is(8L));
}

@Test
protected void findWithSort() {
inject();

ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), null, null, null, null, null, null);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), null, null, null, null, null, null, null);
assertThat(executions.getTotal(), is(28L));
assertThat(executions.size(), is(10));

executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED));
executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED), null);
assertThat(executions.getTotal(), is(8L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class FailTest extends AbstractMemoryRunnerTest {
@Test
void failOnSwitch() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "fail-on-switch", null,
(f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120));
(f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.findTaskRunsByTaskId("switch").get(0).getState().getCurrent(), is(State.Type.FAILED));
Expand All @@ -27,7 +27,7 @@ void failOnSwitch() throws TimeoutException {
@Test
void failOnCondition() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "fail-on-condition", null,
(f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120));
(f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("fail").get(0).getState().getCurrent(), is(State.Type.FAILED));
Expand All @@ -37,7 +37,7 @@ void failOnCondition() throws TimeoutException {
@Test
void dontFailOnCondition() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "fail-on-condition", null,
(f, e) -> Map.of("param", "success") , Duration.ofSeconds(120));
(f, e) -> Map.of("param", "success") , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(3));
assertThat(execution.findTaskRunsByTaskId("fail").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ void run(String input, State.Type fromState, State.Type triggerState, int count
"task-flow",
null,
(f, e) -> ImmutableMap.of("string", input),
Duration.ofMinutes(1)
);
Duration.ofMinutes(1),
null);

countDownLatch.await(1, TimeUnit.MINUTES);

Expand Down
20 changes: 10 additions & 10 deletions core/src/test/java/io/kestra/core/tasks/flows/IfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ class IfTest extends AbstractMemoryRunnerTest {
@Test
void ifTruthy() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", "true") , Duration.ofSeconds(120));
(f, e) -> Map.of("param", "true") , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", 1) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", 1) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
Expand All @@ -40,47 +40,47 @@ void ifTruthy() throws TimeoutException {
@Test
void ifFalsy() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", false) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", false) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", "false") , Duration.ofSeconds(120));
(f, e) -> Map.of("param", "false") , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", 0) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", 0) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", -0) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", -0) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

// We cannot test null as inputs cannot be null
}

@Test
void ifWithoutElse() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "if-without-else", null,
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-without-else", null,
(f, e) -> Map.of("param", false) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", false) , Duration.ofSeconds(120), null);
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.findTaskRunsByTaskId("when-true").isEmpty(), is(true));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public static void withTemplate(RunnerUtils runnerUtils, TemplateRepositoryInter
"with-string", "myString",
"with-optional", "myOpt"
)),
Duration.ofSeconds(60)
);
Duration.ofSeconds(60),
null);

assertThat(execution.getTaskRunList(), hasSize(4));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Expand Down
Loading

0 comments on commit 5547bfb

Please sign in to comment.