Skip to content

Commit

Permalink
fix(clusters): fix dns and unrecoverable error bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Aug 23, 2024
1 parent 8f90076 commit 4b5411d
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 14 deletions.
24 changes: 19 additions & 5 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::{
time::{DurationToMillis, TsToMillis},
GlobalErrorExt, Location,
},
workflow::{Workflow, WorkflowInput},
worker,
workflow::{Workflow, WorkflowInput},
};

// Time to delay a workflow from retrying after an error
Expand Down Expand Up @@ -763,10 +763,24 @@ impl WorkflowCtx {
res: GlobalResult<T>,
) -> GlobalResult<GlobalResult<T>> {
match res {
Err(err) if !err.is_workflow_recoverable() => {
self.location_idx += 1;

Ok(Err(err))
Err(GlobalError::Raw(inner_err)) => {
match inner_err.downcast::<WorkflowError>() {
Ok(inner_err) => {
// Despite "history diverged" errors being unrecoverable, they should not have be returned
// by this function because the state of the history is already messed up and no new
// workflow items can be run.
if !inner_err.is_recoverable()
&& !matches!(*inner_err, WorkflowError::HistoryDiverged(_))
{
return Ok(Err(GlobalError::raw(inner_err)));
} else {
return Err(GlobalError::raw(inner_err));
}
}
Err(err) => {
return Err(GlobalError::Raw(err));
}
}
}
Err(err) => Err(err),
Ok(x) => Ok(Ok(x)),
Expand Down
30 changes: 27 additions & 3 deletions svc/pkg/cluster/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,32 @@ pub(crate) async fn create_dns_record(
cf::dns::DnsContent::TXT { .. } => "TXT",
cf::dns::DnsContent::SRV { .. } => "SRV",
};
let list_records_res =
get_dns_record(cf_token, zone_id, record_name, dns_type).await?;

// Find record to delete
let list_records_res = match content {
cf::dns::DnsContent::A { .. } => {
get_dns_record(cf_token, zone_id, record_name, dns_type).await?
}
cf::dns::DnsContent::TXT { .. } => {
// Get DNS record with content comparison
client
.request(&cf::dns::ListDnsRecords {
zone_identifier: zone_id,
params: cf::dns::ListDnsRecordsParams {
record_type: Some(content.clone()),
name: Some(record_name.to_string()),
..Default::default()
},
})
.await?
.result
.into_iter()
.next()
}
_ => {
unimplemented!("must configure whether to search for records via content vs no content for this DNS record type");
}
};

if let Some(record) = list_records_res {
delete_dns_record(client, zone_id, &record.id).await?;
Expand Down Expand Up @@ -140,7 +164,7 @@ pub(crate) async fn delete_dns_record(
Ok(())
}

/// Fetches the dns record by name.
/// Fetches a dns record by name and type, not content.
async fn get_dns_record(
cf_token: &str,
zone_id: &str,
Expand Down
12 changes: 9 additions & 3 deletions svc/pkg/cluster/src/workflows/server/dns_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct GetDnsRecordsInput {
server_id: Uuid,
}

#[derive(Debug, sqlx::FromRow, Serialize, Deserialize, Hash)]
#[derive(Debug, Default, sqlx::FromRow, Serialize, Deserialize, Hash)]
struct GetDnsRecordsOutput {
dns_record_id: Option<String>,
secondary_dns_record_id: Option<String>,
Expand All @@ -62,7 +62,7 @@ async fn get_dns_records(
ctx: &ActivityCtx,
input: &GetDnsRecordsInput,
) -> GlobalResult<GetDnsRecordsOutput> {
sql_fetch_one!(
let row = sql_fetch_optional!(
[ctx, GetDnsRecordsOutput]
"
SELECT dns_record_id, secondary_dns_record_id
Expand All @@ -73,7 +73,13 @@ async fn get_dns_records(
",
&input.server_id,
)
.await
.await?;

if row.is_none() {
tracing::warn!("server has no DNS record row");
}

Ok(row.unwrap_or_default())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down
9 changes: 8 additions & 1 deletion svc/pkg/cluster/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,14 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob
}
}

cleanup(ctx, input, &dc.provider, provider_server_workflow_id, true).await?;
cleanup(
ctx,
input,
&dc.provider,
provider_server_workflow_id,
state.has_dns,
)
.await?;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/ds/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
Ok(_) => {}
// If we cannot recover a setup error, send a failed signal
Err(err) => {
tracing::warn!("unrecoverable setup");
tracing::warn!(?err, "unrecoverable setup");

// TODO: Cleanup

Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/linode/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn linode_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult
Ok(x) => x,
// If we cannot recover a provisioning error, send a failed signal and clean up resources
Err(err) => {
tracing::warn!("unrecoverable provision, cleaning up");
tracing::warn!(?err, "unrecoverable provision, cleaning up");

ctx.dispatch_workflow(cleanup::Input {
api_token: input.api_token.clone(),
Expand Down

0 comments on commit 4b5411d

Please sign in to comment.