Skip to content

Commit

Permalink
fix(hydro_deploy): only instantiate Localhost once (#1403)
Browse files Browse the repository at this point in the history
Fixes #841
  • Loading branch information
shadaj committed Aug 22, 2024
1 parent 3850b47 commit 63b528f
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 23 deletions.
5 changes: 2 additions & 3 deletions docs/docs/hydroflow_plus/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,14 @@ use hydroflow_plus_deploy::TrybuildHost;
#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::FlowBuilder::new();
let (leader, workers) = flow::broadcast::broadcast(&builder);

flow.with_default_optimize()
.with_process(&leader, TrybuildHost::new(localhost.clone))
.with_process(&leader, TrybuildHost::new(deployment.Localhost()))
.with_cluster(&workers, (0..2)
.map(|idx| TrybuildHost::new(localhost.clone()))
.map(|idx| TrybuildHost::new(deployment.Localhost()))
.collect::<Vec<_>>()
)
.deploy(&mut deployment);
Expand Down
25 changes: 21 additions & 4 deletions hydro_deploy/core/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,41 @@ use super::{
};
use crate::{AzureHost, ServiceBuilder};

#[derive(Default)]
pub struct Deployment {
pub hosts: Vec<Weak<dyn Host>>,
pub services: Vec<Weak<RwLock<dyn Service>>>,
pub resource_pool: ResourcePool,
localhost_host: Option<Arc<LocalhostHost>>,
last_resource_result: Option<Arc<ResourceResult>>,
next_host_id: usize,
next_service_id: usize,
}

impl Default for Deployment {
fn default() -> Self {
Self::new()
}
}

impl Deployment {
pub fn new() -> Self {
Self::default()
let mut ret = Self {
hosts: Vec::new(),
services: Vec::new(),
resource_pool: ResourcePool::default(),
localhost_host: None,
last_resource_result: None,
next_host_id: 0,
next_service_id: 0,
};

ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
ret
}

#[allow(non_snake_case)]
pub fn Localhost(&mut self) -> Arc<LocalhostHost> {
self.add_host(LocalhostHost::new)
pub fn Localhost(&self) -> Arc<LocalhostHost> {
self.localhost_host.clone().unwrap()
}

#[allow(non_snake_case)]
Expand Down
4 changes: 1 addition & 3 deletions hydro_deploy/core/src/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,8 @@ mod tests {
async fn test_crate_panic() {
let mut deployment = deployment::Deployment::new();

let localhost = deployment.Localhost();

let service = deployment.add_service(
HydroflowCrate::new("../hydro_cli_examples", localhost.clone())
HydroflowCrate::new("../hydro_cli_examples", deployment.Localhost())
.example("panic_program")
.profile("dev"),
);
Expand Down
4 changes: 2 additions & 2 deletions hydro_deploy/hydro_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ impl Deployment {
#[new]
fn new() -> Self {
Deployment {
underlying: Arc::new(RwLock::new(core::Deployment::default())),
underlying: Arc::new(RwLock::new(core::Deployment::new())),
}
}

#[allow(non_snake_case)]
fn Localhost(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let arc = self.underlying.blocking_write().Localhost();
let arc = self.underlying.blocking_read().Localhost();

Ok(Py::new(
py,
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus_test/src/cluster/many_to_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ mod tests {
#[tokio::test]
async fn many_to_many() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::FlowBuilder::new();
let cluster = super::many_to_many(&builder);
Expand All @@ -30,7 +29,7 @@ mod tests {
.with_cluster(
&cluster,
(0..2)
.map(|_| TrybuildHost::new(localhost.clone()))
.map(|_| TrybuildHost::new(deployment.Localhost()))
.collect::<Vec<_>>(),
)
.deploy(&mut deployment);
Expand Down
5 changes: 2 additions & 3 deletions hydroflow_plus_test/src/cluster/simple_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ mod tests {
#[tokio::test]
async fn simple_cluster() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::FlowBuilder::new();
let (node, cluster) = super::simple_cluster(&builder);
Expand All @@ -42,11 +41,11 @@ mod tests {
insta::assert_debug_snapshot!(built.ir());

let nodes = built
.with_process(&node, TrybuildHost::new(localhost.clone()))
.with_process(&node, TrybuildHost::new(deployment.Localhost()))
.with_cluster(
&cluster,
(0..2)
.map(|_| TrybuildHost::new(localhost.clone()))
.map(|_| TrybuildHost::new(deployment.Localhost()))
.collect::<Vec<_>>(),
)
.deploy(&mut deployment);
Expand Down
5 changes: 2 additions & 3 deletions hydroflow_plus_test/src/distributed/first_ten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ mod tests {
#[tokio::test]
async fn first_ten_distributed() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::FlowBuilder::new();
let (first_node, second_node) = super::first_ten_distributed(&builder);
Expand All @@ -42,8 +41,8 @@ mod tests {

// if we drop this, we drop the references to the deployment nodes
let nodes = built
.with_process(&first_node, TrybuildHost::new(localhost.clone()))
.with_process(&second_node, TrybuildHost::new(localhost.clone()))
.with_process(&first_node, TrybuildHost::new(deployment.Localhost()))
.with_process(&second_node, TrybuildHost::new(deployment.Localhost()))
.deploy(&mut deployment);

deployment.deploy().await.unwrap();
Expand Down
5 changes: 2 additions & 3 deletions template/hydroflow_plus/examples/first_ten_distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ use hydroflow_plus_deploy::TrybuildHost;
#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let flow = hydroflow_plus::FlowBuilder::new();
let (p1, p2) = hydroflow_plus_template::first_ten_distributed::first_ten_distributed(&flow);

let _nodes = flow
.with_default_optimize()
.with_process(&p1, TrybuildHost::new(localhost.clone()))
.with_process(&p2, TrybuildHost::new(localhost.clone()))
.with_process(&p1, TrybuildHost::new(deployment.Localhost()))
.with_process(&p2, TrybuildHost::new(deployment.Localhost()))
.deploy(&mut deployment);

deployment.run_ctrl_c().await.unwrap();
Expand Down

0 comments on commit 63b528f

Please sign in to comment.