Skip to content

Commit

Permalink
fix(mm, ds): fix dupe alloc killing (#1124)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->
Fixes RVTEE-585
## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Sep 28, 2024
1 parent 1c75e3b commit dcdb06a
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 49 deletions.
33 changes: 16 additions & 17 deletions svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use chirp_workflow::prelude::*;

use crate::util::{signal_allocation, NOMAD_CONFIG, NOMAD_REGION};
use crate::util::{NOMAD_CONFIG, NOMAD_REGION};

// TODO:
const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(2);
Expand All @@ -15,6 +15,7 @@ pub struct Input {

#[workflow]
pub async fn ds_server_nomad_alloc_plan(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
let job_id = unwrap_ref!(input.alloc.job_id);
let alloc_id = unwrap_ref!(input.alloc.ID);
let nomad_node_id = unwrap_ref!(input.alloc.node_id, "alloc has no node id");

Expand Down Expand Up @@ -64,8 +65,8 @@ pub async fn ds_server_nomad_alloc_plan(ctx: &mut WorkflowCtx, input: &Input) ->
.await?;

if db_res.kill_alloc {
ctx.activity(KillAllocInput {
alloc_id: alloc_id.clone(),
ctx.activity(DeleteJobInput {
job_id: job_id.clone(),
})
.await?;
}
Expand Down Expand Up @@ -227,37 +228,35 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<Upd
.unwrap_or_default();

if kill_alloc {
tracing::warn!(server_id=%input.server_id, existing_alloc_id=?nomad_alloc_id, new_alloc_id=%input.alloc_id, "different allocation id given, killing new allocation");
tracing::warn!(server_id=%input.server_id, existing_alloc_id=?nomad_alloc_id, new_alloc_id=%input.alloc_id, "different allocation id given, killing job");
}

Ok(UpdateDbOutput { kill_alloc })
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct KillAllocInput {
alloc_id: String,
struct DeleteJobInput {
job_id: String,
}

#[activity(KillAlloc)]
async fn kill_alloc(ctx: &ActivityCtx, input: &KillAllocInput) -> GlobalResult<()> {
if let Err(err) = signal_allocation(
#[activity(DeleteJob)]
async fn kill_alloc(ctx: &ActivityCtx, input: &DeleteJobInput) -> GlobalResult<()> {
if let Err(err) = nomad_client::apis::jobs_api::delete_job(
&NOMAD_CONFIG,
&input.alloc_id,
None,
&input.job_id,
Some(NOMAD_REGION),
None,
None,
Some(nomad_client_old::models::AllocSignalRequest {
task: None,
signal: Some("SIGKILL".to_string()),
}),
None,
Some(false),
None,
)
.await
{
tracing::warn!(
?err,
?input.alloc_id,
"error while trying to manually kill allocation"
?input.job_id,
"error while trying to manually kill job"
);
}

Expand Down
21 changes: 11 additions & 10 deletions svc/pkg/job-run/src/workers/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ use proto::backend::pkg::*;

#[worker(name = "job-run-drain-all")]
async fn worker(ctx: &OperationContext<job_run::msg::drain_all::Message>) -> GlobalResult<()> {
chirp_workflow::compat::workflow(
ctx,
crate::workflows::drain_all::Input {
nomad_node_id: ctx.nomad_node_id.clone(),
drain_timeout: ctx.drain_timeout,
},
)
.await?
.dispatch()
.await?;
// TODO: Disabled for now
// chirp_workflow::compat::workflow(
// ctx,
// crate::workflows::drain_all::Input {
// nomad_node_id: ctx.nomad_node_id.clone(),
// drain_timeout: ctx.drain_timeout,
// },
// )
// .await?
// .dispatch()
// .await?;

Ok(())
}
23 changes: 9 additions & 14 deletions svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use proto::backend::{self, pkg::*};
use redis::AsyncCommands;
use serde::Deserialize;

use crate::{
util::{signal_allocation, NOMAD_REGION},
workers::{NEW_NOMAD_CONFIG, NOMAD_CONFIG},
};
use crate::{util::NOMAD_REGION, workers::NEW_NOMAD_CONFIG};

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down Expand Up @@ -320,26 +317,24 @@ async fn update_db(
.map(|id| id != &alloc_id)
.unwrap_or_default()
{
tracing::warn!(%run_id, existing_alloc_id=?run_row.alloc_id, new_alloc_id=%alloc_id, "different allocation id given, killing new allocation");
tracing::warn!(%run_id, existing_alloc_id=?run_row.alloc_id, new_alloc_id=%alloc_id, "different allocation id given, killing job");

if let Err(err) = signal_allocation(
&NOMAD_CONFIG,
&alloc_id,
None,
if let Err(err) = nomad_client_new::apis::jobs_api::delete_job(
&NEW_NOMAD_CONFIG,
&job_id,
Some(NOMAD_REGION),
None,
None,
Some(nomad_client::models::AllocSignalRequest {
task: None,
signal: Some("SIGKILL".to_string()),
}),
None,
Some(false),
None,
)
.await
{
tracing::warn!(
?err,
?alloc_id,
"error while trying to manually kill allocation"
"error while trying to manually kill job"
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/linode/src/util/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ pub async fn wait_disk_ready(client: &Client, linode_id: u64, disk_id: u64) -> G
loop {
let res = client
.inner()
.get(&format!(
.get(format!(
"https://api.linode.com/v4/linode/instances/{linode_id}/disks/{disk_id}"
))
.send()
Expand Down
8 changes: 4 additions & 4 deletions svc/pkg/linode/src/util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl Client {
let res = self
.request(
self.inner
.get(&format!("https://api.linode.com/v4{endpoint}")),
.get(format!("https://api.linode.com/v4{endpoint}")),
None,
false,
)
Expand All @@ -132,7 +132,7 @@ impl Client {
pub async fn delete(&self, endpoint: &str) -> GlobalResult<()> {
self.request(
self.inner
.delete(&format!("https://api.linode.com/v4{endpoint}")),
.delete(format!("https://api.linode.com/v4{endpoint}")),
None,
true,
)
Expand All @@ -149,7 +149,7 @@ impl Client {
let res = self
.request(
self.inner
.post(&format!("https://api.linode.com/v4{endpoint}"))
.post(format!("https://api.linode.com/v4{endpoint}"))
.header("content-type", "application/json"),
Some(body),
false,
Expand All @@ -162,7 +162,7 @@ impl Client {
pub async fn post_no_res(&self, endpoint: &str, body: serde_json::Value) -> GlobalResult<()> {
self.request(
self.inner
.post(&format!("https://api.linode.com/v4{endpoint}"))
.post(format!("https://api.linode.com/v4{endpoint}"))
.header("content-type", "application/json"),
Some(body),
false,
Expand Down
1 change: 1 addition & 0 deletions svc/pkg/linode/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ struct CreateInstanceOutput {
}

#[activity(CreateInstance)]
#[timeout = 120]
async fn create_instance(
ctx: &ActivityCtx,
input: &CreateInstanceInput,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use chirp_workflow::prelude::*;
use rivet_operation::prelude::proto::backend::pkg::nomad;
use serde::Deserialize;
use serde_json::json;

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use chirp_workflow::prelude::*;
use rivet_operation::prelude::proto::backend::pkg::nomad;
use serde::Deserialize;
use serde_json::json;

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use chirp_workflow::prelude::*;
use rivet_operation::prelude::proto::backend::pkg::nomad;
use serde::Deserialize;
use serde_json::json;

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down

0 comments on commit dcdb06a

Please sign in to comment.