Skip to content

Commit

Permalink
feat(core): Recover deleted code
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye committed Feb 20, 2023
1 parent 2c3ca8a commit c94efe8
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
37 changes: 37 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
Expand All @@ -15,8 +16,10 @@
import io.kestra.core.runners.ExecutorService;
import io.kestra.core.services.*;
import io.kestra.core.tasks.flows.Template;
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.utils.Await;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import jakarta.inject.Inject;
Expand All @@ -32,6 +35,7 @@
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Singleton
@JdbcRunnerEnabled
Expand Down Expand Up @@ -105,8 +109,18 @@ public class JdbcExecutor implements ExecutorInterface {
@Inject
private AbstractJdbcExecutorStateStorage executorStateStorage;

@Inject
private FlowTopologyService flowTopologyService;

@Inject
private AbstractJdbcFlowTopologyRepository flowTopologyRepository;

protected List<Flow> allFlows;

@Inject
@Named(QueueFactoryInterface.FLOW_NAMED)
private QueueInterface<Flow> flowQueue;

@SneakyThrows
@Override
public void run() {
Expand Down Expand Up @@ -147,6 +161,29 @@ public void run() {
);

schedulerDelayThread.start();

flowQueue.receive(
FlowTopology.class,
flow -> {
if (flow == null) {
return;
}

flowTopologyRepository.save(
flow,
(flow.isDeleted() ?
Stream.<FlowTopology>empty() :
flowTopologyService
.topology(
flow,
this.allFlows.stream()
)
)
.distinct()
.collect(Collectors.toList())
);
}
);
}

private void executionQueue(Execution message) {
Expand Down
18 changes: 16 additions & 2 deletions ui/src/components/flows/FlowRoot.vue
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import BottomLine from "../../components/layout/BottomLine.vue";
import TriggerFlow from "../../components/flows/TriggerFlow.vue";
import Overview from "./Overview.vue";
import FlowDependencies from "./FlowDependencies.vue";
export default {
mixins: [RouteContext],
Expand All @@ -40,7 +41,8 @@
return {
tabIndex: undefined,
mounted: false,
previousFlow: undefined
previousFlow: undefined,
depedenciesCount: undefined
};
},
watch: {
Expand All @@ -60,6 +62,11 @@
if (this.flow) {
this.previousFlow = this.flowKey();
this.$store.dispatch("flow/loadGraph", this.flow);
this.$http
.get(`/api/v1/flows/${this.flow.namespace}/${this.flow.id}/dependencies`)
.then(response => {
this.depedenciesCount = response.data && response.data.nodes ? response.data.nodes.length - 1 : 0;
})
}
});
}
Expand Down Expand Up @@ -134,7 +141,14 @@
title: this.$t("logs"),
});
}
if (this.user && this.flow && this.user.isAllowed(permission.FLOW, action.READ, this.flow.namespace)){
tabs.push({
name: "dependencies",
component: FlowDependencies,
title: this.$t("dependencies"),
count: this.depedenciesCount
})
}
return tabs;
},
activeTabName() {
Expand Down

0 comments on commit c94efe8

Please sign in to comment.