diff --git a/cli/src/main/java/io/kestra/cli/commands/flows/FlowTestCommand.java b/cli/src/main/java/io/kestra/cli/commands/flows/FlowTestCommand.java index 49dfb8b35b8..3645d254fc2 100644 --- a/cli/src/main/java/io/kestra/cli/commands/flows/FlowTestCommand.java +++ b/cli/src/main/java/io/kestra/cli/commands/flows/FlowTestCommand.java @@ -98,8 +98,8 @@ public Integer call() throws Exception { runnerUtils.runOne( all.get(0), (flow, execution) -> runnerUtils.typedInputs(flow, execution, inputs), - Duration.ofHours(1) - ); + Duration.ofHours(1), + null); runner.close(); } catch (MissingRequiredInput e) { diff --git a/core/src/main/java/io/kestra/core/models/executions/Execution.java b/core/src/main/java/io/kestra/core/models/executions/Execution.java index 28255982ed0..fecb1ba777a 100644 --- a/core/src/main/java/io/kestra/core/models/executions/Execution.java +++ b/core/src/main/java/io/kestra/core/models/executions/Execution.java @@ -50,6 +50,9 @@ public class Execution implements DeletedInterface { @With Map inputs; + @With + Map labels; + @With Map variables; @@ -93,6 +96,7 @@ public Execution withState(State.Type state) { this.flowRevision, this.taskRunList, this.inputs, + this.labels, this.variables, this.state.withState(state), this.parentId, @@ -122,6 +126,7 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException { this.flowRevision, newTaskRunList, this.inputs, + this.labels, this.variables, this.state, this.parentId, @@ -139,6 +144,7 @@ public Execution childExecution(String childExecutionId, List taskRunLi this.flowRevision, taskRunList, this.inputs, + this.labels, this.variables, state, childExecutionId != null ? this.getId() : null, diff --git a/core/src/main/java/io/kestra/core/repositories/ExecutionRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/ExecutionRepositoryInterface.java index 4f22c4af9df..8755cb055c4 100644 --- a/core/src/main/java/io/kestra/core/repositories/ExecutionRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/ExecutionRepositoryInterface.java @@ -34,7 +34,8 @@ ArrayListTotal find( @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, - @Nullable List state + @Nullable List state, + @Nullable Map labels ); Flowable find( @@ -43,7 +44,8 @@ Flowable find( @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, - @Nullable List state + @Nullable List state, + @Nullable Map labels ); ArrayListTotal findTaskRun( diff --git a/core/src/main/java/io/kestra/core/runners/RunnerUtils.java b/core/src/main/java/io/kestra/core/runners/RunnerUtils.java index 97188182bdb..ac09bfb816e 100644 --- a/core/src/main/java/io/kestra/core/runners/RunnerUtils.java +++ b/core/src/main/java/io/kestra/core/runners/RunnerUtils.java @@ -260,41 +260,41 @@ private Map handleNestedInputs(Map inputs) { } public Execution runOne(String namespace, String flowId) throws TimeoutException { - return this.runOne(namespace, flowId, null, null, null); + return this.runOne(namespace, flowId, null, null, null, null); } public Execution runOne(String namespace, String flowId, Integer revision) throws TimeoutException { - return this.runOne(namespace, flowId, revision, null, null); + return this.runOne(namespace, flowId, revision, null, null, null); } public Execution runOne(String namespace, String flowId, Integer revision, BiFunction> inputs) throws TimeoutException { - return this.runOne(namespace, flowId, revision, inputs, null); + return this.runOne(namespace, flowId, revision, inputs, null, null); } public Execution runOne(String namespace, String flowId, Duration duration) throws TimeoutException { - return this.runOne(namespace, flowId, null, null, duration); + return this.runOne(namespace, flowId, null, null, duration, null); } - public Execution runOne(String namespace, String flowId, Integer revision, BiFunction> inputs, Duration duration) throws TimeoutException { + public Execution runOne(String namespace, String flowId, Integer revision, BiFunction> inputs, Duration duration, Map labels) throws TimeoutException { return this.runOne( flowRepository .findById(namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty()) .orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")), inputs, - duration - ); + duration, + labels); } public Execution runOne(Flow flow, BiFunction> inputs) throws TimeoutException { - return this.runOne(flow, inputs, null); + return this.runOne(flow, inputs, null, null); } - public Execution runOne(Flow flow, BiFunction> inputs, Duration duration) throws TimeoutException { + public Execution runOne(Flow flow, BiFunction> inputs, Duration duration, Map labels) throws TimeoutException { if (duration == null) { duration = Duration.ofSeconds(15); } - Execution execution = this.newExecution(flow, inputs); + Execution execution = this.newExecution(flow, inputs, labels); return this.awaitExecution(isTerminatedExecution(execution, flow), () -> { this.executionQueue.emit(execution); @@ -320,7 +320,7 @@ public Execution runOneUntilPaused(Flow flow, BiFunction { this.executionQueue.emit(execution); @@ -369,7 +369,7 @@ private Predicate isTerminatedChildExecution(Execution parentExecutio return e -> e.getParentId() != null && e.getParentId().equals(parentExecution.getId()) && conditionService.isTerminatedWithListeners(flow, e); } - public Execution newExecution(Flow flow, BiFunction> inputs) { + public Execution newExecution(Flow flow, BiFunction> inputs, Map labels) { Execution execution = Execution.builder() .id(IdUtils.create()) .namespace(flow.getNamespace()) @@ -382,6 +382,10 @@ public Execution newExecution(Flow flow, BiFunction { PurgeResult.PurgeResultBuilder builder = PurgeResult.builder(); diff --git a/core/src/main/java/io/kestra/core/tasks/flows/Flow.java b/core/src/main/java/io/kestra/core/tasks/flows/Flow.java index 004c6d7fa63..6ae4b87710a 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/Flow.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/Flow.java @@ -67,11 +67,17 @@ public class Flow extends Task implements RunnableTask { private Integer revision; @Schema( - title = "The input to pass to the triggered flow" + title = "The inputs to pass to the triggered flow" ) @PluginProperty(dynamic = true) private Map inputs; + @Schema( + title = "The labels to pass to the triggered flow" + ) + @PluginProperty(dynamic = true) + private Map labels; + @Builder.Default @Schema( title = "Wait the end of the execution.", @@ -112,6 +118,13 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl } } + Map labels = new HashMap<>(); + if (this.labels != null) { + for (Map.Entry entry: this.labels.entrySet()) { + labels.put(entry.getKey(), runContext.render(entry.getValue())); + } + } + Map flowVars = (Map) runContext.getVariables().get("flow"); io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask( @@ -130,8 +143,8 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl return runnerUtils .newExecution( flow, - (f, e) -> runnerUtils.typedInputs(f, e, inputs) - ) + (f, e) -> runnerUtils.typedInputs(f, e, inputs), + labels) .withTrigger(ExecutionTrigger.builder() .id(this.getId()) .type(this.getType()) diff --git a/core/src/main/java/io/kestra/core/topologies/FlowTopologyService.java b/core/src/main/java/io/kestra/core/topologies/FlowTopologyService.java index 5f5aa88399d..1e42e1f8003 100644 --- a/core/src/main/java/io/kestra/core/topologies/FlowTopologyService.java +++ b/core/src/main/java/io/kestra/core/topologies/FlowTopologyService.java @@ -122,7 +122,7 @@ protected boolean isTriggerChild(Flow parent, Flow child) { List triggers = ListUtils.emptyOnNull(child.getTriggers()); // simulated execution - Execution execution = runnerUtils.newExecution(parent, (f, e) -> null); + Execution execution = runnerUtils.newExecution(parent, (f, e) -> null, null); // keep only flow trigger List flowTriggers = triggers diff --git a/core/src/test/java/io/kestra/core/models/executions/ExecutionTest.java b/core/src/test/java/io/kestra/core/models/executions/ExecutionTest.java index 6e00a59da54..caf5bebe1bc 100644 --- a/core/src/test/java/io/kestra/core/models/executions/ExecutionTest.java +++ b/core/src/test/java/io/kestra/core/models/executions/ExecutionTest.java @@ -5,6 +5,7 @@ import io.kestra.core.models.flows.State; import java.util.Collections; +import java.util.Map; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -148,4 +149,14 @@ void originalId() { ); assertThat(restart2.getOriginalId(), is(execution.getId())); } + + @Test + void labels() { + final Execution execution = Execution.builder() + .labels(Map.of("test", "test-value")) + .build(); + + assertThat(execution.getLabels().size(), is(1)); + assertThat(execution.getLabels().get("test"), is("test-value")); + } } diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java index 38ad4e453ad..e7b5b30cb09 100644 --- a/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java +++ b/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java @@ -106,11 +106,11 @@ protected void inject() { protected void find() { inject(); - ArrayListTotal executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, null); + ArrayListTotal executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, null, null); assertThat(executions.getTotal(), is(28L)); assertThat(executions.size(), is(10)); - executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED)); + executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED), null); assertThat(executions.getTotal(), is(8L)); } @@ -118,11 +118,11 @@ protected void find() { protected void findWithSort() { inject(); - ArrayListTotal executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), null, null, null, null, null, null); + ArrayListTotal executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), null, null, null, null, null, null, null); assertThat(executions.getTotal(), is(28L)); assertThat(executions.size(), is(10)); - executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED)); + executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED), null); assertThat(executions.getTotal(), is(8L)); } diff --git a/core/src/test/java/io/kestra/core/tasks/executions/FailTest.java b/core/src/test/java/io/kestra/core/tasks/executions/FailTest.java index 67a8ecaa99e..53388df146e 100644 --- a/core/src/test/java/io/kestra/core/tasks/executions/FailTest.java +++ b/core/src/test/java/io/kestra/core/tasks/executions/FailTest.java @@ -17,7 +17,7 @@ public class FailTest extends AbstractMemoryRunnerTest { @Test void failOnSwitch() throws TimeoutException { Execution execution = runnerUtils.runOne("io.kestra.tests", "fail-on-switch", null, - (f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(1)); assertThat(execution.findTaskRunsByTaskId("switch").get(0).getState().getCurrent(), is(State.Type.FAILED)); @@ -27,7 +27,7 @@ void failOnSwitch() throws TimeoutException { @Test void failOnCondition() throws TimeoutException { Execution execution = runnerUtils.runOne("io.kestra.tests", "fail-on-condition", null, - (f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(2)); assertThat(execution.findTaskRunsByTaskId("fail").get(0).getState().getCurrent(), is(State.Type.FAILED)); @@ -37,7 +37,7 @@ void failOnCondition() throws TimeoutException { @Test void dontFailOnCondition() throws TimeoutException { Execution execution = runnerUtils.runOne("io.kestra.tests", "fail-on-condition", null, - (f, e) -> Map.of("param", "success") , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", "success") , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(3)); assertThat(execution.findTaskRunsByTaskId("fail").get(0).getState().getCurrent(), is(State.Type.SUCCESS)); diff --git a/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java b/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java index 85170232452..57040fc92e9 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/FlowCaseTest.java @@ -57,8 +57,8 @@ void run(String input, State.Type fromState, State.Type triggerState, int count "task-flow", null, (f, e) -> ImmutableMap.of("string", input), - Duration.ofMinutes(1) - ); + Duration.ofMinutes(1), + null); countDownLatch.await(1, TimeUnit.MINUTES); diff --git a/core/src/test/java/io/kestra/core/tasks/flows/IfTest.java b/core/src/test/java/io/kestra/core/tasks/flows/IfTest.java index 710f6a1b91d..98ad44e8eb0 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/IfTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/IfTest.java @@ -16,21 +16,21 @@ class IfTest extends AbstractMemoryRunnerTest { @Test void ifTruthy() throws TimeoutException { Execution execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null, - (f, e) -> Map.of("param", true) , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", true) , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(2)); assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS)); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null, - (f, e) -> Map.of("param", "true") , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", "true") , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(2)); assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS)); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null, - (f, e) -> Map.of("param", 1) , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", 1) , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(2)); assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS)); @@ -40,28 +40,28 @@ void ifTruthy() throws TimeoutException { @Test void ifFalsy() throws TimeoutException { Execution execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null, - (f, e) -> Map.of("param", false) , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", false) , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(2)); assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS)); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null, - (f, e) -> Map.of("param", "false") , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", "false") , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(2)); assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS)); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null, - (f, e) -> Map.of("param", 0) , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", 0) , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(2)); assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS)); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null, - (f, e) -> Map.of("param", -0) , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", -0) , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(2)); assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS)); @@ -69,18 +69,18 @@ void ifFalsy() throws TimeoutException { // We cannot test null as inputs cannot be null } - + @Test void ifWithoutElse() throws TimeoutException { Execution execution = runnerUtils.runOne("io.kestra.tests", "if-without-else", null, - (f, e) -> Map.of("param", true) , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", true) , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(2)); assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS)); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); execution = runnerUtils.runOne("io.kestra.tests", "if-without-else", null, - (f, e) -> Map.of("param", false) , Duration.ofSeconds(120)); + (f, e) -> Map.of("param", false) , Duration.ofSeconds(120), null); assertThat(execution.getTaskRunList(), hasSize(1)); assertThat(execution.findTaskRunsByTaskId("when-true").isEmpty(), is(true)); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); diff --git a/core/src/test/java/io/kestra/core/tasks/flows/TemplateTest.java b/core/src/test/java/io/kestra/core/tasks/flows/TemplateTest.java index 439dbed4ed9..34d7bacf964 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/TemplateTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/TemplateTest.java @@ -60,8 +60,8 @@ public static void withTemplate(RunnerUtils runnerUtils, TemplateRepositoryInter "with-string", "myString", "with-optional", "myOpt" )), - Duration.ofSeconds(60) - ); + Duration.ofSeconds(60), + null); assertThat(execution.getTaskRunList(), hasSize(4)); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); diff --git a/core/src/test/java/io/kestra/core/tasks/flows/WorkerTest.java b/core/src/test/java/io/kestra/core/tasks/flows/WorkerTest.java index 1646ef1d06f..bc7c680c607 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/WorkerTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/WorkerTest.java @@ -39,8 +39,8 @@ void each() throws TimeoutException { public static class Suite { public void success(RunnerUtils runnerUtils) throws TimeoutException { Execution execution = runnerUtils.runOne("io.kestra.tests", "worker", null, - (f, e) -> ImmutableMap.of("failed", "false"), Duration.ofSeconds(60) - ); + (f, e) -> ImmutableMap.of("failed", "false"), Duration.ofSeconds(60), + null); assertThat(execution.getTaskRunList(), hasSize(4)); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); @@ -49,8 +49,8 @@ public void success(RunnerUtils runnerUtils) throws TimeoutException { public void failed(RunnerUtils runnerUtils) throws TimeoutException { Execution execution = runnerUtils.runOne("io.kestra.tests", "worker", null, - (f, e) -> ImmutableMap.of("failed", "true"), Duration.ofSeconds(60) - ); + (f, e) -> ImmutableMap.of("failed", "true"), Duration.ofSeconds(60), + null); assertThat(execution.getTaskRunList(), hasSize(3)); assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); diff --git a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2ExecutionRepository.java b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2ExecutionRepository.java index f42cce65216..8f77bdbc833 100644 --- a/jdbc-h2/src/main/java/io/kestra/repository/h2/H2ExecutionRepository.java +++ b/jdbc-h2/src/main/java/io/kestra/repository/h2/H2ExecutionRepository.java @@ -7,8 +7,12 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; import org.jooq.Condition; +import org.jooq.Field; +import org.jooq.impl.DSL; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; @Singleton @H2RepositoryEnabled @@ -22,4 +26,20 @@ public H2ExecutionRepository(ApplicationContext applicationContext, AbstractJdbc protected Condition findCondition(String query) { return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query); } + + @Override + protected Condition labelsFilter(Map labels) { + return DSL.and(labels.entrySet() + .stream() + .map(pair -> { + final Field field = DSL.field("JQ_STRING(\"value\", '.labels." + pair.getKey() + "')", String.class); + + if (pair.getValue() == null) { + return field.isNotNull(); + } else { + return field.eq(pair.getValue()); + } + } + ).collect(Collectors.toList())); + } } diff --git a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlExecutionRepository.java b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlExecutionRepository.java index ed4dfcba550..4cfbf289a80 100644 --- a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlExecutionRepository.java +++ b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlExecutionRepository.java @@ -7,8 +7,12 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; import org.jooq.Condition; +import org.jooq.Field; +import org.jooq.impl.DSL; import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; @Singleton @MysqlRepositoryEnabled @@ -22,4 +26,20 @@ public MysqlExecutionRepository(ApplicationContext applicationContext, AbstractJ protected Condition findCondition(String query) { return this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "flow_id", "id"), query); } + + @Override + protected Condition labelsFilter(Map labels) { + return DSL.and(labels.entrySet() + .stream() + .map(pair -> { + final Field field = DSL.field("JSON_VALUE(value, '$.labels." + pair.getKey() + "' NULL ON EMPTY)", String.class); + + if (pair.getValue() == null) { + return field.isNotNull(); + } else { + return field.eq(pair.getValue()); + } + } + ).collect(Collectors.toList())); + } } diff --git a/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresExecutionRepository.java b/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresExecutionRepository.java index e3b43403adc..b4cc6d7186d 100644 --- a/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresExecutionRepository.java +++ b/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresExecutionRepository.java @@ -2,18 +2,19 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.State; -import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository; import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage; import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import jakarta.inject.Singleton; import org.jooq.Condition; +import org.jooq.Field; import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; @Singleton @@ -36,6 +37,22 @@ protected Condition statesFilter(List state) { ); } + @Override + protected Condition labelsFilter(Map labels) { + return DSL.and(labels.entrySet() + .stream() + .map(pair -> { + final Field field = DSL.field("value #>> '{labels, " + pair.getKey() + "}'", String.class); + + if (pair.getValue() == null) { + return field.isNotNull(); + } else { + return field.eq(pair.getValue()); + } + } + ).collect(Collectors.toList())); + } + @Override protected Condition findCondition(String query) { return this.jdbcRepository.fullTextCondition(Collections.singletonList("fulltext"), query); diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java index 2f6160076a3..63e5a303994 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java @@ -98,6 +98,9 @@ protected Condition statesFilter(List state) { .in(state.stream().map(Enum::name).collect(Collectors.toList())); } + abstract protected Condition labelsFilter(Map labels); + + @Override public ArrayListTotal find( Pageable pageable, @Nullable String query, @@ -105,7 +108,8 @@ public ArrayListTotal find( @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, - @Nullable List state + @Nullable List state, + @Nullable Map labels ) { return this.jdbcRepository .getDslContextWrapper() @@ -119,7 +123,8 @@ public ArrayListTotal find( flowId, startDate, endDate, - state + state, + labels ); return this.jdbcRepository.fetchPage(context, select, pageable); @@ -133,7 +138,8 @@ public Flowable find( @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, - @Nullable List state + @Nullable List state, + @Nullable Map labels ) { return Flowable.create( emitter -> this.jdbcRepository @@ -148,7 +154,8 @@ public Flowable find( flowId, startDate, endDate, - state + state, + labels ); select.fetch() @@ -168,7 +175,8 @@ private SelectConditionStep> findSelect( @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, - @Nullable List state + @Nullable List state, + @Nullable Map labels ) { SelectConditionStep> select = context .select( @@ -192,6 +200,10 @@ private SelectConditionStep> findSelect( select = select.and(this.statesFilter(state)); } + if (labels != null) { + select = select.and(this.labelsFilter(labels)); + } + return select; } diff --git a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java index 3e7de3ed205..e2183fa5497 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java @@ -15,7 +15,6 @@ import jakarta.inject.Inject; import jakarta.inject.Named; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -81,63 +80,63 @@ void init() throws IOException, URISyntaxException { @Test void full() throws TimeoutException, QueueException { - Execution execution = runnerUtils.runOne("io.kestra.tests", "full", null, null, Duration.ofSeconds(60)); + Execution execution = runnerUtils.runOne("io.kestra.tests", "full", null, null, Duration.ofSeconds(60), null); assertThat(execution.getTaskRunList(), hasSize(13)); } @Test void logs() throws TimeoutException { - Execution execution = runnerUtils.runOne("io.kestra.tests", "logs", null, null, Duration.ofSeconds(60)); + Execution execution = runnerUtils.runOne("io.kestra.tests", "logs", null, null, Duration.ofSeconds(60), null); assertThat(execution.getTaskRunList(), hasSize(3)); } @Test void errors() throws TimeoutException, QueueException { - Execution execution = runnerUtils.runOne("io.kestra.tests", "errors", null, null, Duration.ofSeconds(60)); + Execution execution = runnerUtils.runOne("io.kestra.tests", "errors", null, null, Duration.ofSeconds(60), null); assertThat(execution.getTaskRunList(), hasSize(7)); } @Test void sequential() throws TimeoutException, QueueException { - Execution execution = runnerUtils.runOne("io.kestra.tests", "sequential", null, null, Duration.ofSeconds(60)); + Execution execution = runnerUtils.runOne("io.kestra.tests", "sequential", null, null, Duration.ofSeconds(60), null); assertThat(execution.getTaskRunList(), hasSize(11)); } @Test void parallel() throws TimeoutException, QueueException { - Execution execution = runnerUtils.runOne("io.kestra.tests", "parallel", null, null, Duration.ofSeconds(60)); + Execution execution = runnerUtils.runOne("io.kestra.tests", "parallel", null, null, Duration.ofSeconds(60), null); assertThat(execution.getTaskRunList(), hasSize(8)); } @Test void parallelNested() throws TimeoutException, QueueException { - Execution execution = runnerUtils.runOne("io.kestra.tests", "parallel-nested", null, null, Duration.ofSeconds(60)); + Execution execution = runnerUtils.runOne("io.kestra.tests", "parallel-nested", null, null, Duration.ofSeconds(60), null); assertThat(execution.getTaskRunList(), hasSize(11)); } @Test void eachSequentialNested() throws TimeoutException { - Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential-nested", null, null, Duration.ofSeconds(60)); + Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential-nested", null, null, Duration.ofSeconds(60), null); assertThat(execution.getTaskRunList(), hasSize(23)); } @Test void eachParallel() throws TimeoutException { - Execution execution = runnerUtils.runOne("io.kestra.tests", "each-parallel", null, null, Duration.ofSeconds(60)); + Execution execution = runnerUtils.runOne("io.kestra.tests", "each-parallel", null, null, Duration.ofSeconds(60), null); assertThat(execution.getTaskRunList(), hasSize(8)); } @Test void eachParallelNested() throws TimeoutException { - Execution execution = runnerUtils.runOne("io.kestra.tests", "each-parallel-nested", null, null, Duration.ofSeconds(60)); + Execution execution = runnerUtils.runOne("io.kestra.tests", "each-parallel-nested", null, null, Duration.ofSeconds(60), null); assertThat(execution.getTaskRunList(), hasSize(11)); } diff --git a/repository-memory/src/main/java/io/kestra/repository/memory/MemoryExecutionRepository.java b/repository-memory/src/main/java/io/kestra/repository/memory/MemoryExecutionRepository.java index 4d8b04b8832..7b4cda9c60b 100644 --- a/repository-memory/src/main/java/io/kestra/repository/memory/MemoryExecutionRepository.java +++ b/repository-memory/src/main/java/io/kestra/repository/memory/MemoryExecutionRepository.java @@ -28,12 +28,12 @@ public Boolean isTaskRunEnabled() { } @Override - public ArrayListTotal find(Pageable pageable, String query, String namespace, String flowId, ZonedDateTime startDate, ZonedDateTime endDate, List state) { + public ArrayListTotal find(Pageable pageable, String query, String namespace, String flowId, ZonedDateTime startDate, ZonedDateTime endDate, List state, @Nullable Map labels) { throw new UnsupportedOperationException(); } @Override - public Flowable find(@Nullable String query, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, @Nullable List state) { + public Flowable find(@Nullable String query, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, @Nullable List state, @Nullable Map labels) { return null; } diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java index a5e350ea519..2647363d590 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java @@ -7,6 +7,7 @@ import io.kestra.core.runners.RunContextFactory; import io.kestra.webserver.responses.BulkErrorResponse; import io.kestra.webserver.responses.BulkResponse; +import io.kestra.webserver.utils.RequestUtils; import io.micronaut.context.annotation.Value; import io.micronaut.context.event.ApplicationEventPublisher; import io.micronaut.core.convert.format.Format; @@ -69,6 +70,7 @@ import java.time.ZonedDateTime; import java.util.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static io.kestra.core.utils.Rethrow.throwFunction; @@ -123,7 +125,8 @@ public PagedResults find( @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId, @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate, @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate, - @Parameter(description = "A state filter") @Nullable @QueryValue List state + @Parameter(description = "A state filter") @Nullable @QueryValue List state, + @Parameter(description = "A labels filter") @Nullable @QueryValue List labels ) { return PagedResults.of(executionRepository.find( PageableUtils.from(page, size, sort, executionRepository.sortMapping()), @@ -132,7 +135,8 @@ public PagedResults find( flowId, startDate, endDate, - state + state, + RequestUtils.toMap(labels) )); } @@ -310,7 +314,8 @@ public HttpResponse deleteByQuery( @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId, @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate, @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate, - @Parameter(description = "A state filter") @Nullable @QueryValue List state + @Parameter(description = "A state filter") @Nullable @QueryValue List state, + @Parameter(description = "A labels filter") @Nullable @QueryValue List labels ) { Integer count = executionRepository .find( @@ -319,7 +324,8 @@ public HttpResponse deleteByQuery( flowId, startDate, endDate, - state + state, + RequestUtils.toMap(labels) ) .map(e -> { executionRepository.delete(e); @@ -429,7 +435,7 @@ private Execution webhook( public Execution trigger( @Parameter(description = "The flow namespace") @PathVariable String namespace, @Parameter(description = "The flow id") @PathVariable String id, - @Parameter(description = "The inputs") @Nullable @Body Map inputs, + @Parameter(description = "The inputs and labels") @Nullable @Body Map inputsAndLabels, @Parameter(description = "The inputs of type file") @Nullable @Part Publisher files, @Parameter(description = "If the server will wait the end of the execution") @QueryValue(defaultValue = "false") Boolean wait, @Parameter(description = "The flow revision or latest if null") @QueryValue Optional revision @@ -445,7 +451,8 @@ public Execution trigger( Execution current = runnerUtils.newExecution( find.get(), - (flow, execution) -> runnerUtils.typedInputs(flow, execution, inputs, files) + (flow, execution) -> runnerUtils.typedInputs(flow, execution, RequestUtils.extractInputs(inputsAndLabels), files), + RequestUtils.extractLabels(inputsAndLabels) ); executionQueue.emit(current); @@ -610,12 +617,12 @@ public MutableHttpResponse restartByIds( } } if (invalids.size() > 0) { - return HttpResponse.badRequest(BulkErrorResponse - .builder() - .message("invalid bulk restart") - .invalids(invalids) - .build() - ); + return HttpResponse.badRequest(BulkErrorResponse + .builder() + .message("invalid bulk restart") + .invalids(invalids) + .build() + ); } for (Execution execution : executions) { Execution restart = executionService.restart(execution, null); @@ -635,7 +642,8 @@ public HttpResponse restartByQuery( @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId, @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate, @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate, - @Parameter(description = "A state filter") @Nullable @QueryValue List state + @Parameter(description = "A state filter") @Nullable @QueryValue List state, + @Parameter(description = "A labels filter") @Nullable @QueryValue List labels ) { Integer count = executionRepository .find( @@ -644,7 +652,8 @@ public HttpResponse restartByQuery( flowId, startDate, endDate, - state + state, + RequestUtils.toMap(labels) ) .map(e -> { Execution restart = executionService.restart(e, null); @@ -779,11 +788,11 @@ public MutableHttpResponse killByIds( if (invalids.size() > 0) { return HttpResponse.badRequest(BulkErrorResponse - .builder() - .message("invalid bulk kill") - .invalids(invalids) - .build() - ); + .builder() + .message("invalid bulk kill") + .invalids(invalids) + .build() + ); } executions.forEach(execution -> { @@ -806,7 +815,8 @@ public HttpResponse killByQuery( @Parameter(description = "A flow id filter") @Nullable @QueryValue String flowId, @Parameter(description = "The start datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime startDate, @Parameter(description = "The end datetime") @Nullable @Format("yyyy-MM-dd'T'HH:mm[:ss][.SSS][XXX]") @QueryValue ZonedDateTime endDate, - @Parameter(description = "A state filter") @Nullable @QueryValue List state + @Parameter(description = "A state filter") @Nullable @QueryValue List state, + @Parameter(description = "A labels filter") @Nullable @QueryValue List labels ) { Integer count = executionRepository .find( @@ -815,7 +825,8 @@ public HttpResponse killByQuery( flowId, startDate, endDate, - state + state, + RequestUtils.toMap(labels) ) .map(e -> { if (!e.getState().isRunning()) { diff --git a/webserver/src/main/java/io/kestra/webserver/utils/RequestUtils.java b/webserver/src/main/java/io/kestra/webserver/utils/RequestUtils.java index 4684206f2dd..3e16d7a59a6 100644 --- a/webserver/src/main/java/io/kestra/webserver/utils/RequestUtils.java +++ b/webserver/src/main/java/io/kestra/webserver/utils/RequestUtils.java @@ -6,9 +6,14 @@ import java.util.AbstractMap; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; public class RequestUtils { + private static final String LABEL_PREFIX = "label-"; + private static final Pattern LABEL_PATTERN = Pattern.compile("^" + LABEL_PREFIX + ".+$"); + public static Map toMap(List queryString) { return queryString == null ? null : queryString .stream() @@ -25,4 +30,30 @@ public static Map toMap(List queryString) { }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } + + public static Map extractLabels(Map inputs) { + return inputs == null ? null : inputs.entrySet() + .stream() + .filter(RequestUtils::isLabel) + .collect(Collectors.toMap(entry -> removePrefix(entry.getKey()), Map.Entry::getValue)); + } + + public static Map extractInputs(Map inputs) { + return inputs == null ? null : inputs.entrySet() + .stream() + .filter(entry -> !isLabel(entry)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + protected static boolean isLabel(Map.Entry entry) { + final Matcher labelMatcher = LABEL_PATTERN.matcher(entry.getKey()); + return labelMatcher.matches(); + } + + protected static String removePrefix(String labelName) { + if (labelName.indexOf(LABEL_PREFIX) == -1) { + throw new HttpStatusException(HttpStatus.UNPROCESSABLE_ENTITY, "Invalid label parameter"); + } + return labelName.substring(labelName.indexOf(LABEL_PREFIX) + LABEL_PREFIX.length()); + } } diff --git a/webserver/src/test/java/io/kestra/webserver/controllers/ExecutionControllerTest.java b/webserver/src/test/java/io/kestra/webserver/controllers/ExecutionControllerTest.java index 90e7d859108..10867bbdbf7 100644 --- a/webserver/src/test/java/io/kestra/webserver/controllers/ExecutionControllerTest.java +++ b/webserver/src/test/java/io/kestra/webserver/controllers/ExecutionControllerTest.java @@ -86,7 +86,7 @@ private Execution triggerExecution(String namespace, String flowId, MultipartBod } - private MultipartBody createInputsFlowBody() { + private MultipartBody createInputsLabelsFlowBody() { // Trigger execution File applicationFile = new File(Objects.requireNonNull( ExecutionControllerTest.class.getClassLoader().getResource("application.yml") @@ -103,29 +103,33 @@ private MultipartBody createInputsFlowBody() { .addPart("instant", "2019-10-06T18:27:49Z") .addPart("files", "file", MediaType.TEXT_PLAIN_TYPE, applicationFile) .addPart("files", "optionalFile", MediaType.TEXT_XML_TYPE, logbackFile) + .addPart("label-a", "label-1") + .addPart("label-b", "label-2") .build(); } - private Execution triggerInputsFlowExecution(Boolean wait) { - MultipartBody requestBody = createInputsFlowBody(); + private Execution triggerInputsLabelsFlowExecution(Boolean wait) { + MultipartBody requestBody = createInputsLabelsFlowBody(); return triggerExecution(TESTS_FLOW_NS, "inputs", requestBody, wait); } @Test void trigger() { - Execution result = triggerInputsFlowExecution(false); + Execution result = triggerInputsLabelsFlowExecution(false); assertThat(result.getState().getCurrent(), is(State.Type.CREATED)); assertThat(result.getFlowId(), is("inputs")); assertThat(result.getInputs().get("float"), is(42.42)); assertThat(result.getInputs().get("file").toString(), startsWith("kestra:///io/kestra/tests/inputs/executions/")); assertThat(result.getInputs().get("file").toString(), startsWith("kestra:///io/kestra/tests/inputs/executions/")); + assertThat(result.getLabels().get("a"), is("label-1")); + assertThat(result.getLabels().get("b"), is("label-2")); } @Test void triggerAndWait() { - Execution result = triggerInputsFlowExecution(true); + Execution result = triggerInputsLabelsFlowExecution(true); assertThat(result.getState().getCurrent(), is(State.Type.SUCCESS)); assertThat(result.getTaskRunList().size(), is(5)); @@ -133,7 +137,7 @@ void triggerAndWait() { @Test void get() { - Execution result = triggerInputsFlowExecution(false); + Execution result = triggerInputsLabelsFlowExecution(false); // Get the triggered execution by execution id Execution foundExecution = client.retrieve( @@ -170,7 +174,7 @@ void findByFlowId() { @Test void triggerAndFollow() { - Execution result = triggerInputsFlowExecution(false); + Execution result = triggerInputsLabelsFlowExecution(false); RxSseClient sseClient = embeddedServer.getApplicationContext().createBean(RxSseClient.class, embeddedServer.getURL()); @@ -494,6 +498,4 @@ void webhook() { assertThat(execution.getTrigger().getVariables().get("body"), is("{\\\"a\\\":\\\"\\\",\\\"b\\\":{\\\"c\\\":{\\\"d\\\":{\\\"e\\\":\\\"\\\",\\\"f\\\":\\\"1\\\"}}}}")); } - - } diff --git a/webserver/src/test/java/io/kestra/webserver/utils/RequestUtilsTest.java b/webserver/src/test/java/io/kestra/webserver/utils/RequestUtilsTest.java new file mode 100644 index 00000000000..764c1f51858 --- /dev/null +++ b/webserver/src/test/java/io/kestra/webserver/utils/RequestUtilsTest.java @@ -0,0 +1,49 @@ +package io.kestra.webserver.utils; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.hamcrest.Matchers.*; +import static org.hamcrest.MatcherAssert.assertThat; + +class RequestUtilsTest { + @Test + void extractLabels() { + final Map inputParam = Map.of( + "label-1", "labelA", "test", "testValue", "label-2", "labelB" + ); + + final Map extracted = RequestUtils.extractLabels(inputParam); + + assertThat(extracted.size(), is(2)); + assertThat(extracted.get("1"), is("labelA")); + assertThat(extracted.get("2"), is("labelB")); + } + + @Test + void extractLabelsNullCheck() { + final Map extracted = RequestUtils.extractLabels(null); + + assertThat(extracted, is(nullValue())); + } + + @Test + void extractInputs() { + final Map inputParam = Map.of( + "label-1", "labelA", "test", "testValue", "label-2", "labelB" + ); + + final Map extracted = RequestUtils.extractInputs(inputParam); + + assertThat(extracted.size(), is(1)); + assertThat(extracted.get("test"), is("testValue")); + } + + @Test + void extractInputsNullCheck() { + final Map extracted = RequestUtils.extractInputs(null); + + assertThat(extracted, is(nullValue())); + } +} \ No newline at end of file