Skip to content

Commit

Permalink
WiP Initial rough PgSQL version
Browse files Browse the repository at this point in the history
* Add tests?
* Doc?

Fixes kestra-io#906
  • Loading branch information
yuri1969 committed Apr 22, 2023
1 parent 29424af commit 861390e
Show file tree
Hide file tree
Showing 23 changed files with 273 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public Integer call() throws Exception {
runnerUtils.runOne(
all.get(0),
(flow, execution) -> runnerUtils.typedInputs(flow, execution, inputs),
Duration.ofHours(1)
);
Duration.ofHours(1),
null);

runner.close();
} catch (MissingRequiredInput e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class Execution implements DeletedInterface {
@With
Map<String, Object> inputs;

@With
Map<String, String> labels;

@With
Map<String, Object> variables;

Expand Down Expand Up @@ -93,6 +96,7 @@ public Execution withState(State.Type state) {
this.flowRevision,
this.taskRunList,
this.inputs,
this.labels,
this.variables,
this.state.withState(state),
this.parentId,
Expand Down Expand Up @@ -122,6 +126,7 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException {
this.flowRevision,
newTaskRunList,
this.inputs,
this.labels,
this.variables,
this.state,
this.parentId,
Expand All @@ -139,6 +144,7 @@ public Execution childExecution(String childExecutionId, List<TaskRun> taskRunLi
this.flowRevision,
taskRunList,
this.inputs,
this.labels,
this.variables,
state,
childExecutionId != null ? this.getId() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ ArrayListTotal<Execution> find(
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
@Nullable List<State.Type> state,
@Nullable Map<String, String> labels
);

Flowable<Execution> find(
Expand All @@ -43,7 +44,8 @@ Flowable<Execution> find(
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
@Nullable List<State.Type> state,
@Nullable Map<String, String> labels
);

ArrayListTotal<TaskRun> findTaskRun(
Expand Down
28 changes: 16 additions & 12 deletions core/src/main/java/io/kestra/core/runners/RunnerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,41 +253,41 @@ private Map<String, Object> handleNestedInputs(Map<String, Object> inputs) {
}

public Execution runOne(String namespace, String flowId) throws TimeoutException {
return this.runOne(namespace, flowId, null, null, null);
return this.runOne(namespace, flowId, null, null, null, null);
}

public Execution runOne(String namespace, String flowId, Integer revision) throws TimeoutException {
return this.runOne(namespace, flowId, revision, null, null);
return this.runOne(namespace, flowId, revision, null, null, null);
}

public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs) throws TimeoutException {
return this.runOne(namespace, flowId, revision, inputs, null);
return this.runOne(namespace, flowId, revision, inputs, null, null);
}

public Execution runOne(String namespace, String flowId, Duration duration) throws TimeoutException {
return this.runOne(namespace, flowId, null, null, duration);
return this.runOne(namespace, flowId, null, null, duration, null);
}

public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException {
public Execution runOne(String namespace, String flowId, Integer revision, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, Map<String, String> labels) throws TimeoutException {
return this.runOne(
flowRepository
.findById(namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")),
inputs,
duration
);
duration,
labels);
}

public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs) throws TimeoutException {
return this.runOne(flow, inputs, null);
return this.runOne(flow, inputs, null, null);
}

public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration) throws TimeoutException {
public Execution runOne(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Duration duration, Map<String, String> labels) throws TimeoutException {
if (duration == null) {
duration = Duration.ofSeconds(15);
}

Execution execution = this.newExecution(flow, inputs);
Execution execution = this.newExecution(flow, inputs, labels);

return this.awaitExecution(isTerminatedExecution(execution, flow), () -> {
this.executionQueue.emit(execution);
Expand All @@ -313,7 +313,7 @@ public Execution runOneUntilPaused(Flow flow, BiFunction<Flow, Execution, Map<St
duration = Duration.ofSeconds(15);
}

Execution execution = this.newExecution(flow, inputs);
Execution execution = this.newExecution(flow, inputs, null);

return this.awaitExecution(isPausedExecution(execution), () -> {
this.executionQueue.emit(execution);
Expand Down Expand Up @@ -362,7 +362,7 @@ private Predicate<Execution> isTerminatedChildExecution(Execution parentExecutio
return e -> e.getParentId() != null && e.getParentId().equals(parentExecution.getId()) && conditionService.isTerminatedWithListeners(flow, e);
}

public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs) {
public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String, Object>> inputs, Map<String, String> labels) {
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
Expand All @@ -375,6 +375,10 @@ public Execution newExecution(Flow flow, BiFunction<Flow, Execution, Map<String,
execution = execution.withInputs(inputs.apply(flow, execution));
}

if (labels != null) {
execution = execution.withLabels(labels);
}

return execution;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ public PurgeResult purge(
flowId,
null,
endDate,
state
state,
null
)
.map(execution -> {
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public class Flow extends Task implements RunnableTask<Flow.Output> {
@PluginProperty(dynamic = true)
private Map<String, String> inputs;

@Schema(
title = "The label to pass to the triggered flow"
)
@PluginProperty(dynamic = true)
private Map<String, String> labels;

@Builder.Default
@Schema(
title = "Wait the end of the execution.",
Expand Down Expand Up @@ -112,6 +118,13 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
}
}

Map<String, String> labels = new HashMap<>();
if (this.labels != null) {
for (Map.Entry<String, String> entry: this.labels.entrySet()) {
labels.put(entry.getKey(), runContext.render(entry.getValue()));
}
}

Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow");

io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
Expand All @@ -130,8 +143,8 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
return runnerUtils
.newExecution(
flow,
(f, e) -> runnerUtils.typedInputs(f, e, inputs)
)
(f, e) -> runnerUtils.typedInputs(f, e, inputs),
labels)
.withTrigger(ExecutionTrigger.builder()
.id(this.getId())
.type(this.getType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected boolean isTriggerChild(Flow parent, Flow child) {
List<AbstractTrigger> triggers = ListUtils.emptyOnNull(child.getTriggers());

// simulated execution
Execution execution = runnerUtils.newExecution(parent, (f, e) -> null);
Execution execution = runnerUtils.newExecution(parent, (f, e) -> null, null);

// keep only flow trigger
List<io.kestra.core.models.triggers.types.Flow> flowTriggers = triggers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ protected void inject() {
protected void find() {
inject();

ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, null);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, null, null);
assertThat(executions.getTotal(), is(28L));
assertThat(executions.size(), is(10));

executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED));
executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED), null);
assertThat(executions.getTotal(), is(8L));
}

@Test
protected void findWithSort() {
inject();

ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), null, null, null, null, null, null);
ArrayListTotal<Execution> executions = executionRepository.find(Pageable.from(1, 10, Sort.of(Sort.Order.desc("id"))), null, null, null, null, null, null, null);
assertThat(executions.getTotal(), is(28L));
assertThat(executions.size(), is(10));

executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED));
executions = executionRepository.find(Pageable.from(1, 10), null, null, null, null, null, List.of(State.Type.RUNNING, State.Type.FAILED), null);
assertThat(executions.getTotal(), is(8L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class FailTest extends AbstractMemoryRunnerTest {
@Test
void failOnSwitch() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "fail-on-switch", null,
(f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120));
(f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.findTaskRunsByTaskId("switch").get(0).getState().getCurrent(), is(State.Type.FAILED));
Expand All @@ -27,7 +27,7 @@ void failOnSwitch() throws TimeoutException {
@Test
void failOnCondition() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "fail-on-condition", null,
(f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120));
(f, e) -> Map.of("param", "fail") , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("fail").get(0).getState().getCurrent(), is(State.Type.FAILED));
Expand All @@ -37,7 +37,7 @@ void failOnCondition() throws TimeoutException {
@Test
void dontFailOnCondition() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "fail-on-condition", null,
(f, e) -> Map.of("param", "success") , Duration.ofSeconds(120));
(f, e) -> Map.of("param", "success") , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(3));
assertThat(execution.findTaskRunsByTaskId("fail").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ void run(String input, State.Type fromState, State.Type triggerState, int count
"task-flow",
null,
(f, e) -> ImmutableMap.of("string", input),
Duration.ofMinutes(1)
);
Duration.ofMinutes(1),
null);

countDownLatch.await(1, TimeUnit.MINUTES);

Expand Down
20 changes: 10 additions & 10 deletions core/src/test/java/io/kestra/core/tasks/flows/IfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ class IfTest extends AbstractMemoryRunnerTest {
@Test
void ifTruthy() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", "true") , Duration.ofSeconds(120));
(f, e) -> Map.of("param", "true") , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", 1) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", 1) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
Expand All @@ -40,47 +40,47 @@ void ifTruthy() throws TimeoutException {
@Test
void ifFalsy() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", false) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", false) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", "false") , Duration.ofSeconds(120));
(f, e) -> Map.of("param", "false") , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", 0) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", 0) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-condition", null,
(f, e) -> Map.of("param", -0) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", -0) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-false").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

// We cannot test null as inputs cannot be null
}

@Test
void ifWithoutElse() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "if-without-else", null,
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", true) , Duration.ofSeconds(120), null);

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.findTaskRunsByTaskId("when-true").get(0).getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

execution = runnerUtils.runOne("io.kestra.tests", "if-without-else", null,
(f, e) -> Map.of("param", false) , Duration.ofSeconds(120));
(f, e) -> Map.of("param", false) , Duration.ofSeconds(120), null);
assertThat(execution.getTaskRunList(), hasSize(1));
assertThat(execution.findTaskRunsByTaskId("when-true").isEmpty(), is(true));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public static void withTemplate(RunnerUtils runnerUtils, TemplateRepositoryInter
"with-string", "myString",
"with-optional", "myOpt"
)),
Duration.ofSeconds(60)
);
Duration.ofSeconds(60),
null);

assertThat(execution.getTaskRunList(), hasSize(4));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/java/io/kestra/core/tasks/flows/WorkerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ void each() throws TimeoutException {
public static class Suite {
public void success(RunnerUtils runnerUtils) throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "worker", null,
(f, e) -> ImmutableMap.of("failed", "false"), Duration.ofSeconds(60)
);
(f, e) -> ImmutableMap.of("failed", "false"), Duration.ofSeconds(60),
null);

assertThat(execution.getTaskRunList(), hasSize(4));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
Expand All @@ -49,8 +49,8 @@ public void success(RunnerUtils runnerUtils) throws TimeoutException {

public void failed(RunnerUtils runnerUtils) throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "worker", null,
(f, e) -> ImmutableMap.of("failed", "true"), Duration.ofSeconds(60)
);
(f, e) -> ImmutableMap.of("failed", "true"), Duration.ofSeconds(60),
null);

assertThat(execution.getTaskRunList(), hasSize(3));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
Expand Down
Loading

0 comments on commit 861390e

Please sign in to comment.