diff --git a/hydroflow/tests/compile-fail/surface_singleton_nostate.rs b/hydroflow/tests/compile-fail/surface_singleton_nostate.rs index d054776ba42..929840d6cde 100644 --- a/hydroflow/tests/compile-fail/surface_singleton_nostate.rs +++ b/hydroflow/tests/compile-fail/surface_singleton_nostate.rs @@ -1,3 +1,4 @@ +/// Reference an operator that doesn't have singleton state. pub fn main() { let mut df = hydroflow::hydroflow_syntax! { my_ref = source_iter(15..=25) -> null(); diff --git a/hydroflow/tests/compile-fail/surface_singleton_nostate.stderr b/hydroflow/tests/compile-fail/surface_singleton_nostate.stderr index cabf25fd27a..b956bd54aeb 100644 --- a/hydroflow/tests/compile-fail/surface_singleton_nostate.stderr +++ b/hydroflow/tests/compile-fail/surface_singleton_nostate.stderr @@ -1,20 +1,5 @@ -error[E0425]: cannot find value `singleton_op_2v1` in this scope - --> tests/compile-fail/surface_singleton_nostate.rs:5:16 +error: Cannot reference operator `null`. Only operators with singleton state can be referenced. + --> tests/compile-fail/surface_singleton_nostate.rs:6:41 | -5 | -> filter(|value| value <= #my_ref.as_reveal_ref()) - | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ not found in this scope - -error[E0282]: type annotations needed - --> tests/compile-fail/surface_singleton_nostate.rs:2:18 - | -2 | let mut df = hydroflow::hydroflow_syntax! { - | __________________^ -3 | | my_ref = source_iter(15..=25) -> null(); -4 | | source_iter(10..=30) -5 | | -> filter(|value| value <= #my_ref.as_reveal_ref()) -6 | | -> null(); -7 | | -8 | | }; - | |_____^ cannot infer type - | - = note: this error originates in the macro `hydroflow::hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info) +6 | -> filter(|value| value <= #my_ref.as_reveal_ref()) + | ^^^^^^ diff --git a/hydroflow/tests/compile-fail/surface_singleton_nostate_undefined.stderr b/hydroflow/tests/compile-fail/surface_singleton_nostate_undefined.stderr index 05557611c6e..d349a668b60 100644 --- a/hydroflow/tests/compile-fail/surface_singleton_nostate_undefined.stderr +++ b/hydroflow/tests/compile-fail/surface_singleton_nostate_undefined.stderr @@ -3,3 +3,9 @@ error: Cannot find referenced name `unknown`; name was never assigned. | 5 | -> filter(|value| value <= #my_ref.as_reveal_ref() && value <= #unknown.as_reveal_ref()) | ^^^^^^^ + +error: Cannot reference operator `null`. Only operators with singleton state can be referenced. + --> tests/compile-fail/surface_singleton_nostate_undefined.rs:5:41 + | +5 | -> filter(|value| value <= #my_ref.as_reveal_ref() && value <= #unknown.as_reveal_ref()) + | ^^^^^^ diff --git a/hydroflow/tests/compile-fail/surface_singleton_undefined.rs b/hydroflow/tests/compile-fail/surface_singleton_undefined.rs index 2e0966f4697..a9bd226cc5e 100644 --- a/hydroflow/tests/compile-fail/surface_singleton_undefined.rs +++ b/hydroflow/tests/compile-fail/surface_singleton_undefined.rs @@ -2,8 +2,7 @@ pub fn main() { let mut df = hydroflow::hydroflow_syntax! { source_iter(10..=30) -> filter(|value| value <= #unknown.as_reveal_ref()) - -> null(); - + -> null(); }; df.run_available(); } diff --git a/hydroflow/tests/compile-fail/surface_singleton_undefined_nostate.stderr b/hydroflow/tests/compile-fail/surface_singleton_undefined_nostate.stderr index c85b0627407..f9ea96bd65c 100644 --- a/hydroflow/tests/compile-fail/surface_singleton_undefined_nostate.stderr +++ b/hydroflow/tests/compile-fail/surface_singleton_undefined_nostate.stderr @@ -3,3 +3,9 @@ error: Cannot find referenced name `unknown`; name was never assigned. | 5 | -> filter(|value| value <= #unknown.as_reveal_ref() && value <= #my_ref.as_reveal_ref()) | ^^^^^^^ + +error: Cannot reference operator `null`. Only operators with singleton state can be referenced. + --> tests/compile-fail/surface_singleton_undefined_nostate.rs:5:78 + | +5 | -> filter(|value| value <= #unknown.as_reveal_ref() && value <= #my_ref.as_reveal_ref()) + | ^^^^^^ diff --git a/hydroflow/tests/snapshots/surface_singleton__fold_tick@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_singleton__fold_zip@graphvis_dot.snap similarity index 100% rename from hydroflow/tests/snapshots/surface_singleton__fold_tick@graphvis_dot.snap rename to hydroflow/tests/snapshots/surface_singleton__fold_zip@graphvis_dot.snap diff --git a/hydroflow/tests/snapshots/surface_singleton__fold_tick@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_singleton__fold_zip@graphvis_mermaid.snap similarity index 100% rename from hydroflow/tests/snapshots/surface_singleton__fold_tick@graphvis_mermaid.snap rename to hydroflow/tests/snapshots/surface_singleton__fold_zip@graphvis_mermaid.snap diff --git a/hydroflow/tests/snapshots/surface_singleton__state_ref_tick@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_singleton__state_ref@graphvis_dot.snap similarity index 100% rename from hydroflow/tests/snapshots/surface_singleton__state_ref_tick@graphvis_dot.snap rename to hydroflow/tests/snapshots/surface_singleton__state_ref@graphvis_dot.snap diff --git a/hydroflow/tests/snapshots/surface_singleton__state_ref_tick@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_singleton__state_ref@graphvis_mermaid.snap similarity index 100% rename from hydroflow/tests/snapshots/surface_singleton__state_ref_tick@graphvis_mermaid.snap rename to hydroflow/tests/snapshots/surface_singleton__state_ref@graphvis_mermaid.snap diff --git a/hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_dot.snap new file mode 100644 index 00000000000..f7c950e3ece --- /dev/null +++ b/hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_dot.snap @@ -0,0 +1,31 @@ +--- +source: hydroflow/tests/surface_singleton.rs +expression: "df.meta_graph().unwrap().to_dot(&Default::default())" +--- +digraph { + node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled]; + edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"]; + n1v1 [label="(n1v1) source_iter_delta(15..=25)", shape=invhouse, fillcolor="#88aaff"] + n2v1 [label="(n2v1) map(Max::new)", shape=invhouse, fillcolor="#88aaff"] + n3v1 [label="(n3v1) state_ref::>()", shape=house, fillcolor="#ffff88"] + n1v1 -> n2v1 [color=darkgreen, style=dashed] + n2v1 -> n3v1 [color=darkgreen, style=dashed] + subgraph "cluster n1v1" { + fillcolor="#dddddd" + style=filled + label = "sg_1v1\nstratum 0" + n1v1 + n2v1 + n3v1 + subgraph "cluster_sg_1v1_var_stream2" { + label="var stream2" + n1v1 + n2v1 + } + subgraph "cluster_sg_1v1_var_sum_of_stream2" { + label="var sum_of_stream2" + n3v1 + } + } +} + diff --git a/hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_mermaid.snap new file mode 100644 index 00000000000..666a51e16ec --- /dev/null +++ b/hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_mermaid.snap @@ -0,0 +1,28 @@ +--- +source: hydroflow/tests/surface_singleton.rs +expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())" +--- +%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% +flowchart TD +classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre +classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre +classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre +linkStyle default stroke:#aaa +1v1[\"(1v1) source_iter_delta(15..=25)"/]:::pullClass +2v1[\"(2v1) map(Max::new)"/]:::pullClass +3v1[/"(3v1) state_ref::<Max<_>>()"\]:::pushClass +1v1-.->2v1; linkStyle 0 stroke:#060 +2v1-.->3v1; linkStyle 1 stroke:#060 +subgraph sg_1v1 ["sg_1v1 stratum 0"] + 1v1 + 2v1 + 3v1 + subgraph sg_1v1_var_stream2 ["var stream2"] + 1v1 + 2v1 + end + subgraph sg_1v1_var_sum_of_stream2 ["var sum_of_stream2"] + 3v1 + end +end + diff --git a/hydroflow/tests/surface_singleton.rs b/hydroflow/tests/surface_singleton.rs index 1886be0aa5f..8d915163f65 100644 --- a/hydroflow/tests/surface_singleton.rs +++ b/hydroflow/tests/surface_singleton.rs @@ -3,7 +3,7 @@ use lattices::Max; use multiplatform_test::multiplatform_test; #[multiplatform_test] -pub fn test_state_ref_tick() { +pub fn test_state_ref() { let mut df = hydroflow::hydroflow_syntax! { stream1 = source_iter(10..=30); stream2 = source_iter_delta(15..=25) -> map(Max::new); @@ -33,7 +33,19 @@ pub fn test_state_ref_tick() { } #[multiplatform_test] -pub fn test_fold_tick() { +pub fn test_state_ref_unused() { + let mut df = hydroflow::hydroflow_syntax! { + stream2 = source_iter_delta(15..=25) -> map(Max::new); + sum_of_stream2 = stream2 -> state_ref::>(); + }; + + assert_graphvis_snapshots!(df); + + df.run_available(); // Should return quickly and not hang +} + +#[multiplatform_test] +pub fn test_fold_zip() { let mut df = hydroflow::hydroflow_syntax! { stream1 = source_iter(10..=30); stream2 = source_iter_delta(15..=25) -> map(Max::new); diff --git a/hydroflow_lang/src/graph/flat_graph_builder.rs b/hydroflow_lang/src/graph/flat_graph_builder.rs index da9be0a9287..a0b47e796c1 100644 --- a/hydroflow_lang/src/graph/flat_graph_builder.rs +++ b/hydroflow_lang/src/graph/flat_graph_builder.rs @@ -5,12 +5,12 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; use std::path::PathBuf; +use itertools::Itertools; use proc_macro2::Span; use quote::ToTokens; use syn::spanned::Spanned; use syn::{Error, Ident, ItemUse}; -use super::ops::find_op_op_constraints; use super::{GraphEdgeId, GraphNode, GraphNodeId, HydroflowGraph, PortIndexValue}; use crate::diagnostic::{Diagnostic, Level}; use crate::graph::ops::{PortListSpec, RangeTrait}; @@ -583,10 +583,12 @@ impl FlatGraphBuilder { for (node_id, node) in self.flat_graph.nodes() { match node { GraphNode::Operator(operator) => { - let Some(op_constraints) = find_op_op_constraints(operator) else { + let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else { // Error already emitted by `insert_node_op_insts_all`. continue; }; + let op_constraints = op_inst.op_constraints; + // Check number of args if op_constraints.num_args != operator.args.len() { self.diagnostics.push(Diagnostic::spanned( @@ -811,6 +813,36 @@ impl FlatGraphBuilder { } } } + + // Check that singleton references actually reference *stateful* operators. + { + let singletons_resolved = + self.flat_graph.node_singleton_references(node_id); + for (singleton_node_id, singleton_ident) in + singletons_resolved.zip_eq(&*operator.singletons_referenced) + { + let &Some(singleton_node_id) = singleton_node_id else { + // Error already emitted by `connect_operator_links`, "Cannot find referenced name...". + continue; + }; + let Some(ref_op_inst) = self.flat_graph.node_op_inst(singleton_node_id) + else { + // Error already emitted by `insert_node_op_insts_all`. + continue; + }; + let ref_op_constraints = ref_op_inst.op_constraints; + if !ref_op_constraints.has_singleton_output { + self.diagnostics.push(Diagnostic::spanned( + singleton_ident.span(), + Level::Error, + &format!( + "Cannot reference operator `{}`. Only operators with singleton state can be referenced.", + ref_op_constraints.name, + ), + )); + } + } + } } GraphNode::Handoff { .. } => todo!("Node::Handoff"), GraphNode::ModuleBoundary { .. } => { diff --git a/hydroflow_lang/src/graph/hydroflow_graph.rs b/hydroflow_lang/src/graph/hydroflow_graph.rs index 79644d09505..b5a72d6d230 100644 --- a/hydroflow_lang/src/graph/hydroflow_graph.rs +++ b/hydroflow_lang/src/graph/hydroflow_graph.rs @@ -484,6 +484,19 @@ impl HydroflowGraph { self.node_singleton_references .insert(node_id, singletons_referenced) } + + /// Gets the singletons referenced by a node. Returns an empty iterator for non-operators and + /// operators that do not reference singletons. + pub fn node_singleton_references( + &self, + node_id: GraphNodeId, + ) -> std::slice::Iter<'_, Option> { + if let Some(singletons_referenced) = self.node_singleton_references.get(node_id) { + singletons_referenced.iter() + } else { + [].iter() + } + } } /// Module methods. @@ -794,13 +807,13 @@ impl HydroflowGraph { /// Resolve the singletons via [`Self::node_singleton_references`] for the given `node_id`. fn helper_resolve_singletons(&self, node_id: GraphNodeId, span: Span) -> Vec { - self.node_singleton_references.get(node_id) - .into_iter() // `None` becomes empty iter. - .flatten() + self.node_singleton_references(node_id) .map(|singleton_node_id| { // TODO(mingwei): this `expect` should be caught in error checking self.node_as_singleton_ident( - singleton_node_id.expect("Expected singleton to be resolved but was not, this is a Hydroflow bug."), + singleton_node_id.expect( + "Expected singleton to be resolved but was not, this is a Hydroflow bug.", + ), span, ) }) @@ -932,6 +945,7 @@ impl HydroflowGraph { let op_span = node.span(); let op_name = op_inst.op_constraints.name; + // TODO(mingwei): Just use `op_inst.op_constraints`? let op_constraints = OPERATORS .iter() .find(|op| op_name == op.name) @@ -1008,8 +1022,13 @@ impl HydroflowGraph { let is_pull = idx < pull_to_push_idx; - let singleton_output_ident = - &self.node_as_singleton_ident(node_id, op_span); + let singleton_output_ident = &if op_constraints.has_singleton_output { + self.node_as_singleton_ident(node_id, op_span) + } else { + // This ident *should* go unused. + Ident::new(&format!("{}_has_no_singleton_output", op_name), op_span) + }; + let singletons_resolved = self.helper_resolve_singletons(node_id, op_span); let arguments = &postprocess_singletons( diff --git a/hydroflow_lang/src/graph/ops/_lattice_fold_batch.rs b/hydroflow_lang/src/graph/ops/_lattice_fold_batch.rs index a198af69eeb..13e0ef089c2 100644 --- a/hydroflow_lang/src/graph/ops/_lattice_fold_batch.rs +++ b/hydroflow_lang/src/graph/ops/_lattice_fold_batch.rs @@ -42,6 +42,7 @@ pub const _LATTICE_FOLD_BATCH: OperatorConstraints = OperatorConstraints { soft_range_out: RANGE_1, num_args: 0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| PortListSpec::Fixed(parse_quote! { input, signal })), ports_out: None, input_delaytype_fn: |_| Some(DelayType::MonotoneAccum), diff --git a/hydroflow_lang/src/graph/ops/_lattice_join_fused_join.rs b/hydroflow_lang/src/graph/ops/_lattice_join_fused_join.rs index 2139ced00ad..d0ea1c5d5a6 100644 --- a/hydroflow_lang/src/graph/ops/_lattice_join_fused_join.rs +++ b/hydroflow_lang/src/graph/ops/_lattice_join_fused_join.rs @@ -85,6 +85,7 @@ pub const _LATTICE_JOIN_FUSED_JOIN: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: &(2..=2), is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| Some(DelayType::MonotoneAccum), diff --git a/hydroflow_lang/src/graph/ops/anti_join.rs b/hydroflow_lang/src/graph/ops/anti_join.rs index b68ef6ea7a5..c366f604b07 100644 --- a/hydroflow_lang/src/graph/ops/anti_join.rs +++ b/hydroflow_lang/src/graph/ops/anti_join.rs @@ -33,6 +33,7 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { pos, neg })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/anti_join_multiset.rs b/hydroflow_lang/src/graph/ops/anti_join_multiset.rs index 750740e2267..b804231e582 100644 --- a/hydroflow_lang/src/graph/ops/anti_join_multiset.rs +++ b/hydroflow_lang/src/graph/ops/anti_join_multiset.rs @@ -35,6 +35,7 @@ pub const ANTI_JOIN_MULTISET: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { pos, neg })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/assert.rs b/hydroflow_lang/src/graph/ops/assert.rs index 7b008819a5c..b458beb5429 100644 --- a/hydroflow_lang/src/graph/ops/assert.rs +++ b/hydroflow_lang/src/graph/ops/assert.rs @@ -25,6 +25,7 @@ pub const ASSERT: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/assert_eq.rs b/hydroflow_lang/src/graph/ops/assert_eq.rs index e803c01c665..dd046094189 100644 --- a/hydroflow_lang/src/graph/ops/assert_eq.rs +++ b/hydroflow_lang/src/graph/ops/assert_eq.rs @@ -36,6 +36,7 @@ pub const ASSERT_EQ: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/cross_join.rs b/hydroflow_lang/src/graph/ops/cross_join.rs index 496652ceb31..1acd75eea13 100644 --- a/hydroflow_lang/src/graph/ops/cross_join.rs +++ b/hydroflow_lang/src/graph/ops/cross_join.rs @@ -47,6 +47,7 @@ pub const CROSS_JOIN: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: &(0..=1), is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/cross_join_multiset.rs b/hydroflow_lang/src/graph/ops/cross_join_multiset.rs index 9c860cb8336..0ea45856fb2 100644 --- a/hydroflow_lang/src/graph/ops/cross_join_multiset.rs +++ b/hydroflow_lang/src/graph/ops/cross_join_multiset.rs @@ -33,6 +33,7 @@ pub const CROSS_JOIN_MULTISET: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: &(0..=1), is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/defer_signal.rs b/hydroflow_lang/src/graph/ops/defer_signal.rs index 3763d9a9c98..4fa16712ae0 100644 --- a/hydroflow_lang/src/graph/ops/defer_signal.rs +++ b/hydroflow_lang/src/graph/ops/defer_signal.rs @@ -33,6 +33,7 @@ pub const DEFER_SIGNAL: OperatorConstraints = OperatorConstraints { soft_range_out: RANGE_1, num_args: 0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { input, signal })), ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/defer_tick.rs b/hydroflow_lang/src/graph/ops/defer_tick.rs index e587d9330dc..90dca52ed06 100644 --- a/hydroflow_lang/src/graph/ops/defer_tick.rs +++ b/hydroflow_lang/src/graph/ops/defer_tick.rs @@ -68,6 +68,7 @@ pub const DEFER_TICK: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Tick), diff --git a/hydroflow_lang/src/graph/ops/defer_tick_lazy.rs b/hydroflow_lang/src/graph/ops/defer_tick_lazy.rs index 04114af6413..d0f39a2aebd 100644 --- a/hydroflow_lang/src/graph/ops/defer_tick_lazy.rs +++ b/hydroflow_lang/src/graph/ops/defer_tick_lazy.rs @@ -16,6 +16,7 @@ pub const DEFER_TICK_LAZY: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::TickLazy), diff --git a/hydroflow_lang/src/graph/ops/demux.rs b/hydroflow_lang/src/graph/ops/demux.rs index 0de23b7a4e3..7bd61865d6d 100644 --- a/hydroflow_lang/src/graph/ops/demux.rs +++ b/hydroflow_lang/src/graph/ops/demux.rs @@ -52,6 +52,7 @@ pub const DEMUX: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: Some(|| PortListSpec::Variadic), input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/demux_enum.rs b/hydroflow_lang/src/graph/ops/demux_enum.rs index 6450493d890..5281eea6937 100644 --- a/hydroflow_lang/src/graph/ops/demux_enum.rs +++ b/hydroflow_lang/src/graph/ops/demux_enum.rs @@ -47,6 +47,7 @@ pub const DEMUX_ENUM: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_1, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: Some(|| PortListSpec::Variadic), input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/dest_file.rs b/hydroflow_lang/src/graph/ops/dest_file.rs index 2908a6908c1..63874a13119 100644 --- a/hydroflow_lang/src/graph/ops/dest_file.rs +++ b/hydroflow_lang/src/graph/ops/dest_file.rs @@ -31,6 +31,7 @@ pub const DEST_FILE: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/dest_sink.rs b/hydroflow_lang/src/graph/ops/dest_sink.rs index 3a742275544..f11e18203d2 100644 --- a/hydroflow_lang/src/graph/ops/dest_sink.rs +++ b/hydroflow_lang/src/graph/ops/dest_sink.rs @@ -92,6 +92,7 @@ pub const DEST_SINK: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/dest_sink_serde.rs b/hydroflow_lang/src/graph/ops/dest_sink_serde.rs index 36535012758..4a6dd0d53b9 100644 --- a/hydroflow_lang/src/graph/ops/dest_sink_serde.rs +++ b/hydroflow_lang/src/graph/ops/dest_sink_serde.rs @@ -34,6 +34,7 @@ pub const DEST_SINK_SERDE: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/difference.rs b/hydroflow_lang/src/graph/ops/difference.rs index 6d6e98bd994..cd03110b984 100644 --- a/hydroflow_lang/src/graph/ops/difference.rs +++ b/hydroflow_lang/src/graph/ops/difference.rs @@ -33,6 +33,7 @@ pub const DIFFERENCE: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { pos, neg })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/difference_multiset.rs b/hydroflow_lang/src/graph/ops/difference_multiset.rs index fef9c0be7dc..baaa54dcfea 100644 --- a/hydroflow_lang/src/graph/ops/difference_multiset.rs +++ b/hydroflow_lang/src/graph/ops/difference_multiset.rs @@ -34,6 +34,7 @@ pub const DIFFERENCE_MULTISET: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { pos, neg })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/enumerate.rs b/hydroflow_lang/src/graph/ops/enumerate.rs index ad6a063755c..4b66dd9fa13 100644 --- a/hydroflow_lang/src/graph/ops/enumerate.rs +++ b/hydroflow_lang/src/graph/ops/enumerate.rs @@ -31,6 +31,7 @@ pub const ENUMERATE: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=1), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/filter.rs b/hydroflow_lang/src/graph/ops/filter.rs index 3ddba03658d..727524d462a 100644 --- a/hydroflow_lang/src/graph/ops/filter.rs +++ b/hydroflow_lang/src/graph/ops/filter.rs @@ -29,6 +29,7 @@ pub const FILTER: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/filter_map.rs b/hydroflow_lang/src/graph/ops/filter_map.rs index 12ed55aa949..268aef95875 100644 --- a/hydroflow_lang/src/graph/ops/filter_map.rs +++ b/hydroflow_lang/src/graph/ops/filter_map.rs @@ -27,6 +27,7 @@ pub const FILTER_MAP: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/flat_map.rs b/hydroflow_lang/src/graph/ops/flat_map.rs index 3e7815f595f..70d5d58f177 100644 --- a/hydroflow_lang/src/graph/ops/flat_map.rs +++ b/hydroflow_lang/src/graph/ops/flat_map.rs @@ -31,6 +31,7 @@ pub const FLAT_MAP: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/flatten.rs b/hydroflow_lang/src/graph/ops/flatten.rs index 0c20208d66f..0a6e16d07d7 100644 --- a/hydroflow_lang/src/graph/ops/flatten.rs +++ b/hydroflow_lang/src/graph/ops/flatten.rs @@ -26,6 +26,7 @@ pub const FLATTEN: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/fold.rs b/hydroflow_lang/src/graph/ops/fold.rs index 2b531a72f2c..2addf4d0956 100644 --- a/hydroflow_lang/src/graph/ops/fold.rs +++ b/hydroflow_lang/src/graph/ops/fold.rs @@ -41,6 +41,7 @@ pub const FOLD: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=1), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/fold_keyed.rs b/hydroflow_lang/src/graph/ops/fold_keyed.rs index fbfdd5145e6..a0626eafe8c 100644 --- a/hydroflow_lang/src/graph/ops/fold_keyed.rs +++ b/hydroflow_lang/src/graph/ops/fold_keyed.rs @@ -76,6 +76,7 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=1), type_args: &(0..=2), is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/for_each.rs b/hydroflow_lang/src/graph/ops/for_each.rs index e8137cf7ef4..d22c3e85334 100644 --- a/hydroflow_lang/src/graph/ops/for_each.rs +++ b/hydroflow_lang/src/graph/ops/for_each.rs @@ -29,6 +29,7 @@ pub const FOR_EACH: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/identity.rs b/hydroflow_lang/src/graph/ops/identity.rs index 6d045c77121..1f0d4742747 100644 --- a/hydroflow_lang/src/graph/ops/identity.rs +++ b/hydroflow_lang/src/graph/ops/identity.rs @@ -32,6 +32,7 @@ pub const IDENTITY: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: &(0..=1), is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/initialize.rs b/hydroflow_lang/src/graph/ops/initialize.rs index 4c5ebeefbf8..fb3c4ca838b 100644 --- a/hydroflow_lang/src/graph/ops/initialize.rs +++ b/hydroflow_lang/src/graph/ops/initialize.rs @@ -25,6 +25,7 @@ pub const INITIALIZE: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/inspect.rs b/hydroflow_lang/src/graph/ops/inspect.rs index 975ae7a6d91..6d10aa67ace 100644 --- a/hydroflow_lang/src/graph/ops/inspect.rs +++ b/hydroflow_lang/src/graph/ops/inspect.rs @@ -30,6 +30,7 @@ pub const INSPECT: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/join.rs b/hydroflow_lang/src/graph/ops/join.rs index 2cacfe53683..f1b3a993b61 100644 --- a/hydroflow_lang/src/graph/ops/join.rs +++ b/hydroflow_lang/src/graph/ops/join.rs @@ -90,6 +90,7 @@ pub const JOIN: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: &(0..=1), is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/join_fused.rs b/hydroflow_lang/src/graph/ops/join_fused.rs index f5249fa9283..bcda095e3d9 100644 --- a/hydroflow_lang/src/graph/ops/join_fused.rs +++ b/hydroflow_lang/src/graph/ops/join_fused.rs @@ -99,6 +99,7 @@ pub const JOIN_FUSED: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/join_fused_lhs.rs b/hydroflow_lang/src/graph/ops/join_fused_lhs.rs index 1a0f349788c..1d1cddfad54 100644 --- a/hydroflow_lang/src/graph/ops/join_fused_lhs.rs +++ b/hydroflow_lang/src/graph/ops/join_fused_lhs.rs @@ -36,6 +36,7 @@ pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/join_fused_rhs.rs b/hydroflow_lang/src/graph/ops/join_fused_rhs.rs index be027d76469..7cce355f62e 100644 --- a/hydroflow_lang/src/graph/ops/join_fused_rhs.rs +++ b/hydroflow_lang/src/graph/ops/join_fused_rhs.rs @@ -20,6 +20,7 @@ pub const JOIN_FUSED_RHS: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |idx| match idx { diff --git a/hydroflow_lang/src/graph/ops/join_multiset.rs b/hydroflow_lang/src/graph/ops/join_multiset.rs index e41dea9f78d..9e8fd79ab15 100644 --- a/hydroflow_lang/src/graph/ops/join_multiset.rs +++ b/hydroflow_lang/src/graph/ops/join_multiset.rs @@ -36,6 +36,7 @@ pub const JOIN_MULTISET: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=2), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs b/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs index b9b832edeb9..3b4510e6f9a 100644 --- a/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs +++ b/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs @@ -18,6 +18,7 @@ pub const LATTICE_BIMORPHISM: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| { super::PortListSpec::Fixed(parse_quote! { items_0, items_1, state_0, state_1 }) }), diff --git a/hydroflow_lang/src/graph/ops/lattice_fold.rs b/hydroflow_lang/src/graph/ops/lattice_fold.rs index 3975a396b82..ba79297551d 100644 --- a/hydroflow_lang/src/graph/ops/lattice_fold.rs +++ b/hydroflow_lang/src/graph/ops/lattice_fold.rs @@ -38,6 +38,7 @@ pub const LATTICE_FOLD: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=1), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::MonotoneAccum), diff --git a/hydroflow_lang/src/graph/ops/lattice_reduce.rs b/hydroflow_lang/src/graph/ops/lattice_reduce.rs index 8dc873d3f6d..3d5702ec5aa 100644 --- a/hydroflow_lang/src/graph/ops/lattice_reduce.rs +++ b/hydroflow_lang/src/graph/ops/lattice_reduce.rs @@ -39,6 +39,7 @@ pub const LATTICE_REDUCE: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=1), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::MonotoneAccum), diff --git a/hydroflow_lang/src/graph/ops/map.rs b/hydroflow_lang/src/graph/ops/map.rs index 8868dd799a5..efd1ce66858 100644 --- a/hydroflow_lang/src/graph/ops/map.rs +++ b/hydroflow_lang/src/graph/ops/map.rs @@ -30,6 +30,7 @@ pub const MAP: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs index da34e59ee9c..98f7c81069c 100644 --- a/hydroflow_lang/src/graph/ops/mod.rs +++ b/hydroflow_lang/src/graph/ops/mod.rs @@ -67,6 +67,10 @@ pub struct OperatorConstraints { /// If this operator receives external inputs and therefore must be in /// stratum 0. pub is_external_input: bool, + /// If this operator has a singleton reference output. For stateful operators. + /// If true, [`WriteContextArgs::singleton_output_ident`] will be set to a meaningful value in + /// the [`Self::write_fn`] invocation. + pub has_singleton_output: bool, /// What named or numbered input ports to expect? pub ports_inn: Option PortListSpec>, diff --git a/hydroflow_lang/src/graph/ops/multiset_delta.rs b/hydroflow_lang/src/graph/ops/multiset_delta.rs index b03872ffbc1..cf6e38a8f60 100644 --- a/hydroflow_lang/src/graph/ops/multiset_delta.rs +++ b/hydroflow_lang/src/graph/ops/multiset_delta.rs @@ -41,6 +41,7 @@ pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/next_stratum.rs b/hydroflow_lang/src/graph/ops/next_stratum.rs index 79744f889b9..f67de299cee 100644 --- a/hydroflow_lang/src/graph/ops/next_stratum.rs +++ b/hydroflow_lang/src/graph/ops/next_stratum.rs @@ -16,6 +16,7 @@ pub const NEXT_STRATUM: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/null.rs b/hydroflow_lang/src/graph/ops/null.rs index d52ea62e91a..a9e23552d56 100644 --- a/hydroflow_lang/src/graph/ops/null.rs +++ b/hydroflow_lang/src/graph/ops/null.rs @@ -25,6 +25,7 @@ pub const NULL: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: &(0..=1), is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/partition.rs b/hydroflow_lang/src/graph/ops/partition.rs index 68e1f9f7e40..232f5ae9c77 100644 --- a/hydroflow_lang/src/graph/ops/partition.rs +++ b/hydroflow_lang/src/graph/ops/partition.rs @@ -65,6 +65,7 @@ pub const PARTITION: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: Some(|| PortListSpec::Variadic), input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/persist.rs b/hydroflow_lang/src/graph/ops/persist.rs index c48861ec116..4d34ec60463 100644 --- a/hydroflow_lang/src/graph/ops/persist.rs +++ b/hydroflow_lang/src/graph/ops/persist.rs @@ -46,6 +46,7 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/persist_mut.rs b/hydroflow_lang/src/graph/ops/persist_mut.rs index 14ef117dd7c..95faab8314b 100644 --- a/hydroflow_lang/src/graph/ops/persist_mut.rs +++ b/hydroflow_lang/src/graph/ops/persist_mut.rs @@ -33,6 +33,7 @@ pub const PERSIST_MUT: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs b/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs index c550b8da49b..779ed51e621 100644 --- a/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs +++ b/hydroflow_lang/src/graph/ops/persist_mut_keyed.rs @@ -33,6 +33,7 @@ pub const PERSIST_MUT_KEYED: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/py_udf.rs b/hydroflow_lang/src/graph/ops/py_udf.rs index f70816e20e6..e71ac0ca40b 100644 --- a/hydroflow_lang/src/graph/ops/py_udf.rs +++ b/hydroflow_lang/src/graph/ops/py_udf.rs @@ -61,6 +61,7 @@ pub const PY_UDF: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/reduce.rs b/hydroflow_lang/src/graph/ops/reduce.rs index c3983db8056..37ae3eabbdb 100644 --- a/hydroflow_lang/src/graph/ops/reduce.rs +++ b/hydroflow_lang/src/graph/ops/reduce.rs @@ -40,6 +40,7 @@ pub const REDUCE: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=1), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/reduce_keyed.rs b/hydroflow_lang/src/graph/ops/reduce_keyed.rs index 00e63a777ac..47c00250b73 100644 --- a/hydroflow_lang/src/graph/ops/reduce_keyed.rs +++ b/hydroflow_lang/src/graph/ops/reduce_keyed.rs @@ -66,6 +66,7 @@ pub const REDUCE_KEYED: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=1), type_args: &(0..=2), is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/sort.rs b/hydroflow_lang/src/graph/ops/sort.rs index ee3fb64579c..3980d17577c 100644 --- a/hydroflow_lang/src/graph/ops/sort.rs +++ b/hydroflow_lang/src/graph/ops/sort.rs @@ -26,6 +26,7 @@ pub const SORT: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/sort_by_key.rs b/hydroflow_lang/src/graph/ops/sort_by_key.rs index bc6a3f69747..016f018cc08 100644 --- a/hydroflow_lang/src/graph/ops/sort_by_key.rs +++ b/hydroflow_lang/src/graph/ops/sort_by_key.rs @@ -26,6 +26,7 @@ pub const SORT_BY_KEY: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum), diff --git a/hydroflow_lang/src/graph/ops/source_file.rs b/hydroflow_lang/src/graph/ops/source_file.rs index 2645b97e8ae..b9751cd41ca 100644 --- a/hydroflow_lang/src/graph/ops/source_file.rs +++ b/hydroflow_lang/src/graph/ops/source_file.rs @@ -29,6 +29,7 @@ pub const SOURCE_FILE: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: &(0..=1), is_external_input: true, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_interval.rs b/hydroflow_lang/src/graph/ops/source_interval.rs index 804c703189a..c1f026dd434 100644 --- a/hydroflow_lang/src/graph/ops/source_interval.rs +++ b/hydroflow_lang/src/graph/ops/source_interval.rs @@ -53,6 +53,7 @@ pub const SOURCE_INTERVAL: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: true, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_iter.rs b/hydroflow_lang/src/graph/ops/source_iter.rs index 6bf75c88b61..27f0683093e 100644 --- a/hydroflow_lang/src/graph/ops/source_iter.rs +++ b/hydroflow_lang/src/graph/ops/source_iter.rs @@ -28,6 +28,7 @@ pub const SOURCE_ITER: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_json.rs b/hydroflow_lang/src/graph/ops/source_json.rs index a1659baca80..6d10c6e5ecd 100644 --- a/hydroflow_lang/src/graph/ops/source_json.rs +++ b/hydroflow_lang/src/graph/ops/source_json.rs @@ -26,6 +26,7 @@ pub const SOURCE_JSON: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: &(0..=1), is_external_input: true, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_stdin.rs b/hydroflow_lang/src/graph/ops/source_stdin.rs index f63963cc133..b7eec4ec66f 100644 --- a/hydroflow_lang/src/graph/ops/source_stdin.rs +++ b/hydroflow_lang/src/graph/ops/source_stdin.rs @@ -28,6 +28,7 @@ pub const SOURCE_STDIN: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: true, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_stream.rs b/hydroflow_lang/src/graph/ops/source_stream.rs index fa581b87223..70dcf5c33bf 100644 --- a/hydroflow_lang/src/graph/ops/source_stream.rs +++ b/hydroflow_lang/src/graph/ops/source_stream.rs @@ -35,6 +35,7 @@ pub const SOURCE_STREAM: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: true, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/source_stream_serde.rs b/hydroflow_lang/src/graph/ops/source_stream_serde.rs index 7949eeb985f..e9ab5fbd79a 100644 --- a/hydroflow_lang/src/graph/ops/source_stream_serde.rs +++ b/hydroflow_lang/src/graph/ops/source_stream_serde.rs @@ -35,6 +35,7 @@ pub const SOURCE_STREAM_SERDE: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: true, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/spin.rs b/hydroflow_lang/src/graph/ops/spin.rs index 30435cd5717..f41f5e35daa 100644 --- a/hydroflow_lang/src/graph/ops/spin.rs +++ b/hydroflow_lang/src/graph/ops/spin.rs @@ -28,6 +28,7 @@ pub const SPIN: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/state.rs b/hydroflow_lang/src/graph/ops/state.rs index 30d7fe5eac7..93e44f8374c 100644 --- a/hydroflow_lang/src/graph/ops/state.rs +++ b/hydroflow_lang/src/graph/ops/state.rs @@ -44,6 +44,7 @@ pub const STATE: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, // TODO(mingwei)? type_args: &(0..=1), is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: Some(|| PortListSpec::Fixed(parse_quote! { items, state })), input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/state_ref.rs b/hydroflow_lang/src/graph/ops/state_ref.rs index 5952814cdcc..94c75a91bf2 100644 --- a/hydroflow_lang/src/graph/ops/state_ref.rs +++ b/hydroflow_lang/src/graph/ops/state_ref.rs @@ -18,6 +18,7 @@ pub const STATE_REF: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, // TODO(mingwei)? type_args: &(0..=1), is_external_input: false, + has_singleton_output: true, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/tee.rs b/hydroflow_lang/src/graph/ops/tee.rs index d65dfd5b1e5..bb92323aaa8 100644 --- a/hydroflow_lang/src/graph/ops/tee.rs +++ b/hydroflow_lang/src/graph/ops/tee.rs @@ -27,6 +27,7 @@ pub const TEE: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/union.rs b/hydroflow_lang/src/graph/ops/union.rs index 18f2d5fad98..170c6ea075d 100644 --- a/hydroflow_lang/src/graph/ops/union.rs +++ b/hydroflow_lang/src/graph/ops/union.rs @@ -33,6 +33,7 @@ pub const UNION: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/unique.rs b/hydroflow_lang/src/graph/ops/unique.rs index 364a965f55b..dd7d81c5a24 100644 --- a/hydroflow_lang/src/graph/ops/unique.rs +++ b/hydroflow_lang/src/graph/ops/unique.rs @@ -53,6 +53,7 @@ pub const UNIQUE: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=1), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/unzip.rs b/hydroflow_lang/src/graph/ops/unzip.rs index 2eaf9eab6d2..c9f1033e36b 100644 --- a/hydroflow_lang/src/graph/ops/unzip.rs +++ b/hydroflow_lang/src/graph/ops/unzip.rs @@ -27,6 +27,7 @@ pub const UNZIP: OperatorConstraints = OperatorConstraints { persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: None, ports_out: Some(|| super::PortListSpec::Fixed(parse_quote!(0, 1))), input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/zip.rs b/hydroflow_lang/src/graph/ops/zip.rs index f73fa84eedc..b34bef175b6 100644 --- a/hydroflow_lang/src/graph/ops/zip.rs +++ b/hydroflow_lang/src/graph/ops/zip.rs @@ -29,6 +29,7 @@ pub const ZIP: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=1), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, diff --git a/hydroflow_lang/src/graph/ops/zip_longest.rs b/hydroflow_lang/src/graph/ops/zip_longest.rs index 84f9a67fb2a..a7ec9fa10e4 100644 --- a/hydroflow_lang/src/graph/ops/zip_longest.rs +++ b/hydroflow_lang/src/graph/ops/zip_longest.rs @@ -34,6 +34,7 @@ pub const ZIP_LONGEST: OperatorConstraints = OperatorConstraints { persistence_args: &(0..=1), type_args: RANGE_0, is_external_input: false, + has_singleton_output: false, ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| Some(DelayType::Stratum),