From 53d8952365e3723e5214769d7f6dfbf56bdcd466 Mon Sep 17 00:00:00 2001 From: Brit Myers Date: Thu, 1 Aug 2024 18:12:41 -0400 Subject: [PATCH] Retry removing action nodes upon completion This commit retries removing actions nodes upon action completion. This is a workaround for another bug where a "RemoveEdge" update is not being detected for children of the action category node when the "to_rebase" snapshot address has changed in between commit time in the job and the time that the rebaser performs update and conflict detection. Signed-off-by: Nick Gerace Co-authored-by: Brit Myers --- lib/dal/src/job/definition/action.rs | 62 ++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/lib/dal/src/job/definition/action.rs b/lib/dal/src/job/definition/action.rs index 1d4613cd7d..cb4b602b83 100644 --- a/lib/dal/src/job/definition/action.rs +++ b/lib/dal/src/job/definition/action.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use si_events::ActionResultState; use telemetry::prelude::*; use tryhard::RetryPolicy; +use ulid::Ulid; use veritech_client::{ActionRunResultSuccess, ResourceStatus}; use crate::{ @@ -22,8 +23,9 @@ use crate::{ }, producer::{JobProducer, JobProducerResult}, }, + workspace_snapshot::graph::WorkspaceSnapshotGraphError, AccessBuilder, ActionPrototypeId, Component, ComponentId, DalContext, TransactionsError, - Visibility, WsEvent, + Visibility, WorkspaceSnapshotError, WsEvent, }; #[derive(Debug, Deserialize, Serialize)] @@ -141,14 +143,25 @@ async fn inner_run( let maybe_resource = ActionPrototype::run(ctx, prototype_id, component_id).await?; // Retry process_and_record_execution on a conflict error up to a max - retry_on_conflicts( + let nodes_to_remove: Vec = retry_on_conflicts( || process_and_record_execution(ctx.clone(), maybe_resource.as_ref(), action_id), 10, Duration::from_millis(10), - "si.action_job.process.retries", + "si.action_job.process.retry.process_and_record_execution", ) .await?; + // check if the things we expected to be removed actually were. The snapshot has already been updated to latest by this point + if !nodes_to_remove.is_empty() { + retry_on_conflicts( + || ensure_deletes_happened(ctx.clone(), nodes_to_remove.clone()), + 10, + Duration::from_millis(10), + "si.action_job.process.retry.ensure_deletes_happened", + ) + .await?; + } + // if the action kind was a delete, let's see if any components are ready to be removed that weren't already let prototype = ActionPrototype::get_by_id(ctx, prototype_id).await?; if prototype.kind == ActionKind::Destroy { @@ -183,6 +196,40 @@ async fn inner_run( Ok(maybe_resource) } +#[instrument(name = "action_job.ensure_deletes_happened", level = "info", skip(ctx))] +async fn ensure_deletes_happened( + mut ctx: DalContext, + nodes_to_remove: Vec, +) -> JobConsumerResult<()> { + ctx.update_snapshot_to_visibility().await?; + let snapshot = ctx.workspace_snapshot()?; + let mut removed_at_least_once = false; + + for node_id in nodes_to_remove { + match snapshot.get_node_weight_by_id(node_id).await { + Ok(_) => { + warn!(?node_id, "removing node with edge because it is lingering"); + snapshot + .remove_node_by_id(ctx.vector_clock_id()?, node_id) + .await?; + removed_at_least_once = true; + } + Err(WorkspaceSnapshotError::WorkspaceSnapshotGraph( + WorkspaceSnapshotGraphError::NodeWithIdNotFound(_), + )) => { + trace!(?node_id, "skipping node id: not found"); + } + Err(err) => return Err(err.into()), + } + } + + if removed_at_least_once { + ctx.commit().await?; + } + + Ok(()) +} + async fn prepare_for_execution( ctx: &mut DalContext, action_id: ActionId, @@ -214,7 +261,8 @@ async fn process_and_record_execution( mut ctx: DalContext, maybe_resource: Option<&ActionRunResultSuccess>, action_id: ActionId, -) -> JobConsumerResult<()> { +) -> JobConsumerResult> { + let mut to_remove_nodes = Vec::new(); ctx.update_snapshot_to_visibility().await?; let prototype_id = Action::prototype_id(&ctx, action_id).await?; @@ -234,7 +282,7 @@ async fn process_and_record_execution( if resource.status == ResourceStatus::Ok { // Remove `ActionId` from graph as the execution succeeded Action::remove_by_id(&ctx, action_id).await?; - + to_remove_nodes.push(action_id.into()); if resource.payload.is_none() { // Clear the resource if the status is ok and we don't have a payload. This could // be from invoking a delete action directly, rather than deleting the component. @@ -242,6 +290,7 @@ async fn process_and_record_execution( if component.to_delete() { Component::remove(&ctx, component.id()).await?; + to_remove_nodes.push(component.id().into()); } else { let summary = SummaryDiagramComponent::assemble( &ctx, @@ -285,7 +334,7 @@ async fn process_and_record_execution( ctx.commit().await?; - Ok(()) + Ok(to_remove_nodes) } #[instrument(name = "action_job.process_failed_action", skip_all, level = "info")] @@ -305,7 +354,6 @@ async fn process_failed_action(ctx: &DalContext, action_id: ActionId) -> JobCons .await?; ctx.commit().await?; - Ok(()) }