Skip to content

Commit

Permalink
fix(hydroflow): fold<'tick> should always emit a value (#1407)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Aug 21, 2024
1 parent 46a8a2c commit 75dd4fb
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
source: hydroflow/tests/surface_fold.rs
expression: "df.meta_graph().unwrap().to_dot(& Default :: default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter(Vec::<usize>::new())", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) fold::<\l 'tick,\l>(\l || 0,\l |old: &mut usize, _: usize| {\l *old += 1;\l },\l)\l", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) tee()", shape=house, fillcolor="#ffff88"]
n4v1 [label="(n4v1) for_each(|_| {})", shape=house, fillcolor="#ffff88"]
n5v1 [label="(n5v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) cross_join_multiset()", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"]
n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n3v1
n1v1 -> n8v1
n3v1 -> n4v1
n3v1 -> n9v1
n5v1 -> n10v1
n6v1 -> n7v1
n8v1 -> n2v1 [color=red]
n9v1 -> n6v1 [label="1"]
n10v1 -> n6v1 [label="0"]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1
subgraph "cluster_sg_1v1_var_teed_fold" {
label="var teed_fold"
n1v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 1"
n2v1
n3v1
n4v1
subgraph "cluster_sg_2v1_var_teed_fold" {
label="var teed_fold"
n2v1
n3v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 1"
n6v1
n7v1
subgraph "cluster_sg_3v1_var_join_node" {
label="var join_node"
n6v1
}
}
subgraph "cluster n4v1" {
fillcolor="#dddddd"
style=filled
label = "sg_4v1\nstratum 0"
n5v1
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
---
source: hydroflow/tests/surface_fold.rs
expression: "df.meta_graph().unwrap().to_mermaid(& Default :: default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter(Vec::&lt;usize&gt;::new())</code>"/]:::pullClass
2v1[\"<div style=text-align:center>(2v1)</div> <code>fold::&lt;<br> 'tick,<br>&gt;(<br> || 0,<br> |old: &amp;mut usize, _: usize| {<br> *old += 1;<br> },<br>)</code>"/]:::pullClass
3v1[/"(3v1) <code>tee()</code>"\]:::pushClass
4v1[/"(4v1) <code>for_each(|_| {})</code>"\]:::pushClass
5v1[\"(5v1) <code>source_stream(items_recv)</code>"/]:::pullClass
6v1[\"(6v1) <code>cross_join_multiset()</code>"/]:::pullClass
7v1[/"(7v1) <code>for_each(|v| result_send.send(v).unwrap())</code>"\]:::pushClass
8v1["(8v1) <code>handoff</code>"]:::otherClass
9v1["(9v1) <code>handoff</code>"]:::otherClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
2v1-->3v1
1v1-->8v1
3v1-->4v1
3v1-->9v1
5v1-->10v1
6v1-->7v1
8v1--x2v1; linkStyle 6 stroke:red
9v1-->|1|6v1
10v1-->|0|6v1
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
subgraph sg_1v1_var_teed_fold ["var <tt>teed_fold</tt>"]
1v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 1"]
2v1
3v1
4v1
subgraph sg_2v1_var_teed_fold ["var <tt>teed_fold</tt>"]
2v1
3v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 1"]
6v1
7v1
subgraph sg_3v1_var_join_node ["var <tt>join_node</tt>"]
6v1
end
end
subgraph sg_4v1 ["sg_4v1 stratum 0"]
5v1
end
43 changes: 43 additions & 0 deletions hydroflow/tests/surface_fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,49 @@ pub fn test_fold_static() {
df.run_available(); // Should return quickly and not hang
}

#[multiplatform_test]
pub fn test_fold_static_join() {
let (items_send, items_recv) = hydroflow::util::unbounded_channel::<usize>();
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();

let mut df = hydroflow::hydroflow_syntax! {
teed_fold = source_iter(Vec::<usize>::new())
-> fold::<'tick>(|| 0, |old: &mut usize, _: usize| { *old += 1; })
-> tee();
teed_fold -> for_each(|_| {});
teed_fold -> [1]join_node;

source_stream(items_recv) -> [0]join_node;

join_node = cross_join_multiset();
join_node -> for_each(|v| result_send.send(v).unwrap());
};
assert_graphvis_snapshots!(df);

assert_eq!(
(TickInstant::new(0), 0),
(df.current_tick(), df.current_stratum())
);

items_send.send(0).unwrap();
df.run_available();

assert_eq!(
(TickInstant::new(1), 0),
(df.current_tick(), df.current_stratum())
);
assert_eq!(&[(0, 0)], &*collect_ready::<Vec<_>, _>(&mut result_recv));

items_send.send(1).unwrap();
df.run_available();

assert_eq!(
(TickInstant::new(2), 0),
(df.current_tick(), df.current_stratum())
);
assert_eq!(&[(1, 0)], &*collect_ready::<Vec<_>, _>(&mut result_recv));
}

#[multiplatform_test]
pub fn test_fold_flatten() {
// test pull
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/tests/surface_state_scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn test_fold_tick() {
(df.current_tick(), df.current_stratum())
);

assert_eq!(&[1], &*collect_ready::<Vec<_>, _>(&mut out_recv));
assert_eq!(&[1, 0, 0], &*collect_ready::<Vec<_>, _>(&mut out_recv));

df.run_available(); // Should return quickly and not hang
}
Expand Down
12 changes: 4 additions & 8 deletions hydroflow_lang/src/graph/ops/fold.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use quote::quote_spanned;

use super::{
DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints,
OperatorInstance, OperatorWriteOutput, Persistence, WriteContextArgs, RANGE_0, RANGE_1,
DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance,
OperatorWriteOutput, Persistence, WriteContextArgs, RANGE_0, RANGE_1,
};
use crate::diagnostic::{Diagnostic, Level};

Expand Down Expand Up @@ -144,12 +144,8 @@ pub const FOLD: OperatorConstraints = OperatorConstraints {
};
}
};
let write_iterator_after = if Persistence::Static == persistence {
quote_spanned! {op_span=>
#context.schedule_subgraph(#context.current_subgraph(), false);
}
} else {
Default::default()
let write_iterator_after = quote_spanned! {op_span=>
#context.schedule_subgraph(#context.current_subgraph(), false);
};

Ok(OperatorWriteOutput {
Expand Down

0 comments on commit 75dd4fb

Please sign in to comment.