diff --git a/hydroflow/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap new file mode 100644 index 00000000000..65ef0b0e745 --- /dev/null +++ b/hydroflow/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap @@ -0,0 +1,67 @@ +--- +source: hydroflow/tests/surface_fold.rs +expression: "df.meta_graph().unwrap().to_dot(& Default :: default())" +--- +digraph { + node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled]; + edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"]; + n1v1 [label="(n1v1) source_iter(Vec::::new())", shape=invhouse, fillcolor="#88aaff"] + n2v1 [label="(n2v1) fold::<\l 'tick,\l>(\l || 0,\l |old: &mut usize, _: usize| {\l *old += 1;\l },\l)\l", shape=invhouse, fillcolor="#88aaff"] + n3v1 [label="(n3v1) tee()", shape=house, fillcolor="#ffff88"] + n4v1 [label="(n4v1) for_each(|_| {})", shape=house, fillcolor="#ffff88"] + n5v1 [label="(n5v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"] + n6v1 [label="(n6v1) cross_join_multiset()", shape=invhouse, fillcolor="#88aaff"] + n7v1 [label="(n7v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"] + n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n2v1 -> n3v1 + n1v1 -> n8v1 + n3v1 -> n4v1 + n3v1 -> n9v1 + n5v1 -> n10v1 + n6v1 -> n7v1 + n8v1 -> n2v1 [color=red] + n9v1 -> n6v1 [label="1"] + n10v1 -> n6v1 [label="0"] + subgraph "cluster n1v1" { + fillcolor="#dddddd" + style=filled + label = "sg_1v1\nstratum 0" + n1v1 + subgraph "cluster_sg_1v1_var_teed_fold" { + label="var teed_fold" + n1v1 + } + } + subgraph "cluster n2v1" { + fillcolor="#dddddd" + style=filled + label = "sg_2v1\nstratum 1" + n2v1 + n3v1 + n4v1 + subgraph "cluster_sg_2v1_var_teed_fold" { + label="var teed_fold" + n2v1 + n3v1 + } + } + subgraph "cluster n3v1" { + fillcolor="#dddddd" + style=filled + label = "sg_3v1\nstratum 1" + n6v1 + n7v1 + subgraph "cluster_sg_3v1_var_join_node" { + label="var join_node" + n6v1 + } + } + subgraph "cluster n4v1" { + fillcolor="#dddddd" + style=filled + label = "sg_4v1\nstratum 0" + n5v1 + } +} diff --git a/hydroflow/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap new file mode 100644 index 00000000000..9d60d59a378 --- /dev/null +++ b/hydroflow/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap @@ -0,0 +1,54 @@ +--- +source: hydroflow/tests/surface_fold.rs +expression: "df.meta_graph().unwrap().to_mermaid(& Default :: default())" +--- +%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% +flowchart TD +classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre +classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre +classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre +linkStyle default stroke:#aaa +1v1[\"(1v1) source_iter(Vec::<usize>::new())"/]:::pullClass +2v1[\"
(2v1)
fold::<
'tick,
>(
|| 0,
|old: &mut usize, _: usize| {
*old += 1;
},
)
"/]:::pullClass +3v1[/"(3v1) tee()"\]:::pushClass +4v1[/"(4v1) for_each(|_| {})"\]:::pushClass +5v1[\"(5v1) source_stream(items_recv)"/]:::pullClass +6v1[\"(6v1) cross_join_multiset()"/]:::pullClass +7v1[/"(7v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass +8v1["(8v1) handoff"]:::otherClass +9v1["(9v1) handoff"]:::otherClass +10v1["(10v1) handoff"]:::otherClass +2v1-->3v1 +1v1-->8v1 +3v1-->4v1 +3v1-->9v1 +5v1-->10v1 +6v1-->7v1 +8v1--x2v1; linkStyle 6 stroke:red +9v1-->|1|6v1 +10v1-->|0|6v1 +subgraph sg_1v1 ["sg_1v1 stratum 0"] + 1v1 + subgraph sg_1v1_var_teed_fold ["var teed_fold"] + 1v1 + end +end +subgraph sg_2v1 ["sg_2v1 stratum 1"] + 2v1 + 3v1 + 4v1 + subgraph sg_2v1_var_teed_fold ["var teed_fold"] + 2v1 + 3v1 + end +end +subgraph sg_3v1 ["sg_3v1 stratum 1"] + 6v1 + 7v1 + subgraph sg_3v1_var_join_node ["var join_node"] + 6v1 + end +end +subgraph sg_4v1 ["sg_4v1 stratum 0"] + 5v1 +end diff --git a/hydroflow/tests/surface_fold.rs b/hydroflow/tests/surface_fold.rs index 11d206c57dc..3c00610e324 100644 --- a/hydroflow/tests/surface_fold.rs +++ b/hydroflow/tests/surface_fold.rs @@ -97,6 +97,49 @@ pub fn test_fold_static() { df.run_available(); // Should return quickly and not hang } +#[multiplatform_test] +pub fn test_fold_static_join() { + let (items_send, items_recv) = hydroflow::util::unbounded_channel::(); + let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); + + let mut df = hydroflow::hydroflow_syntax! { + teed_fold = source_iter(Vec::::new()) + -> fold::<'tick>(|| 0, |old: &mut usize, _: usize| { *old += 1; }) + -> tee(); + teed_fold -> for_each(|_| {}); + teed_fold -> [1]join_node; + + source_stream(items_recv) -> [0]join_node; + + join_node = cross_join_multiset(); + join_node -> for_each(|v| result_send.send(v).unwrap()); + }; + assert_graphvis_snapshots!(df); + + assert_eq!( + (TickInstant::new(0), 0), + (df.current_tick(), df.current_stratum()) + ); + + items_send.send(0).unwrap(); + df.run_available(); + + assert_eq!( + (TickInstant::new(1), 0), + (df.current_tick(), df.current_stratum()) + ); + assert_eq!(&[(0, 0)], &*collect_ready::, _>(&mut result_recv)); + + items_send.send(1).unwrap(); + df.run_available(); + + assert_eq!( + (TickInstant::new(2), 0), + (df.current_tick(), df.current_stratum()) + ); + assert_eq!(&[(1, 0)], &*collect_ready::, _>(&mut result_recv)); +} + #[multiplatform_test] pub fn test_fold_flatten() { // test pull diff --git a/hydroflow/tests/surface_state_scheduling.rs b/hydroflow/tests/surface_state_scheduling.rs index 7b1eed27e05..e89eb296305 100644 --- a/hydroflow/tests/surface_state_scheduling.rs +++ b/hydroflow/tests/surface_state_scheduling.rs @@ -62,7 +62,7 @@ pub fn test_fold_tick() { (df.current_tick(), df.current_stratum()) ); - assert_eq!(&[1], &*collect_ready::, _>(&mut out_recv)); + assert_eq!(&[1, 0, 0], &*collect_ready::, _>(&mut out_recv)); df.run_available(); // Should return quickly and not hang } diff --git a/hydroflow_lang/src/graph/ops/fold.rs b/hydroflow_lang/src/graph/ops/fold.rs index 747b239a195..0824033b336 100644 --- a/hydroflow_lang/src/graph/ops/fold.rs +++ b/hydroflow_lang/src/graph/ops/fold.rs @@ -1,8 +1,8 @@ use quote::quote_spanned; use super::{ - DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints, - OperatorInstance, OperatorWriteOutput, Persistence, WriteContextArgs, RANGE_0, RANGE_1, + DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, + OperatorWriteOutput, Persistence, WriteContextArgs, RANGE_0, RANGE_1, }; use crate::diagnostic::{Diagnostic, Level}; @@ -144,12 +144,8 @@ pub const FOLD: OperatorConstraints = OperatorConstraints { }; } }; - let write_iterator_after = if Persistence::Static == persistence { - quote_spanned! {op_span=> - #context.schedule_subgraph(#context.current_subgraph(), false); - } - } else { - Default::default() + let write_iterator_after = quote_spanned! {op_span=> + #context.schedule_subgraph(#context.current_subgraph(), false); }; Ok(OperatorWriteOutput {