From a0bad712fd1ec3d8c6d9046db633e4fa6299f3b5 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Fri, 2 Aug 2024 23:28:46 -0700 Subject: [PATCH] feat(hydroflow): add `cross_singleton` operator This operator is necessary to eliminate performance bottlenecks in Paxos where cross-products result in unnecessary cloning and lack of short-circuit behavior results in values being cloned out of the internal state of `reduce_keyed`. --- ...e_cross_singleton__basic@graphvis_dot.snap | 39 +++++++++ ...oss_singleton__basic@graphvis_mermaid.snap | 34 ++++++++ hydroflow/tests/surface_cross_singleton.rs | 28 +++++++ .../src/graph/ops/cross_singleton.rs | 79 +++++++++++++++++++ hydroflow_lang/src/graph/ops/mod.rs | 1 + 5 files changed, 181 insertions(+) create mode 100644 hydroflow/tests/snapshots/surface_cross_singleton__basic@graphvis_dot.snap create mode 100644 hydroflow/tests/snapshots/surface_cross_singleton__basic@graphvis_mermaid.snap create mode 100644 hydroflow/tests/surface_cross_singleton.rs create mode 100644 hydroflow_lang/src/graph/ops/cross_singleton.rs diff --git a/hydroflow/tests/snapshots/surface_cross_singleton__basic@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_cross_singleton__basic@graphvis_dot.snap new file mode 100644 index 000000000000..0916dc2fd4ce --- /dev/null +++ b/hydroflow/tests/snapshots/surface_cross_singleton__basic@graphvis_dot.snap @@ -0,0 +1,39 @@ +--- +source: hydroflow/tests/surface_cross_singleton.rs +expression: "df.meta_graph().unwrap().to_dot(& Default :: default())" +--- +digraph { + node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled]; + edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"]; + n1v1 [label="(n1v1) cross_singleton()", shape=invhouse, fillcolor="#88aaff"] + n2v1 [label="(n2v1) source_iter([1, 2, 3])", shape=invhouse, fillcolor="#88aaff"] + n3v1 [label="(n3v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"] + n4v1 [label="(n4v1) source_stream(single_rx)", shape=invhouse, fillcolor="#88aaff"] + n5v1 [label="(n5v1) for_each(|x| egress_tx.send(x).unwrap())", shape=house, fillcolor="#ffff88"] + n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n3v1 -> n1v1 [label="input"] + n2v1 -> n3v1 + n4v1 -> n6v1 + n1v1 -> n5v1 + n6v1 -> n1v1 [label="single", color=red] + subgraph "cluster n1v1" { + fillcolor="#dddddd" + style=filled + label = "sg_1v1\nstratum 0" + n4v1 + } + subgraph "cluster n2v1" { + fillcolor="#dddddd" + style=filled + label = "sg_2v1\nstratum 1" + n2v1 + n3v1 + n1v1 + n5v1 + subgraph "cluster_sg_2v1_var_join" { + label="var join" + n1v1 + } + } +} + diff --git a/hydroflow/tests/snapshots/surface_cross_singleton__basic@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_cross_singleton__basic@graphvis_mermaid.snap new file mode 100644 index 000000000000..44a19c6ea3d3 --- /dev/null +++ b/hydroflow/tests/snapshots/surface_cross_singleton__basic@graphvis_mermaid.snap @@ -0,0 +1,34 @@ +--- +source: hydroflow/tests/surface_cross_singleton.rs +expression: "df.meta_graph().unwrap().to_mermaid(& Default :: default())" +--- +%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% +flowchart TD +classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre +classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre +classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre +linkStyle default stroke:#aaa +1v1[\"(1v1) cross_singleton()"/]:::pullClass +2v1[\"(2v1) source_iter([1, 2, 3])"/]:::pullClass +3v1[\"(3v1) persist::<'static>()"/]:::pullClass +4v1[\"(4v1) source_stream(single_rx)"/]:::pullClass +5v1[/"(5v1) for_each(|x| egress_tx.send(x).unwrap())"\]:::pushClass +6v1["(6v1) handoff"]:::otherClass +3v1-->|input|1v1 +2v1-->3v1 +4v1-->6v1 +1v1-->5v1 +6v1--x|single|1v1; linkStyle 4 stroke:red +subgraph sg_1v1 ["sg_1v1 stratum 0"] + 4v1 +end +subgraph sg_2v1 ["sg_2v1 stratum 1"] + 2v1 + 3v1 + 1v1 + 5v1 + subgraph sg_2v1_var_join ["var join"] + 1v1 + end +end + diff --git a/hydroflow/tests/surface_cross_singleton.rs b/hydroflow/tests/surface_cross_singleton.rs new file mode 100644 index 000000000000..5dedaa432655 --- /dev/null +++ b/hydroflow/tests/surface_cross_singleton.rs @@ -0,0 +1,28 @@ +use hydroflow::util::collect_ready; +use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax}; +use multiplatform_test::multiplatform_test; + +#[multiplatform_test] +pub fn test_basic() { + let (single_tx, single_rx) = hydroflow::util::unbounded_channel::<()>(); + let (egress_tx, mut egress_rx) = hydroflow::util::unbounded_channel(); + + let mut df = hydroflow_syntax! { + join = cross_singleton(); + source_iter([1, 2, 3]) -> persist::<'static>() -> [input]join; + source_stream(single_rx) -> [single]join; + + join -> for_each(|x| egress_tx.send(x).unwrap()); + }; + assert_graphvis_snapshots!(df); + + df.run_available(); + let out: Vec<_> = collect_ready(&mut egress_rx); + assert_eq!(out, []); + + single_tx.send(()).unwrap(); + df.run_available(); + + let out: Vec<_> = collect_ready(&mut egress_rx); + assert_eq!(out, vec![(1, ()), (2, ()), (3, ())]); +} diff --git a/hydroflow_lang/src/graph/ops/cross_singleton.rs b/hydroflow_lang/src/graph/ops/cross_singleton.rs new file mode 100644 index 000000000000..83a760a6b727 --- /dev/null +++ b/hydroflow_lang/src/graph/ops/cross_singleton.rs @@ -0,0 +1,79 @@ +use quote::{quote_spanned, ToTokens}; +use syn::parse_quote; + +use super::{ + DelayType, OperatorCategory, OperatorConstraints, + OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1, +}; +use crate::graph::PortIndexValue; + +/// > 2 input streams, 1 output stream, no arguments. +/// +/// Operates like cross-join, but when one of the inputs is a "singleton"-like stream, emitting +/// at most one input. This operator blocks on the singleton input, and then joins it +/// with all the elements in the other stream if an element is present. This operator is useful +/// when a singleton input must be used to transform elements of a stream, since unlike cross-product +/// it avoids cloning the stream of inputs. It is also useful for creating conditional branches, +/// since the operator short circuits if the singleton input produces no values. +/// +/// There are two inputs to `cross_singleton`, they are `input` and `single`. +/// `input` is the input data flow, and `single` is the singleton input. +/// +/// ```hydroflow +/// join = cross_singleton(); +/// +/// source_iter([1, 2, 3]) -> [input]join; +/// source_iter([0]) -> [single]join; +/// +/// join -> assert_eq([(1, 0), (2, 0), (3, 0)]); +/// ``` +pub const CROSS_SINGLETON: OperatorConstraints = OperatorConstraints { + name: "cross_singleton", + categories: &[OperatorCategory::MultiIn], + persistence_args: RANGE_0, + type_args: RANGE_0, + hard_range_inn: &(2..=2), + soft_range_inn: &(2..=2), + hard_range_out: RANGE_1, + soft_range_out: RANGE_1, + num_args: 0, + is_external_input: false, + has_singleton_output: false, + ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { input, single })), + ports_out: None, + input_delaytype_fn: |idx| match idx { + PortIndexValue::Path(path) if "single" == path.to_token_stream().to_string() => { + Some(DelayType::Stratum) + } + _else => None, + }, + flow_prop_fn: None, + write_fn: |wc @ &WriteContextArgs { + ident, + op_span, + inputs, + is_pull, + .. + }, + _diagnostics| { + assert!(is_pull); + + let input = &inputs[0]; + let singleton = &inputs[1]; + let s_taken_ident = wc.make_ident("singleton_taken"); + let singleton_value_ident = wc.make_ident("singleton_value"); + + let write_iterator = quote_spanned! {op_span=> + let mut #s_taken_ident = #singleton; + let #ident = #s_taken_ident.next().map(|#singleton_value_ident| { + #input.map(move |x| (x, ::std::clone::Clone::clone(&#singleton_value_ident))) + }).into_iter().flatten(); + }; + + Ok(OperatorWriteOutput { + write_prologue: Default::default(), + write_iterator, + ..Default::default() + }) + }, +}; diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs index 7f15392bcd9c..a4a7f47782a8 100644 --- a/hydroflow_lang/src/graph/ops/mod.rs +++ b/hydroflow_lang/src/graph/ops/mod.rs @@ -334,6 +334,7 @@ declare_ops![ cast::CAST, cross_join::CROSS_JOIN, cross_join_multiset::CROSS_JOIN_MULTISET, + cross_singleton::CROSS_SINGLETON, demux::DEMUX, demux_enum::DEMUX_ENUM, dest_file::DEST_FILE,