From 431958e046863484b6a537414576dd1b601c9846 Mon Sep 17 00:00:00 2001 From: dvince Date: Tue, 13 Aug 2024 13:31:22 -0400 Subject: [PATCH 1/5] WIP - cleaning up workflows. This is the service to blow away tombstoned edges/nodes at startup --- .../service/data/WorkflowCleanupService.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowCleanupService.java diff --git a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowCleanupService.java b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowCleanupService.java new file mode 100644 index 0000000000..5655da27be --- /dev/null +++ b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowCleanupService.java @@ -0,0 +1,44 @@ +package software.uncharted.terarium.hmiserver.service.data; + +import jakarta.annotation.PostConstruct; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +@Slf4j +public class WorkflowCleanupService { + + private final RedissonClient redissonClient; + private final WorkflowService workflowService; + private RLock lock; + + @PostConstruct + void init() { + lock = redissonClient.getLock("cleaner_upper_lock_name"); + } + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationReady(final ApplicationReadyEvent event) { + try { + if (lock.tryLock(1, 100, TimeUnit.SECONDS)) { + try { + System.out.println("cleaning up"); + //workflowService + } finally { + lock.unlock(); + } + } else { + log.info("Someone else is cleaning this up"); + } + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } +} From fae406ecf917e16c72c8afa6b8ecab52ee7b0609 Mon Sep 17 00:00:00 2001 From: dvince Date: Mon, 19 Aug 2024 14:39:21 -0400 Subject: [PATCH 2/5] git blew away my stuff --- .../repository/data/WorkflowRepository.java | 26 +++++++- .../service/data/WorkflowService.java | 60 +++++++++++++++---- 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java index 7dcc9eddbb..51b05c8d42 100644 --- a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java +++ b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java @@ -1,9 +1,33 @@ package software.uncharted.terarium.hmiserver.repository.data; +import java.util.List; import java.util.UUID; +import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; import software.uncharted.terarium.hmiserver.models.dataservice.workflow.Workflow; import software.uncharted.terarium.hmiserver.repository.PSCrudSoftDeleteRepository; @Repository -public interface WorkflowRepository extends PSCrudSoftDeleteRepository {} +public interface WorkflowRepository extends PSCrudSoftDeleteRepository { + /** + * Find all workflows who have at least one edge or node marked for deletion + */ + @Query( + value = "SELECT w.* FROM Workflow w " + + " cross join json_array_elements(edges) as edgesData" + + " where edgesData->>'isDeleted' = 'true'", + nativeQuery = true + ) + List findEdgesToBeDeleted(); + + /** + * Find all workflows who have at least one edge or node marked for deletion + */ + @Query( + value = "SELECT w.* FROM Workflow w " + + " cross join json_array_elements(nodes) as edgesData" + + " where edgesData->>'isDeleted' = 'true'", + nativeQuery = true + ) + List findNodesToBeDeleted(); +} diff --git a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java index c201ac9bac..e5025edc9a 100644 --- a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java +++ b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java @@ -5,10 +5,11 @@ import io.micrometer.observation.annotation.Observed; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import org.springframework.stereotype.Service; import software.uncharted.terarium.hmiserver.configuration.Config; @@ -59,6 +60,43 @@ public String getAssetAlias() { return elasticConfig.getWorkflowAlias(); } + @Observed(name = "function_profile") + public Set findWorkflowsToClean() { + final Set workflows = new HashSet<>(); + workflows.addAll(repository.findEdgesToBeDeleted()); + workflows.addAll(repository.findNodesToBeDeleted()); + return workflows; + } + + /** + * Update an asset. + * + * @param asset The asset to update + * @return The updated asset + * @throws IOException If there is an error updating the asset + * @throws IllegalArgumentException If the asset tries to move from permanent to temporary + */ + //@Override + //@Observed(name = "function_profile") + //public Optional> updateWorkflows(final List assets, final UUID projectId, final Schema.Permission hasWritePermission) + // throws IOException, IllegalArgumentException { + /*final Optional updated = super.updateAsset(asset, projectId, hasWritePermission); + + if (updated.isEmpty()) { + return Optional.empty(); + } + + if (!updated.get().getTemporary() && updated.get().getPublicAsset()) { + elasticService.index(getAssetAlias(), updated.get().getId().toString(), updated); + } + + if (updated.get().getTemporary() || !updated.get().getPublicAsset()) { + elasticService.delete(getAssetAlias(), updated.get().getId().toString()); + } + + return updated;*/ + //} + @Override @Observed(name = "function_profile") public Workflow createAsset(final Workflow asset, final UUID projectId, final Schema.Permission hasWritePermission) @@ -120,8 +158,8 @@ public Optional updateAsset( //////////////////////////////////////////////////////////////////////////////// if (dbWorkflowNodes != null && dbWorkflowNodes.size() > 0) { for (int index = 0; index < dbWorkflowNodes.size(); index++) { - WorkflowNode dbNode = dbWorkflowNodes.get(index); - WorkflowNode node = nodeMap.get(dbNode.getId()); + final WorkflowNode dbNode = dbWorkflowNodes.get(index); + final WorkflowNode node = nodeMap.get(dbNode.getId()); if (node == null) continue; if (node.getIsDeleted() != null && node.getIsDeleted() == true) { @@ -130,8 +168,8 @@ public Optional updateAsset( continue; } - JsonNode nodeContent = this.objectMapper.valueToTree(node); - JsonNode dbNodeContent = this.objectMapper.valueToTree(dbNode); + final JsonNode nodeContent = this.objectMapper.valueToTree(node); + final JsonNode dbNodeContent = this.objectMapper.valueToTree(dbNode); if (nodeContent.equals(dbNodeContent) == true) { nodeMap.remove(node.getId()); @@ -156,8 +194,8 @@ public Optional updateAsset( if (dbWorkflowEdges != null && dbWorkflowEdges.size() > 0) { for (int index = 0; index < dbWorkflowEdges.size(); index++) { - WorkflowEdge dbEdge = dbWorkflowEdges.get(index); - WorkflowEdge edge = edgeMap.get(dbEdge.getId()); + final WorkflowEdge dbEdge = dbWorkflowEdges.get(index); + final WorkflowEdge edge = edgeMap.get(dbEdge.getId()); if (edge == null) continue; if (edge.getIsDeleted() != null && edge.getIsDeleted() == true) { @@ -166,8 +204,8 @@ public Optional updateAsset( continue; } - JsonNode edgeContent = this.objectMapper.valueToTree(edge); - JsonNode dbEdgeContent = this.objectMapper.valueToTree(dbEdge); + final JsonNode edgeContent = this.objectMapper.valueToTree(edge); + final JsonNode dbEdgeContent = this.objectMapper.valueToTree(dbEdge); if (edgeContent.equals(dbEdgeContent) == true) { edgeMap.remove(edge.getId()); @@ -192,10 +230,10 @@ public Optional updateAsset( //////////////////////////////////////////////////////////////////////////////// // Handle new nodes or edges //////////////////////////////////////////////////////////////////////////////// - for (Map.Entry pair : nodeMap.entrySet()) { + for (final Map.Entry pair : nodeMap.entrySet()) { dbWorkflowNodes.add(pair.getValue()); } - for (Map.Entry pair : edgeMap.entrySet()) { + for (final Map.Entry pair : edgeMap.entrySet()) { dbWorkflowEdges.add(pair.getValue()); } From 54dfcc31f57c99515d96d48564d524cc52cdc89f Mon Sep 17 00:00:00 2001 From: dvince Date: Mon, 19 Aug 2024 15:01:28 -0400 Subject: [PATCH 3/5] cleaning edges and nodes --- .../dataservice/workflow/WorkflowEdge.java | 6 +++ .../dataservice/workflow/WorkflowNode.java | 6 +++ .../service/data/WorkflowCleanupService.java | 15 ++++++- .../service/data/WorkflowService.java | 39 +++++-------------- 4 files changed, 34 insertions(+), 32 deletions(-) diff --git a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/models/dataservice/workflow/WorkflowEdge.java b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/models/dataservice/workflow/WorkflowEdge.java index 2d5f173824..4db5baf34d 100644 --- a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/models/dataservice/workflow/WorkflowEdge.java +++ b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/models/dataservice/workflow/WorkflowEdge.java @@ -43,4 +43,10 @@ public WorkflowEdge clone(final UUID workflowId, final UUID source, final UUID t clone.setTargetPortId(targetPortId); return clone; } + + public boolean getIsDeleted() { + if (this.isDeleted == null) return false; + + return this.isDeleted; + } } diff --git a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/models/dataservice/workflow/WorkflowNode.java b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/models/dataservice/workflow/WorkflowNode.java index f3c53044de..9e2ea26576 100644 --- a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/models/dataservice/workflow/WorkflowNode.java +++ b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/models/dataservice/workflow/WorkflowNode.java @@ -57,4 +57,10 @@ public WorkflowNode clone(final UUID workflowId) { clone.setWorkflowId(workflowId); return clone; } + + public boolean getIsDeleted() { + if (this.isDeleted == null) return false; + + return this.isDeleted; + } } diff --git a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowCleanupService.java b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowCleanupService.java index 5655da27be..db86c202bc 100644 --- a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowCleanupService.java +++ b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowCleanupService.java @@ -1,6 +1,8 @@ package software.uncharted.terarium.hmiserver.service.data; import jakarta.annotation.PostConstruct; +import java.io.IOException; +import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -9,6 +11,9 @@ import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; +import software.uncharted.terarium.hmiserver.models.dataservice.workflow.Workflow; +import software.uncharted.terarium.hmiserver.models.dataservice.workflow.WorkflowEdge; +import software.uncharted.terarium.hmiserver.models.dataservice.workflow.WorkflowNode; @Service @RequiredArgsConstructor @@ -29,8 +34,14 @@ public void onApplicationReady(final ApplicationReadyEvent event) { try { if (lock.tryLock(1, 100, TimeUnit.SECONDS)) { try { - System.out.println("cleaning up"); - //workflowService + final Set workflows = workflowService.findWorkflowsToClean(); + for (final Workflow workflow : workflows) { + workflow.getEdges().removeIf(WorkflowEdge::getIsDeleted); + workflow.getNodes().removeIf(WorkflowNode::getIsDeleted); + } + workflowService.updateWorkflows(workflows); + } catch (final IOException e) { + throw new RuntimeException(e); } finally { lock.unlock(); } diff --git a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java index 508a1bf5ca..4eb5feb2cc 100644 --- a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java +++ b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.observation.annotation.Observed; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -44,34 +45,12 @@ public Set findWorkflowsToClean() { return workflows; } - /** - * Update an asset. - * - * @param asset The asset to update - * @return The updated asset - * @throws IOException If there is an error updating the asset - * @throws IllegalArgumentException If the asset tries to move from permanent to temporary - */ - //@Override - //@Observed(name = "function_profile") - //public Optional> updateWorkflows(final List assets, final UUID projectId, final Schema.Permission hasWritePermission) - // throws IOException, IllegalArgumentException { - /*final Optional updated = super.updateAsset(asset, projectId, hasWritePermission); - - if (updated.isEmpty()) { - return Optional.empty(); - } - - if (!updated.get().getTemporary() && updated.get().getPublicAsset()) { - elasticService.index(getAssetAlias(), updated.get().getId().toString(), updated); - } - - if (updated.get().getTemporary() || !updated.get().getPublicAsset()) { - elasticService.delete(getAssetAlias(), updated.get().getId().toString()); - } - - return updated;*/ - //} + @Observed(name = "function_profile") + public Optional> updateWorkflows(final Collection assets) + throws IOException, IllegalArgumentException { + final List workflows = repository.saveAll(assets); + return Optional.of(workflows); + } @Override @Observed(name = "function_profile") @@ -138,7 +117,7 @@ public Optional updateAsset( final WorkflowNode node = nodeMap.get(dbNode.getId()); if (node == null) continue; - if (node.getIsDeleted() != null && node.getIsDeleted() == true) { + if (node.getIsDeleted()) { dbNode.setIsDeleted(true); nodeMap.remove(node.getId()); continue; @@ -174,7 +153,7 @@ public Optional updateAsset( final WorkflowEdge edge = edgeMap.get(dbEdge.getId()); if (edge == null) continue; - if (edge.getIsDeleted() != null && edge.getIsDeleted() == true) { + if (edge.getIsDeleted()) { dbEdge.setIsDeleted(true); edgeMap.remove(edge.getId()); continue; From e07ab74c847e96e6ad90ff3a90dc706940ff735a Mon Sep 17 00:00:00 2001 From: dvince Date: Mon, 19 Aug 2024 15:06:44 -0400 Subject: [PATCH 4/5] Update name --- .../hmiserver/repository/data/WorkflowRepository.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java index 51b05c8d42..98d18e8baa 100644 --- a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java +++ b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java @@ -25,8 +25,8 @@ public interface WorkflowRepository extends PSCrudSoftDeleteRepository>'isDeleted' = 'true'", + " cross join json_array_elements(nodes) as nodesData" + + " where nodesData->>'isDeleted' = 'true'", nativeQuery = true ) List findNodesToBeDeleted(); From 557d433ffa812f94896c621648d41cb537705c44 Mon Sep 17 00:00:00 2001 From: dvince Date: Wed, 21 Aug 2024 11:02:43 -0400 Subject: [PATCH 5/5] PR feedback --- .../hmiserver/repository/data/WorkflowRepository.java | 4 ++-- .../terarium/hmiserver/service/data/WorkflowService.java | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java index 98d18e8baa..0be3e2cd62 100644 --- a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java +++ b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/repository/data/WorkflowRepository.java @@ -18,7 +18,7 @@ public interface WorkflowRepository extends PSCrudSoftDeleteRepository>'isDeleted' = 'true'", nativeQuery = true ) - List findEdgesToBeDeleted(); + List findWorkflowsWithEdgesToBeDeleted(); /** * Find all workflows who have at least one edge or node marked for deletion @@ -29,5 +29,5 @@ public interface WorkflowRepository extends PSCrudSoftDeleteRepository>'isDeleted' = 'true'", nativeQuery = true ) - List findNodesToBeDeleted(); + List findWorkflowsWithNodesToBeDeleted(); } diff --git a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java index 4eb5feb2cc..82ce860022 100644 --- a/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java +++ b/packages/server/src/main/java/software/uncharted/terarium/hmiserver/service/data/WorkflowService.java @@ -14,12 +14,10 @@ import java.util.UUID; import org.springframework.stereotype.Service; import software.uncharted.terarium.hmiserver.configuration.Config; -import software.uncharted.terarium.hmiserver.configuration.ElasticsearchConfiguration; import software.uncharted.terarium.hmiserver.models.dataservice.workflow.Workflow; import software.uncharted.terarium.hmiserver.models.dataservice.workflow.WorkflowEdge; import software.uncharted.terarium.hmiserver.models.dataservice.workflow.WorkflowNode; import software.uncharted.terarium.hmiserver.repository.data.WorkflowRepository; -import software.uncharted.terarium.hmiserver.service.elasticsearch.ElasticsearchService; import software.uncharted.terarium.hmiserver.service.s3.S3ClientService; import software.uncharted.terarium.hmiserver.utils.rebac.Schema; @@ -40,8 +38,8 @@ public WorkflowService( @Observed(name = "function_profile") public Set findWorkflowsToClean() { final Set workflows = new HashSet<>(); - workflows.addAll(repository.findEdgesToBeDeleted()); - workflows.addAll(repository.findNodesToBeDeleted()); + workflows.addAll(repository.findWorkflowsWithEdgesToBeDeleted()); + workflows.addAll(repository.findWorkflowsWithNodesToBeDeleted()); return workflows; }