Skip to content

Commit

Permalink
feat(core): add variable function currentEachOutput() (#924)
Browse files Browse the repository at this point in the history
Co-authored-by: Ludovic DEHON <tchiot.ludo@gmail.com>
  • Loading branch information
loicmathieu and tchiotludo authored Jan 26, 2023
1 parent e8c5ff9 commit 5c30c05
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.kestra.core.runners.pebble;

import io.pebbletemplates.pebble.extension.*;
import io.pebbletemplates.pebble.node.expression.RangeExpression;
import io.pebbletemplates.pebble.operator.Associativity;
import io.pebbletemplates.pebble.operator.BinaryOperator;
import io.pebbletemplates.pebble.operator.BinaryOperatorImpl;
import io.pebbletemplates.pebble.operator.UnaryOperator;
import io.pebbletemplates.pebble.tokenParser.TokenParser;
import io.kestra.core.runners.pebble.expression.NullCoalescingExpression;
import io.kestra.core.runners.pebble.filters.*;
import io.kestra.core.runners.pebble.functions.CurrentEachOutputFunction;
import io.kestra.core.runners.pebble.functions.JsonFunction;
import io.kestra.core.runners.pebble.functions.NowFunction;
import io.kestra.core.runners.pebble.tests.JsonTest;
Expand Down Expand Up @@ -81,6 +81,7 @@ public Map<String, Function> getFunctions() {

tests.put("now", new NowFunction());
tests.put("json", new JsonFunction());
tests.put("currentEachOutput", new CurrentEachOutputFunction());

return tests;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.kestra.core.runners.pebble.functions;

import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class CurrentEachOutputFunction implements Function {

@SuppressWarnings("unchecked")
@Override
public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
if (!args.containsKey("outputs")) {
throw new PebbleException(null, "The 'currentEachOutput' function expects an argument 'outputs'.", lineNumber, self.getName());
}

if (!(args.get("outputs") instanceof Map)) {
throw new PebbleException(null, "The 'currentEachOutput' function expects an argument 'outputs' with type map.", lineNumber, self.getName());
}

Map<?, ?> outputs = (Map<?, ?>) args.get("outputs");
List<Map<?, ?>> parents = (List<Map<?, ?>>) context.getVariable("parents");
if (parents != null && !parents.isEmpty()) {
Collections.reverse(parents);
for (Map<?, ?> parent : parents) {
outputs = (Map<?, ?>) outputs.get(((Map<?, ?>) parent.get("taskrun")).get("value"));
}
}
Map<?, ?> taskrun = (Map<?, ?>) context.getVariable("taskrun");

return outputs.get(taskrun.get("value"));
}

@Override
public List<String> getArgumentNames() {
return List.of("outputs");
}
}
2 changes: 1 addition & 1 deletion core/src/test/java/io/kestra/core/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.function.Consumer;

public class Helpers {
public static long FLOWS_COUNT = 51;
public static long FLOWS_COUNT = 52;

public static ApplicationContext applicationContext() throws URISyntaxException {
return applicationContext(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.kestra.core.tasks.flows;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

public class CurrentEachOutputFunctionTest extends AbstractMemoryRunnerTest {
@Test
void parallel() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "test-current-output");

var output1 = (Map<String, Object>) execution.outputs().get("1-1-1_return");
var outputv11 = (Map<String, Object>) output1.get("v11");
var outputv11v21 = (Map<String, Object>) outputv11.get("v21");
assertThat(((Map<String, Object>) outputv11v21.get("v31")).get("value"), is(equalTo("return-v11-v21-v31")));
assertThat(((Map<String, Object>) outputv11v21.get("v32")).get("value"), is(equalTo("return-v11-v21-v32")));
var outputv11v22 = (Map<String, Object>) outputv11.get("v22");
assertThat(((Map<String, Object>) outputv11v22.get("v31")).get("value"), is(equalTo("return-v11-v22-v31")));
assertThat(((Map<String, Object>) outputv11v22.get("v32")).get("value"), is(equalTo("return-v11-v22-v32")));
var outputv12 = (Map<String, Object>) output1.get("v12");
var outputv12v21 = (Map<String, Object>) outputv12.get("v21");
assertThat(((Map<String, Object>) outputv12v21.get("v31")).get("value"), is(equalTo("return-v12-v21-v31")));
assertThat(((Map<String, Object>) outputv12v21.get("v32")).get("value"), is(equalTo("return-v12-v21-v32")));
var outputv12v22 = (Map<String, Object>) outputv12.get("v22");
assertThat(((Map<String, Object>) outputv12v22.get("v31")).get("value"), is(equalTo("return-v12-v22-v31")));
assertThat(((Map<String, Object>) outputv12v22.get("v32")).get("value"), is(equalTo("return-v12-v22-v32")));

var output2 = (Map<String, Object>) execution.outputs().get("2-1_return");
assertThat(((Map<String, Object>) output2.get("v41")).get("value"), is(equalTo("return-v41")));
assertThat(((Map<String, Object>) output2.get("v42")).get("value"), is(equalTo("return-v42")));
}
}
47 changes: 47 additions & 0 deletions core/src/test/resources/flows/valids/current-output.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
id: test-current-output
namespace: io.kestra.tests
tasks:
- id: 1_each
type: io.kestra.core.tasks.flows.EachSequential
value: '["v11", "v12"]'
tasks:
- id: 1-1_each
type: io.kestra.core.tasks.flows.EachSequential
value: '["v21", "v22"]'
tasks:
- id: 1-1-1_each
type: io.kestra.core.tasks.flows.EachSequential
value: '["v31", "v32"]'
tasks:
- id: 1-1-1_output
type: io.kestra.core.tasks.debugs.Return
format: "{{ parents[1].taskrun.value }}-{{ parents[0].taskrun.value }}-{{ taskrun.value }}"
- id: 1-1-1_return
type: io.kestra.core.tasks.debugs.Return
#format: "return-{{ outputs['1-1-1_output'][parents[1].taskrun.value][parents[0].taskrun.value][taskrun.value].value }}"
format: "return-{{ currentEachOutput(outputs['1-1-1_output']).value }}"
- id: 2_each
type: io.kestra.core.tasks.flows.EachSequential
value: '["v41", "v42"]'
tasks:
- id: 2-1_output
type: io.kestra.core.tasks.debugs.Return
format: "{{ taskrun.value }}"
- id: 2-1_return
type: io.kestra.core.tasks.debugs.Return
#format: "return-{{ outputs['1-1-1_output'][taskrun.value].value }}"
format: "return-{{ currentEachOutput(outputs['2-1_output']).value }}"
- id: 2_sequential
type: io.kestra.core.tasks.flows.Sequential
tasks:
- id: 2-1_sequential
type: io.kestra.core.tasks.flows.Sequential
tasks:
- id: 2-1-1_output
type: io.kestra.core.tasks.debugs.Return
format: "{{ taskrun.id }}"
- id: 2-1-1_return
type: io.kestra.core.tasks.debugs.Return
format: "return-{{ outputs['2-1-1_output'].value }}"


0 comments on commit 5c30c05

Please sign in to comment.