Skip to content

Commit

Permalink
fix(workflows): rename signals, improve failure handling for server i…
Browse files Browse the repository at this point in the history
…nstall (#1043)

<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Aug 13, 2024
1 parent 63a7601 commit 40cb84a
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 58 deletions.
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ macro_rules! join_signal {
)else*

else {
unreachable!("received signal that wasn't queried for");
unreachable!("received signal that wasn't queried for: {}, expected {:?}", name, &[$($signals::NAME),*]);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions svc/pkg/cluster/src/workflows/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult<()>
Ok(())
}

#[message("cluster-create-complete")]
#[message("cluster_create_complete")]
pub struct CreateComplete {}

#[signal("cluster-game-link")]
#[signal("cluster_game_link")]
pub struct GameLink {
pub game_id: Uuid,
}

#[signal("cluster-datacenter-create")]
#[signal("cluster_datacenter_create")]
pub struct DatacenterCreate {
pub datacenter_id: Uuid,
pub name_id: String,
Expand Down
10 changes: 5 additions & 5 deletions svc/pkg/cluster/src/workflows/datacenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,19 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult<()>
Ok(())
}

#[signal("cluster-datacenter-update")]
#[signal("cluster_datacenter_update")]
pub struct Update {
pub pools: Vec<PoolUpdate>,
pub prebakes_enabled: Option<bool>,
}

#[signal("cluster-datacenter-scale")]
#[signal("cluster_datacenter_scale")]
pub struct Scale {}

#[signal("cluster-datacenter-tls-renew")]
#[signal("cluster_datacenter_tls_renew")]
pub struct TlsRenew {}

#[signal("cluster-datacenter-server-create")]
#[signal("cluster_datacenter_server_create")]
pub struct ServerCreate {
pub server_id: Uuid,
pub pool_type: PoolType,
Expand All @@ -238,7 +238,7 @@ pub struct ServerCreate {

join_signal!(Main, [Update, Scale, ServerCreate, TlsRenew]);

#[message("cluster-datacenter-create-complete")]
#[message("cluster_datacenter_create_complete")]
pub struct CreateComplete {}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down
120 changes: 76 additions & 44 deletions svc/pkg/cluster/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,32 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob

// Install components on server
if !already_installed {
ctx.workflow(install::Input {
datacenter_id: input.datacenter_id,
server_id: Some(input.server_id),
public_ip,
pool_type: input.pool_type.clone(),
initialize_immediately: true,
})
.await?;
let install_res = ctx
.workflow(install::Input {
datacenter_id: input.datacenter_id,
server_id: Some(input.server_id),
public_ip,
pool_type: input.pool_type.clone(),
initialize_immediately: true,
})
.await;

// If the server failed all attempts to install, clean it up
match ctx.catch_unrecoverable(install_res)? {
Ok(_) => {}
Err(err) => {
tracing::warn!(?err, "failed installing server, cleaning up");

ctx.activity(MarkDestroyedInput {
server_id: input.server_id,
})
.await?;

cleanup(ctx, input, &dc.provider, provider_server_workflow_id, false).await?;

return Err(err);
}
}
}

// Scale to get rid of tainted servers
Expand Down Expand Up @@ -279,34 +297,7 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob
}
}

// Cleanup DNS
if let PoolType::Gg = input.pool_type {
ctx.workflow(dns_delete::Input {
server_id: input.server_id,
})
.await?;
}

// Cleanup server
match dc.provider {
Provider::Linode => {
tracing::info!(server_id=?input.server_id, "destroying linode server");

ctx.tagged_signal(
&json!({
"server_id": input.server_id,
}),
linode::workflows::server::Destroy {},
)
.await?;

// Wait for workflow to complete
ctx.wait_for_workflow::<linode::workflows::server::Workflow>(
provider_server_workflow_id,
)
.await?;
}
}
cleanup(ctx, input, &dc.provider, provider_server_workflow_id, true).await?;

Ok(())
}
Expand Down Expand Up @@ -677,6 +668,47 @@ async fn set_drain_complete(ctx: &ActivityCtx, input: &SetDrainCompleteInput) ->
Ok(())
}

async fn cleanup(
ctx: &mut WorkflowCtx,
input: &Input,
provider: &Provider,
provider_server_workflow_id: Uuid,
cleanup_dns: bool,
) -> GlobalResult<()> {
if cleanup_dns {
// Cleanup DNS
if let PoolType::Gg = input.pool_type {
ctx.workflow(dns_delete::Input {
server_id: input.server_id,
})
.await?;
}
}

// Cleanup server
match provider {
Provider::Linode => {
tracing::info!(server_id=?input.server_id, "destroying linode server");

ctx.tagged_signal(
&json!({
"server_id": input.server_id,
}),
linode::workflows::server::Destroy {},
)
.await?;

// Wait for workflow to complete
ctx.wait_for_workflow::<linode::workflows::server::Workflow>(
provider_server_workflow_id,
)
.await?;
}
}

Ok(())
}

/// Finite state machine for handling server updates.
struct State {
draining: bool,
Expand Down Expand Up @@ -782,30 +814,30 @@ type ProvisionComplete = linode::workflows::server::ProvisionComplete;
type ProvisionFailed = linode::workflows::server::ProvisionFailed;
join_signal!(pub(crate) Linode, [ProvisionComplete, ProvisionFailed]);

#[signal("cluster-server-drain")]
#[signal("cluster_server_drain")]
pub struct Drain {}

#[signal("cluster-server-undrain")]
#[signal("cluster_server_undrain")]
pub struct Undrain {}

#[signal("cluster-server-taint")]
#[signal("cluster_server_taint")]
pub struct Taint {}

#[signal("cluster-server-dns-create")]
#[signal("cluster_server_dns_create")]
pub struct DnsCreate {}

#[signal("cluster-server-dns-delete")]
#[signal("cluster_server_dns_delete")]
pub struct DnsDelete {}

#[signal("cluster-server-destroy")]
#[signal("cluster_server_destroy")]
pub struct Destroy {}

#[signal("cluster-server-nomad-registered")]
#[signal("cluster_server_nomad_registered")]
pub struct NomadRegistered {
pub node_id: String,
}

#[signal("cluster-server-nomad-drain-complete")]
#[signal("cluster_server_nomad_drain_complete")]
pub struct NomadDrainComplete {}

join_signal!(
Expand Down
4 changes: 2 additions & 2 deletions svc/pkg/linode/src/workflows/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ async fn create_custom_image(
Ok(create_image_res.id)
}

#[signal("linode-image-create-complete")]
#[signal("linode_image_create_complete")]
pub struct CreateComplete {
pub image_id: String,
}

#[signal("linode-image-destroy")]
#[signal("linode_image_destroy")]
pub struct Destroy {}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down
6 changes: 3 additions & 3 deletions svc/pkg/linode/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,15 @@ async fn get_public_ip(ctx: &ActivityCtx, input: &GetPublicIpInput) -> GlobalRes
api::get_public_ip(&client, input.linode_id).await
}

#[signal("linode-server-provision-complete")]
#[signal("linode_server_provision_complete")]
pub struct ProvisionComplete {
pub linode_id: u64,
pub public_ip: Ipv4Addr,
pub boot_disk_id: u64,
}

#[signal("linode-server-provision-failed")]
#[signal("linode_server_provision_failed")]
pub struct ProvisionFailed {}

#[signal("linode-server-destroy")]
#[signal("linode_server_destroy")]
pub struct Destroy {}

0 comments on commit 40cb84a

Please sign in to comment.