From 09d6d44eafc866881e73719813fe9edeb49ca2a6 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Tue, 13 Aug 2024 11:06:45 -0700 Subject: [PATCH] refactor(hydroflow_plus)!: start rearranging stages of flow compilation to prepare for trybuild approach (#1358) --- .../hydroflow_plus/quickstart/clusters.mdx | 4 +- hydroflow_plus/src/builder.rs | 72 +++++++++++++------ hydroflow_plus/src/cycle.rs | 7 +- hydroflow_plus/src/persist_pullup.rs | 8 +-- hydroflow_plus/src/profiler.rs | 4 +- hydroflow_plus/src/properties.rs | 2 +- hydroflow_plus/src/stream.rs | 13 ++-- .../examples/perf_compute_pi.rs | 3 +- hydroflow_plus_test/src/cluster/compute_pi.rs | 13 ++-- .../src/cluster/many_to_many.rs | 6 +- hydroflow_plus_test/src/cluster/map_reduce.rs | 8 +-- .../src/cluster/simple_cluster.rs | 6 +- .../src/distributed/first_ten.rs | 6 +- .../src/distributed/networked.rs | 6 +- hydroflow_plus_test/src/local/chat_app.rs | 2 +- hydroflow_plus_test/src/local/compute_pi.rs | 2 +- hydroflow_plus_test/src/local/count_elems.rs | 2 +- hydroflow_plus_test/src/local/first_ten.rs | 2 +- .../src/local/graph_reachability.rs | 2 +- hydroflow_plus_test/src/local/negation.rs | 4 +- hydroflow_plus_test/src/local/teed_join.rs | 4 +- template/hydroflow_plus/flow/src/first_ten.rs | 2 +- .../flow/src/first_ten_distributed.rs | 6 +- 23 files changed, 103 insertions(+), 81 deletions(-) diff --git a/docs/docs/hydroflow_plus/quickstart/clusters.mdx b/docs/docs/hydroflow_plus/quickstart/clusters.mdx index 5fc47c7a5d1..3407305b4d8 100644 --- a/docs/docs/hydroflow_plus/quickstart/clusters.mdx +++ b/docs/docs/hydroflow_plus/quickstart/clusters.mdx @@ -60,8 +60,8 @@ pub fn broadcast_runtime<'a>( cli: RuntimeData<&'a HydroCLI>, ) -> impl Quoted<'a, Hydroflow<'a>> { broadcast(&flow, &cli, &cli); - flow.extract() - .optimize_default() + flow.with_default_optimize() + .compile() .with_dynamic_id(q!(cli.meta.subgraph_id)) } ``` diff --git a/hydroflow_plus/src/builder.rs b/hydroflow_plus/src/builder.rs index fe658fc1e54..e40bb086d62 100644 --- a/hydroflow_plus/src/builder.rs +++ b/hydroflow_plus/src/builder.rs @@ -7,7 +7,7 @@ use std::time::Duration; use hydroflow::bytes::Bytes; use hydroflow::futures::stream::Stream as FuturesStream; use hydroflow::lattices::collections::MapMapValues; -use hydroflow_lang::graph::eliminate_extra_unions_tees; +use hydroflow_lang::graph::{eliminate_extra_unions_tees, HydroflowGraph}; use proc_macro2::Span; use stageleft::*; use syn::parse_quote; @@ -19,8 +19,13 @@ use crate::location::{ use crate::stream::{Async, Windowed}; use crate::{HfCompiled, HfCycle, RuntimeContext, Stream}; +/// Tracks the leaves of the dataflow IR. This is referenced by +/// `Stream` and `HfCycle` to build the IR. The inner option will +/// be set to `None` when this builder is finalized. +pub type FlowLeaves = Rc>>>; + pub struct FlowBuilder<'a, D: LocalDeploy<'a> + ?Sized> { - ir_leaves: Rc>>, + ir_leaves: FlowLeaves, nodes: RefCell>, clusters: RefCell>, cycle_ids: RefCell>, @@ -33,6 +38,10 @@ pub struct FlowBuilder<'a, D: LocalDeploy<'a> + ?Sized> { next_node_id: RefCell, + /// Tracks whether this flow has been finalized; it is an error to + /// drop without finalizing. + finalized: bool, + /// 'a on a FlowBuilder is used to ensure that staged code does not /// capture more data that it is allowed to; 'a is generated at the /// entrypoint of the staged code and we keep it invariant here @@ -40,6 +49,14 @@ pub struct FlowBuilder<'a, D: LocalDeploy<'a> + ?Sized> { _phantom: PhantomData<&'a mut &'a ()>, } +impl<'a, D: LocalDeploy<'a> + ?Sized> Drop for FlowBuilder<'a, D> { + fn drop(&mut self) { + if !self.finalized { + panic!("Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."); + } + } +} + impl<'a, D: LocalDeploy<'a>> QuotedContext for FlowBuilder<'a, D> { fn create() -> Self { FlowBuilder::new() @@ -50,26 +67,39 @@ impl<'a, D: LocalDeploy<'a>> FlowBuilder<'a, D> { #[allow(clippy::new_without_default)] pub fn new() -> FlowBuilder<'a, D> { FlowBuilder { - ir_leaves: Rc::new(RefCell::new(Vec::new())), + ir_leaves: Rc::new(RefCell::new(Some(Vec::new()))), nodes: RefCell::new(Vec::new()), clusters: RefCell::new(Vec::new()), cycle_ids: RefCell::new(HashMap::new()), meta: RefCell::new(Default::default()), next_node_id: RefCell::new(0), + finalized: false, _phantom: PhantomData, } } - pub fn extract(self) -> BuiltFlow<'a, D> { + pub fn finalize(mut self) -> BuiltFlow<'a, D> { + self.finalized = true; BuiltFlow { - ir: self.ir_leaves.borrow().clone(), - nodes: self.nodes.into_inner(), - clusters: self.clusters.into_inner(), + ir: self.ir_leaves.borrow_mut().take().unwrap(), + nodes: self.nodes.replace(vec![]), + clusters: self.clusters.replace(vec![]), _phantom: PhantomData, } } - pub fn ir_leaves(&self) -> &Rc>> { + pub fn with_default_optimize(self) -> BuiltFlow<'a, D> { + self.finalize().with_default_optimize() + } + + pub fn optimize_with( + self, + f: impl FnOnce(Vec) -> Vec, + ) -> BuiltFlow<'a, D> { + self.finalize().optimize_with(f) + } + + pub fn ir_leaves(&self) -> &FlowLeaves { &self.ir_leaves } @@ -313,7 +343,7 @@ impl<'a, D: LocalDeploy<'a>> BuiltFlow<'a, D> { } } -fn build_inner<'a, D: LocalDeploy<'a>>(me: BuiltFlow<'a, D>) -> HfCompiled<'a, D::GraphId> { +fn build_inner<'a, D: LocalDeploy<'a>>(me: BuiltFlow<'a, D>) -> BTreeMap { let mut builders = BTreeMap::new(); let mut built_tees = HashMap::new(); let mut next_stmt_id = 0; @@ -321,26 +351,22 @@ fn build_inner<'a, D: LocalDeploy<'a>>(me: BuiltFlow<'a, D>) -> HfCompiled<'a, D leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id); } - HfCompiled { - hydroflow_ir: builders.map_values(|v| { - let (mut flat_graph, _, _) = v.build(); - eliminate_extra_unions_tees(&mut flat_graph); - flat_graph - }), - _phantom: PhantomData, - } + builders.map_values(|v| { + let (mut flat_graph, _, _) = v.build(); + eliminate_extra_unions_tees(&mut flat_graph); + flat_graph + }) } impl<'a, D: LocalDeploy<'a>> BuiltFlow<'a, D> { - pub fn no_optimize(self) -> HfCompiled<'a, D::GraphId> { - build_inner(self) + pub fn compile(self) -> HfCompiled<'a, D::GraphId> { + HfCompiled { + hydroflow_ir: build_inner(self), + _phantom: PhantomData, + } } pub fn with_default_optimize(self) -> BuiltFlow<'a, D> { self.optimize_with(super::persist_pullup::persist_pullup) } - - pub fn optimize_default(self) -> HfCompiled<'a, D::GraphId> { - self.with_default_optimize().no_optimize() - } } diff --git a/hydroflow_plus/src/cycle.rs b/hydroflow_plus/src/cycle.rs index 29e1329d86b..d0fdd321114 100644 --- a/hydroflow_plus/src/cycle.rs +++ b/hydroflow_plus/src/cycle.rs @@ -1,7 +1,6 @@ -use std::cell::RefCell; use std::marker::PhantomData; -use std::rc::Rc; +use crate::builder::FlowLeaves; use crate::ir::HfPlusLeaf; use crate::location::Location; use crate::Stream; @@ -13,7 +12,7 @@ use crate::Stream; pub struct HfCycle<'a, T, W, N: Location + Clone> { pub(crate) ident: syn::Ident, pub(crate) node: N, - pub(crate) ir_leaves: Rc>>, + pub(crate) ir_leaves: FlowLeaves, pub(crate) _phantom: PhantomData<(&'a mut &'a (), T, W)>, } @@ -21,7 +20,7 @@ impl<'a, T, W, N: Location + Clone> HfCycle<'a, T, W, N> { pub fn complete(self, stream: Stream<'a, T, W, N>) { let ident = self.ident; - self.ir_leaves.borrow_mut().push(HfPlusLeaf::CycleSink { + self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a cycle to a flow that has already been finalized. No cycles can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { ident, location_id: self.node.id(), input: Box::new(stream.ir_node.into_inner()), diff --git a/hydroflow_plus/src/persist_pullup.rs b/hydroflow_plus/src/persist_pullup.rs index 2b531a8ae21..529ae5307b8 100644 --- a/hydroflow_plus/src/persist_pullup.rs +++ b/hydroflow_plus/src/persist_pullup.rs @@ -36,14 +36,14 @@ mod tests { .map(q!(|v| v + 1)) .for_each(q!(|n| println!("{}", n))); - let built = flow.extract(); + let built = flow.finalize(); insta::assert_debug_snapshot!(built.ir()); let optimized = built.optimize_with(super::persist_pullup); insta::assert_debug_snapshot!(optimized.ir()); - for (id, graph) in optimized.no_optimize().hydroflow_ir() { + for (id, graph) in optimized.compile().hydroflow_ir() { insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, { insta::assert_display_snapshot!(graph.surface_syntax_string()); }); @@ -64,7 +64,7 @@ mod tests { before_tee.for_each(q!(|n| println!("{}", n))); - let built = flow.extract(); + let built = flow.finalize(); insta::assert_debug_snapshot!(built.ir()); @@ -72,7 +72,7 @@ mod tests { insta::assert_debug_snapshot!(optimized.ir()); - for (id, graph) in optimized.no_optimize().hydroflow_ir() { + for (id, graph) in optimized.compile().hydroflow_ir() { insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, { insta::assert_display_snapshot!(graph.surface_syntax_string()); }); diff --git a/hydroflow_plus/src/profiler.rs b/hydroflow_plus/src/profiler.rs index ef5396038eb..94fa78fd970 100644 --- a/hydroflow_plus/src/profiler.rs +++ b/hydroflow_plus/src/profiler.rs @@ -93,13 +93,13 @@ mod tests { .for_each(q!(|n| println!("{}", n))); let runtime_context = flow.runtime_context(); - let built = flow.extract(); + let built = flow.finalize(); insta::assert_debug_snapshot!(&built.ir); // Print mermaid // let mut mermaid_config = WriteConfig {op_text_no_imports: true, ..Default::default()}; - // for (_, ir) in built.clone().optimize_default().hydroflow_ir() { + // for (_, ir) in built.clone().with_default_optimize().compile().hydroflow_ir() { // println!("{}", ir.to_mermaid(&mermaid_config)); // } diff --git a/hydroflow_plus/src/properties.rs b/hydroflow_plus/src/properties.rs index 540537b882a..c16a9c7d5a4 100644 --- a/hydroflow_plus/src/properties.rs +++ b/hydroflow_plus/src/properties.rs @@ -102,7 +102,7 @@ mod tests { .for_each(q!(|(string, count)| println!("{}: {}", string, count))); let built = flow - .extract() + .finalize() .optimize_with(|ir| properties_optimize(ir, &database)) .with_default_optimize(); diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 8b2089b6901..18877c73e15 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -14,6 +14,7 @@ use serde::Serialize; use stageleft::{q, IntoQuotedMut, Quoted}; use syn::parse_quote; +use crate::builder::FlowLeaves; use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource}; use crate::location::{Cluster, HfSend, Location}; @@ -42,18 +43,14 @@ pub struct Windowed {} pub struct Stream<'a, T, W, N: Location + Clone> { node: N, - ir_leaves: Rc>>, + ir_leaves: FlowLeaves, pub(crate) ir_node: RefCell, _phantom: PhantomData<(&'a mut &'a (), T, W)>, } impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> { - pub(crate) fn new( - node: N, - ir_leaves: Rc>>, - ir_node: HfPlusNode, - ) -> Self { + pub(crate) fn new(node: N, ir_leaves: FlowLeaves, ir_node: HfPlusNode) -> Self { Stream { node, ir_leaves, @@ -218,14 +215,14 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> { } pub fn for_each(self, f: impl IntoQuotedMut<'a, F>) { - self.ir_leaves.borrow_mut().push(HfPlusLeaf::ForEach { + self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::ForEach { input: Box::new(self.ir_node.into_inner()), f: f.splice().into(), }); } pub fn dest_sink + 'a>(self, sink: impl Quoted<'a, S>) { - self.ir_leaves.borrow_mut().push(HfPlusLeaf::DestSink { + self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::DestSink { sink: sink.splice().into(), input: Box::new(self.ir_node.into_inner()), }); diff --git a/hydroflow_plus_test/examples/perf_compute_pi.rs b/hydroflow_plus_test/examples/perf_compute_pi.rs index 01567ffa41e..20ed1d1a756 100644 --- a/hydroflow_plus_test/examples/perf_compute_pi.rs +++ b/hydroflow_plus_test/examples/perf_compute_pi.rs @@ -89,8 +89,7 @@ async fn main() { // Uncomment below, change .bin("counter_compute_pi") in order to track cardinality per operation // let runtime_context = builder.runtime_context(); - // dbg!(builder.extract() - // .with_default_optimize() + // dbg!(builder.with_default_optimize() // .optimize_with(|ir| profiling(ir, runtime_context, RuntimeData::new("FAKE"), RuntimeData::new("FAKE"))) // .ir()); diff --git a/hydroflow_plus_test/src/cluster/compute_pi.rs b/hydroflow_plus_test/src/cluster/compute_pi.rs index 85fc6fc9828..d0e2c796e9a 100644 --- a/hydroflow_plus_test/src/cluster/compute_pi.rs +++ b/hydroflow_plus_test/src/cluster/compute_pi.rs @@ -59,8 +59,8 @@ pub fn compute_pi_runtime<'a>( batch_size: RuntimeData<&'a usize>, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = compute_pi(&flow, &cli, &cli, batch_size); - flow.extract() - .optimize_default() + flow.with_default_optimize() + .compile() .with_dynamic_id(q!(cli.meta.subgraph_id)) } @@ -74,9 +74,8 @@ pub fn cardinality_compute_pi_runtime<'a>( ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = compute_pi(&flow, &cli, &cli, batch_size); let runtime_context = flow.runtime_context(); - flow.extract() - .optimize_with(|ir| profiling(ir, runtime_context, counters, counter_queue)) - .no_optimize() + flow.optimize_with(|ir| profiling(ir, runtime_context, counters, counter_queue)) + .compile() .with_dynamic_id(q!(cli.meta.subgraph_id)) } @@ -94,11 +93,11 @@ mod tests { &RuntimeData::new("FAKE"), RuntimeData::new("FAKE"), ); - let built = builder.extract(); + let built = builder.finalize(); insta::assert_debug_snapshot!(built.ir()); - for (id, ir) in built.optimize_default().hydroflow_ir() { + for (id, ir) in built.with_default_optimize().compile().hydroflow_ir() { insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, { insta::assert_display_snapshot!(ir.surface_syntax_string()); }); diff --git a/hydroflow_plus_test/src/cluster/many_to_many.rs b/hydroflow_plus_test/src/cluster/many_to_many.rs index fc5b31e279c..039d6ec28f1 100644 --- a/hydroflow_plus_test/src/cluster/many_to_many.rs +++ b/hydroflow_plus_test/src/cluster/many_to_many.rs @@ -25,8 +25,8 @@ pub fn many_to_many_runtime<'a>( cli: RuntimeData<&'a HydroCLI>, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = many_to_many(&flow, &cli); - flow.extract() - .optimize_default() + flow.with_default_optimize() + .compile() .with_dynamic_id(q!(cli.meta.subgraph_id)) } @@ -59,7 +59,7 @@ mod tests { }), ); - insta::assert_debug_snapshot!(builder.extract().ir()); + insta::assert_debug_snapshot!(builder.finalize().ir()); let mut deployment = deployment.into_inner(); diff --git a/hydroflow_plus_test/src/cluster/map_reduce.rs b/hydroflow_plus_test/src/cluster/map_reduce.rs index 8169a141fd7..e6be4f69687 100644 --- a/hydroflow_plus_test/src/cluster/map_reduce.rs +++ b/hydroflow_plus_test/src/cluster/map_reduce.rs @@ -44,8 +44,8 @@ pub fn map_reduce_runtime<'a>( cli: RuntimeData<&'a HydroCLI>, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = map_reduce(&flow, &cli, &cli); - flow.extract() - .optimize_default() + flow.with_default_optimize() + .compile() .with_dynamic_id(q!(cli.meta.subgraph_id)) } @@ -62,11 +62,11 @@ mod tests { &RuntimeData::new("FAKE"), &RuntimeData::new("FAKE"), ); - let built = builder.extract(); + let built = builder.finalize(); insta::assert_debug_snapshot!(built.ir()); - for (id, ir) in built.optimize_default().hydroflow_ir() { + for (id, ir) in built.with_default_optimize().compile().hydroflow_ir() { insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, { insta::assert_display_snapshot!(ir.surface_syntax_string()); }); diff --git a/hydroflow_plus_test/src/cluster/simple_cluster.rs b/hydroflow_plus_test/src/cluster/simple_cluster.rs index 8c533c504ee..345c61bbc18 100644 --- a/hydroflow_plus_test/src/cluster/simple_cluster.rs +++ b/hydroflow_plus_test/src/cluster/simple_cluster.rs @@ -36,8 +36,8 @@ pub fn simple_cluster_runtime<'a>( cli: RuntimeData<&'a HydroCLI>, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = simple_cluster(&flow, &cli, &cli); - flow.extract() - .optimize_default() + flow.with_default_optimize() + .compile() .with_dynamic_id(q!(cli.meta.subgraph_id)) } @@ -79,7 +79,7 @@ mod tests { }), ); - insta::assert_debug_snapshot!(builder.extract().ir()); + insta::assert_debug_snapshot!(builder.finalize().ir()); let mut deployment = deployment.into_inner(); diff --git a/hydroflow_plus_test/src/distributed/first_ten.rs b/hydroflow_plus_test/src/distributed/first_ten.rs index f4ba715c9bd..a324119ea38 100644 --- a/hydroflow_plus_test/src/distributed/first_ten.rs +++ b/hydroflow_plus_test/src/distributed/first_ten.rs @@ -32,8 +32,8 @@ pub fn first_ten_distributed_runtime<'a>( cli: RuntimeData<&'a HydroCLI>, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = first_ten_distributed(&flow, &cli); - flow.extract() - .optimize_default() + flow.with_default_optimize() + .compile() .with_dynamic_id(q!(cli.meta.subgraph_id)) } @@ -61,7 +61,7 @@ mod tests { ); // if we drop this, we drop the references to the deployment nodes - let built = builder.extract(); + let built = builder.finalize(); insta::assert_debug_snapshot!(built.ir()); diff --git a/hydroflow_plus_test/src/distributed/networked.rs b/hydroflow_plus_test/src/distributed/networked.rs index 8247516225b..19693d2c039 100644 --- a/hydroflow_plus_test/src/distributed/networked.rs +++ b/hydroflow_plus_test/src/distributed/networked.rs @@ -50,8 +50,8 @@ pub fn networked_basic_runtime<'a>( cli: RuntimeData<&'a HydroCLI>, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = networked_basic(&flow, &cli, &cli); - flow.extract() - .optimize_default() + flow.with_default_optimize() + .compile() .with_dynamic_id(q!(cli.meta.subgraph_id)) } @@ -92,7 +92,7 @@ mod tests { }), ); - insta::assert_debug_snapshot!(builder.extract().ir()); + insta::assert_debug_snapshot!(builder.finalize().ir()); let mut deployment = deployment.into_inner(); diff --git a/hydroflow_plus_test/src/local/chat_app.rs b/hydroflow_plus_test/src/local/chat_app.rs index 53a9864945d..8a4cbcbe2de 100644 --- a/hydroflow_plus_test/src/local/chat_app.rs +++ b/hydroflow_plus_test/src/local/chat_app.rs @@ -33,7 +33,7 @@ pub fn chat_app<'a>( output.send(t).unwrap(); })); - flow.extract().optimize_default() + flow.with_default_optimize().compile() } #[stageleft::runtime] diff --git a/hydroflow_plus_test/src/local/compute_pi.rs b/hydroflow_plus_test/src/local/compute_pi.rs index c04220bdb16..95de0466bab 100644 --- a/hydroflow_plus_test/src/local/compute_pi.rs +++ b/hydroflow_plus_test/src/local/compute_pi.rs @@ -49,5 +49,5 @@ pub fn compute_pi_runtime<'a>( batch_size: RuntimeData<&'a usize>, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = compute_pi(&flow, &(), batch_size); - flow.extract().optimize_default() + flow.with_default_optimize().compile() } diff --git a/hydroflow_plus_test/src/local/count_elems.rs b/hydroflow_plus_test/src/local/count_elems.rs index 2df45eb50c2..a949be645c9 100644 --- a/hydroflow_plus_test/src/local/count_elems.rs +++ b/hydroflow_plus_test/src/local/count_elems.rs @@ -21,7 +21,7 @@ pub fn count_elems<'a, T: 'a>( output.send(v).unwrap(); })); - flow.extract().optimize_default() + flow.with_default_optimize().compile() } #[stageleft::runtime] diff --git a/hydroflow_plus_test/src/local/first_ten.rs b/hydroflow_plus_test/src/local/first_ten.rs index ddcf92c7fdf..2ca7ecb4fdb 100644 --- a/hydroflow_plus_test/src/local/first_ten.rs +++ b/hydroflow_plus_test/src/local/first_ten.rs @@ -15,7 +15,7 @@ pub fn first_ten_runtime<'a>( flow: FlowBuilder<'a, SingleProcessGraph>, ) -> impl Quoted<'a, Hydroflow<'a>> { first_ten(&flow, &()); - flow.extract().optimize_default() + flow.with_default_optimize().compile() } #[stageleft::runtime] diff --git a/hydroflow_plus_test/src/local/graph_reachability.rs b/hydroflow_plus_test/src/local/graph_reachability.rs index 7e6758722db..1030711ee89 100644 --- a/hydroflow_plus_test/src/local/graph_reachability.rs +++ b/hydroflow_plus_test/src/local/graph_reachability.rs @@ -29,7 +29,7 @@ pub fn graph_reachability<'a>( reached_out.send(v).unwrap(); })); - flow.extract().optimize_default() + flow.with_default_optimize().compile() } #[stageleft::runtime] diff --git a/hydroflow_plus_test/src/local/negation.rs b/hydroflow_plus_test/src/local/negation.rs index c9d80287a1f..9fc10979bdb 100644 --- a/hydroflow_plus_test/src/local/negation.rs +++ b/hydroflow_plus_test/src/local/negation.rs @@ -25,7 +25,7 @@ pub fn test_difference<'a>( output.send(v).unwrap(); })); - flow.extract().optimize_default() + flow.with_default_optimize().compile() } #[stageleft::entry] @@ -52,7 +52,7 @@ pub fn test_anti_join<'a>( output.send(v.0).unwrap(); })); - flow.extract().optimize_default() + flow.with_default_optimize().compile() } #[stageleft::runtime] diff --git a/hydroflow_plus_test/src/local/teed_join.rs b/hydroflow_plus_test/src/local/teed_join.rs index 488e5a4883e..3540b4d4b3f 100644 --- a/hydroflow_plus_test/src/local/teed_join.rs +++ b/hydroflow_plus_test/src/local/teed_join.rs @@ -36,8 +36,8 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( output.send(v).unwrap(); })); - flow.extract() - .optimize_default() + flow.with_default_optimize() + .compile() .with_dynamic_id(subgraph_id) } diff --git a/template/hydroflow_plus/flow/src/first_ten.rs b/template/hydroflow_plus/flow/src/first_ten.rs index 84479efa324..a00bf0e17f0 100644 --- a/template/hydroflow_plus/flow/src/first_ten.rs +++ b/template/hydroflow_plus/flow/src/first_ten.rs @@ -18,5 +18,5 @@ pub fn first_ten_runtime<'a>( flow: FlowBuilder<'a, SingleProcessGraph>, ) -> impl Quoted<'a, Hydroflow<'a>> { first_ten(&flow, &()); // &() for a single process graph. - flow.extract().optimize_default() // : impl Quoted<'a, Hydroflow<'a>> + flow.with_default_optimize().compile() // : impl Quoted<'a, Hydroflow<'a>> } diff --git a/template/hydroflow_plus/flow/src/first_ten_distributed.rs b/template/hydroflow_plus/flow/src/first_ten_distributed.rs index 41167c99a52..0b06e993366 100644 --- a/template/hydroflow_plus/flow/src/first_ten_distributed.rs +++ b/template/hydroflow_plus/flow/src/first_ten_distributed.rs @@ -25,8 +25,8 @@ pub fn first_ten_distributed_runtime<'a>( cli: RuntimeData<&'a HydroCLI>, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = first_ten_distributed(&flow, &cli); - flow.extract() - .optimize_default() + flow.with_default_optimize() + .compile() .with_dynamic_id(q!(cli.meta.subgraph_id)) } @@ -55,6 +55,8 @@ mod tests { }), ); + let _built = flow.with_default_optimize(); + deployment.deploy().await.unwrap(); let second_process_stdout = second_process.stdout().await;