Skip to content

Commit

Permalink
refactor(hydroflow_plus)!: start rearranging stages of flow compilati…
Browse files Browse the repository at this point in the history
…on to prepare for trybuild approach (#1358)
  • Loading branch information
shadaj committed Aug 13, 2024
1 parent a88a550 commit 09d6d44
Show file tree
Hide file tree
Showing 23 changed files with 103 additions and 81 deletions.
4 changes: 2 additions & 2 deletions docs/docs/hydroflow_plus/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub fn broadcast_runtime<'a>(
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
broadcast(&flow, &cli, &cli);
flow.extract()
.optimize_default()
flow.with_default_optimize()
.compile()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}
```
Expand Down
72 changes: 49 additions & 23 deletions hydroflow_plus/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::Duration;
use hydroflow::bytes::Bytes;
use hydroflow::futures::stream::Stream as FuturesStream;
use hydroflow::lattices::collections::MapMapValues;
use hydroflow_lang::graph::eliminate_extra_unions_tees;
use hydroflow_lang::graph::{eliminate_extra_unions_tees, HydroflowGraph};
use proc_macro2::Span;
use stageleft::*;
use syn::parse_quote;
Expand All @@ -19,8 +19,13 @@ use crate::location::{
use crate::stream::{Async, Windowed};
use crate::{HfCompiled, HfCycle, RuntimeContext, Stream};

/// Tracks the leaves of the dataflow IR. This is referenced by
/// `Stream` and `HfCycle` to build the IR. The inner option will
/// be set to `None` when this builder is finalized.
pub type FlowLeaves = Rc<RefCell<Option<Vec<HfPlusLeaf>>>>;

pub struct FlowBuilder<'a, D: LocalDeploy<'a> + ?Sized> {
ir_leaves: Rc<RefCell<Vec<HfPlusLeaf>>>,
ir_leaves: FlowLeaves,
nodes: RefCell<Vec<D::Process>>,
clusters: RefCell<Vec<D::Cluster>>,
cycle_ids: RefCell<HashMap<usize, usize>>,
Expand All @@ -33,13 +38,25 @@ pub struct FlowBuilder<'a, D: LocalDeploy<'a> + ?Sized> {

next_node_id: RefCell<usize>,

/// Tracks whether this flow has been finalized; it is an error to
/// drop without finalizing.
finalized: bool,

/// 'a on a FlowBuilder is used to ensure that staged code does not
/// capture more data that it is allowed to; 'a is generated at the
/// entrypoint of the staged code and we keep it invariant here
/// to enforce the appropriate constraints
_phantom: PhantomData<&'a mut &'a ()>,
}

impl<'a, D: LocalDeploy<'a> + ?Sized> Drop for FlowBuilder<'a, D> {
fn drop(&mut self) {
if !self.finalized {
panic!("Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`.");
}
}
}

impl<'a, D: LocalDeploy<'a>> QuotedContext for FlowBuilder<'a, D> {
fn create() -> Self {
FlowBuilder::new()
Expand All @@ -50,26 +67,39 @@ impl<'a, D: LocalDeploy<'a>> FlowBuilder<'a, D> {
#[allow(clippy::new_without_default)]
pub fn new() -> FlowBuilder<'a, D> {
FlowBuilder {
ir_leaves: Rc::new(RefCell::new(Vec::new())),
ir_leaves: Rc::new(RefCell::new(Some(Vec::new()))),
nodes: RefCell::new(Vec::new()),
clusters: RefCell::new(Vec::new()),
cycle_ids: RefCell::new(HashMap::new()),
meta: RefCell::new(Default::default()),
next_node_id: RefCell::new(0),
finalized: false,
_phantom: PhantomData,
}
}

pub fn extract(self) -> BuiltFlow<'a, D> {
pub fn finalize(mut self) -> BuiltFlow<'a, D> {
self.finalized = true;
BuiltFlow {
ir: self.ir_leaves.borrow().clone(),
nodes: self.nodes.into_inner(),
clusters: self.clusters.into_inner(),
ir: self.ir_leaves.borrow_mut().take().unwrap(),
nodes: self.nodes.replace(vec![]),
clusters: self.clusters.replace(vec![]),
_phantom: PhantomData,
}
}

pub fn ir_leaves(&self) -> &Rc<RefCell<Vec<HfPlusLeaf>>> {
pub fn with_default_optimize(self) -> BuiltFlow<'a, D> {
self.finalize().with_default_optimize()
}

pub fn optimize_with(
self,
f: impl FnOnce(Vec<HfPlusLeaf>) -> Vec<HfPlusLeaf>,
) -> BuiltFlow<'a, D> {
self.finalize().optimize_with(f)
}

pub fn ir_leaves(&self) -> &FlowLeaves {
&self.ir_leaves
}

Expand Down Expand Up @@ -313,34 +343,30 @@ impl<'a, D: LocalDeploy<'a>> BuiltFlow<'a, D> {
}
}

fn build_inner<'a, D: LocalDeploy<'a>>(me: BuiltFlow<'a, D>) -> HfCompiled<'a, D::GraphId> {
fn build_inner<'a, D: LocalDeploy<'a>>(me: BuiltFlow<'a, D>) -> BTreeMap<usize, HydroflowGraph> {
let mut builders = BTreeMap::new();
let mut built_tees = HashMap::new();
let mut next_stmt_id = 0;
for leaf in me.ir {
leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
}

HfCompiled {
hydroflow_ir: builders.map_values(|v| {
let (mut flat_graph, _, _) = v.build();
eliminate_extra_unions_tees(&mut flat_graph);
flat_graph
}),
_phantom: PhantomData,
}
builders.map_values(|v| {
let (mut flat_graph, _, _) = v.build();
eliminate_extra_unions_tees(&mut flat_graph);
flat_graph
})
}

impl<'a, D: LocalDeploy<'a>> BuiltFlow<'a, D> {
pub fn no_optimize(self) -> HfCompiled<'a, D::GraphId> {
build_inner(self)
pub fn compile(self) -> HfCompiled<'a, D::GraphId> {
HfCompiled {
hydroflow_ir: build_inner(self),
_phantom: PhantomData,
}
}

pub fn with_default_optimize(self) -> BuiltFlow<'a, D> {
self.optimize_with(super::persist_pullup::persist_pullup)
}

pub fn optimize_default(self) -> HfCompiled<'a, D::GraphId> {
self.with_default_optimize().no_optimize()
}
}
7 changes: 3 additions & 4 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;

use crate::builder::FlowLeaves;
use crate::ir::HfPlusLeaf;
use crate::location::Location;
use crate::Stream;
Expand All @@ -13,15 +12,15 @@ use crate::Stream;
pub struct HfCycle<'a, T, W, N: Location + Clone> {
pub(crate) ident: syn::Ident,
pub(crate) node: N,
pub(crate) ir_leaves: Rc<RefCell<Vec<HfPlusLeaf>>>,
pub(crate) ir_leaves: FlowLeaves,
pub(crate) _phantom: PhantomData<(&'a mut &'a (), T, W)>,
}

impl<'a, T, W, N: Location + Clone> HfCycle<'a, T, W, N> {
pub fn complete(self, stream: Stream<'a, T, W, N>) {
let ident = self.ident;

self.ir_leaves.borrow_mut().push(HfPlusLeaf::CycleSink {
self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a cycle to a flow that has already been finalized. No cycles can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_id: self.node.id(),
input: Box::new(stream.ir_node.into_inner()),
Expand Down
8 changes: 4 additions & 4 deletions hydroflow_plus/src/persist_pullup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ mod tests {
.map(q!(|v| v + 1))
.for_each(q!(|n| println!("{}", n)));

let built = flow.extract();
let built = flow.finalize();

insta::assert_debug_snapshot!(built.ir());

let optimized = built.optimize_with(super::persist_pullup);

insta::assert_debug_snapshot!(optimized.ir());
for (id, graph) in optimized.no_optimize().hydroflow_ir() {
for (id, graph) in optimized.compile().hydroflow_ir() {
insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
insta::assert_display_snapshot!(graph.surface_syntax_string());
});
Expand All @@ -64,15 +64,15 @@ mod tests {

before_tee.for_each(q!(|n| println!("{}", n)));

let built = flow.extract();
let built = flow.finalize();

insta::assert_debug_snapshot!(built.ir());

let optimized = built.optimize_with(super::persist_pullup);

insta::assert_debug_snapshot!(optimized.ir());

for (id, graph) in optimized.no_optimize().hydroflow_ir() {
for (id, graph) in optimized.compile().hydroflow_ir() {
insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
insta::assert_display_snapshot!(graph.surface_syntax_string());
});
Expand Down
4 changes: 2 additions & 2 deletions hydroflow_plus/src/profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ mod tests {
.for_each(q!(|n| println!("{}", n)));

let runtime_context = flow.runtime_context();
let built = flow.extract();
let built = flow.finalize();

insta::assert_debug_snapshot!(&built.ir);

// Print mermaid
// let mut mermaid_config = WriteConfig {op_text_no_imports: true, ..Default::default()};
// for (_, ir) in built.clone().optimize_default().hydroflow_ir() {
// for (_, ir) in built.clone().with_default_optimize().compile().hydroflow_ir() {
// println!("{}", ir.to_mermaid(&mermaid_config));
// }

Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/src/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ mod tests {
.for_each(q!(|(string, count)| println!("{}: {}", string, count)));

let built = flow
.extract()
.finalize()
.optimize_with(|ir| properties_optimize(ir, &database))
.with_default_optimize();

Expand Down
13 changes: 5 additions & 8 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use serde::Serialize;
use stageleft::{q, IntoQuotedMut, Quoted};
use syn::parse_quote;

use crate::builder::FlowLeaves;
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::location::{Cluster, HfSend, Location};

Expand Down Expand Up @@ -42,18 +43,14 @@ pub struct Windowed {}
pub struct Stream<'a, T, W, N: Location + Clone> {
node: N,

ir_leaves: Rc<RefCell<Vec<HfPlusLeaf>>>,
ir_leaves: FlowLeaves,
pub(crate) ir_node: RefCell<HfPlusNode>,

_phantom: PhantomData<(&'a mut &'a (), T, W)>,
}

impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> {
pub(crate) fn new(
node: N,
ir_leaves: Rc<RefCell<Vec<HfPlusLeaf>>>,
ir_node: HfPlusNode,
) -> Self {
pub(crate) fn new(node: N, ir_leaves: FlowLeaves, ir_node: HfPlusNode) -> Self {
Stream {
node,
ir_leaves,
Expand Down Expand Up @@ -218,14 +215,14 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> {
}

pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F>) {
self.ir_leaves.borrow_mut().push(HfPlusLeaf::ForEach {
self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::ForEach {
input: Box::new(self.ir_node.into_inner()),
f: f.splice().into(),
});
}

pub fn dest_sink<S: Unpin + Sink<T> + 'a>(self, sink: impl Quoted<'a, S>) {
self.ir_leaves.borrow_mut().push(HfPlusLeaf::DestSink {
self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::DestSink {
sink: sink.splice().into(),
input: Box::new(self.ir_node.into_inner()),
});
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus_test/examples/perf_compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ async fn main() {

// Uncomment below, change .bin("counter_compute_pi") in order to track cardinality per operation
// let runtime_context = builder.runtime_context();
// dbg!(builder.extract()
// .with_default_optimize()
// dbg!(builder.with_default_optimize()
// .optimize_with(|ir| profiling(ir, runtime_context, RuntimeData::new("FAKE"), RuntimeData::new("FAKE")))
// .ir());

Expand Down
13 changes: 6 additions & 7 deletions hydroflow_plus_test/src/cluster/compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ pub fn compute_pi_runtime<'a>(
batch_size: RuntimeData<&'a usize>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = compute_pi(&flow, &cli, &cli, batch_size);
flow.extract()
.optimize_default()
flow.with_default_optimize()
.compile()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}

Expand All @@ -74,9 +74,8 @@ pub fn cardinality_compute_pi_runtime<'a>(
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = compute_pi(&flow, &cli, &cli, batch_size);
let runtime_context = flow.runtime_context();
flow.extract()
.optimize_with(|ir| profiling(ir, runtime_context, counters, counter_queue))
.no_optimize()
flow.optimize_with(|ir| profiling(ir, runtime_context, counters, counter_queue))
.compile()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}

Expand All @@ -94,11 +93,11 @@ mod tests {
&RuntimeData::new("FAKE"),
RuntimeData::new("FAKE"),
);
let built = builder.extract();
let built = builder.finalize();

insta::assert_debug_snapshot!(built.ir());

for (id, ir) in built.optimize_default().hydroflow_ir() {
for (id, ir) in built.with_default_optimize().compile().hydroflow_ir() {
insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
insta::assert_display_snapshot!(ir.surface_syntax_string());
});
Expand Down
6 changes: 3 additions & 3 deletions hydroflow_plus_test/src/cluster/many_to_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub fn many_to_many_runtime<'a>(
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = many_to_many(&flow, &cli);
flow.extract()
.optimize_default()
flow.with_default_optimize()
.compile()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}

Expand Down Expand Up @@ -59,7 +59,7 @@ mod tests {
}),
);

insta::assert_debug_snapshot!(builder.extract().ir());
insta::assert_debug_snapshot!(builder.finalize().ir());

let mut deployment = deployment.into_inner();

Expand Down
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/cluster/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub fn map_reduce_runtime<'a>(
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = map_reduce(&flow, &cli, &cli);
flow.extract()
.optimize_default()
flow.with_default_optimize()
.compile()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}

Expand All @@ -62,11 +62,11 @@ mod tests {
&RuntimeData::new("FAKE"),
&RuntimeData::new("FAKE"),
);
let built = builder.extract();
let built = builder.finalize();

insta::assert_debug_snapshot!(built.ir());

for (id, ir) in built.optimize_default().hydroflow_ir() {
for (id, ir) in built.with_default_optimize().compile().hydroflow_ir() {
insta::with_settings!({snapshot_suffix => format!("surface_graph_{id}")}, {
insta::assert_display_snapshot!(ir.surface_syntax_string());
});
Expand Down
6 changes: 3 additions & 3 deletions hydroflow_plus_test/src/cluster/simple_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub fn simple_cluster_runtime<'a>(
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = simple_cluster(&flow, &cli, &cli);
flow.extract()
.optimize_default()
flow.with_default_optimize()
.compile()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}

Expand Down Expand Up @@ -79,7 +79,7 @@ mod tests {
}),
);

insta::assert_debug_snapshot!(builder.extract().ir());
insta::assert_debug_snapshot!(builder.finalize().ir());

let mut deployment = deployment.into_inner();

Expand Down
Loading

0 comments on commit 09d6d44

Please sign in to comment.