From 0f8789828b8de241fbe8688f1581979233c7c845 Mon Sep 17 00:00:00 2001 From: David Chu Date: Wed, 19 Jun 2024 01:29:48 +0000 Subject: [PATCH] feat(hydro_deploy): Perf works over SSH --- hydro_deploy/core/src/hydroflow_crate/mod.rs | 7 +- .../core/src/hydroflow_crate/perf_options.rs | 7 ++ .../core/src/hydroflow_crate/service.rs | 5 +- hydro_deploy/core/src/lib.rs | 4 +- hydro_deploy/core/src/localhost/mod.rs | 19 ++++-- hydro_deploy/core/src/ssh.rs | 64 ++++++++++++++++--- hydro_deploy/core/src/terraform.rs | 1 + hydro_deploy/hydro_cli/src/lib.rs | 4 +- .../examples/perf_compute_pi.rs | 18 ++++-- 9 files changed, 102 insertions(+), 27 deletions(-) create mode 100644 hydro_deploy/core/src/hydroflow_crate/perf_options.rs diff --git a/hydro_deploy/core/src/hydroflow_crate/mod.rs b/hydro_deploy/core/src/hydroflow_crate/mod.rs index 0e3c861bb909..188f5e6b686f 100644 --- a/hydro_deploy/core/src/hydroflow_crate/mod.rs +++ b/hydro_deploy/core/src/hydroflow_crate/mod.rs @@ -1,6 +1,7 @@ use std::path::PathBuf; use std::sync::Arc; +use perf_options::PerfOptions; use tokio::sync::RwLock; use super::Host; @@ -12,6 +13,8 @@ pub mod ports; pub mod service; pub use service::*; +pub mod perf_options; + #[derive(PartialEq)] pub enum CrateTarget { Default, @@ -26,7 +29,7 @@ pub struct HydroflowCrate { target: CrateTarget, on: Arc>, profile: Option, - perf: Option, /* If a path is provided, run perf to get CPU time and output to that path.perf.data */ + perf: Option, args: Vec, display_name: Option, } @@ -80,7 +83,7 @@ impl HydroflowCrate { self } - pub fn perf(mut self, perf: impl Into) -> Self { + pub fn perf(mut self, perf: impl Into) -> Self { if self.perf.is_some() { panic!("perf path already set"); } diff --git a/hydro_deploy/core/src/hydroflow_crate/perf_options.rs b/hydro_deploy/core/src/hydroflow_crate/perf_options.rs new file mode 100644 index 000000000000..2f7125183699 --- /dev/null +++ b/hydro_deploy/core/src/hydroflow_crate/perf_options.rs @@ -0,0 +1,7 @@ +use std::path::PathBuf; + +#[derive(Clone)] +pub struct PerfOptions { + pub output_file: PathBuf, + pub frequency: u32, +} diff --git a/hydro_deploy/core/src/hydroflow_crate/service.rs b/hydro_deploy/core/src/hydroflow_crate/service.rs index 71531265b89e..10d810afbcd9 100644 --- a/hydro_deploy/core/src/hydroflow_crate/service.rs +++ b/hydro_deploy/core/src/hydroflow_crate/service.rs @@ -12,6 +12,7 @@ use serde::Serialize; use tokio::sync::RwLock; use super::build::{build_crate_memoized, BuildError, BuildOutput, BuildParams}; +use super::perf_options::PerfOptions; use super::ports::{self, HydroflowPortConfig, HydroflowSink, SourcePath}; use crate::progress::ProgressTracker; use crate::{ @@ -22,7 +23,7 @@ pub struct HydroflowCrateService { id: usize, pub(super) on: Arc>, build_params: BuildParams, - perf: Option, + perf: Option, args: Option>, display_id: Option, external_ports: Vec, @@ -55,7 +56,7 @@ impl HydroflowCrateService { bin: Option, example: Option, profile: Option, - perf: Option, + perf: Option, features: Option>, args: Option>, display_id: Option, diff --git a/hydro_deploy/core/src/lib.rs b/hydro_deploy/core/src/lib.rs index c512328111b5..6fadddba8995 100644 --- a/hydro_deploy/core/src/lib.rs +++ b/hydro_deploy/core/src/lib.rs @@ -1,12 +1,12 @@ use std::collections::HashMap; use std::net::SocketAddr; -use std::path::PathBuf; use std::sync::Arc; use anyhow::Result; use async_channel::{Receiver, Sender}; use async_trait::async_trait; use hydroflow_cli_integration::ServerBindConfig; +use hydroflow_crate::perf_options::PerfOptions; pub mod deployment; pub use deployment::Deployment; @@ -101,7 +101,7 @@ pub trait LaunchedHost: Send + Sync { id: String, binary: &BuildOutput, args: &[String], - perf: Option, + perf: Option, ) -> Result>; async fn forward_port(&self, addr: &SocketAddr) -> Result; diff --git a/hydro_deploy/core/src/localhost/mod.rs b/hydro_deploy/core/src/localhost/mod.rs index ec27b1901375..6ce0d8f68795 100644 --- a/hydro_deploy/core/src/localhost/mod.rs +++ b/hydro_deploy/core/src/localhost/mod.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::net::SocketAddr; -use std::path::PathBuf; use std::sync::Arc; use anyhow::{Context, Result}; @@ -13,6 +12,7 @@ use super::{ ResourceResult, ServerStrategy, }; use crate::hydroflow_crate::build::BuildOutput; +use crate::hydroflow_crate::perf_options::PerfOptions; pub mod launched_binary; pub use launched_binary::*; @@ -156,15 +156,22 @@ impl LaunchedHost for LaunchedLocalhost { id: String, binary: &BuildOutput, args: &[String], - perf: Option, + perf: Option, ) -> Result> { let mut command = if let Some(perf) = perf { println!("Profiling binary with perf"); let mut tmp = Command::new("perf"); - tmp.args(["record", "-F", "5", "--call-graph", "dwarf,64000", "-o"]) - .arg(&perf) - .arg(&binary.bin_path) - .args(args); + tmp.args([ + "record", + "-F", + &perf.frequency.to_string(), + "--call-graph", + "dwarf,64000", + "-o", + ]) + .arg(&perf.output_file) + .arg(&binary.bin_path) + .args(args); tmp } else { let mut tmp = Command::new(&binary.bin_path); diff --git a/hydro_deploy/core/src/ssh.rs b/hydro_deploy/core/src/ssh.rs index 7a58d8b131f4..162b5b6376de 100644 --- a/hydro_deploy/core/src/ssh.rs +++ b/hydro_deploy/core/src/ssh.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::collections::HashMap; +use std::fs; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; @@ -11,7 +12,7 @@ use async_ssh2_lite::ssh2::ErrorCode; use async_ssh2_lite::{AsyncChannel, AsyncSession, Error, SessionConfiguration}; use async_trait::async_trait; use futures::io::BufReader; -use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt}; +use futures::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, StreamExt}; use hydroflow_cli_integration::ServerBindConfig; use nanoid::nanoid; use tokio::net::{TcpListener, TcpStream}; @@ -21,6 +22,7 @@ use super::progress::ProgressTracker; use super::util::async_retry; use super::{LaunchedBinary, LaunchedHost, ResourceResult, ServerStrategy}; use crate::hydroflow_crate::build::BuildOutput; +use crate::hydroflow_crate::perf_options::PerfOptions; use crate::util::prioritized_broadcast; struct LaunchedSshBinary { @@ -31,6 +33,7 @@ struct LaunchedSshBinary { stdout_receivers: Arc>>>, stdout_cli_receivers: Arc>>>, stderr_receivers: Arc>>>, + perf: Option, } #[async_trait] @@ -87,9 +90,40 @@ impl Drop for LaunchedSshBinary { let session = self.session.take().unwrap(); std::thread::scope(|s| { s.spawn(|| { - let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .build() + .unwrap(); runtime - .block_on(session.disconnect(None, "", None)) + .block_on(async move { + self.channel.write_all(&[b'\x03']).await.unwrap(); + self.channel.send_eof().await?; + self.channel.wait_eof().await?; + self.channel.wait_close().await?; + // Copy perf file down + if let Some(perf) = &self.perf { + let output_file = perf.output_file.to_str().unwrap(); + let sftp = session.sftp().await?; + + // download perf.data + let mut perf_data = sftp.open(&PathBuf::from(output_file)).await?; + let mut downloaded_perf_data = + tokio::fs::File::create(output_file).await?; + let data_size = perf_data.stat().await?.size.unwrap(); + let mut read_buf = vec![0; 128 * 1024]; + let mut index = 0; + while index < data_size { + let bytes_read = perf_data.read(&mut read_buf).await?; + use tokio::io::AsyncWriteExt; + downloaded_perf_data + .write_all(read_buf[0..bytes_read].as_ref()) + .await?; + index += bytes_read as u64; + } + } + + session.disconnect(None, "", None).await + }) .unwrap(); }) .join() @@ -276,7 +310,7 @@ impl LaunchedHost for T { id: String, binary: &BuildOutput, args: &[String], - perf: Option, + perf: Option, ) -> Result> { let session = self.open_ssh_session().await?; @@ -301,17 +335,30 @@ impl LaunchedHost for T { Duration::from_secs(1), ) .await?; + // Launch with perf if specified, also copy local binary to expected place for perf report to work + let perf_wrapper = if let Some(perf) = perf.clone() { + // Copy local binary to {output_file}.bins/home/{user}/hydro-{unique_name} + let output_file = perf.output_file.to_str().unwrap(); + let local_binary = PathBuf::from(format!( + "{output_file}.bins/home/{user}/hydro-{unique_name}" + )); + fs::create_dir_all(local_binary.parent().unwrap()).unwrap(); + fs::write(local_binary, &binary.bin_data).unwrap(); + + // Attach perf to the command + let frequency = perf.frequency; + format!("perf record -F {frequency} --call-graph dwarf,64000 -o {output_file} ") + } else { + "".to_string() + }; let binary_path_string = binary_path.to_str().unwrap(); let args_string = args .iter() .map(|s| shell_escape::unix::escape(Cow::from(s))) .fold("".to_string(), |acc, v| format!("{acc} {v}")); channel - .exec(&format!("{binary_path_string}{args_string}")) + .exec(&format!("{perf_wrapper}{binary_path_string}{args_string}")) .await?; - if perf.is_some() { - todo!("Profiling on remote machines is not (yet) supported"); - } anyhow::Ok(channel) }, @@ -348,6 +395,7 @@ impl LaunchedHost for T { stdout_cli_receivers, stdout_receivers, stderr_receivers, + perf, })) } diff --git a/hydro_deploy/core/src/terraform.rs b/hydro_deploy/core/src/terraform.rs index 342b4181e435..3d75124f0a6f 100644 --- a/hydro_deploy/core/src/terraform.rs +++ b/hydro_deploy/core/src/terraform.rs @@ -79,6 +79,7 @@ impl Drop for TerraformPool { #[derive(Serialize, Deserialize)] pub struct TerraformBatch { pub terraform: TerraformConfig, + #[serde(skip_serializing_if = "HashMap::is_empty")] pub provider: HashMap, #[serde(skip_serializing_if = "HashMap::is_empty")] pub data: HashMap>, diff --git a/hydro_deploy/hydro_cli/src/lib.rs b/hydro_deploy/hydro_cli/src/lib.rs index 58de94bac89c..6198a46a95f2 100644 --- a/hydro_deploy/hydro_cli/src/lib.rs +++ b/hydro_deploy/hydro_cli/src/lib.rs @@ -5,7 +5,6 @@ use core::hydroflow_crate::ports::HydroflowSource; use std::cell::OnceCell; use std::collections::HashMap; use std::ops::DerefMut; -use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, OnceLock}; @@ -266,7 +265,6 @@ impl Deployment { bin: Option, example: Option, profile: Option, - perf: Option, features: Option>, args: Option>, display_id: Option, @@ -280,7 +278,7 @@ impl Deployment { bin, example, profile, - perf, + None, // Python API doesn't support perf features, args, display_id, diff --git a/hydroflow_plus_test/examples/perf_compute_pi.rs b/hydroflow_plus_test/examples/perf_compute_pi.rs index ed6ee798c095..d8db6ec96769 100644 --- a/hydroflow_plus_test/examples/perf_compute_pi.rs +++ b/hydroflow_plus_test/examples/perf_compute_pi.rs @@ -1,7 +1,9 @@ use std::cell::RefCell; +use std::path::PathBuf; use std::sync::Arc; use hydro_deploy::gcp::GcpNetwork; +use hydro_deploy::hydroflow_crate::perf_options::PerfOptions; use hydro_deploy::{Deployment, Host, HydroflowCrate}; use hydroflow_plus_cli_integration::{DeployClusterSpec, DeployProcessSpec}; use stageleft::RuntimeData; @@ -24,8 +26,8 @@ async fn main() { deployment.GcpComputeEngineHost( &project, "e2-micro", - "debian-cloud/debian-11", - "us-west1-a", + "perf-image", + "us-central1-a", network.clone(), None, ) @@ -46,11 +48,15 @@ async fn main() { &DeployProcessSpec::new(|| { let mut deployment = deployment.borrow_mut(); let host = create_host(&mut deployment); + let perf_options = PerfOptions { + output_file: PathBuf::from("leader.perf.data"), + frequency: 5, + }; deployment.add_service( HydroflowCrate::new(".", host.clone()) .bin("compute_pi") .profile(profile) - .perf("leader.perf.data") + .perf(perf_options) .display_name("leader"), ) }), @@ -59,11 +65,15 @@ async fn main() { (0..8) .map(|idx| { let host = create_host(&mut deployment); + let perf_options = PerfOptions { + output_file: PathBuf::from(format!("cluster{}.perf.data", idx)), + frequency: 5, + }; deployment.add_service( HydroflowCrate::new(".", host.clone()) .bin("compute_pi") .profile(profile) - .perf(format!("cluster{}.perf.data", idx)) + .perf(perf_options) .display_name(format!("cluster/{}", idx)), ) })