From fca03c494deb0c72193d6bc2a865266f0ee9ecf1 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Tue, 3 Sep 2024 17:12:18 -0700 Subject: [PATCH] feat: add process manager for game server & opengb dev server --- packages/toolchain/src/backend/mod.rs | 4 +- packages/toolchain/src/config/meta.rs | 10 + packages/toolchain/src/game_server.rs | 3 + packages/toolchain/src/lib.rs | 1 + .../{backend_dev.rs => backend_start.rs} | 44 ++-- packages/toolchain/src/tasks/backend_stop.rs | 26 +++ .../toolchain/src/tasks/game_server_start.rs | 35 +++ .../toolchain/src/tasks/game_server_stop.rs | 26 +++ packages/toolchain/src/tasks/mod.rs | 11 +- packages/toolchain/src/util/mod.rs | 1 + .../toolchain/src/util/process_manager.rs | 214 ++++++++++++++++++ 11 files changed, 358 insertions(+), 17 deletions(-) create mode 100644 packages/toolchain/src/game_server.rs rename packages/toolchain/src/tasks/{backend_dev.rs => backend_start.rs} (53%) create mode 100644 packages/toolchain/src/tasks/backend_stop.rs create mode 100644 packages/toolchain/src/tasks/game_server_start.rs create mode 100644 packages/toolchain/src/tasks/game_server_stop.rs create mode 100644 packages/toolchain/src/util/process_manager.rs diff --git a/packages/toolchain/src/backend/mod.rs b/packages/toolchain/src/backend/mod.rs index 47940a4c..f79aa44e 100644 --- a/packages/toolchain/src/backend/mod.rs +++ b/packages/toolchain/src/backend/mod.rs @@ -11,10 +11,12 @@ use uuid::Uuid; use crate::{ config, paths, - util::{cmd::shell_cmd, task}, + util::{cmd::shell_cmd, process_manager::ProcessManagerConfig, task}, ToolchainCtx, }; +pub const PROCESS_MANAGER_DEV: ProcessManagerConfig = ProcessManagerConfig { key: "backend_dev" }; + pub struct BackendCommandOpts { pub command: &'static str, pub opts: serde_json::Value, diff --git a/packages/toolchain/src/config/meta.rs b/packages/toolchain/src/config/meta.rs index c3fec74b..6fa144be 100644 --- a/packages/toolchain/src/config/meta.rs +++ b/packages/toolchain/src/config/meta.rs @@ -23,6 +23,9 @@ pub struct ProjectMeta { pub cluster: Cluster, pub tokens: Tokens, pub environments: HashMap, + + /// Stores the state for all of the process managers. + pub process_managers: HashMap, } impl ProjectMeta { @@ -31,6 +34,7 @@ impl ProjectMeta { cluster: Cluster { api_endpoint }, tokens: Tokens { cloud: cloud_token }, environments: HashMap::new(), + process_managers: HashMap::new(), } } } @@ -58,6 +62,12 @@ pub struct Backend { pub db_url: Option, } +#[derive(Default, Clone, Serialize, Deserialize)] +pub struct ProcessManagerState { + /// PID of the currently running process. If none, no process is running. + pub pid: Option, +} + static SINGLETON: OnceCell> = OnceCell::const_new(); /// Gets the global config instance. diff --git a/packages/toolchain/src/game_server.rs b/packages/toolchain/src/game_server.rs new file mode 100644 index 00000000..32fc3c74 --- /dev/null +++ b/packages/toolchain/src/game_server.rs @@ -0,0 +1,3 @@ +use crate::util::process_manager::ProcessManagerConfig; + +pub const PROCESS_MANAGER: ProcessManagerConfig = ProcessManagerConfig { key: "game_server" }; diff --git a/packages/toolchain/src/lib.rs b/packages/toolchain/src/lib.rs index 4d756b2a..487120dc 100644 --- a/packages/toolchain/src/lib.rs +++ b/packages/toolchain/src/lib.rs @@ -1,6 +1,7 @@ pub mod backend; pub mod config; pub mod game; +pub mod game_server; pub mod paths; pub mod tasks; pub mod toolchain_ctx; diff --git a/packages/toolchain/src/tasks/backend_dev.rs b/packages/toolchain/src/tasks/backend_start.rs similarity index 53% rename from packages/toolchain/src/tasks/backend_dev.rs rename to packages/toolchain/src/tasks/backend_start.rs index 484581f6..97cf4483 100644 --- a/packages/toolchain/src/tasks/backend_dev.rs +++ b/packages/toolchain/src/tasks/backend_start.rs @@ -21,7 +21,7 @@ impl task::Task for Task { type Output = Output; fn name() -> &'static str { - "backend_dev" + "backend_start" } async fn run(task: task::TaskCtx, input: Self::Input) -> Result { @@ -34,18 +34,36 @@ impl task::Task for Task { cmd_env.insert("OPENGB_PORT".into(), input.port.to_string()); cmd_env.insert("OPENGB_HOSTNAME".into(), "0.0.0.0".to_string()); cmd_env.insert("OPENGB_TERM_COLOR".into(), "never".into()); - let exit_code = backend::run_opengb_command_from_task( - task.clone(), - backend::BackendCommandOpts { - command: "dev", - opts: serde_json::json!({ - "project": config_path, - "nonInteractive": true - }), - env: cmd_env, - }, - ) - .await?; + // let exit_code = backend::run_opengb_command_from_task( + // task.clone(), + // backend::BackendCommandOpts { + // command: "dev", + // opts: serde_json::json!({ + // "project": config_path, + // "nonInteractive": true + // }), + // env: cmd_env, + // }, + // ) + // .await?; + + // let cmd = build_opengb_command(opts).await?; + let exit_code = backend::PROCESS_MANAGER_DEV + .start(task.clone(), todo!(), todo!(), todo!()) + .await?; + + // let exit_code = backend::run_opengb_command( + // task.clone(), + // backend::BackendCommandOpts { + // config_path, + // args: vec!["dev".into(), "--non-interactive".into()], + // env: cmd_env, + // cwd: input.cwd.into(), + // ports: vec![(input.port, input.port)], + // enable_postgres: true, + // }, + // ) + // .await?; Ok(Output { exit_code }) } diff --git a/packages/toolchain/src/tasks/backend_stop.rs b/packages/toolchain/src/tasks/backend_stop.rs new file mode 100644 index 00000000..4c70ddcb --- /dev/null +++ b/packages/toolchain/src/tasks/backend_stop.rs @@ -0,0 +1,26 @@ +use anyhow::*; +use serde::{Deserialize, Serialize}; + +use crate::{backend, config, util::task}; + +#[derive(Deserialize)] +pub struct Input {} + +#[derive(Serialize)] +pub struct Output {} + +pub struct Task; + +impl task::Task for Task { + type Input = Input; + type Output = Output; + + fn name() -> &'static str { + "backend_stop" + } + + async fn run(task: task::TaskCtx, input: Self::Input) -> Result { + backend::PROCESS_MANAGER_DEV.stop().await?; + Ok(Output {}) + } +} diff --git a/packages/toolchain/src/tasks/game_server_start.rs b/packages/toolchain/src/tasks/game_server_start.rs new file mode 100644 index 00000000..c4e874e1 --- /dev/null +++ b/packages/toolchain/src/tasks/game_server_start.rs @@ -0,0 +1,35 @@ +use anyhow::*; +use serde::{Deserialize, Serialize}; + +use crate::util::task; + +#[derive(Deserialize)] +pub struct Input { + pub cmd: String, + pub args: Vec, + pub cwd: String, +} + +#[derive(Serialize)] +pub struct Output { + exit_code: i32, +} + +pub struct Task; + +impl task::Task for Task { + type Input = Input; + type Output = Output; + + fn name() -> &'static str { + "game_server_start" + } + + async fn run(task: task::TaskCtx, input: Self::Input) -> Result { + // TODO: Add abiality to pipe logs + let exit_code = crate::game_server::PROCESS_MANAGER + .start(task.clone(), input.cmd, input.args, input.cwd) + .await?; + Ok(Output { exit_code }) + } +} diff --git a/packages/toolchain/src/tasks/game_server_stop.rs b/packages/toolchain/src/tasks/game_server_stop.rs new file mode 100644 index 00000000..056ea201 --- /dev/null +++ b/packages/toolchain/src/tasks/game_server_stop.rs @@ -0,0 +1,26 @@ +use anyhow::*; +use serde::{Deserialize, Serialize}; + +use crate::util::task; + +#[derive(Deserialize)] +pub struct Input {} + +#[derive(Serialize)] +pub struct Output {} + +pub struct Task; + +impl task::Task for Task { + type Input = Input; + type Output = Output; + + fn name() -> &'static str { + "game_server_stop" + } + + async fn run(task: task::TaskCtx, input: Self::Input) -> Result { + crate::game_server::PROCESS_MANAGER.stop().await?; + Ok(Output {}) + } +} diff --git a/packages/toolchain/src/tasks/mod.rs b/packages/toolchain/src/tasks/mod.rs index d2902685..7977c55b 100644 --- a/packages/toolchain/src/tasks/mod.rs +++ b/packages/toolchain/src/tasks/mod.rs @@ -1,10 +1,13 @@ pub mod backend_choose_local_port; -pub mod backend_dev; pub mod backend_sdk_gen; +pub mod backend_start; +pub mod backend_stop; pub mod check_login_state; pub mod check_system_requirements; pub mod deploy; pub mod exec_command; +pub mod game_server_start; +pub mod game_server_stop; pub mod get_bootstrap_data; pub mod get_hub_link; pub mod get_settings_paths; @@ -16,18 +19,20 @@ pub mod wait_for_login; crate::task_registry!( backend_choose_local_port::Task, - backend_dev::Task, backend_sdk_gen::Task, + backend_start::Task, + backend_stop::Task, check_login_state::Task, check_system_requirements::Task, deploy::Task, exec_command::Task, + game_server_start::Task, get_bootstrap_data::Task, get_hub_link::Task, + get_settings_paths::Task, open::Task, show_term::Task, start_device_link::Task, unlink::Task, wait_for_login::Task, - get_settings_paths::Task, ); diff --git a/packages/toolchain/src/util/mod.rs b/packages/toolchain/src/util/mod.rs index 385704f9..88e8f27a 100644 --- a/packages/toolchain/src/util/mod.rs +++ b/packages/toolchain/src/util/mod.rs @@ -4,6 +4,7 @@ pub mod docker; pub mod lz4; pub mod net; pub mod os; +pub mod process_manager; pub mod show_term; pub mod task; pub mod term; diff --git a/packages/toolchain/src/util/process_manager.rs b/packages/toolchain/src/util/process_manager.rs new file mode 100644 index 00000000..02b46333 --- /dev/null +++ b/packages/toolchain/src/util/process_manager.rs @@ -0,0 +1,214 @@ +use anyhow::*; +use std::process::{Command, Stdio}; + +use crate::{config, util::task::TaskCtx}; + +pub struct ProcessManagerConfig { + pub key: &'static str, +} + +impl ProcessManagerConfig { + pub async fn start( + &self, + task: TaskCtx, + command: String, + args: Vec, + current_dir: String, + ) -> Result { + // Check if existing process exists + // + // Preserving the PID in settings serves a few purposes: + // - Some game engines like Unity frequently restart the plugin, so the + // processes need to run independently + // - Game server processes often hog a port, so we need to kill the + // previous process to ensure the port is free + let pid = if let Some(pid) = self.read_meta(|meta| meta.pid).await? { + // Check if PID exists + if is_pid_running(pid).await? { + Some(pid) + } else { + None + } + } else { + None + }; + + // If PID does not exist, spawn a new process + let pid = if let Some(pid) = pid { + pid + } else { + // Spawn process + // TODO: Somehow write output to filesystem + // TODO: Make sure it handles signals correctly + let child = tokio::process::Command::new(command) + .args(args) + .current_dir(current_dir) + .spawn()?; + + // TODO: pipe logs to context + + // TODO: Check for race condition where pid already set + // TODO: Get PID + let pid = 0; + self.mutate_meta(|meta| meta.pid = Some(pid)).await?; + + // TODO: Exit code + 0 + }; + + // TODO: poll if PID is running + + // TODO: If no PID, read output file + + // TODO: use waitpid + + Ok(exit_code) + } + + pub async fn stop(&self) -> Result<()> { + if let Some(pid) = self.mutate_meta(|meta| meta.pid.take()).await? { + // TODO: kill pid + } + + Ok(()) + } + + pub async fn is_running(&self) -> Result { + self.read_meta(|meta| meta.pid.is_some()).await + } + + async fn read_meta T, T>( + &self, + cb: F, + ) -> Result { + config::meta::mutate_project(|meta| { + cb(meta + .process_managers + .entry(self.key.to_string()) + .or_default()) + }) + .await + } + + async fn mutate_meta T, T>( + &self, + cb: F, + ) -> Result { + config::meta::mutate_project(|meta| { + cb(meta + .process_managers + .entry(self.key.to_string()) + .or_default()) + }) + .await + } +} + +/// Checks if a PID is running in a cross-platform way. +async fn is_pid_running(pid: u16) -> Result { + tokio::task::block_in_place(move || { + #[cfg(unix)] + { + use nix::errno::Errno; + use nix::sys::signal::kill; + use nix::unistd::Pid; + + match kill(Pid::from_raw(pid as i32), None) { + Result::Ok(_) => Ok(true), // Process exists + Err(Errno::ESRCH) => Ok(false), // No process + Err(Errno::EPERM) => Ok(true), // Process exists but we don't have permission to send a signal + Err(e) => { + bail!("unexpected error when checking process existence: {}", e) + } + } + } + + #[cfg(windows)] + { + use windows::Win32::Foundation::{CloseHandle, HANDLE}; + use windows::Win32::System::Threading::{ + OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION, + }; + + unsafe { + let handle: HANDLE = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, false, pid); + if handle.is_invalid() { + Ok(false) + } else { + CloseHandle(handle); + Ok(true) + } + } + } + }) +} + +fn spawn_orphan_process(program: &str, args: &[&str]) -> Result<()> { + // Double-fork + exec in order to daemonize the process. + #[cfg(target_family = "unix")] + { + use nix::{ + sys::wait::{waitpid, WaitStatus}, + unistd::{fork, ForkResult}, + }; + use std::os::unix::process::CommandExt; + + match unsafe { fork() }.context("daemon first fork failed")? { + ForkResult::Parent { child } => { + // Ensure that the child process spawned successfully + match waitpid(child, None).context("waitpid failed")? { + WaitStatus::Exited(_, 0) => Ok(()), + WaitStatus::Exited(_, status) => { + bail!("Child process exited with status {}", status) + } + _ => bail!("Unexpected wait status for child process"), + } + } + ForkResult::Child => { + // Child process + match unsafe { fork() } { + Result::Ok(ForkResult::Parent { .. }) => { + // Exit the intermediate child + std::process::exit(0); + } + Result::Ok(ForkResult::Child) => { + // Exit immediately on fail in order to not leak process + let err = Command::new(program) + .args(args) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .exec(); + eprintln!("exec failed: {err:?}"); + std::process::exit(1); + } + Err(err) => { + // Exit immediately in order to not leak child process. + // + // The first fork doesn't need to exit on error since it + eprintln!("daemon second fork failed: {err:?}"); + std::process::exit(1); + } + } + } + } + } + + #[cfg(target_os = "windows")] + { + use std::os::windows::process::CommandExt; + + // Windows implementation remains the same + Command::new(program) + .args(args) + .creation_flags( + winapi::um::winbase::CREATE_NEW_PROCESS_GROUP + | winapi::um::winbase::DETACHED_PROCESS, + ) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn()?; + Ok(()) + } +}