Skip to content

Commit

Permalink
feat(hydro_deploy): Perf works over SSH
Browse files Browse the repository at this point in the history
  • Loading branch information
davidchuyaya committed Jun 19, 2024
1 parent f91c300 commit 439c0ff
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 26 deletions.
7 changes: 5 additions & 2 deletions hydro_deploy/core/src/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::path::PathBuf;
use std::sync::Arc;

use perf_options::PerfOptions;
use tokio::sync::RwLock;

use super::Host;
Expand All @@ -12,6 +13,8 @@ pub mod ports;
pub mod service;
pub use service::*;

pub mod perf_options;

#[derive(PartialEq)]
pub enum CrateTarget {
Default,
Expand All @@ -26,7 +29,7 @@ pub struct HydroflowCrate {
target: CrateTarget,
on: Arc<RwLock<dyn Host>>,
profile: Option<String>,
perf: Option<PathBuf>, /* If a path is provided, run perf to get CPU time and output to that path.perf.data */
perf: Option<PerfOptions>,
args: Vec<String>,
display_name: Option<String>,
}
Expand Down Expand Up @@ -80,7 +83,7 @@ impl HydroflowCrate {
self
}

pub fn perf(mut self, perf: impl Into<PathBuf>) -> Self {
pub fn perf(mut self, perf: impl Into<PerfOptions>) -> Self {
if self.perf.is_some() {
panic!("perf path already set");
}
Expand Down
7 changes: 7 additions & 0 deletions hydro_deploy/core/src/hydroflow_crate/perf_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use std::path::PathBuf;

#[derive(Clone)]
pub struct PerfOptions {
pub output_file: PathBuf,
pub frequency: u32,
}
5 changes: 3 additions & 2 deletions hydro_deploy/core/src/hydroflow_crate/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use serde::Serialize;
use tokio::sync::RwLock;

use super::build::{build_crate, BuildError, BuiltCrate};
use super::perf_options::PerfOptions;
use super::ports::{self, HydroflowPortConfig, HydroflowSink, SourcePath};
use crate::progress::ProgressTracker;
use crate::{
Expand All @@ -26,7 +27,7 @@ pub struct HydroflowCrateService {
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
perf: Option<PathBuf>,
perf: Option<PerfOptions>,
features: Option<Vec<String>>,
args: Option<Vec<String>>,
display_id: Option<String>,
Expand Down Expand Up @@ -63,7 +64,7 @@ impl HydroflowCrateService {
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
perf: Option<PathBuf>,
perf: Option<PerfOptions>,
features: Option<Vec<String>>,
args: Option<Vec<String>>,
display_id: Option<String>,
Expand Down
3 changes: 2 additions & 1 deletion hydro_deploy/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::Result;
use async_channel::{Receiver, Sender};
use async_trait::async_trait;
use hydroflow_cli_integration::ServerBindConfig;
use hydroflow_crate::perf_options::PerfOptions;
use tokio::sync::RwLock;

pub mod deployment;
Expand Down Expand Up @@ -100,7 +101,7 @@ pub trait LaunchedHost: Send + Sync {
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
perf: Option<PathBuf>,
perf: Option<PerfOptions>,
) -> Result<Arc<RwLock<dyn LaunchedBinary>>>;

async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
Expand Down
18 changes: 13 additions & 5 deletions hydro_deploy/core/src/localhost/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::{
ClientStrategy, Host, HostTargetType, LaunchedBinary, LaunchedHost, ResourceBatch,
ResourceResult, ServerStrategy,
};
use crate::hydroflow_crate::perf_options::PerfOptions;

pub mod launched_binary;
pub use launched_binary::*;
Expand Down Expand Up @@ -156,15 +157,22 @@ impl LaunchedHost for LaunchedLocalhost {
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
perf: Option<PathBuf>,
perf: Option<PerfOptions>,
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
let mut command = if let Some(perf) = perf {
println!("Profiling binary with perf");
let mut tmp = Command::new("perf");
tmp.args(["record", "-F", "5", "--call-graph", "dwarf,64000", "-o"])
.arg(&perf)
.arg(&binary.2)
.args(args);
tmp.args([
"record",
"-F",
&perf.frequency.to_string(),
"--call-graph",
"dwarf,64000",
"-o",
])
.arg(&perf.output_file)
.arg(&binary.2)
.args(args);
tmp
} else {
let mut tmp = Command::new(&binary.2);
Expand Down
69 changes: 61 additions & 8 deletions hydro_deploy/core/src/ssh.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::fs;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -11,7 +12,7 @@ use async_ssh2_lite::ssh2::ErrorCode;
use async_ssh2_lite::{AsyncChannel, AsyncSession, Error, SessionConfiguration};
use async_trait::async_trait;
use futures::io::BufReader;
use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt};
use futures::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, StreamExt};
use hydroflow_cli_integration::ServerBindConfig;
use nanoid::nanoid;
use tokio::net::{TcpListener, TcpStream};
Expand All @@ -20,6 +21,7 @@ use tokio::sync::RwLock;
use super::progress::ProgressTracker;
use super::util::async_retry;
use super::{LaunchedBinary, LaunchedHost, ResourceResult, ServerStrategy};
use crate::hydroflow_crate::perf_options::PerfOptions;
use crate::util::prioritized_broadcast;

struct LaunchedSSHBinary {
Expand All @@ -30,6 +32,7 @@ struct LaunchedSSHBinary {
stdout_receivers: Arc<RwLock<Vec<Sender<String>>>>,
stdout_cli_receivers: Arc<RwLock<Option<tokio::sync::oneshot::Sender<String>>>>,
stderr_receivers: Arc<RwLock<Vec<Sender<String>>>>,
perf: Option<PerfOptions>,
}

#[async_trait]
Expand Down Expand Up @@ -86,9 +89,45 @@ impl Drop for LaunchedSSHBinary {
let session = self.session.take().unwrap();
std::thread::scope(|s| {
s.spawn(|| {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.build()
.unwrap();
runtime
.block_on(session.disconnect(None, "", None))
.block_on(async move {
self.channel.write_all(&[b'\x03']).await.unwrap();
self.channel.send_eof().await?;
self.channel.wait_eof().await?;
self.channel.wait_close().await?;
// Copy perf file down
if let Some(perf) = &self.perf {
let output_file = perf.output_file.to_str().unwrap();
let sftp = session.sftp().await?;

async move {
// download perf.data
let mut perf_data = sftp.open(&PathBuf::from(output_file)).await?;
let mut downloaded_perf_data =
tokio::fs::File::create(output_file).await?;
let data_size = perf_data.stat().await?.size.unwrap();
let mut read_buf = vec![0; 128 * 1024];
let mut index = 0;
while index < data_size {
let bytes_read = perf_data.read(&mut read_buf).await?;
use tokio::io::AsyncWriteExt;
downloaded_perf_data
.write_all(read_buf[0..bytes_read].as_ref())
.await?;
index += bytes_read as u64;
}
anyhow::Ok(())
}
.await
.unwrap();
}

session.disconnect(None, "", None).await
})
.unwrap();
})
.join()
Expand Down Expand Up @@ -273,7 +312,7 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
perf: Option<PathBuf>,
perf: Option<PerfOptions>,
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
let session = self.open_ssh_session().await?;

Expand All @@ -298,17 +337,30 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
Duration::from_secs(1),
)
.await?;
// Launch with perf if specified, also copy local binary to expected place for perf report to work
let perf_wrapper = if let Some(perf) = perf.clone() {
// Copy local binary to {output_file}.bins/home/{user}/hydro-{unique_name}
let output_file = perf.output_file.to_str().unwrap();
let local_binary = PathBuf::from(format!(
"{output_file}.bins/home/{user}/hydro-{unique_name}"
));
fs::create_dir_all(local_binary.parent().unwrap()).unwrap();
fs::write(local_binary, &binary.1).unwrap();

// Attach perf to the command
let frequency = perf.frequency;
format!("perf record -F {frequency} --call-graph dwarf,64000 -o {output_file} ")
} else {
"".to_string()
};
let binary_path_string = binary_path.to_str().unwrap();
let args_string = args
.iter()
.map(|s| shell_escape::unix::escape(Cow::from(s)))
.fold("".to_string(), |acc, v| format!("{acc} {v}"));
channel
.exec(&format!("{binary_path_string}{args_string}"))
.exec(&format!("{perf_wrapper}{binary_path_string}{args_string}"))
.await?;
if perf.is_some() {
todo!("Perf profiling on remote machines is not supported");
}

anyhow::Ok(channel)
},
Expand Down Expand Up @@ -345,6 +397,7 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
stdout_cli_receivers,
stdout_receivers,
stderr_receivers,
perf,
})))
}

Expand Down
1 change: 1 addition & 0 deletions hydro_deploy/core/src/terraform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl Drop for TerraformPool {
#[derive(Serialize, Deserialize)]
pub struct TerraformBatch {
pub terraform: TerraformConfig,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub provider: HashMap<String, serde_json::Value>,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub data: HashMap<String, HashMap<String, serde_json::Value>>,
Expand Down
4 changes: 1 addition & 3 deletions hydro_deploy/hydro_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use core::hydroflow_crate::ports::HydroflowSource;
use std::collections::HashMap;
use std::ops::DerefMut;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;

Expand Down Expand Up @@ -266,7 +265,6 @@ impl Deployment {
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
perf: Option<PathBuf>,
features: Option<Vec<String>>,
args: Option<Vec<String>>,
display_id: Option<String>,
Expand All @@ -280,7 +278,7 @@ impl Deployment {
bin,
example,
profile,
perf,
None, // Python API doesn't support perf
features,
args,
display_id,
Expand Down
20 changes: 15 additions & 5 deletions hydroflow_plus_test/examples/perf_compute_pi.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::cell::RefCell;
use std::path::PathBuf;
use std::sync::Arc;

use hydro_deploy::gcp::GCPNetwork;
use hydro_deploy::hydroflow_crate::perf_options::PerfOptions;
use hydro_deploy::{Deployment, Host, HydroflowCrate};
use hydroflow_plus_cli_integration::{DeployClusterSpec, DeployProcessSpec};
use stageleft::RuntimeData;
Expand All @@ -24,8 +26,8 @@ async fn main() {
deployment.GCPComputeEngineHost(
&project,
"e2-micro",
"debian-cloud/debian-11",
"us-west1-a",
"perf-image",
"us-central1-a",
network.clone(),
None,
)
Expand All @@ -46,24 +48,32 @@ async fn main() {
&DeployProcessSpec::new(|| {
let mut deployment = deployment.borrow_mut();
let host = create_host(&mut deployment);
let perf_options = PerfOptions {
output_file: PathBuf::from("leader.perf.data"),
frequency: 5,
};
deployment.add_service(
HydroflowCrate::new(".", host.clone())
.bin("compute_pi")
.profile(profile)
.perf("leader.perf.data")
.perf(perf_options)
.display_name("leader"),
)
}),
&DeployClusterSpec::new(|| {
let mut deployment = deployment.borrow_mut();
(0..8)
(0..1)
.map(|idx| {
let host = create_host(&mut deployment);
let perf_options = PerfOptions {
output_file: PathBuf::from(format!("cluster{}.perf.data", idx)),
frequency: 5,
};
deployment.add_service(
HydroflowCrate::new(".", host.clone())
.bin("compute_pi")
.profile(profile)
.perf(format!("cluster{}.perf.data", idx))
.perf(perf_options)
.display_name(format!("cluster/{}", idx)),
)
})
Expand Down

0 comments on commit 439c0ff

Please sign in to comment.