Skip to content

Commit

Permalink
fix(kafka-runner): handle flow with multiple trigger that trigger mul…
Browse files Browse the repository at this point in the history
…tiple times next flow
  • Loading branch information
tchiotludo committed Feb 23, 2022
1 parent 53d2f1b commit 7120872
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public class DailyExecutionStatistics {
State.Type.SUCCESS, 0L,
State.Type.WARNING, 0L,
State.Type.FAILED, 0L,
State.Type.KILLED, 0L
State.Type.KILLED, 0L,
State.Type.PAUSED, 0L
));

@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,12 @@ triggers:
- type: io.kestra.core.models.conditions.types.ExecutionFlowCondition
namespace: io.kestra.tests
flowId: trigger-flow
- id: listen-flow-failed
type: io.kestra.core.models.triggers.types.Flow
conditions:
- type: io.kestra.core.models.conditions.types.ExecutionStatusCondition
in:
- FAILED
- type: io.kestra.core.models.conditions.types.ExecutionFlowCondition
namespace: io.kestra.tests
flowId: other-flow
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,14 @@ void dailyGroupByFlowStatistics() {
DailyExecutionStatistics second = result.get("io.kestra.unittest").get("second").get(10);

assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(full.getExecutionCounts().size(), is(8));
assertThat(full.getExecutionCounts().size(), is(9));
assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(7L));
assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L));

assertThat(second.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(second.getExecutionCounts().size(), is(8));
assertThat(second.getExecutionCounts().size(), is(9));
assertThat(second.getExecutionCounts().get(State.Type.SUCCESS), is(13L));
assertThat(second.getExecutionCounts().get(State.Type.CREATED), is(0L));

Expand All @@ -193,7 +193,7 @@ void dailyGroupByFlowStatistics() {
assertThat(result.get("io.kestra.unittest").size(), is(1));
full = result.get("io.kestra.unittest").get("*").get(10);
assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(full.getExecutionCounts().size(), is(8));
assertThat(full.getExecutionCounts().size(), is(9));
assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(20L));
Expand All @@ -217,7 +217,7 @@ void dailyStatistics() {
);

assertThat(result.size(), is(11));
assertThat(result.get(10).getExecutionCounts().size(), is(8));
assertThat(result.get(10).getExecutionCounts().size(), is(9));
assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L));

assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L));
Expand All @@ -243,7 +243,7 @@ void taskRunsDailyStatistics() {
);

assertThat(result.size(), is(11));
assertThat(result.get(10).getExecutionCounts().size(), is(8));
assertThat(result.get(10).getExecutionCounts().size(), is(9));
assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L));

assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L * 2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ public void init(ProcessorContext context) {

@Override
public Iterable<KeyValue<String, ExecutorFlowTrigger>> transform(String key, Executor value) {
// flowWithFlowTrigger return 1 result per flow per trigger but since we analysed the whole flow on FlowTrigger
// we deduplicate by flow
return flowService.flowWithFlowTrigger(kafkaFlowExecutor.allLastVersion().stream())
.stream()
.collect(Collectors.toMap(o -> o.getFlow().uidWithoutRevision(), p -> p, (p, q) -> p)).values()
.stream()
.map(f -> KeyValue.pair(f.getFlow().uidWithoutRevision(), new ExecutorFlowTrigger(f.getFlow(), value.getExecution())))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,13 +492,17 @@ void flowTrigger() {
io.kestra.runner.kafka.streams.ExecutorFlowTrigger executorFlowTrigger = executorFlowTriggerOutput().readRecord().value();
assertThat(executorFlowTrigger.getFlowHavingTrigger().getId(), is("trigger-flow-listener-no-inputs"));

assertThat(executorFlowTriggerOutput().isEmpty(), is(true));

startStream(this.executorFlowTrigger);

executorFlowTriggerInput().pipeInput(executorFlowTrigger.getFlowHavingTrigger().uid(), executorFlowTrigger);

Execution triggerExecution = executionOutput().readRecord().getValue();
assertThat(triggerExecution.getState().getCurrent(), is(State.Type.CREATED));
assertThat(triggerExecution.getFlowId(), is("trigger-flow-listener-no-inputs"));

assertThat(executionOutput().isEmpty(), is(true));
}

@Test
Expand Down

0 comments on commit 7120872

Please sign in to comment.