Skip to content

Commit

Permalink
feat(hydroflow): add cross_singleton operator (#1373)
Browse files Browse the repository at this point in the history
This operator is necessary to eliminate performance bottlenecks in Paxos
where cross-products result in unnecessary cloning and lack of
short-circuit behavior results in values being cloned out of the
internal state of `reduce_keyed`.
  • Loading branch information
shadaj authored Aug 6, 2024
1 parent bb081d3 commit bd793e2
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
---
source: hydroflow/tests/surface_cross_singleton.rs
expression: "df.meta_graph().unwrap().to_dot(& Default :: default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"];
n1v1 [label="(n1v1) cross_singleton()", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_iter([1, 2, 3])", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) source_stream(single_rx)", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) for_each(|x| egress_tx.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n3v1 -> n1v1 [label="input"]
n2v1 -> n3v1
n4v1 -> n6v1
n1v1 -> n5v1
n6v1 -> n1v1 [label="single", color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n4v1
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 1"
n2v1
n3v1
n1v1
n5v1
subgraph "cluster_sg_2v1_var_join" {
label="var join"
n1v1
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
source: hydroflow/tests/surface_cross_singleton.rs
expression: "df.meta_graph().unwrap().to_mermaid(& Default :: default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>cross_singleton()</code>"/]:::pullClass
2v1[\"(2v1) <code>source_iter([1, 2, 3])</code>"/]:::pullClass
3v1[\"(3v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
4v1[\"(4v1) <code>source_stream(single_rx)</code>"/]:::pullClass
5v1[/"(5v1) <code>for_each(|x| egress_tx.send(x).unwrap())</code>"\]:::pushClass
6v1["(6v1) <code>handoff</code>"]:::otherClass
3v1-->|input|1v1
2v1-->3v1
4v1-->6v1
1v1-->5v1
6v1--x|single|1v1; linkStyle 4 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
4v1
end
subgraph sg_2v1 ["sg_2v1 stratum 1"]
2v1
3v1
1v1
5v1
subgraph sg_2v1_var_join ["var <tt>join</tt>"]
1v1
end
end

28 changes: 28 additions & 0 deletions hydroflow/tests/surface_cross_singleton.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use hydroflow::util::collect_ready;
use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
use multiplatform_test::multiplatform_test;

#[multiplatform_test]
pub fn test_basic() {
let (single_tx, single_rx) = hydroflow::util::unbounded_channel::<()>();
let (egress_tx, mut egress_rx) = hydroflow::util::unbounded_channel();

let mut df = hydroflow_syntax! {
join = cross_singleton();
source_iter([1, 2, 3]) -> persist::<'static>() -> [input]join;
source_stream(single_rx) -> [single]join;

join -> for_each(|x| egress_tx.send(x).unwrap());
};
assert_graphvis_snapshots!(df);

df.run_available();
let out: Vec<_> = collect_ready(&mut egress_rx);
assert_eq!(out, []);

single_tx.send(()).unwrap();
df.run_available();

let out: Vec<_> = collect_ready(&mut egress_rx);
assert_eq!(out, vec![(1, ()), (2, ()), (3, ())]);
}
79 changes: 79 additions & 0 deletions hydroflow_lang/src/graph/ops/cross_singleton.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use quote::{quote_spanned, ToTokens};
use syn::parse_quote;

use super::{
DelayType, OperatorCategory, OperatorConstraints,
OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1,
};
use crate::graph::PortIndexValue;

/// > 2 input streams, 1 output stream, no arguments.
///
/// Operates like cross-join, but treats one of the inputs as a "singleton"-like stream, emitting
/// ignoring everything after the first element. This operator blocks on the singleton input, and
/// then joins it with all the elements in the other stream if an element is present. This operator
/// is useful when a singleton input must be used to transform elements of a stream, since unlike
/// cross-product it avoids cloning the stream of inputs. It is also useful for creating conditional
/// branches, since the operator short circuits if the singleton input produces no values.
///
/// There are two inputs to `cross_singleton`, they are `input` and `single`.
/// `input` is the input data flow, and `single` is the singleton input.
///
/// ```hydroflow
/// join = cross_singleton();
///
/// source_iter([1, 2, 3]) -> [input]join;
/// source_iter([0]) -> [single]join;
///
/// join -> assert_eq([(1, 0), (2, 0), (3, 0)]);
/// ```
pub const CROSS_SINGLETON: OperatorConstraints = OperatorConstraints {
name: "cross_singleton",
categories: &[OperatorCategory::MultiIn],
persistence_args: RANGE_0,
type_args: RANGE_0,
hard_range_inn: &(2..=2),
soft_range_inn: &(2..=2),
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 0,
is_external_input: false,
has_singleton_output: false,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { input, single })),
ports_out: None,
input_delaytype_fn: |idx| match idx {
PortIndexValue::Path(path) if "single" == path.to_token_stream().to_string() => {
Some(DelayType::Stratum)
}
_else => None,
},
flow_prop_fn: None,
write_fn: |wc @ &WriteContextArgs {
ident,
op_span,
inputs,
is_pull,
..
},
_diagnostics| {
assert!(is_pull);

let input = &inputs[0];
let singleton = &inputs[1];
let s_taken_ident = wc.make_ident("singleton_taken");
let singleton_value_ident = wc.make_ident("singleton_value");

let write_iterator = quote_spanned! {op_span=>
let mut #s_taken_ident = #singleton;
let #ident = #s_taken_ident.next().map(|#singleton_value_ident| {
#input.map(move |x| (x, ::std::clone::Clone::clone(&#singleton_value_ident)))
}).into_iter().flatten();
};

Ok(OperatorWriteOutput {
write_prologue: Default::default(),
write_iterator,
..Default::default()
})
},
};
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ declare_ops![
cast::CAST,
cross_join::CROSS_JOIN,
cross_join_multiset::CROSS_JOIN_MULTISET,
cross_singleton::CROSS_SINGLETON,
demux::DEMUX,
demux_enum::DEMUX_ENUM,
dest_file::DEST_FILE,
Expand Down

0 comments on commit bd793e2

Please sign in to comment.