Skip to content

Commit

Permalink
fix(mm): move runtime aggregate logic into query
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato authored and NathanFlurry committed Jun 30, 2024
1 parent 3fa0611 commit a334684
Showing 1 changed file with 72 additions and 115 deletions.
187 changes: 72 additions & 115 deletions svc/pkg/mm/ops/lobby-runtime-aggregate/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,11 @@
use std::collections::{HashMap, HashSet};

use futures_util::stream::StreamExt;
use proto::backend::pkg::*;
use rivet_operation::prelude::*;

// NOTE: There's a bug in mm-lobby-cleanup that will upsert rows
#[derive(Debug, sqlx::FromRow)]
struct LobbyRow {
struct RegionRow {
namespace_id: Uuid,
lobby_id: Option<Uuid>,
region_id: Option<Uuid>,
create_ts: Option<i64>,
stop_ts: Option<i64>,
}

#[derive(Default)]
struct RegionAggregate {
query_start: i64,
query_end: i64,

/// Total time in milliseconds for each (namespace_id, region_id)
total_time: HashMap<(Uuid, Uuid), i64>,

/// Lobbies that are included in the aggregation.
processed_lobby_ids: HashSet<Uuid>,
}

impl RegionAggregate {
fn process_lobby(&mut self, lobby_row: &LobbyRow) {
// Unwrap values or ignore row
let (lobby_id, region_id, create_ts) = if let (Some(a), Some(b), Some(c)) =
(lobby_row.lobby_id, lobby_row.region_id, lobby_row.create_ts)
{
(a, b, c)
} else {
tracing::warn!(?lobby_row, "missing data in lobby row history");
return;
};

// Check it's not already registered
if self.processed_lobby_ids.contains(&lobby_id) {
tracing::info!(%lobby_id, "lobby already processed");
return;
}

// Derive start and stop ts
let start_ts = create_ts;
let stop_ts = lobby_row.stop_ts.unwrap_or(self.query_end);

// Filter out lobbies that did not overlap with the given range
if start_ts > self.query_end || stop_ts <= self.query_start {
return;
}

// Add duration masked with the query range
let duration = i64::min(stop_ts, self.query_end) - i64::max(start_ts, self.query_start);
*self
.total_time
.entry((lobby_row.namespace_id, region_id))
.or_insert(0) += duration;
self.processed_lobby_ids.insert(lobby_id);
}
region_id: Uuid,
total_time: i64,
}

#[operation(name = "mm-lobby-runtime-aggregate")]
Expand All @@ -74,64 +19,76 @@ async fn handle(
.collect::<Vec<_>>();
tracing::info!(?namespace_ids, "namespaces");

let mut region_aggregate = RegionAggregate {
query_start: ctx.query_start,
query_end: ctx.query_end,
..Default::default()
};
let regions = ctx
.cache()
.immutable()
.fetch_all_proto(
"mm.lobby_runtime",
namespace_ids,
|mut cache, namespace_ids| {
let query_start = ctx.query_start;
let query_end = ctx.query_end;
let ctx = ctx.base();
async move {
// Aggregate all lobbies that finished during the given query span.
//
// We do this after querying the lobbies that are still running in order to
// ensure that we capture all lobbies in all states that may have stopped
// while generating this aggregation.
//
// Use AS OF SYSTEM TIME to reduce contention.
// https://www.cockroachlabs.com/docs/v22.2/performance-best-practices-overview#use-as-of-system-time-to-decrease-conflicts-with-long-running-queries
let region_rows = sql_fetch_all!(
[ctx, RegionRow]
"
SELECT
namespace_id,
region_id,
SUM(
CASE
-- Lobbies stopped during the query window
WHEN stop_ts > $2 AND stop_ts <= $3 THEN
stop_ts - GREATEST(create_ts, $2)
-- Lobbies created during the query window, these may already be stopped after query_end
WHEN create_ts > $2 AND create_ts <= $3 THEN
LEAST(stop_ts, $3) - create_ts
-- Lobbies still running that overlap with the query window
WHEN stop_ts IS NULL AND create_ts <= $3 THEN
$3 - create_ts
ELSE 0
END
) AS total_time
FROM db_mm_state.lobbies AS OF SYSTEM TIME '-5s'
WHERE namespace_id = ANY($1)
AND (
(stop_ts > $2 AND stop_ts <= $3)
OR (create_ts > $2 AND create_ts <= $3)
OR (stop_ts IS NULL AND create_ts <= $3)
)
GROUP BY namespace_id, region_id
",
&namespace_ids,
query_start,
query_end,
)
.await?;

// Aggregate all lobbies that finished during the given query span.
//
// We do this after querying the lobbies that are still running in order to
// ensure that we capture all lobbies in all states that may have stopped
// while generating this aggregation.
//
// `LobbyAggregate` handles deduplication of aggregated lobbies from the
// previous step.
//
// Use AS OF SYSTEM TIME to reduce contention.
// https://www.cockroachlabs.com/docs/v22.2/performance-best-practices-overview#use-as-of-system-time-to-decrease-conflicts-with-long-running-queries
let crdb = ctx.crdb().await?;
let mut lobby_rows = sql_fetch!(
[ctx, LobbyRow, &crdb]
"
SELECT namespace_id, lobby_id, region_id, create_ts, stop_ts
FROM db_mm_state.lobbies AS OF SYSTEM TIME '-5s'
WHERE namespace_id = ANY($1) AND (
-- Lobbies stopped during the query window
(stop_ts > $2 AND stop_ts <= $3) OR
-- Lobbies created during the query window, these may already be stopped after query_end
(create_ts > $2 AND create_ts <= $3) OR
-- Lobbies still running that overlap with the query window
(stop_ts IS NULL AND create_ts <= $3)
)
",
&namespace_ids,
ctx.query_start,
ctx.query_end,
);
while let Some(lobby_row) = lobby_rows.next().await {
let lobby_row = lobby_row?;
region_aggregate.process_lobby(&lobby_row);
}
tracing::info!(
total_time = ?region_aggregate.total_time,
processed_len = ?region_aggregate.processed_lobby_ids.len(),
"aggregated all lobbies"
);
for row in region_rows {
cache.resolve(
&row.namespace_id,
mm::lobby_runtime_aggregate::response::NamespaceUsage {
namespace_id: Some(row.namespace_id.into()),
region_id: Some(row.region_id.into()),
total_time: row.total_time,
},
);
}

// Build response
let usage = region_aggregate
.total_time
.into_iter()
.map(|((namespace_id, region_id), total_time)| {
mm::lobby_runtime_aggregate::response::NamespaceUsage {
namespace_id: Some(namespace_id.into()),
region_id: Some(region_id.into()),
total_time,
}
})
.collect::<Vec<_>>();
Ok(cache)
}
},
)
.await?;

Ok(mm::lobby_runtime_aggregate::Response { usage })
Ok(mm::lobby_runtime_aggregate::Response { usage: regions })
}

0 comments on commit a334684

Please sign in to comment.