Skip to content

Commit

Permalink
feat: add batch actor upgrade (#1480)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->
Fixes RVT-4155
## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Nov 27, 2024
1 parent dea5633 commit c2558d4
Show file tree
Hide file tree
Showing 47 changed files with 1,764 additions and 10 deletions.
126 changes: 117 additions & 9 deletions packages/api/actor/src/route/actors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use api_helper::{anchor::WatchIndexQuery, ctx::Ctx};
use futures_util::{StreamExt, TryStreamExt};
use proto::backend;
use rivet_api::models;
use rivet_convert::{ApiInto, ApiTryInto};
Expand Down Expand Up @@ -439,6 +440,98 @@ pub async fn upgrade(
Ok(json!({}))
}

// MARK: POST /actors/upgrade
pub async fn upgrade_all(
ctx: Ctx<Auth>,
body: models::ActorUpgradeAllActorsRequest,
query: GlobalQuery,
) -> GlobalResult<models::ActorUpgradeAllActorsResponse> {
let CheckOutput { env_id, .. } = ctx.auth().check(ctx.op_ctx(), &query, false).await?;

let tags = unwrap_with!(body.tags, API_BAD_BODY, error = "missing property `tags`");

ensure_with!(
tags.as_object().map(|x| x.len()).unwrap_or_default() <= 64,
API_BAD_BODY,
error = "Too many tags (max 64)."
);

let tags = unwrap_with!(
serde_json::from_value::<HashMap<String, String>>(tags).ok(),
API_BAD_BODY,
error = "`tags` must be `Map<String, String>`"
);

for (k, v) in &tags {
ensure_with!(
k.len() <= 256,
API_BAD_BODY,
error = format!(
"tags[{:?}]: Tag label too large (max 256 bytes).",
&k[..256]
),
);

ensure_with!(
v.len() <= 1024,
API_BAD_BODY,
error = format!("tags[{k:?}]: Tag value too large (max 1024 bytes)."),
);
}

let build_id = resolve_build_id(&ctx, env_id, body.build, body.build_tags.flatten()).await?;

// Work in batches
let mut count = 0;
let mut cursor = None;
loop {
let list_res = ctx
.op(ds::ops::server::list_for_env::Input {
env_id,
tags: tags.clone(),
include_destroyed: false,
cursor,
limit: 10_000,
})
.await?;

count += list_res.server_ids.len();
cursor = list_res.server_ids.last().cloned();

let subs = futures_util::stream::iter(list_res.server_ids.clone())
.map(|server_id| {
ctx.subscribe::<ds::workflows::server::UpgradeStarted>(("server_id", server_id))
})
.buffer_unordered(32)
.try_collect::<Vec<_>>()
.await?;

futures_util::stream::iter(list_res.server_ids)
.map(|server_id| {
ctx.signal(ds::workflows::server::Upgrade { image_id: build_id })
.tag("server_id", server_id)
.send()
})
.buffer_unordered(32)
.try_collect::<Vec<_>>()
.await?;

futures_util::stream::iter(subs)
.map(|mut sub| async move { sub.next().await })
.buffer_unordered(32)
.try_collect::<Vec<_>>()
.await?;

if count % 10_000 != 0 {
break;
}
}

Ok(models::ActorUpgradeAllActorsResponse {
count: count.try_into()?,
})
}

// MARK: GET /actors
#[derive(Debug, Clone, Deserialize)]
pub struct ListQuery {
Expand All @@ -464,15 +557,26 @@ async fn list_actors_inner(
) -> GlobalResult<models::ActorListActorsResponse> {
let CheckOutput { env_id, .. } = ctx.auth().check(ctx.op_ctx(), &query.global, false).await?;

let include_destroyed = query.include_destroyed.unwrap_or(false);

let tags = unwrap_with!(
query
.tags_json
.as_deref()
.map_or(Ok(HashMap::new()), serde_json::from_str)
.ok(),
API_BAD_QUERY_PARAMETER,
parameter = "tags_json",
error = "must be `Map<String, String>`"
);

let list_res = ctx
.op(ds::ops::server::list_for_env::Input {
env_id,
tags: query
.tags_json
.as_deref()
.map_or(Ok(HashMap::new()), serde_json::from_str)?,
include_destroyed: query.include_destroyed.unwrap_or(false),
tags,
include_destroyed,
cursor: query.cursor,
limit: if include_destroyed { 64 } else { 10_000 },
})
.await?;

Expand Down Expand Up @@ -636,26 +740,30 @@ async fn resolve_build_id(
(Some(build_id), None) => Ok(build_id),
// Resolve build from tags
(None, Some(build_tags)) => {
let build_tags = serde_json::from_value::<HashMap<String, String>>(build_tags)?;
let build_tags = unwrap_with!(
serde_json::from_value::<HashMap<String, String>>(build_tags).ok(),
API_BAD_BODY,
error = "`build_tags` must be `Map<String, String>`"
);

ensure_with!(
build_tags.len() < 64,
ACTOR_FAILED_TO_CREATE,
API_BAD_BODY,
error = "Too many build tags (max 64)."
);

for (k, v) in &build_tags {
ensure_with!(
k.len() < 128,
ACTOR_FAILED_TO_CREATE,
API_BAD_BODY,
error = format!(
"build_tags[{:?}]: Build tag label too large (max 128 bytes).",
&k[..128]
)
);
ensure_with!(
v.len() < 256,
ACTOR_FAILED_TO_CREATE,
API_BAD_BODY,
error =
format!("build_tags[{k:?}]: Build tag value too large (max 256 bytes).")
);
Expand Down
25 changes: 25 additions & 0 deletions packages/api/actor/src/route/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ define_router! {
),
},

"actors" / "upgrade": {
POST: actors::upgrade_all(
query: GlobalQuery,
body: models::ActorUpgradeAllActorsRequest,
opt_auth: true,
rate_limit: {
buckets: [
{ count: 1_000, bucket: duration::minutes(1) },
],
},
),
},

"actors" / Uuid: {
GET: actors::get(
Expand All @@ -91,6 +103,19 @@ define_router! {
),
},

"actors" / Uuid / "upgrade": {
POST: actors::upgrade(
query: GlobalQuery,
body: models::ActorUpgradeActorRequest,
opt_auth: true,
rate_limit: {
buckets: [
{ count: 1_000, bucket: duration::minutes(1) },
],
},
),
},

"actors" / Uuid / "logs": {
GET: logs::get_logs(
query: logs::GetActorLogsQuery,
Expand Down
3 changes: 2 additions & 1 deletion packages/services/ds/src/ops/server/list_for_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct Input {
pub tags: HashMap<String, String>,
pub include_destroyed: bool,
pub cursor: Option<Uuid>,
pub limit: usize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -44,7 +45,7 @@ pub async fn ds_server_list_for_env(ctx: &OperationCtx, input: &Input) -> Global
input.include_destroyed,
input.cursor,
// TODO: Add pagination when OpenGB lobbies no longer uses polling RVTEE-492
if input.include_destroyed { 64 } else { 10_000 },
i64::try_from(input.limit)?,
)
.await?
.into_iter()
Expand Down
22 changes: 22 additions & 0 deletions sdks/api/fern/definition/actor/__package__.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions sdks/api/full/go/actor/actor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c2558d4

Please sign in to comment.