From f6fc52d43acb35f1e7162285173fa034bb158d2e Mon Sep 17 00:00:00 2001 From: NathanFlurry Date: Wed, 18 Sep 2024 06:23:17 +0000 Subject: [PATCH] fix(toolchain): handle progress manager signals correctly on windows (#430) --- Cargo.lock | 2 +- packages/process-runner/Cargo.toml | 1 + packages/process-runner/src/main.rs | 65 +-- packages/toolchain/Cargo.toml | 3 +- .../toolchain/src/util/process_manager.rs | 413 ++++++++++-------- packages/toolchain/tests/process_manager.rs | 207 +++------ 6 files changed, 328 insertions(+), 363 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1cb2d19..24e74b97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1912,6 +1912,7 @@ dependencies = [ name = "rivet-process-runner" version = "0.1.0" dependencies = [ + "ctrlc", "lazy_static", "nix 0.29.0", "rivet-process-runner-shared", @@ -1944,7 +1945,6 @@ dependencies = [ "config", "console", "const_format", - "ctrlc", "dirs", "fs_extra", "futures-util", diff --git a/packages/process-runner/Cargo.toml b/packages/process-runner/Cargo.toml index fb03ddd9..e9c5d9ef 100644 --- a/packages/process-runner/Cargo.toml +++ b/packages/process-runner/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" name = "rivet-process-runner" [dependencies] +ctrlc = { version = "3.4.5", features = ["termination"] } lazy_static = "1.5.0" rivet-process-runner-shared = { path = "../process-runner-shared" } signal-hook = { version = "0.3.17", features = ["iterator"], default-features = false } diff --git a/packages/process-runner/src/main.rs b/packages/process-runner/src/main.rs index a7927350..030fc09d 100644 --- a/packages/process-runner/src/main.rs +++ b/packages/process-runner/src/main.rs @@ -66,8 +66,10 @@ fn run_process( command: &str, command_args: &[String], ) -> Result { - // Set up signal handling - setup_signal_handling()?; + // Listen for SIGTERM (Unix) and SIGBREAK (Windows) + ctrlc::set_handler(move || { + HAS_RECEIVED_SIGTERM.store(true, Ordering::Relaxed); + }).unwrap(); // Assert that the data directory exists if !Path::new(data_dir).is_dir() { @@ -95,8 +97,11 @@ fn run_process( #[cfg(target_os = "windows")] { use std::os::windows::process::CommandExt; - use windows::Win32::System::Threading::CREATE_NO_WINDOW; - cmd.creation_flags(CREATE_NO_WINDOW.0); + + // CREATE_NEW_PROCESS_GROUP detaches from this process. This only + // accepts CTRL_BREAK. + use windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP; + cmd.creation_flags(CREATE_NEW_PROCESS_GROUP.0); } let mut child = cmd.spawn().map_err(ManagerError::CommandExecutionError)?; @@ -114,12 +119,17 @@ fn run_process( "", )?; terminate_child(&mut child)?; - } - match child.try_wait() { - Ok(Some(status)) => break status.code().unwrap_or(1), - Ok(None) => {} - Err(e) => return Err(ManagerError::CommandExecutionError(e)), + match child.wait() { + Ok(status) => break status.code(), + Err(e) => return Err(ManagerError::CommandExecutionError(e)), + } + } else { + match child.try_wait() { + Ok(Some(status)) => break status.code(), + Ok(None) => {} + Err(e) => return Err(ManagerError::CommandExecutionError(e)), + } } std::thread::sleep(Duration::from_millis(100)); @@ -127,9 +137,9 @@ fn run_process( // Write exit code to file let exit_code_path = Path::new(data_dir).join(shared::paths::CHILD_EXIT_CODE); - write_to_file(&exit_code_path, &exit_code.to_string())?; + write_to_file(&exit_code_path, &exit_code.map_or_else(||"unknown".to_string(), |x| x.to_string()))?; - Ok(exit_code) + Ok(exit_code.unwrap_or(1)) } /// Write & flush a string to a file. @@ -140,35 +150,6 @@ fn write_to_file(path: &Path, content: &str) -> Result<(), ManagerError> { Ok(()) } -#[cfg(unix)] -fn setup_signal_handling() -> Result<(), ManagerError> { - signal_hook::flag::register(signal_hook::consts::SIGTERM, HAS_RECEIVED_SIGTERM.clone()) - .map(|_| ()) - .map_err(ManagerError::RegisterSignalHookError) -} - -#[cfg(windows)] -fn setup_signal_handling() -> Result<(), ManagerError> { - use windows::Win32::Foundation::BOOL; - use windows::Win32::System::Console::SetConsoleCtrlHandler; - - unsafe { - if !SetConsoleCtrlHandler(Some(ctrl_handler), BOOL::from(true)).as_bool() { - return Err(ManagerError::RegisterSignalHookError(std::io::Error::new( - std::io::ErrorKind::Other, - "Failed to set console control handler", - ))); - } - } - - unsafe extern "system" fn ctrl_handler(_: u32) -> BOOL { - HAS_RECEIVED_SIGTERM.store(true, Ordering::Relaxed); - BOOL::from(true) - } - - Ok(()) -} - #[cfg(unix)] fn terminate_child(child: &mut Child) -> Result<(), ManagerError> { use nix::{ @@ -182,10 +163,10 @@ fn terminate_child(child: &mut Child) -> Result<(), ManagerError> { #[cfg(windows)] fn terminate_child(child: &mut Child) -> Result<(), ManagerError> { - use windows::Win32::System::Console::{GenerateConsoleCtrlEvent, CTRL_C_EVENT}; + use windows::Win32::System::Console::{GenerateConsoleCtrlEvent, CTRL_BREAK_EVENT}; unsafe { - if !GenerateConsoleCtrlEvent(CTRL_C_EVENT, child.id() as u32).as_bool() { + if !GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, child.id() as u32).as_bool() { return Err(ManagerError::SignalError( "Failed to generate console control event".to_string(), )); diff --git a/packages/toolchain/Cargo.toml b/packages/toolchain/Cargo.toml index fa764d66..2db2c04d 100644 --- a/packages/toolchain/Cargo.toml +++ b/packages/toolchain/Cargo.toml @@ -16,7 +16,6 @@ async-posthog = { git = "https://github.com/rivet-gg/posthog-rs.git", rev = "ef4 async-stream = "0.3.3" config = { git = "https://github.com/rivet-gg/config-rs", rev = "0f3c89b4770276e8db72ce962974a9a72c59c97a", default-features = false, features = ["json", "async"] } console = "0.15" -ctrlc = { version = "3.2", features = ["termination"] } dirs = "5.0" fs_extra = "1.2.0" futures-util = "0.3" @@ -66,7 +65,7 @@ nix = { version = "0.27", default-features = false, features = ["user", "signal" signal-hook = { version = "0.3.17", default-features = false } [target.'cfg(windows)'.dependencies] -windows = { version = "0.48", features = ["Win32_Foundation", "Win32_System_Threading"] } +windows = { version = "0.48", features = ["Win32_Foundation", "Win32_Security", "Win32_System_Diagnostics", "Win32_System_Diagnostics_ToolHelp", "Win32_System_Threading", "Win32_System_Console"] } [dev-dependencies] assert_cmd = "2.0" diff --git a/packages/toolchain/src/util/process_manager.rs b/packages/toolchain/src/util/process_manager.rs index fccd69c3..61775c0a 100644 --- a/packages/toolchain/src/util/process_manager.rs +++ b/packages/toolchain/src/util/process_manager.rs @@ -13,6 +13,14 @@ use uuid::Uuid; use crate::{config, util::task::TaskCtx}; +const LOG_POLL_INTERVAL: Duration = Duration::from_millis(100); + +#[cfg(unix)] +type PID = i32; + +#[cfg(not(unix))] +type PID = u32; + #[derive(Serialize, Deserialize, PartialEq, Eq)] pub enum StartMode { StartOrHook, @@ -96,10 +104,10 @@ impl ProcessManager { let process_data_dir = process_data_dir(process_id, &base_data_dir)?; std::fs::create_dir_all(&process_data_dir)?; - // Spawn orphan + // Spawn process let process_runner_path = rivet_process_runner_embed::get_executable(&base_data_dir)?; - spawn_orphaned_process( + spawn_process( process_runner_path, process_data_dir, &command_opts.current_dir, @@ -173,7 +181,9 @@ impl ProcessManager { return Ok(exit_code); } } - x => bail!("process state should be exited, got: {x:?}"), + x => { + bail!("process state should be exited, got: {x:?}") + } } } ProcessState::Exited { exit_code, error } => { @@ -250,7 +260,7 @@ enum ProcessState { Starting, /// Currently running. - Running { pid: i32 }, + Running { pid: PID }, /// Process exited. Exited { @@ -274,49 +284,67 @@ async fn get_process_state(process_id: Uuid, base_data_dir: &PathBuf) -> Result< let exit_code_path = process_data_dir.join(shared::paths::CHILD_EXIT_CODE); let error_path = process_data_dir.join(shared::paths::RUNNER_ERROR); let pid_path = process_data_dir.join(shared::paths::RUNNER_PID); - if exit_code_path.exists() { - let exit_code_str = tokio::fs::read_to_string(exit_code_path).await?; - let exit_code: i32 = exit_code_str.trim().parse()?; - Ok(ProcessState::Exited { - exit_code: Some(exit_code), - error: None, - }) - } else if error_path.exists() { + + if error_path.exists() { // Read the runner error let error = tokio::fs::read_to_string(error_path).await?; - Ok(ProcessState::Exited { - exit_code: None, - error: Some(error), - }) - } else if pid_path.exists() { + if error.is_empty() { + return Ok(ProcessState::Exited { + exit_code: None, + error: Some(error), + }); + } + } + + if exit_code_path.exists() { + let exit_code_str = tokio::fs::read_to_string(exit_code_path).await?; + if !exit_code_str.is_empty() { + if exit_code_str == "unknown" { + return Ok(ProcessState::Exited { + exit_code: None, + error: None, + }); + } else { + let exit_code: i32 = exit_code_str.trim().parse()?; + return Ok(ProcessState::Exited { + exit_code: Some(exit_code), + error: None, + }); + } + } + } + + if pid_path.exists() { // Read the PID from the file let pid_str = tokio::fs::read_to_string(pid_path).await?; - let pid: i32 = pid_str.trim().parse()?; - assert!(pid > 0); + if !pid_str.is_empty() { + let pid: PID = pid_str.trim().parse()?; + assert!(pid > 0); - if is_pid_running(pid).await? { - // Process is currently running - Ok(ProcessState::Running { pid }) - } else { - // Process did not successfully write exit code to file system. - // - // This happens when the process manager does not exit gracefully. For example, on a - // system restart or force kill process. - // - // There is a rare race condition when: - // - process_state started - // - attempts to read exit code - // - process crashes - // - arrives here and pid no longer is running - Ok(ProcessState::Exited { - exit_code: None, - error: None, - }) + if is_pid_running(pid)? { + // Process is currently running + return Ok(ProcessState::Running { pid }); + } else { + // Process did not successfully write exit code to file system. + // + // This happens when the process manager does not exit gracefully. For example, on a + // system restart or force kill process. + // + // There is a rare race condition when: + // - process_state started + // - attempts to read exit code + // - process crashes + // - arrives here and pid no longer is running + return Ok(ProcessState::Exited { + exit_code: None, + error: None, + }); + } } - } else { - // If process does not have a PID yet, it's starting - Ok(ProcessState::Starting) } + + // If process does not have a PID yet, it's starting + Ok(ProcessState::Starting) } /// Kills a process. @@ -353,72 +381,161 @@ async fn kill_process( match tokio::task::block_in_place(|| kill(Pid::from_raw(pid), Signal::SIGTERM)) { Result::Ok(_) => {} Err(Errno::ESRCH) => return Ok(false), - Err(e) => return Err(anyhow::anyhow!("Failed to send SIGTERM: {}", e)), + Err(e) => bail!("Failed to send SIGTERM: {}", e), } - // Poll for process exit - let start = Instant::now(); - while start.elapsed() < kill_grace { - sleep(Duration::from_millis(100)).await; - if !is_pid_running(pid).await? { - return Ok(true); - } - } + todo!("switch to wait for process"); + // // Poll for process exit + // let start = Instant::now(); + // while start.elapsed() < kill_grace { + // sleep(Duration::from_millis(100)).await; + // if !is_pid_running(pid).await? { + // return Ok(true); + // } + // } - // Send SIGKILL if process hasn't exited - match tokio::task::block_in_place(|| kill(Pid::from_raw(pid), Signal::SIGKILL)) { - Result::Ok(_) => Ok(true), - Err(Errno::ESRCH) => Ok(true), // Assume process was already killed by SIGTERM in race condition - Err(e) => Err(anyhow::anyhow!("Failed to send SIGKILL: {}", e)), - } + terminate_process_tree(pid); + // // TODO: Send to entire tree + // // Send SIGKILL if process hasn't exited + // match tokio::task::block_in_place(|| kill(Pid::from_raw(pid), Signal::SIGKILL)) { + // Result::Ok(_) => Ok(true), + // Err(Errno::ESRCH) => Ok(true), // Assume process was already killed by SIGTERM in race condition + // Err(e) => bail!("Failed to send SIGKILL: {}", e), + // } + + Ok(true) } #[cfg(windows)] { - use tokio::time::{sleep, Duration, Instant}; - use windows::Win32::Foundation::CloseHandle; - use windows::Win32::Foundation::HANDLE; - use windows::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE}; + use windows::Win32::{ + Foundation::{CloseHandle, HANDLE, WAIT_OBJECT_0, WAIT_TIMEOUT}, + System::{ + Console::{GenerateConsoleCtrlEvent, CTRL_BREAK_EVENT}, + Threading::{OpenProcess, WaitForSingleObject, PROCESS_SYNCHRONIZE}, + }, + }; unsafe { - // Open the process - let process_handle: HANDLE = OpenProcess(PROCESS_TERMINATE, false, pid as u32)?; - if process_handle.is_invalid() { - return Ok(false); - } - // Attempt to terminate the process gracefully - if TerminateProcess(process_handle, 0).as_bool() { - // Poll for process exit - let start = Instant::now(); - while start.elapsed() < kill_grace { - sleep(Duration::from_millis(100)).await; - if !is_pid_running(pid).await? { + if GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid as u32).as_bool() { + // Open the process + let process_handle: HANDLE = OpenProcess(PROCESS_SYNCHRONIZE, false, pid as u32)?; + if process_handle.is_invalid() { + return Ok(true); + } + + // Wait for process exit + match tokio::task::block_in_place(|| { + WaitForSingleObject(process_handle, kill_grace.as_millis() as u32) + }) { + WAIT_OBJECT_0 => { CloseHandle(process_handle); - return Ok(true); + Ok(true) + } + WAIT_TIMEOUT => { + CloseHandle(process_handle); + + // Process didn't exit within grace period, force terminate process & all children + terminate_process_tree(pid); + + // HACK: Sleep to allow process to finish terminating + tokio::time::sleep(Duration::from_secs(1)).await; + + Ok(true) + } + err => { + CloseHandle(process_handle); + bail!("WaitForSingleObject failed: {err:?}") } } + } else { + bail!("failed to terminate process") + } + } + } +} + +#[cfg(unix)] +fn terminate_process_tree(pid: PID) { + todo!() +} - // Force terminate if process hasn't exited - if TerminateProcess(process_handle, 1).as_bool() { - CloseHandle(process_handle); - Ok(true) +#[cfg(windows)] +fn terminate_process_tree(pid: PID) { + use windows::Win32::{ + Foundation::WAIT_OBJECT_0, + System::{ + Diagnostics::ToolHelp::{ + CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, + TH32CS_SNAPPROCESS, + }, + Threading::{ + OpenProcess, TerminateProcess, WaitForSingleObject, PROCESS_SYNCHRONIZE, + PROCESS_TERMINATE, + }, + }, + }; + + unsafe { + // Gather child PIDs + let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0).unwrap(); + let mut entry = PROCESSENTRY32::default(); + entry.dwSize = std::mem::size_of::() as u32; + let mut child_pids = Vec::new(); + if Process32First(snapshot, &mut entry).as_bool() { + loop { + if entry.th32ParentProcessID == pid { + // eprintln!("Child exec file: {}", String::from_utf8_lossy(&entry.szExeFile)); + child_pids.push(entry.th32ProcessID) + } + + if !Process32Next(snapshot, &mut entry).as_bool() { + break; + } + } + } + + // Kill this process before killing children in order to prevent the + // parent from doing anything more to the child processes + let handle = match OpenProcess(PROCESS_TERMINATE, false, pid) { + Result::Ok(handle) => { + if !handle.is_invalid() { + // Kill process + if !TerminateProcess(handle, 1).as_bool() { + eprintln!("failed to kill process"); + } else { + eprintln!("terminated process"); + } } else { - CloseHandle(process_handle); - Err(anyhow::anyhow!("Failed to terminate process")) + eprintln!("handle invalid: {pid}"); } + } + Err(_) => { + eprintln!("failed to open process, likely already stopped"); + } + }; + + loop { + if is_pid_running(pid).unwrap() { + eprintln!("pid still running: {pid}"); + std::thread::sleep(Duration::from_millis(500)); } else { - CloseHandle(process_handle); - Err(anyhow::anyhow!("Failed to initiate process termination")) + break; } } + + // Recursively kill child processes immediately + for pid in child_pids { + terminate_process_tree(pid); + } } } /// Checks if a PID is running in a cross-platform way. /// /// Should only be called by `process_state`. -async fn is_pid_running(pid: i32) -> Result { +fn is_pid_running(pid: PID) -> Result { tokio::task::block_in_place(move || { #[cfg(unix)] { @@ -438,9 +555,9 @@ async fn is_pid_running(pid: i32) -> Result { #[cfg(windows)] { - use windows::Win32::Foundation::{CloseHandle, HANDLE}; + use windows::Win32::Foundation::{CloseHandle, HANDLE, STILL_ACTIVE}; use windows::Win32::System::Threading::{ - OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION, + GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION, }; unsafe { @@ -451,11 +568,14 @@ async fn is_pid_running(pid: i32) -> Result { }; if handle.is_invalid() { - Ok(false) - } else { - CloseHandle(handle); - Ok(true) + return Ok(false); } + + let mut exit_code = 0u32; + let success = GetExitCodeProcess(handle, &mut exit_code as *mut u32); + CloseHandle(handle); + + return Ok(success.as_bool() && exit_code == STILL_ACTIVE.0 as u32); } } }) @@ -464,7 +584,7 @@ async fn is_pid_running(pid: i32) -> Result { /// Wait for a PID to exit. /// /// Should only be called by `ProcessManager::start`. -async fn wait_pid_exit(pid: i32) -> Result<()> { +async fn wait_pid_exit(pid: PID) -> Result<()> { // Wait for the process to exit in a cross-platform way #[cfg(unix)] { @@ -505,7 +625,7 @@ async fn wait_pid_exit(pid: i32) -> Result<()> { let handle = unsafe { match OpenProcess(PROCESS_SYNCHRONIZE, false, pid as u32) { Result::Ok(handle) => handle, - Err(_) => return Err(anyhow!("Failed to open process handle")), + Err(_) => bail!("Failed to open process handle"), } }; ensure!(!handle.is_invalid(), "failed to open process handle"); @@ -521,13 +641,14 @@ async fn wait_pid_exit(pid: i32) -> Result<()> { .await??; } + // HACK: Add grace period to allow logs to finish reading + tokio::time::sleep(LOG_POLL_INTERVAL + Duration::from_millis(50)).await; + Ok(()) } -/// Spawns an orphaned process. -/// -/// This allows us to run processes that will stay running even after the parent exits. -fn spawn_orphaned_process( +// TODO: Make this spawn orphans +fn spawn_process( process_runner_path: PathBuf, process_data_dir: PathBuf, current_dir: &str, @@ -539,95 +660,35 @@ fn spawn_orphaned_process( let mut runner_args = vec![process_data_dir.to_str().unwrap(), current_dir, program]; runner_args.extend(args.iter().map(|&s| s)); - #[cfg(target_family = "unix")] - { - // Spawn child - // - // Calling `.wait()` is required in order to remove zombie processes after complete - let mut child = Command::new(&process_runner_path) - .args(&runner_args) - .envs(envs.iter().cloned()) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn()?; - tokio::task::spawn_blocking(move || child.wait().expect("child.wait")); - - Ok(()) - - // // TODO: This works in unit tests, but doesn't play nice with forking an entire engine. - // We need to make this fork process runnerunner. - // use nix::{ - // sys::wait::{waitpid, WaitStatus}, - // unistd::{fork, ForkResult}, - // }; - // use std::os::unix::process::CommandExt; - // - // match unsafe { fork() }.context("process first fork failed")? { - // ForkResult::Parent { child } => { - // // Ensure that the child process spawned successfully - // match waitpid(child, None).context("waitpid failed")? { - // WaitStatus::Exited(_, 0) => Ok(()), - // WaitStatus::Exited(_, status) => { - // bail!("Child process exited with status {}", status) - // } - // _ => bail!("Unexpected wait status for child process"), - // } - // } - // ForkResult::Child => { - // // Child process - // match unsafe { fork() } { - // Result::Ok(ForkResult::Parent { .. }) => { - // // Exit the intermediate child - // std::process::exit(0); - // } - // Result::Ok(ForkResult::Child) => { - // // Exit immediately on fail in order to not leak process - // let err = Command::new(&process_runner_path) - // .args(&runner_args) - // .envs(envs.iter().cloned()) - // .stdin(Stdio::null()) - // .stdout(Stdio::null()) - // .stderr(Stdio::null()) - // .exec(); - // eprintln!("exec failed: {err:?}"); - // std::process::exit(1); - // } - // Err(err) => { - // // Exit immediately in order to not leak child process. - // // - // // The first fork doesn't need to exit on error since it - // eprintln!("process second fork failed: {err:?}"); - // std::process::exit(1); - // } - // } - // } - // } - } + // Spawn child + // + // Calling `.wait()` is required in order to remove zombie processes after complete + let mut cmd = Command::new(&process_runner_path); + cmd.args(&runner_args) + .envs(envs.iter().cloned()) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()); #[cfg(target_os = "windows")] { use std::os::windows::process::CommandExt; use windows::Win32::System::Threading::{ - CREATE_NEW_PROCESS_GROUP, CREATE_NO_WINDOW, DETACHED_PROCESS, + CREATE_BREAKAWAY_FROM_JOB, CREATE_NEW_CONSOLE, CREATE_NEW_PROCESS_GROUP, + CREATE_NO_WINDOW, DETACHED_PROCESS, }; - - // Spawn process - // - // Calling `.wait()` is required in order to remove zombie processes after complete - let mut child = Command::new(&process_runner_path) - .args(&runner_args) - .envs(envs.iter().cloned()) - // .creation_flags(CREATE_NEW_PROCESS_GROUP.0 | DETACHED_PROCESS.0 | CREATE_NO_WINDOW.0) - // .creation_flags(CREATE_NO_WINDOW.0) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn()?; - tokio::task::spawn_blocking(move || child.wait().expect("child.wait")); - - Ok(()) + cmd.creation_flags(CREATE_NEW_PROCESS_GROUP.0); + // cmd.creation_flags(DETACHED_PROCESS.0 | CREATE_NO_WINDOW.0); + // cmd.creation_flags(CREATE_NEW_PROCESS_GROUP.0 | CREATE_NO_WINDOW.0); + // cmd.creation_flags(CREATE_NEW_PROCESS_GROUP.0 | DETACHED_PROCESS.0); + // cmd.creation_flags(CREATE_NO_WINDOW.0); } + + let mut child = cmd.spawn()?; + + tokio::task::spawn_blocking(move || child.wait().expect("child.wait")); + + Ok(()) } /// Reads a log file and streams lines as they're received. @@ -640,7 +701,7 @@ async fn tail_logs(path: PathBuf, task: TaskCtx, stream_name: &'static str) -> R match reader.read_line(&mut buffer).await { Result::Ok(0) => { // Reached EOF, wait a bit before checking for new content - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + tokio::time::sleep(LOG_POLL_INTERVAL).await; continue; } Result::Ok(_) => { diff --git a/packages/toolchain/tests/process_manager.rs b/packages/toolchain/tests/process_manager.rs index e612fb6e..706bea2f 100644 --- a/packages/toolchain/tests/process_manager.rs +++ b/packages/toolchain/tests/process_manager.rs @@ -1,16 +1,10 @@ +use std::time::Duration; + use anyhow::*; -use rivet_process_runner_shared as shared; use rivet_toolchain::util::{ process_manager::*, task::{TaskCtx, TaskCtxInner, TaskEvent}, }; -use serde::{Deserialize, Serialize}; -use std::{ - future::Future, - process::{Command, Stdio}, - time::{Duration, Instant}, -}; -use tokio::fs::File; use tokio::{ sync::{broadcast, mpsc}, time::sleep, @@ -43,6 +37,13 @@ fn create_task_ctx() -> (TaskCtx, mpsc::UnboundedReceiver) { (task, log_rx) } +fn build_deno_cmd(script: &str) -> (String, Vec) { + ( + "deno".to_string(), + vec!["eval".to_string(), "--quiet".to_string(), script.to_string()] + ) +} + #[tokio::test(flavor = "multi_thread")] async fn test_process_manager_lifecycle() -> Result<()> { let (process_manager, temp_dir) = setup_test_environment().await?; @@ -51,40 +52,17 @@ async fn test_process_manager_lifecycle() -> Result<()> { let (task, mut log_rx) = create_task_ctx(); // Build command - #[cfg(windows)] - let (command, args) = ( - "powershell".to_string(), - vec![ - "-NoProfile".to_string(), - "-Command".to_string(), - r#" - $env:ENV_VAR - Write-Host 'Hello from stdout' - Write-Error 'Error message' - Start-Sleep -Seconds 2 - Write-Host 'Exiting now' - exit 42 - "# - .to_string(), - ], - ); + let script = r#" + console.log(`ENV_VAR: ${Deno.env.get("ENV_VAR")}`); + console.log("stdout test"); + console.error("stderr test"); + await new Promise(resolve => setTimeout(resolve, 2000)); + console.log("exiting now"); + Deno.exit(42); + "#; + + let (command, args) = build_deno_cmd(script); - #[cfg(not(windows))] - let (command, args) = ( - "sh".to_string(), - vec![ - "-c".to_string(), - r#" - echo "ENV_VAR: $ENV_VAR" - echo 'Hello from stdout' - echo 'Error message' >&2 - sleep 2 - echo 'Exiting now' - exit 42 - "# - .to_string(), - ], - ); let envs = vec![("ENV_VAR".to_string(), "test_value".to_string())]; let current_dir = std::env::current_dir()?.to_string_lossy().to_string(); let base_data_dir = temp_dir.path().to_path_buf(); @@ -120,10 +98,10 @@ async fn test_process_manager_lifecycle() -> Result<()> { while let Some(event) = log_rx.recv().await { match event { TaskEvent::Log(log) => { - if log.contains("[stdout]") { - stdout_logs.push(log); - } else if log.contains("[stderr]") { - stderr_logs.push(log); + if let Some(log) = log.strip_prefix("[stdout] ") { + stdout_logs.push(log.to_string()); + } else if let Some(log) = log.strip_prefix("[stderr] ") { + stderr_logs.push(log.to_string()); } } TaskEvent::Result { .. } => break, @@ -141,41 +119,24 @@ async fn test_process_manager_lifecycle() -> Result<()> { assert_eq!(exit_code, Some(42)); // Verify logs - #[cfg(windows)] - { - assert!(stdout_logs.iter().any(|log| log.contains("test_value"))); - } - #[cfg(not(windows))] - { - assert!(stdout_logs - .iter() - .any(|log| log.contains("ENV_VAR: test_value"))); - } - assert!(stdout_logs - .iter() - .any(|log| log.contains("Hello from stdout"))); - assert!(stdout_logs.iter().any(|log| log.contains("Exiting now"))); - assert!(stderr_logs.iter().any(|log| log.contains("Error message"))); + assert_eq!(stdout_logs, vec![ + "ENV_VAR: test_value", + "stdout test", + "exiting now", + ]); + assert_eq!(stderr_logs, vec![ + "stderr test", + ]); // Restart the process - #[cfg(windows)] - let (command, args) = ( - "powershell".to_string(), - vec![ - "-NoProfile".to_string(), - "-Command".to_string(), - "Write-Host 'Restarted process'; Start-Sleep -Seconds 2; exit 0".to_string(), - ], - ); + let script = r#" + console.log("Restarted process"); + await new Promise(resolve => setTimeout(resolve, 2000)); + Deno.exit(0); + "#; + + let (command, args) = build_deno_cmd(script); - #[cfg(not(windows))] - let (command, args) = ( - "sh".to_string(), - vec![ - "-c".to_string(), - "echo 'Restarted process'; sleep 2; exit 0".to_string(), - ], - ); let envs = Vec::new(); let current_dir = std::env::current_dir()?.to_string_lossy().to_string(); let base_data_dir = temp_dir.path().to_path_buf(); @@ -231,39 +192,20 @@ async fn test_process_manager_stop_graceful() -> Result<()> { let (task, _log_rx) = create_task_ctx(); // Start a long-running process with custom exit code on SIGTERM - #[cfg(windows)] - let (command, args) = ( - "powershell".to_string(), - vec![ - "-NoProfile".to_string(), - "-Command".to_string(), - r#" - $script:exitCode = 42 - $handler = { - Write-Host "Exiting with code $script:exitCode" - exit $script:exitCode - } - $null = Register-EngineEvent -SourceIdentifier PowerShell.Exiting -Action $handler - Write-Host 'Starting long process' - while ($true) { Start-Sleep -Seconds 1 } - "# - .to_string(), - ], - ); + let script = r#" + const signal = Deno.build.os === "windows" ? "SIGBREAK" : "SIGTERM"; + Deno.addSignalListener(signal, () => { + console.log("Exiting with code 42"); + Deno.exit(42); + }); + console.log("Starting long process"); + while (true) { + await new Promise(resolve => setTimeout(resolve, 1000)); + } + "#; + + let (command, args) = build_deno_cmd(script); - #[cfg(not(windows))] - let (command, args) = ( - "sh".to_string(), - vec![ - "-c".to_string(), - r#" - trap 'echo "Exiting with code 42"; exit 42' TERM - echo 'Starting long process' - tail -f /dev/null & wait - "# - .to_string(), - ], - ); let envs = Vec::new(); let current_dir = std::env::current_dir()?.to_string_lossy().to_string(); let base_data_dir = temp_dir.path().to_path_buf(); @@ -338,39 +280,19 @@ async fn test_process_manager_stop_timeout() -> Result<()> { let (task, _log_rx) = create_task_ctx(); // Start a process that ignores SIGTERM - #[cfg(windows)] - let (command, args) = ( - "powershell".to_string(), - vec![ - "-NoProfile".to_string(), - "-Command".to_string(), - r#" - $handler = { - Write-Host "Caught term, ignoring" - } - $null = Register-EngineEvent -SourceIdentifier PowerShell.Exiting -Action $handler - Write-Host 'Starting process that ignores SIGTERM' - while ($true) { Start-Sleep -Seconds 1 } - "# - .to_string(), - ], - ); + let script = r#" + const signal = Deno.build.os === "windows" ? "SIGBREAK" : "SIGTERM"; + Deno.addSignalListener(signal, () => { + console.log("Caught term, ignoring"); + }); + console.log("Starting process that ignores SIGTERM"); + while (true) { + await new Promise(resolve => setTimeout(resolve, 1000)); + } + "#; + + let (command, args) = build_deno_cmd(script); - #[cfg(not(windows))] - let (command, args) = ( - "sh".to_string(), - vec![ - "-c".to_string(), - r#" - trap 'echo "Caught term, ignoring"' TERM - echo 'Starting process that ignores SIGTERM' - while true; do - sleep 1 - done - "# - .to_string(), - ], - ); let envs = Vec::new(); let current_dir = std::env::current_dir()?.to_string_lossy().to_string(); let base_data_dir = temp_dir.path().to_path_buf(); @@ -440,7 +362,8 @@ async fn test_process_manager_stop_timeout() -> Result<()> { // Wait for the process to finish and get the exit code let exit_code = handle.await??; - // Verify exit code (should be None due to SIGKILL) + // Verify exit code. This is None on Unix bc SIGKILL terminates immediately. + // This is Some(1) on Windows bc TerminateProcess exits with 1. assert_eq!(exit_code, None, "Unexpected exit code"); Ok(())