Skip to content

Commit

Permalink
feat(core): move flows dependencies to os (#990)
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye authored Feb 20, 2023
1 parent 20a2896 commit 3efb814
Show file tree
Hide file tree
Showing 31 changed files with 1,308 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ kestra:
settings:
table: "settings"
cls: io.kestra.core.models.Setting
flowtopologies:
table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology

queues:
min-poll-interval: 100ms
Expand Down
4 changes: 4 additions & 0 deletions cli/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ kestra:
type: memory
queue:
type: memory
storage:
type: local
local:
base-path: /tmp/unittest
plugins:
path: /tmp/plugins
repositories:
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/java/io/kestra/core/models/topologies/FlowNode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.kestra.core.models.topologies;

import io.kestra.core.models.flows.Flow;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.SuperBuilder;

import javax.validation.constraints.NotNull;

@Getter
@AllArgsConstructor
@SuperBuilder(toBuilder = true)
public class FlowNode {
@NotNull
String uid;

String namespace;

String id;

public static FlowNode of(Flow flow) {
return FlowNode.builder()
.uid(flow.uidWithoutRevision())
.namespace(flow.getNamespace())
.id(flow.getId())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.kestra.core.models.topologies;

public enum FlowRelation {
FLOW_TASK,
FLOW_TRIGGER,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.kestra.core.models.topologies;

import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Builder;
import lombok.Value;

import javax.validation.constraints.NotNull;

@Value
@Builder
public class FlowTopology {
@NotNull
FlowNode source;

@NotNull
FlowRelation relation;

@NotNull
FlowNode destination;

@JsonIgnore
public String uid() {
// we use destination as prefix to enable prefixScan on FlowTopologyUpdateTransformer
return destination.getUid() + "|" + source.getUid();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.kestra.core.models.topologies;

import io.kestra.core.models.hierarchies.Graph;
import lombok.*;

import java.util.Set;
import java.util.stream.Collectors;

@Value
@Builder
public class FlowTopologyGraph {
Set<FlowNode> nodes;
Set<Edge> edges;

public static FlowTopologyGraph of(Graph<FlowNode, FlowRelation> graph) {
return FlowTopologyGraph.builder()
.nodes(graph.nodes())
.edges(graph.edges()
.stream()
.map(flowRelationEdge -> new Edge(
flowRelationEdge.getSource().getUid(),
flowRelationEdge.getTarget().getUid(),
flowRelationEdge.getValue()
))
.collect(Collectors.toSet())
)
.build();
}

@Getter
@AllArgsConstructor
@ToString
@EqualsAndHashCode
public static class Edge {
private final String source;
private final String target;
private final FlowRelation relation;
}
}
4 changes: 3 additions & 1 deletion core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.*;

import jakarta.inject.Singleton;

@Singleton
Expand Down Expand Up @@ -47,6 +47,8 @@ public String key(Object object) {
return ((Setting) object).getKey();
} else if (object.getClass() == Executor.class) {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == FlowTopology.class) {
return ((FlowTopology) object).uid();
} else {
throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.kestra.core.repositories;


import io.kestra.core.models.topologies.FlowTopology;

import java.util.List;

public interface FlowTopologyRepositoryInterface {
List<FlowTopology> findByFlow(String namespace, String flowId, Boolean destinationOnly);

FlowTopology save(FlowTopology flowTopology);
}
192 changes: 192 additions & 0 deletions core/src/main/java/io/kestra/core/topologies/FlowTopologyService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package io.kestra.core.topologies;

import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.types.*;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.hierarchies.Graph;
import io.kestra.core.models.topologies.FlowNode;
import io.kestra.core.models.topologies.FlowRelation;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.topologies.FlowTopologyGraph;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.services.ConditionService;
import io.kestra.core.utils.ListUtils;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
@Singleton
public class FlowTopologyService {
@Inject
protected ConditionService conditionService;

@Inject
protected RunnerUtils runnerUtils;


public FlowTopologyGraph graph(Stream<FlowTopology> flows, Function<FlowNode, FlowNode> anonymize) {
Graph<FlowNode, FlowRelation> graph = new Graph<>();

flows
.forEach(flowTopology -> {
FlowNode source = anonymize.apply(flowTopology.getSource());
FlowNode destination = anonymize.apply(flowTopology.getDestination());


if (!graph.nodes().contains(source)) {
graph.addNode(source);
}

if (!graph.nodes().contains(destination)) {
graph.addNode(destination);
}

if (!source.getUid().equals(destination.getUid())) {
graph.addEdge(source, destination, flowTopology.getRelation());
}
});

return FlowTopologyGraph.of(graph);
}

public Stream<FlowTopology> topology(Flow child, Stream<Flow> allFlows) {
return allFlows
.flatMap(parent -> Stream.concat(
Stream.ofNullable(this.map(parent, child)),
Stream.ofNullable(this.map(child, parent))
))
.filter(Objects::nonNull);
}

protected FlowTopology map(Flow parent, Flow child) {
// we don't allow self link
if (child.uidWithoutRevision().equals(parent.uidWithoutRevision())) {
return null;
}

FlowRelation relation = this.isChild(parent, child);
if (relation == null) {
return null;
}

FlowNode parentTopology = FlowNode.of(parent);
FlowNode childTopology = FlowNode.of(child);

return FlowTopology.builder()
.source(parentTopology)
.destination(childTopology)
.relation(relation)
.build();
}

@Nullable
public FlowRelation isChild(Flow parent, Flow child) {
if (this.isFlowTaskChild(parent, child)) {
return FlowRelation.FLOW_TASK;
}

if (this.isTriggerChild(parent, child)) {
return FlowRelation.FLOW_TRIGGER;
}

return null;
}

protected boolean isFlowTaskChild(Flow parent, Flow child) {
try {
return parent
.allTasksWithChilds()
.stream()
.filter(t -> t instanceof io.kestra.core.tasks.flows.Flow)
.map(t -> (io.kestra.core.tasks.flows.Flow) t)
.anyMatch(t ->
t.getNamespace().equals(child.getNamespace()) && t.getFlowId().equals(child.getId())
);
} catch (Exception e) {
log.warn("Failed to detect flow task on namespace:'" + parent.getNamespace() + "', flowId:'" + parent.getId() + "'", e);
return false;
}
}

protected boolean isTriggerChild(Flow parent, Flow child) {
List<AbstractTrigger> triggers = ListUtils.emptyOnNull(child.getTriggers());

// simulated execution
Execution execution = runnerUtils.newExecution(parent, (f, e) -> null);

// keep only flow trigger
List<io.kestra.core.models.triggers.types.Flow> flowTriggers = triggers
.stream()
.filter(t -> t instanceof io.kestra.core.models.triggers.types.Flow)
.map(t -> (io.kestra.core.models.triggers.types.Flow) t)
.collect(Collectors.toList());

if (flowTriggers.size() == 0) {
return false;
}

return flowTriggers
.stream()
.flatMap(flow -> ListUtils.emptyOnNull(flow.getConditions()).stream())
.allMatch(condition -> validateCondition(condition, parent, execution));
}

protected boolean validateCondition(Condition condition, Flow child, Execution execution) {
if (isFilterCondition(condition)) {
return true;
}

if (condition instanceof MultipleCondition) {
List<Condition> multipleConditions = ((MultipleCondition) condition)
.getConditions()
.values()
.stream()
.filter(c -> !isFilterCondition(c))
.collect(Collectors.toList());


return (multipleConditions
.stream()
.filter(c -> !isMandatoryMultipleCondition(c))
.anyMatch(c -> validateCondition(c, child, execution))
) && (
multipleConditions
.stream()
.filter(this::isMandatoryMultipleCondition)
.allMatch(c -> validateCondition(c, child, execution))
);
}

return this.conditionService.isValid(condition, child, execution);
}

protected boolean isMandatoryMultipleCondition(Condition condition) {
return Stream
.of(
VariableCondition.class
)
.anyMatch(aClass -> condition.getClass().isAssignableFrom(aClass));
}

protected boolean isFilterCondition(Condition condition) {
return Stream
.of(
ExecutionStatusCondition.class,
DateTimeBetweenCondition.class,
DayWeekCondition.class,
HasRetryAttemptCondition.class,
WeekendCondition.class
)
.anyMatch(aClass -> condition.getClass().isAssignableFrom(aClass));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.kestra.core.repositories;

import io.kestra.core.models.topologies.FlowNode;
import io.kestra.core.models.topologies.FlowRelation;
import io.kestra.core.models.topologies.FlowTopology;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@MicronautTest(transactional = false)
public abstract class AbstractFlowTopologyRepositoryTest {
@Inject
private FlowTopologyRepositoryInterface flowTopologyRepository;

@Test
void suite() {
flowTopologyRepository.save(FlowTopology.builder()
.relation(FlowRelation.FLOW_TASK)
.source(FlowNode.builder()
.namespace("io.kestra.test")
.id("flow-a")
.build()
)
.destination(FlowNode.builder()
.namespace("io.kestra.test")
.id("flow-b")
.build()
)
.build()
);

List<FlowTopology> list = flowTopologyRepository.findByFlow("io.kestra.test", "flow-a", false);

assertThat(list.size(), is(1));
}
}
Loading

0 comments on commit 3efb814

Please sign in to comment.