Skip to content

Commit

Permalink
fix(job-run): fix dupe allocs, re-enable drain all (#1128)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Sep 28, 2024
1 parent 2e3217e commit d019e01
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 46 deletions.
3 changes: 0 additions & 3 deletions svc/pkg/cluster/src/workflows/datacenter/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,6 @@ async fn inner(
// Sort job servers by allocated memory
servers.sort_by_key(|server| memory_by_server.get(&server.server_id));

// TODO: remove
tracing::info!(server_ids=?servers.iter().map(|s| s.server_id).collect::<Vec<_>>(), ?memory_by_server, "server topo");

// TODO: RVT-3732 Sort gg and ats servers by cpu usage
// servers.sort_by_key

Expand Down
1 change: 1 addition & 0 deletions svc/pkg/job-run/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub fn registry() -> WorkflowResult<Registry> {

let mut registry = Registry::new();
registry.register_workflow::<drain_all::Workflow>()?;
registry.register_workflow::<drain_all::Workflow2>()?;

Ok(registry)
}
21 changes: 10 additions & 11 deletions svc/pkg/job-run/src/workers/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ use proto::backend::pkg::*;

#[worker(name = "job-run-drain-all")]
async fn worker(ctx: &OperationContext<job_run::msg::drain_all::Message>) -> GlobalResult<()> {
// TODO: Disabled for now
// chirp_workflow::compat::workflow(
// ctx,
// crate::workflows::drain_all::Input {
// nomad_node_id: ctx.nomad_node_id.clone(),
// drain_timeout: ctx.drain_timeout,
// },
// )
// .await?
// .dispatch()
// .await?;
chirp_workflow::compat::workflow(
ctx,
crate::workflows::drain_all::Input2 {
nomad_node_id: ctx.nomad_node_id.clone(),
drain_timeout: ctx.drain_timeout,
},
)
.await?
.dispatch()
.await?;

Ok(())
}
6 changes: 1 addition & 5 deletions svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,7 @@ async fn update_db(
)
.await
{
tracing::warn!(
?err,
?alloc_id,
"error while trying to manually kill job"
);
tracing::warn!(?err, ?alloc_id, "error while trying to manually kill job");
}
}
}
Expand Down
45 changes: 45 additions & 0 deletions svc/pkg/job-run/src/workflows/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ pub async fn job_run_drain_all(ctx: &mut WorkflowCtx, input: &Input) -> GlobalRe
Ok(())
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Input2 {
pub nomad_node_id: String,
pub drain_timeout: u64,
}

#[workflow(Workflow2)]
pub async fn job_run_drain_all2(ctx: &mut WorkflowCtx, input: &Input2) -> GlobalResult<()> {
// We fetch here so that when we kill allocs later, we don't refetch new job runs that might be on the
// nomad node. Only allocs fetched at this time will be killed.
let job_runs = ctx
.activity(FetchJobRunsInput {
nomad_node_id: input.nomad_node_id.clone(),
})
.await?;

ctx.sleep(input.drain_timeout.saturating_sub(DRAIN_PADDING))
.await?;

ctx.activity(StopJobRuns2Input {
run_ids: job_runs.iter().map(|jr| jr.run_id).collect(),
})
.await?;

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct FetchJobRunsInput {
nomad_node_id: String,
Expand Down Expand Up @@ -95,6 +122,24 @@ async fn stop_job_runs(ctx: &ActivityCtx, input: &StopJobRunsInput) -> GlobalRes
Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct StopJobRuns2Input {
run_ids: Vec<Uuid>,
}

#[activity(StopJobRuns2)]
async fn stop_job_runs2(ctx: &ActivityCtx, input: &StopJobRuns2Input) -> GlobalResult<()> {
for run_id in &input.run_ids {
msg!([ctx] job_run::msg::stop(run_id) {
run_id: Some((*run_id).into()),
skip_kill_alloc: false,
})
.await?;
}

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct KillAllocsInput {
nomad_node_id: String,
Expand Down
29 changes: 2 additions & 27 deletions svc/pkg/mm/worker/src/workers/nomad_node_closed_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,34 +66,9 @@ async fn worker(
}

pipe.query_async(&mut ctx.redis_mm().await?).await?;
} else {
let mut script = REDIS_SCRIPT.prepare_invoke();

script.arg(lobby_rows.len());

for lobby in lobby_rows {
script
.key(util_mm::key::lobby_config(lobby.lobby_id))
.key(util_mm::key::lobby_player_ids(lobby.lobby_id))
.key(util_mm::key::lobby_available_spots(
lobby.namespace_id,
datacenter_id,
lobby.lobby_group_id,
util_mm::JoinKind::Normal,
))
.key(util_mm::key::lobby_available_spots(
lobby.namespace_id,
datacenter_id,
lobby.lobby_group_id,
util_mm::JoinKind::Party,
))
.arg(lobby.lobby_id.to_string())
.arg(lobby.max_players_normal)
.arg(lobby.max_players_party);
}

script.invoke_async(&mut ctx.redis_mm().await?).await?;
}

// NOTE: Don't do anything on undrain

Ok(())
}

0 comments on commit d019e01

Please sign in to comment.