Skip to content

Commit

Permalink
chore: cleanup runtime aggregate op (#902)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Jun 18, 2024
1 parent 22a1ebd commit 538d9b8
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 222 deletions.
21 changes: 0 additions & 21 deletions lib/convert/src/impls/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,6 @@ pub fn analytics_lobby_summary_from_lobby(
})
}

// TODO: Remove
// impl ApiTryFrom<mm::lobby_runtime_aggregate::response::RegionTierTime>
// for models::CloudRegionTierExpenses
// {
// type Error = GlobalError;

// fn api_try_from(
// value: mm::lobby_runtime_aggregate::response::RegionTierTime,
// ) -> GlobalResult<Self> {
// let uptime_in_seconds = util::div_up!(value.total_time, 1_000);

// Ok(models::CloudRegionTierExpenses {
// namespace_id: unwrap_ref!(value.namespace_id).as_uuid(),
// region_id: unwrap_ref!(value.region_id).as_uuid(),
// tier_name_id: value.tier_name_id,
// lobby_group_name_id: value.lobby_group_name_id,
// uptime: uptime_in_seconds,
// })
// }
// }

impl ApiTryFrom<backend::game::Game> for models::GameHandle {
type Error = GlobalError;

Expand Down
2 changes: 1 addition & 1 deletion lib/convert/src/impls/portal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use proto::backend::{self, pkg::*};
use proto::backend;
use rivet_operation::prelude::*;
use rivet_portal_server::models;

Expand Down
18 changes: 0 additions & 18 deletions proto/backend/billing.proto

This file was deleted.

173 changes: 29 additions & 144 deletions svc/pkg/mm/ops/lobby-runtime-aggregate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,33 @@ struct LobbyRow {
namespace_id: Uuid,
lobby_id: Option<Uuid>,
region_id: Option<Uuid>,
lobby_group_id: Option<Uuid>,
create_ts: Option<i64>,
stop_ts: Option<i64>,
}

#[derive(Default)]
struct LobbyAggregate {
struct RegionAggregate {
query_start: i64,
query_end: i64,

/// Total time in milliseconds for each (namespace_id, region_id, lobby_group_id)
total_time: HashMap<(Uuid, Uuid, Uuid), i64>,
/// Total time in milliseconds for each (namespace_id, region_id)
total_time: HashMap<(Uuid, Uuid), i64>,

/// Lobbies that are included in the aggregation.
processed_lobby_ids: HashSet<Uuid>,
}

impl LobbyAggregate {
impl RegionAggregate {
fn process_lobby(&mut self, lobby_row: &LobbyRow) {
// Unwrap values or ignore row
let (lobby_id, region_id, lobby_group_id, create_ts) =
if let (Some(a), Some(b), Some(c), Some(d)) = (
lobby_row.lobby_id,
lobby_row.region_id,
lobby_row.lobby_group_id,
lobby_row.create_ts,
) {
(a, b, c, d)
} else {
tracing::warn!(?lobby_row, "missing data in lobby row history");
return;
};
let (lobby_id, region_id, create_ts) = if let (Some(a), Some(b), Some(c)) =
(lobby_row.lobby_id, lobby_row.region_id, lobby_row.create_ts)
{
(a, b, c)
} else {
tracing::warn!(?lobby_row, "missing data in lobby row history");
return;
};

// Check it's not already registered
if self.processed_lobby_ids.contains(&lobby_id) {
Expand All @@ -62,17 +57,10 @@ impl LobbyAggregate {
let duration = i64::min(stop_ts, self.query_end) - i64::max(start_ts, self.query_start);
*self
.total_time
.entry((lobby_row.namespace_id, region_id, lobby_group_id))
.entry((lobby_row.namespace_id, region_id))
.or_insert(0) += duration;
self.processed_lobby_ids.insert(lobby_id);
}

fn lobby_group_ids(&self) -> HashSet<Uuid> {
self.total_time
.iter()
.map(|((_, _, x), _)| *x)
.collect::<HashSet<Uuid>>()
}
}

#[operation(name = "mm-lobby-runtime-aggregate")]
Expand All @@ -86,10 +74,10 @@ async fn handle(
.collect::<Vec<_>>();
tracing::info!(?namespace_ids, "namespaces");

let mut lobby_aggregate = LobbyAggregate {
let mut region_aggregate = RegionAggregate {
query_start: ctx.query_start,
query_end: ctx.query_end,
..LobbyAggregate::default()
..Default::default()
};

// Aggregate all lobbies that finished during the given query span.
Expand All @@ -107,7 +95,7 @@ async fn handle(
let mut lobby_rows = sql_fetch!(
[ctx, LobbyRow, &crdb]
"
SELECT namespace_id, lobby_id, region_id, lobby_group_id, create_ts, stop_ts
SELECT namespace_id, lobby_id, region_id, create_ts, stop_ts
FROM db_mm_state.lobbies AS OF SYSTEM TIME '-5s'
WHERE namespace_id = ANY($1) AND (
-- Lobbies stopped during the query window
Expand All @@ -124,129 +112,26 @@ async fn handle(
);
while let Some(lobby_row) = lobby_rows.next().await {
let lobby_row = lobby_row?;
lobby_aggregate.process_lobby(&lobby_row);
region_aggregate.process_lobby(&lobby_row);
}
tracing::info!(
total_time = ?lobby_aggregate.total_time,
processed_len = ?lobby_aggregate.processed_lobby_ids.len(),
total_time = ?region_aggregate.total_time,
processed_len = ?region_aggregate.processed_lobby_ids.len(),
"aggregated all lobbies"
);

// Look up region tiers for all lobby groups
let lobby_group_ids = lobby_aggregate
.lobby_group_ids()
.into_iter()
.map(Into::<common::Uuid>::into)
.collect::<Vec<_>>();
let lg_resolve_res = op!([ctx] mm_config_lobby_group_resolve_version {
lobby_group_ids: lobby_group_ids.clone(),
})
.await?;
tracing::info!(
lobby_group_ids_len = ?lobby_group_ids.len(),
versions_len = ?lg_resolve_res.versions.len(),
"resolved lobby group versions"
);

let version_ids = lg_resolve_res
.versions
.iter()
.filter_map(|x| x.version_id.as_ref())
.map(common::Uuid::as_uuid)
.collect::<HashSet<_>>()
.into_iter()
.map(Into::<common::Uuid>::into)
.collect::<Vec<_>>();
let version_res = op!([ctx] mm_config_version_get {
version_ids: version_ids.clone(),
})
.await?;
ensure_eq!(
version_ids.len(),
version_res.versions.len(),
"missing version ids"
);
tracing::info!(versions_len = ?version_res.versions.len(), "fetched mm versions");

// Convert responses
let mut tier_aggregates = HashMap::<(Uuid, Uuid, String, &str), i64>::new(); // (namespace_id, region_id, lobby_group_name_id, tier_name_id) -> time (ms)
for ((namespace_id, region_id, lobby_group_id), total_time) in lobby_aggregate.total_time {
let region_id_proto = Some(common::Uuid::from(region_id));
let lgi_proto = Some(common::Uuid::from(lobby_group_id));

// Find the version ID for the lobby group
let version_id_proto = if let Some(version) = lg_resolve_res
.versions
.iter()
.find(|x| x.lobby_group_id == lgi_proto)
{
&version.version_id
} else {
tracing::warn!(%lobby_group_id, "could not find matching version for lobby group");
continue;
};
let version_id = unwrap_ref!(version_id_proto).as_uuid();

// Find the matching version config
let version_res = if let Some(x) = version_res
.versions
.iter()
.find(|x| x.version_id == *version_id_proto)
{
x
} else {
tracing::warn!(%lobby_group_id, %version_id, "could not find matching version config for version id");
continue;
};
let version_config = unwrap_ref!(version_res.config);
let version_meta = unwrap_ref!(version_res.config_meta);

// Resolve the configured tier name ID
let lobby_group_idx = unwrap!(
version_meta
.lobby_groups
.iter()
.enumerate()
.find(|(_, x)| x.lobby_group_id == lgi_proto),
"could not find matching tier"
)
.0;
let lobby_group_config = unwrap!(version_config.lobby_groups.get(lobby_group_idx));
let lobby_group_region = unwrap!(
lobby_group_config
.regions
.iter()
.find(|x| x.region_id == region_id_proto),
"could not find matching region id config"
);
let tier_name_id = lobby_group_region.tier_name_id.as_str();

// Append to region + tier aggregate
*tier_aggregates
.entry((
namespace_id,
region_id,
lobby_group_config.name_id.clone(),
tier_name_id,
))
.or_insert(0) += total_time;
}

// Build response
let region_tier_times = tier_aggregates
let usage = region_aggregate
.total_time
.into_iter()
.map(
|((namespace_id, region_id, lobby_group_name_id, tier_name_id), total_time)| {
mm::lobby_runtime_aggregate::response::RegionTierTime {
namespace_id: Some(namespace_id.into()),
region_id: Some(region_id.into()),
lobby_group_name_id,
tier_name_id: tier_name_id.to_string(),
total_time,
}
},
)
.map(|((namespace_id, region_id), total_time)| {
mm::lobby_runtime_aggregate::response::NamespaceUsage {
namespace_id: Some(namespace_id.into()),
region_id: Some(region_id.into()),
total_time,
}
})
.collect::<Vec<_>>();

Ok(mm::lobby_runtime_aggregate::Response { region_tier_times })
Ok(mm::lobby_runtime_aggregate::Response { usage })
}
14 changes: 7 additions & 7 deletions svc/pkg/mm/ops/lobby-runtime-aggregate/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ async fn default(ctx: TestCtx) {
.await
.unwrap();

assert_eq!(res.region_tier_times.len(), 2, "ns not found");
for times in &res.region_tier_times {
assert_eq!(res.usage.len(), 2, "ns not found");
for times in &res.usage {
assert!(times.total_time > 0, "should have time");
}
}
Expand Down Expand Up @@ -82,8 +82,8 @@ async fn missing_columns(ctx: TestCtx) {
.await
.unwrap();

assert_eq!(2, res.region_tier_times.len(), "logs not found");
for times in &res.region_tier_times {
assert_eq!(2, res.usage.len(), "logs not found");
for times in &res.usage {
let times_ns_id = times.namespace_id.as_ref().unwrap().as_uuid();
if times_ns_id == fake_namespace_id {
// TODO: Calculate what the total time should be and that it doesn't
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn out_of_range(ctx: TestCtx) {
.await
.unwrap();

assert!(res.region_tier_times.is_empty(), "range check failed");
assert!(res.usage.is_empty(), "range check failed");
}

#[worker_test]
Expand Down Expand Up @@ -171,7 +171,7 @@ async fn min(ctx: TestCtx) {
.unwrap();

assert_eq!(
res.region_tier_times.first().unwrap().total_time,
res.usage.first().unwrap().total_time,
5,
"minimum check failed"
);
Expand Down Expand Up @@ -223,7 +223,7 @@ async fn max(ctx: TestCtx) {
.unwrap();

assert_eq!(
res.region_tier_times.first().unwrap().total_time,
res.usage.first().unwrap().total_time,
5,
"minimum check failed"
);
Expand Down
10 changes: 4 additions & 6 deletions svc/pkg/mm/types/lobby-runtime-aggregate.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ message Request {
}

message Response {
message RegionTierTime {
message NamespaceUsage {
reserved 3, 5;

rivet.common.Uuid namespace_id = 1;
rivet.common.Uuid region_id = 2;
string tier_name_id = 3;
// Use the name ID instead of the UUID since we want to combine the same
// expenses across multiple lobby groups
string lobby_group_name_id = 5;
int64 total_time = 4; // in milliseconds
}

repeated RegionTierTime region_tier_times = 1;
repeated NamespaceUsage usage = 1;
}
25 changes: 0 additions & 25 deletions svc/pkg/team/types/billing-aggregate.proto

This file was deleted.

0 comments on commit 538d9b8

Please sign in to comment.