Skip to content

Commit

Permalink
fix(mm): call mm-lobby-cleanup from mm-gc even for preemptive lobbies…
Browse files Browse the repository at this point in the history
… without sql row (#856)

<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
NathanFlurry committed Jun 6, 2024
1 parent 9370e9e commit 5315a9a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 12 deletions.
4 changes: 3 additions & 1 deletion svc/pkg/mm/ops/lobby-find-fail/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ async fn handle(
futs.push(fail_query(ctx.clone(), redis, query_id, ctx.error_code).boxed());
}
}
futures_util::future::try_join_all(futs).await?;
if !futs.is_empty() {
futures_util::future::try_join_all(futs).await?;
}

Ok(mm::lobby_find_fail::Response {})
}
Expand Down
9 changes: 8 additions & 1 deletion svc/pkg/mm/ops/lobby-find-lobby-query-list/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use proto::backend::pkg::*;
use redis::AsyncCommands;
use rivet_operation::prelude::*;

const MAX_COUNT: isize = 16;

#[operation(name = "mm-lobby-find-lobby-query-list")]
async fn handle(
ctx: OperationContext<mm::lobby_find_lobby_query_list::Request>,
Expand All @@ -11,7 +13,7 @@ async fn handle(
let query_ids = ctx
.redis_mm()
.await?
.zrange::<_, Vec<String>>(util_mm::key::lobby_find_queries(lobby_id), 0, -1)
.zrange::<_, Vec<String>>(util_mm::key::lobby_find_queries(lobby_id), 0, MAX_COUNT - 1)
.await?
.iter()
.map(String::as_str)
Expand All @@ -20,5 +22,10 @@ async fn handle(
.map(common::Uuid::from)
.collect::<Vec<common::Uuid>>();

if query_ids.len() as isize == MAX_COUNT {
tracing::warn!("too many find queries, short circuiting to prevent bad things from happening");
return Ok(mm::lobby_find_lobby_query_list::Response { query_ids: Vec::new() })
}

Ok(mm::lobby_find_lobby_query_list::Response { query_ids })
}
17 changes: 17 additions & 0 deletions svc/pkg/mm/worker/src/workers/lobby_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ async fn worker(ctx: &OperationContext<mm::msg::lobby_cleanup::Message>) -> Glob
// This is idempotent, don't raise error
tracing::info!("lobby not present in redis");

remove_from_redis_without_config(&mut redis_mm, lobby_id).await?;

false
};

Expand Down Expand Up @@ -256,3 +258,18 @@ async fn remove_from_redis(

Ok(())
}

async fn remove_from_redis_without_config(
redis_mm: &mut RedisPool,
lobby_id: Uuid,
) -> GlobalResult<()> {
let mut pipe = redis::pipe();
pipe.atomic()
.unlink(util_mm::key::lobby_config(lobby_id))
.unlink(util_mm::key::lobby_tags(lobby_id))
.zrem(util_mm::key::lobby_unready(), lobby_id.to_string())
.query_async(redis_mm)
.await?;

Ok(())
}
25 changes: 15 additions & 10 deletions svc/pkg/mm/worker/src/workers/lobby_stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ struct LobbyRow {
async fn worker(ctx: &OperationContext<mm::msg::lobby_stop::Message>) -> GlobalResult<()> {
let lobby_id = unwrap_ref!(ctx.lobby_id).as_uuid();

if ctx.req_dt() > util::duration::minutes(5) {
tracing::error!("discarding stale message");
return Ok(());
}

// Fetch the lobby.
//
// This also ensures that mm-lobby-find or mm-lobby-create
Expand Down Expand Up @@ -38,24 +43,24 @@ async fn worker(ctx: &OperationContext<mm::msg::lobby_stop::Message>) -> GlobalR
.await?;
tracing::info!(?lobby_row, "lobby row");

let Some(lobby_row) = lobby_row else {
if ctx.req_dt() > util::duration::minutes(5) {
tracing::error!("discarding stale message");
return Ok(());
} else {
retry_bail!("lobby not found, may be race condition with insertion");
}
};

// conflicting locks on the lobby row
// Cleanup the lobby ASAP.
//
// Conflicting locks on the lobby row, so dont cleanup after the SQL query but before the retry_bail in
// case the lobby does not exist in the db. lobby_cleanup will remove it from Redis
// appropriately.
//
// This will also be called in `job-run-cleanup`, but this is idempotent.
msg!([ctx] mm::msg::lobby_cleanup(lobby_id) {
lobby_id: Some(lobby_id.into()),
})
.await?;

let Some(lobby_row) = lobby_row else {
// Don't use `retry_bail` because this will retry frequently, and we need to call
// `mm::msg::lobby_cleanup` first
bail!("lobby not found, may be race condition with insertion");
};

// Stop the job. This will call cleanup and delete the lobby row.
if let Some(run_id) = lobby_row.run_id {
msg!([ctx] job_run::msg::stop(run_id) {
Expand Down

0 comments on commit 5315a9a

Please sign in to comment.