Skip to content

Commit

Permalink
refactor(hydro_deploy)!: Deployment.stop() for graceful shutdown in…
Browse files Browse the repository at this point in the history
…cluding updated `perf` profile downloading (#1370)

* `perf` profile downloading moved from the `drop()` impl to `async fn
stop()`
* download perf data via stdout
* update async-ssh2-lite to 0.5 to cleanup tokio compat issues

WIP for #1365

BREAKING CHANGE: to get `perf` data, calling `.stop().await` is now
required.
  • Loading branch information
MingweiSamuel committed Aug 2, 2024
1 parent 77246e7 commit a214786
Show file tree
Hide file tree
Showing 16 changed files with 179 additions and 156 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions hydro_deploy/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description = "Hydro Deploy"
anyhow = { version = "1.0.69", features = [ "backtrace" ] }
async-process = "1.6.0"
async-recursion = "1.0"
async-ssh2-lite = { version = "0.4.2", features = [ "tokio" ] }
async-ssh2-lite = { version = "0.5.0", features = [ "tokio" ] }
async-trait = "0.1.64"
buildstructor = "0.5.4"
bytes = "1.1.0"
Expand All @@ -31,4 +31,4 @@ shell-escape = "0.1.5"
tempfile = "3.3.0"
tokio = { version = "1.16", features = [ "full" ] }
tokio-stream = { version = "0.1.15", default-features = false }
tokio-util = { version = "0.7.7", features=[ "compat" ] }
tokio-util = { version = "0.7.7" }
35 changes: 34 additions & 1 deletion hydro_deploy/core/src/deployment.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::{Arc, Weak};

use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use futures::{FutureExt, StreamExt, TryStreamExt};
use tokio::sync::RwLock;

use super::gcp::GcpNetwork;
Expand Down Expand Up @@ -41,6 +42,21 @@ impl Deployment {
self.add_service(|id| CustomService::new(id, on, external_ports))
}

/// Runs `deploy()`, and `start()`, waits for the trigger future, then runs `stop()`.
pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
// TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
self.deploy().await?;
self.start().await?;
trigger.await;
self.stop().await?;
Ok(())
}

/// Runs `deploy()`, and `start()`, waits for CTRL+C, then runs `stop()`.
pub async fn run_ctrl_c(&mut self) -> Result<()> {
self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
}

pub async fn deploy(&mut self) -> Result<()> {
self.services.retain(|weak| weak.strong_count() > 0);

Expand Down Expand Up @@ -119,6 +135,23 @@ impl Deployment {
.await?;
Ok(())
}

pub async fn stop(&mut self) -> Result<()> {
self.services.retain(|weak| weak.strong_count() > 0);

progress::ProgressTracker::with_group("stop", None, || {
let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
|service: Arc<RwLock<dyn Service>>| async move {
service.write().await.stop().await?;
Ok(()) as Result<()>
},
);

futures::future::try_join_all(all_services_stop)
})
.await?;
Ok(())
}
}

impl Deployment {
Expand Down
27 changes: 20 additions & 7 deletions hydro_deploy/core/src/hydroflow_crate/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,10 @@ impl Service for HydroflowCrateService {
.unwrap();

let start_ack_line = ProgressTracker::leaf(
"waiting for ack start".to_string(),
self.display_id
.clone()
.unwrap_or_else(|| format!("service/{}", self.id))
+ " / waiting for ack start",
tokio::time::timeout(Duration::from_secs(60), stdout_receiver),
)
.await??;
Expand All @@ -300,13 +303,23 @@ impl Service for HydroflowCrateService {
}

async fn stop(&mut self) -> Result<()> {
self.launched_binary
.as_ref()
.unwrap()
.stdin()
.send("stop\n".to_string())?;
let launched_binary = self.launched_binary.as_mut().unwrap();
launched_binary.stdin().send("stop\n".to_string())?;

self.launched_binary.as_mut().unwrap().wait().await;
let timeout_result = ProgressTracker::leaf(
self.display_id
.clone()
.unwrap_or_else(|| format!("service/{}", self.id))
+ " / waiting for exit",
tokio::time::timeout(Duration::from_secs(60), launched_binary.wait()),
)
.await;
match timeout_result {
Err(_timeout) => {} // `wait()` timed out, but stop will force quit.
Ok(Err(unexpected_error)) => return Err(unexpected_error), // `wait()` errored.
Ok(Ok(_exit_status)) => {}
}
launched_binary.stop().await?;

Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion hydro_deploy/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ pub trait LaunchedBinary: Send + Sync {

fn exit_code(&self) -> Option<i32>;

async fn wait(&mut self) -> Option<i32>;
/// Wait for the process to stop on its own. Returns the exit code.
async fn wait(&mut self) -> Result<i32>;
/// If the process is still running, force stop it. Then run post-run tasks.
async fn stop(&mut self) -> Result<()>;
}

#[async_trait]
Expand Down
26 changes: 17 additions & 9 deletions hydro_deploy/core/src/localhost/launched_binary.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
use std::sync::{Arc, Mutex};

use anyhow::Result;
use async_trait::async_trait;
use futures::io::BufReader;
use futures::{AsyncBufReadExt, AsyncWriteExt};
Expand Down Expand Up @@ -110,16 +112,22 @@ impl LaunchedBinary for LaunchedLocalhostBinary {
.try_status()
.ok()
.flatten()
.and_then(|c| {
#[cfg(unix)]
return c.code().or(c.signal());
#[cfg(not(unix))]
return c.code();
})
.map(exit_code)
}

async fn wait(&mut self) -> Option<i32> {
let _ = self.child.get_mut().unwrap().status().await;
self.exit_code()
async fn wait(&mut self) -> Result<i32> {
Ok(exit_code(self.child.get_mut().unwrap().status().await?))
}

async fn stop(&mut self) -> Result<()> {
self.child.get_mut().unwrap().kill()?;
Ok(())
}
}

fn exit_code(c: ExitStatus) -> i32 {
#[cfg(unix)]
return c.code().or(c.signal()).unwrap();
#[cfg(not(unix))]
return c.code().unwrap();
}
Loading

0 comments on commit a214786

Please sign in to comment.