Skip to content

Commit

Permalink
refactor(hydroflow_plus)!: defer network instantiation until after fi…
Browse files Browse the repository at this point in the history
…nalizing IR
  • Loading branch information
shadaj committed Aug 13, 2024
1 parent 5b3d2c8 commit 58321d4
Show file tree
Hide file tree
Showing 33 changed files with 590 additions and 301 deletions.
134 changes: 89 additions & 45 deletions hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use hydro_deploy::hydroflow_crate::ports::{
};
use hydro_deploy::hydroflow_crate::HydroflowCrateService;
use hydro_deploy::{Deployment, Host};
use hydroflow_plus::lang::graph::HydroflowGraph;
use hydroflow_plus::location::{
Cluster, ClusterSpec, Deploy, HfSendManyToMany, HfSendManyToOne, HfSendOneToMany,
HfSendOneToOne, Location, ProcessSpec,
Expand All @@ -23,6 +24,7 @@ pub struct HydroDeploy {}

impl<'a> Deploy<'a> for HydroDeploy {
type ClusterId = u32;
type InstantiateEnv = Deployment;
type Process = DeployNode;
type Cluster = DeployCluster;
type Meta = HashMap<usize, Vec<u32>>;
Expand Down Expand Up @@ -68,12 +70,13 @@ pub trait DeployCrateWrapper {
pub struct DeployNode {
id: usize,
next_port: Rc<RefCell<usize>>,
underlying: Arc<RwLock<HydroflowCrateService>>,
node_fn: Rc<CrateBuilder<'static>>,
underlying: Rc<RefCell<Option<Arc<RwLock<HydroflowCrateService>>>>>,
}

impl DeployCrateWrapper for DeployNode {
fn underlying(&self) -> Arc<RwLock<HydroflowCrateService>> {
self.underlying.clone()
self.underlying.borrow().as_ref().unwrap().clone()
}
}

Expand All @@ -99,7 +102,7 @@ impl DeployPort<DeployCluster> {
on: &Arc<impl Host + 'static>,
) -> Vec<CustomClientPort> {
let mut out = vec![];
for member in &self.node.members {
for member in self.node.members() {
out.push(member.create_sender(&self.port, deployment, on).await);
}

Expand All @@ -110,6 +113,7 @@ impl DeployPort<DeployCluster> {
impl Location for DeployNode {
type Port = DeployPort<Self>;
type Meta = HashMap<usize, Vec<u32>>;
type InstantiateEnv = Deployment;

fn id(&self) -> usize {
self.id
Expand All @@ -126,13 +130,23 @@ impl Location for DeployNode {
}

fn update_meta(&mut self, meta: &Self::Meta) {
let mut n = self.underlying.try_write().unwrap();
let underlying_node = self.underlying.borrow();
let mut n = underlying_node.as_ref().unwrap().try_write().unwrap();
n.update_meta(HydroflowPlusMeta {
clusters: meta.clone(),
cluster_id: None,
subgraph_id: self.id,
});
}

fn instantiate(
&self,
env: &mut Self::InstantiateEnv,
_meta: &mut Self::Meta,
_graph: HydroflowGraph,
) {
*self.underlying.borrow_mut() = Some((self.node_fn.as_ref())(env));
}
}

#[derive(Clone)]
Expand All @@ -146,16 +160,26 @@ impl DeployCrateWrapper for DeployClusterNode {
}
}

type CreateClusterFn = dyn Fn(&mut Deployment) -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'static;

#[derive(Clone)]
pub struct DeployCluster {
id: usize,
next_port: Rc<RefCell<usize>>,
pub members: Vec<DeployClusterNode>,
cluster_fn: Rc<RefCell<Box<CreateClusterFn>>>,
members: Rc<RefCell<Vec<DeployClusterNode>>>,
}

impl DeployCluster {
pub fn members(&self) -> Vec<DeployClusterNode> {
self.members.borrow().clone()
}
}

impl Location for DeployCluster {
type Port = DeployPort<Self>;
type Meta = HashMap<usize, Vec<u32>>;
type InstantiateEnv = Deployment;

fn id(&self) -> usize {
self.id
Expand All @@ -171,15 +195,29 @@ impl Location for DeployCluster {
}
}

fn instantiate(
&self,
_env: &mut Self::InstantiateEnv,
meta: &mut Self::Meta,
_graph: HydroflowGraph,
) {
let cluster_nodes = (self.cluster_fn.borrow_mut())(_env);
meta.insert(self.id, (0..(cluster_nodes.len() as u32)).collect());
*self.members.borrow_mut() = cluster_nodes
.into_iter()
.map(|n| DeployClusterNode { underlying: n })
.collect();
}

fn update_meta(&mut self, meta: &Self::Meta) {
self.members.iter().enumerate().for_each(|(cluster_id, n)| {
let mut n = n.underlying.try_write().unwrap();
n.update_meta(&HydroflowPlusMeta {
for (cluster_id, node) in self.members.borrow().iter().enumerate() {
let mut n = node.underlying.try_write().unwrap();
n.update_meta(HydroflowPlusMeta {
clusters: meta.clone(),
cluster_id: Some(cluster_id as u32),
subgraph_id: self.id,
});
});
}
}
}

Expand All @@ -202,17 +240,19 @@ impl HfSendOneToOne<DeployNode> for DeployNode {
source_port: &DeployPort<DeployNode>,
recipient_port: &DeployPort<DeployNode>,
) {
let source_port = self
.underlying
let self_underlying_borrow = self.underlying.borrow();
let self_underlying = self_underlying_borrow.as_ref().unwrap();
let source_port = self_underlying
.try_read()
.unwrap()
.get_port(source_port.port.clone(), &self.underlying);
.get_port(source_port.port.clone(), self_underlying);

let recipient_port = other
.underlying
let other_underlying_borrow = other.underlying.borrow();
let other_underlying = other_underlying_borrow.as_ref().unwrap();
let recipient_port = other_underlying
.try_read()
.unwrap()
.get_port(recipient_port.port.clone(), &other.underlying);
.get_port(recipient_port.port.clone(), other_underlying);

source_port.send_to(&recipient_port);
}
Expand All @@ -233,14 +273,15 @@ impl HfSendManyToOne<DeployNode, u32> for DeployCluster {
source_port: &DeployPort<DeployCluster>,
recipient_port: &DeployPort<DeployNode>,
) {
let recipient_port = other
.underlying
let other_underlying_borrow = other.underlying.borrow();
let other_underlying = other_underlying_borrow.as_ref().unwrap();
let recipient_port = other_underlying
.try_read()
.unwrap()
.get_port(recipient_port.port.clone(), &other.underlying)
.get_port(recipient_port.port.clone(), other_underlying)
.merge();

for (i, node) in self.members.iter().enumerate() {
for (i, node) in self.members.borrow().iter().enumerate() {
let source_port = node
.underlying
.try_read()
Expand Down Expand Up @@ -271,15 +312,17 @@ impl HfSendOneToMany<DeployCluster, u32> for DeployNode {
source_port: &DeployPort<DeployNode>,
recipient_port: &DeployPort<DeployCluster>,
) {
let source_port = self
.underlying
let self_underlying_borrow = self.underlying.borrow();
let self_underlying = self_underlying_borrow.as_ref().unwrap();
let source_port = self_underlying
.try_read()
.unwrap()
.get_port(source_port.port.clone(), &self.underlying);
.get_port(source_port.port.clone(), self_underlying);

let recipient_port = DemuxSink {
demux: other
.members
.borrow()
.iter()
.enumerate()
.map(|(id, c)| {
Expand Down Expand Up @@ -315,7 +358,7 @@ impl HfSendManyToMany<DeployCluster, u32> for DeployCluster {
source_port: &DeployPort<DeployCluster>,
recipient_port: &DeployPort<DeployCluster>,
) {
for (i, sender) in self.members.iter().enumerate() {
for (i, sender) in self.members.borrow().iter().enumerate() {
let source_port = sender
.underlying
.try_read()
Expand All @@ -325,6 +368,7 @@ impl HfSendManyToMany<DeployCluster, u32> for DeployCluster {
let recipient_port = DemuxSink {
demux: other
.members
.borrow()
.iter()
.enumerate()
.map(|(id, c)| {
Expand Down Expand Up @@ -360,48 +404,48 @@ impl HfSendManyToMany<DeployCluster, u32> for DeployCluster {
}
}

type CrateBuilder<'a> = dyn FnMut() -> Arc<RwLock<HydroflowCrateService>> + 'a;
type CrateBuilder<'a> = dyn Fn(&mut Deployment) -> Arc<RwLock<HydroflowCrateService>> + 'a;

pub struct DeployProcessSpec<'a>(RefCell<Box<CrateBuilder<'a>>>);
pub struct DeployProcessSpec(Rc<CrateBuilder<'static>>);

impl<'a> DeployProcessSpec<'a> {
pub fn new<F: FnMut() -> Arc<RwLock<HydroflowCrateService>> + 'a>(f: F) -> Self {
Self(RefCell::new(Box::new(f)))
impl DeployProcessSpec {
pub fn new<F: Fn(&mut Deployment) -> Arc<RwLock<HydroflowCrateService>> + 'static>(
f: F,
) -> Self {
Self(Rc::new(f))
}
}

impl<'a: 'b, 'b> ProcessSpec<'a, HydroDeploy> for DeployProcessSpec<'b> {
fn build(&self, id: usize, _meta: &mut HashMap<usize, Vec<u32>>) -> DeployNode {
impl<'a> ProcessSpec<'a, HydroDeploy> for DeployProcessSpec {
fn build(&self, id: usize) -> DeployNode {
DeployNode {
id,
next_port: Rc::new(RefCell::new(0)),
underlying: (self.0.borrow_mut())(),
node_fn: self.0.clone(),
underlying: Rc::new(RefCell::new(None)),
}
}
}

type ClusterSpecFn<'a> = dyn FnMut() -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'a;
type ClusterSpecFn = dyn Fn(&mut Deployment) -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'static;

pub struct DeployClusterSpec<'a>(RefCell<Box<ClusterSpecFn<'a>>>);
pub struct DeployClusterSpec(Rc<RefCell<Box<ClusterSpecFn>>>);

impl<'a> DeployClusterSpec<'a> {
pub fn new<F: FnMut() -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'a>(f: F) -> Self {
Self(RefCell::new(Box::new(f)))
impl DeployClusterSpec {
pub fn new<F: Fn(&mut Deployment) -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'static>(
f: F,
) -> Self {
Self(Rc::new(RefCell::new(Box::new(f))))
}
}

impl<'a: 'b, 'b> ClusterSpec<'a, HydroDeploy> for DeployClusterSpec<'b> {
fn build(&self, id: usize, meta: &mut HashMap<usize, Vec<u32>>) -> DeployCluster {
let cluster_nodes = (self.0.borrow_mut())();
meta.insert(id, (0..(cluster_nodes.len() as u32)).collect());

impl<'a> ClusterSpec<'a, HydroDeploy> for DeployClusterSpec {
fn build(&self, id: usize) -> DeployCluster {
DeployCluster {
id,
next_port: Rc::new(RefCell::new(0)),
members: cluster_nodes
.into_iter()
.map(|u| DeployClusterNode { underlying: u })
.collect(),
cluster_fn: self.0.clone(),
members: Rc::new(RefCell::new(vec![])),
}
}
}
26 changes: 24 additions & 2 deletions hydro_deploy/hydroflow_plus_cli_integration/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cell::RefCell;
use std::rc::Rc;

use hydroflow_plus::lang::graph::HydroflowGraph;
use hydroflow_plus::location::{
Cluster, ClusterSpec, Deploy, HfSendManyToMany, HfSendManyToOne, HfSendOneToMany,
HfSendOneToOne, Location, ProcessSpec,
Expand All @@ -16,6 +17,7 @@ pub struct CLIRuntime {}

impl<'a> Deploy<'a> for CLIRuntime {
type ClusterId = u32;
type InstantiateEnv = ();
type Process = CLIRuntimeNode<'a>;
type Cluster = CLIRuntimeCluster<'a>;
type Meta = ();
Expand All @@ -34,6 +36,7 @@ pub struct CLIRuntimeNode<'a> {
impl<'a> Location for CLIRuntimeNode<'a> {
type Port = String;
type Meta = ();
type InstantiateEnv = ();

fn id(&self) -> usize {
self.id
Expand All @@ -46,6 +49,15 @@ impl<'a> Location for CLIRuntimeNode<'a> {
}

fn update_meta(&mut self, _meta: &Self::Meta) {}

fn instantiate(
&self,
_env: &mut Self::InstantiateEnv,
_meta: &mut Self::Meta,
_graph: HydroflowGraph,
) {
panic!(".deploy() cannot be called on a CLIRuntimeNode");
}
}

#[derive(Clone)]
Expand All @@ -58,6 +70,7 @@ pub struct CLIRuntimeCluster<'a> {
impl<'a> Location for CLIRuntimeCluster<'a> {
type Port = String;
type Meta = ();
type InstantiateEnv = ();

fn id(&self) -> usize {
self.id
Expand All @@ -70,6 +83,15 @@ impl<'a> Location for CLIRuntimeCluster<'a> {
}

fn update_meta(&mut self, _meta: &Self::Meta) {}

fn instantiate(
&self,
_env: &mut Self::InstantiateEnv,
_meta: &mut Self::Meta,
_graph: HydroflowGraph,
) {
panic!(".deploy() cannot be called on a CLIRuntimeCluster");
}
}

impl<'a> Cluster<'a> for CLIRuntimeCluster<'a> {
Expand Down Expand Up @@ -209,7 +231,7 @@ impl<'a> HfSendManyToMany<CLIRuntimeCluster<'a>, u32> for CLIRuntimeCluster<'a>
}

impl<'cli> ProcessSpec<'cli, CLIRuntime> for RuntimeData<&'cli HydroCLI<HydroflowPlusMeta>> {
fn build(&self, id: usize, _meta: &mut ()) -> CLIRuntimeNode<'cli> {
fn build(&self, id: usize) -> CLIRuntimeNode<'cli> {
CLIRuntimeNode {
id,
next_port: Rc::new(RefCell::new(0)),
Expand All @@ -219,7 +241,7 @@ impl<'cli> ProcessSpec<'cli, CLIRuntime> for RuntimeData<&'cli HydroCLI<Hydroflo
}

impl<'cli> ClusterSpec<'cli, CLIRuntime> for RuntimeData<&'cli HydroCLI<HydroflowPlusMeta>> {
fn build(&self, id: usize, _meta: &mut ()) -> CLIRuntimeCluster<'cli> {
fn build(&self, id: usize) -> CLIRuntimeCluster<'cli> {
CLIRuntimeCluster {
id,
next_port: Rc::new(RefCell::new(0)),
Expand Down
Loading

0 comments on commit 58321d4

Please sign in to comment.