Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dvince/clean up workflows #4499

Merged
merged 9 commits into from
Aug 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,10 @@ public WorkflowEdge clone(final UUID workflowId, final UUID source, final UUID t
clone.setTargetPortId(targetPortId);
return clone;
}

public boolean getIsDeleted() {
if (this.isDeleted == null) return false;

return this.isDeleted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,10 @@ public WorkflowNode clone(final UUID workflowId) {
clone.setWorkflowId(workflowId);
return clone;
}

public boolean getIsDeleted() {
if (this.isDeleted == null) return false;

return this.isDeleted;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
package software.uncharted.terarium.hmiserver.repository.data;

import java.util.List;
import java.util.UUID;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import software.uncharted.terarium.hmiserver.models.dataservice.workflow.Workflow;
import software.uncharted.terarium.hmiserver.repository.PSCrudSoftDeleteRepository;

@Repository
public interface WorkflowRepository extends PSCrudSoftDeleteRepository<Workflow, UUID> {}
public interface WorkflowRepository extends PSCrudSoftDeleteRepository<Workflow, UUID> {
/**
* Find all workflows who have at least one edge or node marked for deletion
*/
@Query(
value = "SELECT w.* FROM Workflow w " +
" cross join json_array_elements(edges) as edgesData" +
" where edgesData->>'isDeleted' = 'true'",
nativeQuery = true
)
List<Workflow> findWorkflowsWithEdgesToBeDeleted();

/**
* Find all workflows who have at least one edge or node marked for deletion
*/
@Query(
value = "SELECT w.* FROM Workflow w " +
" cross join json_array_elements(nodes) as nodesData" +
" where nodesData->>'isDeleted' = 'true'",
nativeQuery = true
)
List<Workflow> findWorkflowsWithNodesToBeDeleted();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package software.uncharted.terarium.hmiserver.service.data;

import jakarta.annotation.PostConstruct;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import software.uncharted.terarium.hmiserver.models.dataservice.workflow.Workflow;
import software.uncharted.terarium.hmiserver.models.dataservice.workflow.WorkflowEdge;
import software.uncharted.terarium.hmiserver.models.dataservice.workflow.WorkflowNode;

@Service
@RequiredArgsConstructor
@Slf4j
public class WorkflowCleanupService {

private final RedissonClient redissonClient;
private final WorkflowService workflowService;
private RLock lock;

@PostConstruct
void init() {
lock = redissonClient.getLock("cleaner_upper_lock_name");
}

@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady(final ApplicationReadyEvent event) {
try {
if (lock.tryLock(1, 100, TimeUnit.SECONDS)) {
try {
final Set<Workflow> workflows = workflowService.findWorkflowsToClean();
for (final Workflow workflow : workflows) {
workflow.getEdges().removeIf(WorkflowEdge::getIsDeleted);
workflow.getNodes().removeIf(WorkflowNode::getIsDeleted);
}
workflowService.updateWorkflows(workflows);
} catch (final IOException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
} else {
log.info("Someone else is cleaning this up");
}
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.observation.annotation.Observed;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.springframework.stereotype.Service;
import software.uncharted.terarium.hmiserver.configuration.Config;
import software.uncharted.terarium.hmiserver.configuration.ElasticsearchConfiguration;
import software.uncharted.terarium.hmiserver.models.dataservice.workflow.Workflow;
import software.uncharted.terarium.hmiserver.models.dataservice.workflow.WorkflowEdge;
import software.uncharted.terarium.hmiserver.models.dataservice.workflow.WorkflowNode;
import software.uncharted.terarium.hmiserver.repository.data.WorkflowRepository;
import software.uncharted.terarium.hmiserver.service.elasticsearch.ElasticsearchService;
import software.uncharted.terarium.hmiserver.service.s3.S3ClientService;
import software.uncharted.terarium.hmiserver.utils.rebac.Schema;

Expand All @@ -35,6 +35,21 @@ public WorkflowService(
super(objectMapper, config, projectService, projectAssetService, repository, s3ClientService, Workflow.class);
}

@Observed(name = "function_profile")
public Set<Workflow> findWorkflowsToClean() {
final Set<Workflow> workflows = new HashSet<>();
workflows.addAll(repository.findWorkflowsWithEdgesToBeDeleted());
workflows.addAll(repository.findWorkflowsWithNodesToBeDeleted());
return workflows;
}

@Observed(name = "function_profile")
public Optional<List<Workflow>> updateWorkflows(final Collection<Workflow> assets)
throws IOException, IllegalArgumentException {
final List<Workflow> workflows = repository.saveAll(assets);
return Optional.of(workflows);
}

@Override
@Observed(name = "function_profile")
public Workflow createAsset(final Workflow asset, final UUID projectId, final Schema.Permission hasWritePermission)
Expand Down Expand Up @@ -96,18 +111,18 @@ public Optional<Workflow> updateAsset(
////////////////////////////////////////////////////////////////////////////////
if (dbWorkflowNodes != null && dbWorkflowNodes.size() > 0) {
for (int index = 0; index < dbWorkflowNodes.size(); index++) {
WorkflowNode dbNode = dbWorkflowNodes.get(index);
WorkflowNode node = nodeMap.get(dbNode.getId());
final WorkflowNode dbNode = dbWorkflowNodes.get(index);
final WorkflowNode node = nodeMap.get(dbNode.getId());

if (node == null) continue;
if (node.getIsDeleted() != null && node.getIsDeleted() == true) {
if (node.getIsDeleted()) {
dbNode.setIsDeleted(true);
nodeMap.remove(node.getId());
continue;
}

JsonNode nodeContent = this.objectMapper.valueToTree(node);
JsonNode dbNodeContent = this.objectMapper.valueToTree(dbNode);
final JsonNode nodeContent = this.objectMapper.valueToTree(node);
final JsonNode dbNodeContent = this.objectMapper.valueToTree(dbNode);

if (nodeContent.equals(dbNodeContent) == true) {
nodeMap.remove(node.getId());
Expand All @@ -132,18 +147,18 @@ public Optional<Workflow> updateAsset(

if (dbWorkflowEdges != null && dbWorkflowEdges.size() > 0) {
for (int index = 0; index < dbWorkflowEdges.size(); index++) {
WorkflowEdge dbEdge = dbWorkflowEdges.get(index);
WorkflowEdge edge = edgeMap.get(dbEdge.getId());
final WorkflowEdge dbEdge = dbWorkflowEdges.get(index);
final WorkflowEdge edge = edgeMap.get(dbEdge.getId());

if (edge == null) continue;
if (edge.getIsDeleted() != null && edge.getIsDeleted() == true) {
if (edge.getIsDeleted()) {
dbEdge.setIsDeleted(true);
edgeMap.remove(edge.getId());
continue;
}

JsonNode edgeContent = this.objectMapper.valueToTree(edge);
JsonNode dbEdgeContent = this.objectMapper.valueToTree(dbEdge);
final JsonNode edgeContent = this.objectMapper.valueToTree(edge);
final JsonNode dbEdgeContent = this.objectMapper.valueToTree(dbEdge);

if (edgeContent.equals(dbEdgeContent) == true) {
edgeMap.remove(edge.getId());
Expand All @@ -168,10 +183,10 @@ public Optional<Workflow> updateAsset(
////////////////////////////////////////////////////////////////////////////////
// Handle new nodes or edges
////////////////////////////////////////////////////////////////////////////////
for (Map.Entry<UUID, WorkflowNode> pair : nodeMap.entrySet()) {
for (final Map.Entry<UUID, WorkflowNode> pair : nodeMap.entrySet()) {
dbWorkflowNodes.add(pair.getValue());
}
for (Map.Entry<UUID, WorkflowEdge> pair : edgeMap.entrySet()) {
for (final Map.Entry<UUID, WorkflowEdge> pair : edgeMap.entrySet()) {
dbWorkflowEdges.add(pair.getValue());
}

Expand Down
Loading