Skip to content

Commit

Permalink
Merge pull request #4270 from systeminit/brit/hotfix-action-removal
Browse files Browse the repository at this point in the history
Retry removing action nodes upon completion
  • Loading branch information
stack72 authored Aug 1, 2024
2 parents 038a95e + 53d8952 commit ab3102d
Showing 1 changed file with 55 additions and 7 deletions.
62 changes: 55 additions & 7 deletions lib/dal/src/job/definition/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
use si_events::ActionResultState;
use telemetry::prelude::*;
use tryhard::RetryPolicy;
use ulid::Ulid;
use veritech_client::{ActionRunResultSuccess, ResourceStatus};

use crate::{
Expand All @@ -22,8 +23,9 @@ use crate::{
},
producer::{JobProducer, JobProducerResult},
},
workspace_snapshot::graph::WorkspaceSnapshotGraphError,
AccessBuilder, ActionPrototypeId, Component, ComponentId, DalContext, TransactionsError,
Visibility, WsEvent,
Visibility, WorkspaceSnapshotError, WsEvent,
};

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -141,14 +143,25 @@ async fn inner_run(
let maybe_resource = ActionPrototype::run(ctx, prototype_id, component_id).await?;

// Retry process_and_record_execution on a conflict error up to a max
retry_on_conflicts(
let nodes_to_remove: Vec<Ulid> = retry_on_conflicts(
|| process_and_record_execution(ctx.clone(), maybe_resource.as_ref(), action_id),
10,
Duration::from_millis(10),
"si.action_job.process.retries",
"si.action_job.process.retry.process_and_record_execution",
)
.await?;

// check if the things we expected to be removed actually were. The snapshot has already been updated to latest by this point
if !nodes_to_remove.is_empty() {
retry_on_conflicts(
|| ensure_deletes_happened(ctx.clone(), nodes_to_remove.clone()),
10,
Duration::from_millis(10),
"si.action_job.process.retry.ensure_deletes_happened",
)
.await?;
}

// if the action kind was a delete, let's see if any components are ready to be removed that weren't already
let prototype = ActionPrototype::get_by_id(ctx, prototype_id).await?;
if prototype.kind == ActionKind::Destroy {
Expand Down Expand Up @@ -183,6 +196,40 @@ async fn inner_run(
Ok(maybe_resource)
}

#[instrument(name = "action_job.ensure_deletes_happened", level = "info", skip(ctx))]
async fn ensure_deletes_happened(
mut ctx: DalContext,
nodes_to_remove: Vec<Ulid>,
) -> JobConsumerResult<()> {
ctx.update_snapshot_to_visibility().await?;
let snapshot = ctx.workspace_snapshot()?;
let mut removed_at_least_once = false;

for node_id in nodes_to_remove {
match snapshot.get_node_weight_by_id(node_id).await {
Ok(_) => {
warn!(?node_id, "removing node with edge because it is lingering");
snapshot
.remove_node_by_id(ctx.vector_clock_id()?, node_id)
.await?;
removed_at_least_once = true;
}
Err(WorkspaceSnapshotError::WorkspaceSnapshotGraph(
WorkspaceSnapshotGraphError::NodeWithIdNotFound(_),
)) => {
trace!(?node_id, "skipping node id: not found");
}
Err(err) => return Err(err.into()),
}
}

if removed_at_least_once {
ctx.commit().await?;
}

Ok(())
}

async fn prepare_for_execution(
ctx: &mut DalContext,
action_id: ActionId,
Expand Down Expand Up @@ -217,7 +264,8 @@ async fn process_and_record_execution(
mut ctx: DalContext,
maybe_resource: Option<&ActionRunResultSuccess>,
action_id: ActionId,
) -> JobConsumerResult<()> {
) -> JobConsumerResult<Vec<Ulid>> {
let mut to_remove_nodes = Vec::new();
ctx.update_snapshot_to_visibility().await?;

let prototype_id = Action::prototype_id(&ctx, action_id).await?;
Expand All @@ -237,14 +285,15 @@ async fn process_and_record_execution(
if resource.status == ResourceStatus::Ok {
// Remove `ActionId` from graph as the execution succeeded
Action::remove_by_id(&ctx, action_id).await?;

to_remove_nodes.push(action_id.into());
if resource.payload.is_none() {
// Clear the resource if the status is ok and we don't have a payload. This could
// be from invoking a delete action directly, rather than deleting the component.
component.clear_resource(&ctx).await?;

if component.to_delete() {
Component::remove(&ctx, component.id()).await?;
to_remove_nodes.push(component.id().into());
} else {
let summary = SummaryDiagramComponent::assemble(
&ctx,
Expand Down Expand Up @@ -287,7 +336,7 @@ async fn process_and_record_execution(
.await?;
ctx.commit().await?;

Ok(())
Ok(to_remove_nodes)
}

#[instrument(
Expand All @@ -311,7 +360,6 @@ async fn process_failed_action(ctx: &DalContext, action_id: ActionId) -> JobCons
.await?;

ctx.commit().await?;

Ok(())
}

Expand Down

0 comments on commit ab3102d

Please sign in to comment.