From c94efe86fa705e993bd0df14fb4698b74c01f0d9 Mon Sep 17 00:00:00 2001 From: Yann C Date: Mon, 20 Feb 2023 12:26:08 +0100 Subject: [PATCH] feat(core): Recover deleted code --- .../io/kestra/jdbc/runner/JdbcExecutor.java | 37 +++++++++++++++++++ ui/src/components/flows/FlowRoot.vue | 18 ++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 96112593f07..dbc3236ae19 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -7,6 +7,7 @@ import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; +import io.kestra.core.models.topologies.FlowTopology; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.FlowRepositoryInterface; @@ -15,8 +16,10 @@ import io.kestra.core.runners.ExecutorService; import io.kestra.core.services.*; import io.kestra.core.tasks.flows.Template; +import io.kestra.core.topologies.FlowTopologyService; import io.kestra.core.utils.Await; import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository; +import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository; import io.micronaut.context.ApplicationContext; import io.micronaut.transaction.exceptions.CannotCreateTransactionException; import jakarta.inject.Inject; @@ -32,6 +35,7 @@ import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; +import java.util.stream.Stream; @Singleton @JdbcRunnerEnabled @@ -105,8 +109,18 @@ public class JdbcExecutor implements ExecutorInterface { @Inject private AbstractJdbcExecutorStateStorage executorStateStorage; + @Inject + private FlowTopologyService flowTopologyService; + + @Inject + private AbstractJdbcFlowTopologyRepository flowTopologyRepository; + protected List allFlows; + @Inject + @Named(QueueFactoryInterface.FLOW_NAMED) + private QueueInterface flowQueue; + @SneakyThrows @Override public void run() { @@ -147,6 +161,29 @@ public void run() { ); schedulerDelayThread.start(); + + flowQueue.receive( + FlowTopology.class, + flow -> { + if (flow == null) { + return; + } + + flowTopologyRepository.save( + flow, + (flow.isDeleted() ? + Stream.empty() : + flowTopologyService + .topology( + flow, + this.allFlows.stream() + ) + ) + .distinct() + .collect(Collectors.toList()) + ); + } + ); } private void executionQueue(Execution message) { diff --git a/ui/src/components/flows/FlowRoot.vue b/ui/src/components/flows/FlowRoot.vue index a05da1ded4d..0389ef74616 100644 --- a/ui/src/components/flows/FlowRoot.vue +++ b/ui/src/components/flows/FlowRoot.vue @@ -28,6 +28,7 @@ import BottomLine from "../../components/layout/BottomLine.vue"; import TriggerFlow from "../../components/flows/TriggerFlow.vue"; import Overview from "./Overview.vue"; + import FlowDependencies from "./FlowDependencies.vue"; export default { mixins: [RouteContext], @@ -40,7 +41,8 @@ return { tabIndex: undefined, mounted: false, - previousFlow: undefined + previousFlow: undefined, + depedenciesCount: undefined }; }, watch: { @@ -60,6 +62,11 @@ if (this.flow) { this.previousFlow = this.flowKey(); this.$store.dispatch("flow/loadGraph", this.flow); + this.$http + .get(`/api/v1/flows/${this.flow.namespace}/${this.flow.id}/dependencies`) + .then(response => { + this.depedenciesCount = response.data && response.data.nodes ? response.data.nodes.length - 1 : 0; + }) } }); } @@ -134,7 +141,14 @@ title: this.$t("logs"), }); } - + if (this.user && this.flow && this.user.isAllowed(permission.FLOW, action.READ, this.flow.namespace)){ + tabs.push({ + name: "dependencies", + component: FlowDependencies, + title: this.$t("dependencies"), + count: this.depedenciesCount + }) + } return tabs; }, activeTabName() {