diff --git a/packages/common/chirp-workflow/core/src/utils.rs b/packages/common/chirp-workflow/core/src/utils.rs index c2a7763e16..0ba16c2191 100644 --- a/packages/common/chirp-workflow/core/src/utils.rs +++ b/packages/common/chirp-workflow/core/src/utils.rs @@ -50,12 +50,11 @@ pub mod time { impl TsToMillis for Instant { fn to_millis(self) -> GlobalResult { let now_instant = Instant::now(); - let now_system_time = SystemTime::now(); let system_time = if self >= now_instant { - now_system_time.checked_add(self.duration_since(now_instant)) + SystemTime::now().checked_add(self.duration_since(now_instant)) } else { - now_system_time.checked_sub(now_instant.duration_since(self)) + SystemTime::now().checked_sub(now_instant.duration_since(self)) }; let ms = unwrap!(system_time, "invalid timestamp") @@ -67,6 +66,12 @@ pub mod time { } } + impl TsToMillis for tokio::time::Instant { + fn to_millis(self) -> GlobalResult { + self.into_std().to_millis() + } + } + impl TsToMillis for SystemTime { fn to_millis(self) -> GlobalResult { let ms = self diff --git a/packages/services/ds/db/servers/migrations/20241115184305_add_durable.up.sql b/packages/services/ds/db/servers/migrations/20241115184305_add_durable.up.sql index 27da2a3c83..9a48913bf0 100644 --- a/packages/services/ds/db/servers/migrations/20241115184305_add_durable.up.sql +++ b/packages/services/ds/db/servers/migrations/20241115184305_add_durable.up.sql @@ -1,3 +1,5 @@ ALTER TABLE servers RENAME COLUMN kill_timeout_ms TO lifecycle_kill_timeout_ms, - ADD COLUMN lifecycle_durable BOOLEAN DEFAULT false; + ADD COLUMN lifecycle_durable BOOLEAN NOT NULL DEFAULT false, + ADD COLUMN reschedule_retry_count INT NOT NULL DEFAULT 0, + ADD COLUMN last_reschedule_retry_ts INT; diff --git a/packages/services/ds/src/workflows/server/mod.rs b/packages/services/ds/src/workflows/server/mod.rs index 666ce63a37..604575b05c 100644 --- a/packages/services/ds/src/workflows/server/mod.rs +++ b/packages/services/ds/src/workflows/server/mod.rs @@ -542,7 +542,7 @@ struct GetServerMetaInput { image_id: Uuid, } -#[derive(Debug, Serialize, Deserialize, Hash)] +#[derive(Clone, Debug, Serialize, Deserialize, Hash)] struct GetServerMetaOutput { project_id: Uuid, project_slug: String, @@ -582,7 +582,8 @@ async fn get_server_meta( let project_id = unwrap!(env.game_id).as_uuid(); let projects_res = op!([ctx] game_get { game_ids: vec![project_id.into()], - }).await?; + }) + .await?; let project = unwrap!(projects_res.games.first()); Ok(GetServerMetaOutput { @@ -646,6 +647,37 @@ async fn update_image(ctx: &ActivityCtx, input: &UpdateImageInput) -> GlobalResu Ok(()) } +#[derive(Debug, Serialize, Deserialize, Hash)] +struct UpdateRescheduleRetryInput { + server_id: Uuid, + reset: bool, +} + +#[activity(UpdateRescheduleRetry)] +async fn update_reschedule_retry( + ctx: &ActivityCtx, + input: &UpdateRescheduleRetryInput, +) -> GlobalResult { + let (retry_count,) = sql_fetch_one!( + [ctx, (i64,)] + " + UPDATE db_ds.servers + SET + reschedule_retry_count = COALESCE($2, reschedule_retry_count + 1), + last_reschedule_retry_ts = COALESCE($3, last_reschedule_retry_ts) + WHERE server_id = $1 + -- Return value before update + RETURNING reschedule_retry_count - 1 + ", + input.server_id, + input.reset.then_some(0), + (!input.reset).then(util::timestamp::now), + ) + .await?; + + Ok(retry_count) +} + #[message("ds_server_create_complete")] pub struct CreateComplete {} diff --git a/packages/services/ds/src/workflows/server/pegboard/mod.rs b/packages/services/ds/src/workflows/server/pegboard/mod.rs index b6bb8c4f4a..949e57c47d 100644 --- a/packages/services/ds/src/workflows/server/pegboard/mod.rs +++ b/packages/services/ds/src/workflows/server/pegboard/mod.rs @@ -10,8 +10,9 @@ use util::serde::AsHashableExt; use super::{ resolve_image_artifact_url, CreateComplete, Destroy, Drain, DrainState, Failed, - GetServerMetaInput, InsertDbInput, Port, Ready, SetConnectableInput, UpdateImageInput, Upgrade, - UpgradeComplete, UpgradeStarted, DRAIN_PADDING_MS, TRAEFIK_GRACE_PERIOD, + GetServerMetaInput, GetServerMetaOutput, InsertDbInput, Port, Ready, SetConnectableInput, + UpdateImageInput, UpdateRescheduleRetryInput, Upgrade, UpgradeComplete, UpgradeStarted, + DRAIN_PADDING_MS, TRAEFIK_GRACE_PERIOD, }; use crate::types::{ GameGuardProtocol, HostProtocol, NetworkMode, Routing, ServerLifecycle, ServerResources, @@ -19,6 +20,9 @@ use crate::types::{ pub mod destroy; +/// Time to delay an actor from rescheduling after a rescheduling failure (no available dc's). +const BASE_RETRY_TIMEOUT_MS: usize = 2000; + #[derive(Serialize, Deserialize)] struct StateRes { signal_actor: bool, @@ -44,7 +48,7 @@ pub(crate) struct Input { #[workflow] pub(crate) async fn ds_server_pegboard(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { - let res = setup(ctx, input, true, None).await; + let res = setup(ctx, input, SetupCtx::Init).await; match ctx.catch_unrecoverable(res)? { Ok(_actor_id) => {} Err(err) => { @@ -75,18 +79,43 @@ pub(crate) async fn ds_server_pegboard(ctx: &mut WorkflowCtx, input: &Input) -> .send() .await?; - if let Some(sig) = wait_actor_ready(ctx, input.server_id).await? { - // Destroyed early - ctx.workflow(destroy::Input { - server_id: input.server_id, - override_kill_timeout_ms: sig.override_kill_timeout_ms, - signal_actor: true, - }) - .output() - .await?; + let _client_id = match ctx.listen::().await? { + Init::ActorStateUpdate(sig) => match sig.state { + pp::ActorState::Allocated { client_id } => client_id, + pp::ActorState::FailedToAllocate => { + ctx.msg(Failed { + message: "Failed to allocate (no availability).".into(), + }) + .tag("server_id", input.server_id) + .send() + .await?; - return Ok(()); - } + ctx.workflow(destroy::Input { + server_id: input.server_id, + override_kill_timeout_ms: None, + signal_actor: false, + }) + .output() + .await?; + + bail!("failed to allocate actor"); + } + state => bail!("unexpected actor state: {state:?}"), + }, + Init::Destroy(sig) => { + tracing::debug!("destroying before actor start"); + + ctx.workflow(destroy::Input { + server_id: input.server_id, + override_kill_timeout_ms: sig.override_kill_timeout_ms, + signal_actor: true, + }) + .output() + .await?; + + return Ok(()); + } + }; let state_res = ctx .repeat(|ctx| { @@ -102,14 +131,21 @@ pub(crate) async fn ds_server_pegboard(ctx: &mut WorkflowCtx, input: &Input) -> .await?; } pp::ActorState::Running { ports, .. } => { - ctx.activity(UpdatePortsInput { - server_id: input.server_id, - datacenter_id: input.datacenter_id, - ports, - }) + ctx.join(( + activity(UpdatePortsInput { + server_id: input.server_id, + datacenter_id: input.datacenter_id, + ports, + }), + // Reset retry count only after the actor becomes running + activity(UpdateRescheduleRetryInput { + server_id: input.server_id, + reset: true, + }), + )) .await?; - // Wait for Traefik to be ready + // Wait for Traefik to poll ports and update GG ctx.sleep(TRAEFIK_GRACE_PERIOD).await?; let updated = ctx @@ -276,37 +312,58 @@ pub(crate) async fn ds_server_pegboard(ctx: &mut WorkflowCtx, input: &Input) -> Ok(()) } +enum SetupCtx { + Init, + Reschedule { new_image_id: Option }, +} + +#[derive(Clone)] +struct ActorSetupCtx { + actor_id: Uuid, + server_meta: GetServerMetaOutput, + resources: pp::Resources, + image_artifact_url: String, +} + async fn setup( ctx: &mut WorkflowCtx, input: &Input, - insert_db: bool, - new_image_id: Option, -) -> GlobalResult { - if insert_db { - ctx.activity(InsertDbInput { - server_id: input.server_id, - env_id: input.env_id, - datacenter_id: input.datacenter_id, - cluster_id: input.cluster_id, - tags: input.tags.as_hashable(), - resources: input.resources.clone(), - lifecycle: input.lifecycle.clone(), - image_id: input.image_id, - args: input.args.clone(), - network_mode: input.network_mode, - environment: input.environment.as_hashable(), - network_ports: input.network_ports.as_hashable(), - }) - .await?; - } else if let Some(image_id) = new_image_id { - ctx.activity(UpdateImageInput { - server_id: input.server_id, - image_id, - }) - .await?; - } + setup: SetupCtx, +) -> GlobalResult { + let image_id = match &setup { + SetupCtx::Init => { + ctx.activity(InsertDbInput { + server_id: input.server_id, + env_id: input.env_id, + datacenter_id: input.datacenter_id, + cluster_id: input.cluster_id, + tags: input.tags.as_hashable(), + resources: input.resources.clone(), + lifecycle: input.lifecycle.clone(), + image_id: input.image_id, + args: input.args.clone(), + network_mode: input.network_mode, + environment: input.environment.as_hashable(), + network_ports: input.network_ports.as_hashable(), + }) + .await?; + + input.image_id + } + SetupCtx::Reschedule { new_image_id } => { + if let Some(image_id) = *new_image_id { + ctx.activity(UpdateImageInput { + server_id: input.server_id, + image_id, + }) + .await?; - let image_id = new_image_id.unwrap_or(input.image_id); + image_id + } else { + input.image_id + } + } + }; let server_meta = ctx .activity(GetServerMetaInput { @@ -330,98 +387,25 @@ async fn setup( image_id, server_id: input.server_id, build_upload_id: server_meta.build_upload_id, - build_file_name: server_meta.build_file_name, + build_file_name: server_meta.build_file_name.clone(), dc_build_delivery_method: server_meta.dc_build_delivery_method, }), )) .await?; - ctx.signal(pp::Command::StartActor { + let actor_setup = ActorSetupCtx { actor_id, - config: Box::new(pp::ActorConfig { - image: pp::Image { - artifact_url: image_artifact_url, - kind: match server_meta.build_kind { - BuildKind::DockerImage => pp::ImageKind::DockerImage, - BuildKind::OciBundle => pp::ImageKind::OciBundle, - BuildKind::JavaScript => pp::ImageKind::JavaScript, - }, - compression: match server_meta.build_compression { - BuildCompression::None => pp::ImageCompression::None, - BuildCompression::Lz4 => pp::ImageCompression::Lz4, - }, - }, - root_user_enabled: input.root_user_enabled, - env: input.environment.as_hashable(), - ports: input - .network_ports - .iter() - .map(|(port_label, port)| match port.routing { - Routing::GameGuard { protocol, .. } => ( - crate::util::pegboard_normalize_port_label(port_label), - pp::Port { - target: port.internal_port, - protocol: match protocol { - GameGuardProtocol::Http - | GameGuardProtocol::Https - | GameGuardProtocol::Tcp - | GameGuardProtocol::TcpTls => pp::TransportProtocol::Tcp, - GameGuardProtocol::Udp => pp::TransportProtocol::Udp, - }, - routing: pp::PortRouting::GameGuard, - }, - ), - Routing::Host { protocol } => ( - crate::util::pegboard_normalize_port_label(port_label), - pp::Port { - target: port.internal_port, - protocol: match protocol { - HostProtocol::Tcp => pp::TransportProtocol::Tcp, - HostProtocol::Udp => pp::TransportProtocol::Udp, - }, - routing: pp::PortRouting::Host, - }, - ), - }) - .collect(), - network_mode: match input.network_mode { - NetworkMode::Bridge => pp::NetworkMode::Bridge, - NetworkMode::Host => pp::NetworkMode::Host, - }, - resources, - owner: pp::ActorOwner::DynamicServer { - server_id: input.server_id, - }, - metadata: util::serde::Raw::new(&pp::ActorMetadata { - tags: input.tags.as_hashable(), - // Represents when the pegboard actor was created, not the ds workflow. - create_ts: ctx.ts(), - project: pp::ActorMetadataProject { - project_id: server_meta.project_id, - slug: server_meta.project_slug, - }, - environment: pp::ActorMetadataEnvironment { - env_id: input.env_id, - slug: server_meta.env_slug, - }, - datacenter: pp::ActorMetadataDatacenter { - name_id: server_meta.dc_name_id, - display_name: server_meta.dc_display_name, - }, - cluster: pp::ActorMetadataCluster { - cluster_id: input.cluster_id, - }, - build: pp::ActorMetadataBuild { - build_id: input.image_id, - }, - })?, - }), - }) - .tag("datacenter_id", input.datacenter_id) - .send() - .await?; + server_meta, + resources, + image_artifact_url, + }; - Ok(actor_id) + // Rescheduling handles spawning the actor manually + if let SetupCtx::Init = setup { + spawn_actor(ctx, input, &actor_setup).await?; + } + + Ok(actor_setup) } #[derive(Debug, Serialize, Deserialize, Hash)] @@ -496,39 +480,97 @@ async fn select_resources( }) } -/// Returns the destroy signal if the dynamic server was destroyed. -async fn wait_actor_ready(ctx: &mut WorkflowCtx, server_id: Uuid) -> GlobalResult> { - let _client_id = match ctx.listen::().await? { - Init::ActorStateUpdate(sig) => match sig.state { - pp::ActorState::Allocated { client_id } => client_id, - pp::ActorState::FailedToAllocate => { - ctx.msg(Failed { - message: "Failed to allocate (no availability).".into(), - }) - .tag("server_id", server_id) - .send() - .await?; - - ctx.workflow(destroy::Input { - server_id, - override_kill_timeout_ms: None, - signal_actor: false, +async fn spawn_actor( + ctx: &mut WorkflowCtx, + input: &Input, + actor_setup: &ActorSetupCtx, +) -> GlobalResult<()> { + ctx.signal(pp::Command::StartActor { + actor_id: actor_setup.actor_id, + config: Box::new(pp::ActorConfig { + image: pp::Image { + artifact_url: actor_setup.image_artifact_url.clone(), + kind: match actor_setup.server_meta.build_kind { + BuildKind::DockerImage => pp::ImageKind::DockerImage, + BuildKind::OciBundle => pp::ImageKind::OciBundle, + BuildKind::JavaScript => pp::ImageKind::JavaScript, + }, + compression: match actor_setup.server_meta.build_compression { + BuildCompression::None => pp::ImageCompression::None, + BuildCompression::Lz4 => pp::ImageCompression::Lz4, + }, + }, + root_user_enabled: input.root_user_enabled, + env: input.environment.as_hashable(), + ports: input + .network_ports + .iter() + .map(|(port_label, port)| match port.routing { + Routing::GameGuard { protocol, .. } => ( + crate::util::pegboard_normalize_port_label(port_label), + pp::Port { + target: port.internal_port, + protocol: match protocol { + GameGuardProtocol::Http + | GameGuardProtocol::Https + | GameGuardProtocol::Tcp + | GameGuardProtocol::TcpTls => pp::TransportProtocol::Tcp, + GameGuardProtocol::Udp => pp::TransportProtocol::Udp, + }, + routing: pp::PortRouting::GameGuard, + }, + ), + Routing::Host { protocol } => ( + crate::util::pegboard_normalize_port_label(port_label), + pp::Port { + target: port.internal_port, + protocol: match protocol { + HostProtocol::Tcp => pp::TransportProtocol::Tcp, + HostProtocol::Udp => pp::TransportProtocol::Udp, + }, + routing: pp::PortRouting::Host, + }, + ), }) - .output() - .await?; - - bail!("failed to allocate actor"); - } - state => bail!("unexpected actor state: {state:?}"), - }, - Init::Destroy(sig) => { - tracing::debug!("destroying before actor start"); - - return Ok(Some(sig)); - } - }; + .collect(), + network_mode: match input.network_mode { + NetworkMode::Bridge => pp::NetworkMode::Bridge, + NetworkMode::Host => pp::NetworkMode::Host, + }, + resources: actor_setup.resources.clone(), + owner: pp::ActorOwner::DynamicServer { + server_id: input.server_id, + }, + metadata: util::serde::Raw::new(&pp::ActorMetadata { + tags: input.tags.as_hashable(), + // Represents when the pegboard actor was created, not the ds workflow. + create_ts: ctx.ts(), + project: pp::ActorMetadataProject { + project_id: actor_setup.server_meta.project_id, + slug: actor_setup.server_meta.project_slug.clone(), + }, + environment: pp::ActorMetadataEnvironment { + env_id: input.env_id, + slug: actor_setup.server_meta.env_slug.clone(), + }, + datacenter: pp::ActorMetadataDatacenter { + name_id: actor_setup.server_meta.dc_name_id.clone(), + display_name: actor_setup.server_meta.dc_display_name.clone(), + }, + cluster: pp::ActorMetadataCluster { + cluster_id: input.cluster_id, + }, + build: pp::ActorMetadataBuild { + build_id: input.image_id, + }, + })?, + }), + }) + .tag("datacenter_id", input.datacenter_id) + .send() + .await?; - Ok(None) + Ok(()) } #[derive(Debug, Serialize, Deserialize, Hash)] @@ -650,27 +692,58 @@ async fn reschedule_actor( }) .await?; - let res = setup(ctx, &input, false, new_image_id).await; - match ctx.catch_unrecoverable(res)? { - Ok(_actor_id) => {} - Err(err) => { - tracing::error!(?err, "unrecoverable reschedule"); + let actor_setup = setup(ctx, &input, SetupCtx::Reschedule { new_image_id }).await?; - ctx.workflow(destroy::Input { - server_id: input.server_id, - override_kill_timeout_ms: None, - signal_actor: false, - }) - .output() - .await?; + // Waits for the actor to be ready (or destroyed) and automatically retries if failed to allocate. + ctx.repeat(|ctx| { + let input = input.clone(); + let actor_setup = actor_setup.clone(); - // Throw the original error from the setup activities - return Err(err); - } - }; + async move { + // Get and increment retry count + let retry_count = ctx + .activity(UpdateRescheduleRetryInput { + server_id: input.server_id, + reset: false, + }) + .await?; - // Wait for new actor to be ready - wait_actor_ready(ctx, input.server_id).await + // Don't sleep for first retry + if retry_count > 0 { + // Determine next backoff sleep duration + let mut backoff = rivet_util::Backoff::new_at( + 8, + None, + BASE_RETRY_TIMEOUT_MS, + 500, + (retry_count - 1).try_into()?, + ); + let next = backoff.step().expect("should not have max retry"); + + // Sleep for backoff + ctx.sleep_until(next).await?; + } + + spawn_actor(ctx, &input, &actor_setup).await?; + + match ctx.listen::().await? { + Init::ActorStateUpdate(sig) => match sig.state { + pp::ActorState::Allocated { + client_id: _client_id, + } => return Ok(Loop::Break(None)), + pp::ActorState::FailedToAllocate => return Ok(Loop::Continue), + state => bail!("unexpected actor state: {state:?}"), + }, + Init::Destroy(sig) => { + tracing::debug!("destroying before actor start"); + + return Ok(Loop::Break(Some(sig))); + } + }; + } + .boxed() + }) + .await } #[derive(Debug, Serialize, Deserialize, Hash)]