Skip to content

Commit

Permalink
feat(hydroflow_lang): track which ops have singleton state (#1091)
Browse files Browse the repository at this point in the history
Add a nice error message for referencing a non-singleton op
  • Loading branch information
MingweiSamuel committed Apr 2, 2024
1 parent 45a29bd commit 2857625
Show file tree
Hide file tree
Showing 81 changed files with 220 additions and 31 deletions.
1 change: 1 addition & 0 deletions hydroflow/tests/compile-fail/surface_singleton_nostate.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/// Reference an operator that doesn't have singleton state.
pub fn main() {
let mut df = hydroflow::hydroflow_syntax! {
my_ref = source_iter(15..=25) -> null();
Expand Down
23 changes: 4 additions & 19 deletions hydroflow/tests/compile-fail/surface_singleton_nostate.stderr
Original file line number Diff line number Diff line change
@@ -1,20 +1,5 @@
error[E0425]: cannot find value `singleton_op_2v1` in this scope
--> tests/compile-fail/surface_singleton_nostate.rs:5:16
error: Cannot reference operator `null`. Only operators with singleton state can be referenced.
--> tests/compile-fail/surface_singleton_nostate.rs:6:41
|
5 | -> filter(|value| value <= #my_ref.as_reveal_ref())
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ not found in this scope

error[E0282]: type annotations needed
--> tests/compile-fail/surface_singleton_nostate.rs:2:18
|
2 | let mut df = hydroflow::hydroflow_syntax! {
| __________________^
3 | | my_ref = source_iter(15..=25) -> null();
4 | | source_iter(10..=30)
5 | | -> filter(|value| value <= #my_ref.as_reveal_ref())
6 | | -> null();
7 | |
8 | | };
| |_____^ cannot infer type
|
= note: this error originates in the macro `hydroflow::hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info)
6 | -> filter(|value| value <= #my_ref.as_reveal_ref())
| ^^^^^^
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ error: Cannot find referenced name `unknown`; name was never assigned.
|
5 | -> filter(|value| value <= #my_ref.as_reveal_ref() && value <= #unknown.as_reveal_ref())
| ^^^^^^^

error: Cannot reference operator `null`. Only operators with singleton state can be referenced.
--> tests/compile-fail/surface_singleton_nostate_undefined.rs:5:41
|
5 | -> filter(|value| value <= #my_ref.as_reveal_ref() && value <= #unknown.as_reveal_ref())
| ^^^^^^
3 changes: 1 addition & 2 deletions hydroflow/tests/compile-fail/surface_singleton_undefined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ pub fn main() {
let mut df = hydroflow::hydroflow_syntax! {
source_iter(10..=30)
-> filter(|value| value <= #unknown.as_reveal_ref())
-> null();

-> null();
};
df.run_available();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ error: Cannot find referenced name `unknown`; name was never assigned.
|
5 | -> filter(|value| value <= #unknown.as_reveal_ref() && value <= #my_ref.as_reveal_ref())
| ^^^^^^^

error: Cannot reference operator `null`. Only operators with singleton state can be referenced.
--> tests/compile-fail/surface_singleton_undefined_nostate.rs:5:78
|
5 | -> filter(|value| value <= #unknown.as_reveal_ref() && value <= #my_ref.as_reveal_ref())
| ^^^^^^
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter_delta(15..=25)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) map(Max::new)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) state_ref::<Max<_>>()", shape=house, fillcolor="#ffff88"]
n1v1 -> n2v1 [color=darkgreen, style=dashed]
n2v1 -> n3v1 [color=darkgreen, style=dashed]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1
n2v1
n3v1
subgraph "cluster_sg_1v1_var_stream2" {
label="var stream2"
n1v1
n2v1
}
subgraph "cluster_sg_1v1_var_sum_of_stream2" {
label="var sum_of_stream2"
n3v1
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter_delta(15..=25)</code>"/]:::pullClass
2v1[\"(2v1) <code>map(Max::new)</code>"/]:::pullClass
3v1[/"(3v1) <code>state_ref::&lt;Max&lt;_&gt;&gt;()</code>"\]:::pushClass
1v1-.->2v1; linkStyle 0 stroke:#060
2v1-.->3v1; linkStyle 1 stroke:#060
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1
2v1
3v1
subgraph sg_1v1_var_stream2 ["var <tt>stream2</tt>"]
1v1
2v1
end
subgraph sg_1v1_var_sum_of_stream2 ["var <tt>sum_of_stream2</tt>"]
3v1
end
end

16 changes: 14 additions & 2 deletions hydroflow/tests/surface_singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use lattices::Max;
use multiplatform_test::multiplatform_test;

#[multiplatform_test]
pub fn test_state_ref_tick() {
pub fn test_state_ref() {
let mut df = hydroflow::hydroflow_syntax! {
stream1 = source_iter(10..=30);
stream2 = source_iter_delta(15..=25) -> map(Max::new);
Expand Down Expand Up @@ -33,7 +33,19 @@ pub fn test_state_ref_tick() {
}

#[multiplatform_test]
pub fn test_fold_tick() {
pub fn test_state_ref_unused() {
let mut df = hydroflow::hydroflow_syntax! {
stream2 = source_iter_delta(15..=25) -> map(Max::new);
sum_of_stream2 = stream2 -> state_ref::<Max<_>>();
};

assert_graphvis_snapshots!(df);

df.run_available(); // Should return quickly and not hang
}

#[multiplatform_test]
pub fn test_fold_zip() {
let mut df = hydroflow::hydroflow_syntax! {
stream1 = source_iter(10..=30);
stream2 = source_iter_delta(15..=25) -> map(Max::new);
Expand Down
36 changes: 34 additions & 2 deletions hydroflow_lang/src/graph/flat_graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;

use itertools::Itertools;
use proc_macro2::Span;
use quote::ToTokens;
use syn::spanned::Spanned;
use syn::{Error, Ident, ItemUse};

use super::ops::find_op_op_constraints;
use super::{GraphEdgeId, GraphNode, GraphNodeId, HydroflowGraph, PortIndexValue};
use crate::diagnostic::{Diagnostic, Level};
use crate::graph::ops::{PortListSpec, RangeTrait};
Expand Down Expand Up @@ -583,10 +583,12 @@ impl FlatGraphBuilder {
for (node_id, node) in self.flat_graph.nodes() {
match node {
GraphNode::Operator(operator) => {
let Some(op_constraints) = find_op_op_constraints(operator) else {
let Some(op_inst) = self.flat_graph.node_op_inst(node_id) else {
// Error already emitted by `insert_node_op_insts_all`.
continue;
};
let op_constraints = op_inst.op_constraints;

// Check number of args
if op_constraints.num_args != operator.args.len() {
self.diagnostics.push(Diagnostic::spanned(
Expand Down Expand Up @@ -811,6 +813,36 @@ impl FlatGraphBuilder {
}
}
}

// Check that singleton references actually reference *stateful* operators.
{
let singletons_resolved =
self.flat_graph.node_singleton_references(node_id);
for (singleton_node_id, singleton_ident) in
singletons_resolved.zip_eq(&*operator.singletons_referenced)
{
let &Some(singleton_node_id) = singleton_node_id else {
// Error already emitted by `connect_operator_links`, "Cannot find referenced name...".
continue;
};
let Some(ref_op_inst) = self.flat_graph.node_op_inst(singleton_node_id)
else {
// Error already emitted by `insert_node_op_insts_all`.
continue;
};
let ref_op_constraints = ref_op_inst.op_constraints;
if !ref_op_constraints.has_singleton_output {
self.diagnostics.push(Diagnostic::spanned(
singleton_ident.span(),
Level::Error,
&format!(
"Cannot reference operator `{}`. Only operators with singleton state can be referenced.",
ref_op_constraints.name,
),
));
}
}
}
}
GraphNode::Handoff { .. } => todo!("Node::Handoff"),
GraphNode::ModuleBoundary { .. } => {
Expand Down
31 changes: 25 additions & 6 deletions hydroflow_lang/src/graph/hydroflow_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,19 @@ impl HydroflowGraph {
self.node_singleton_references
.insert(node_id, singletons_referenced)
}

/// Gets the singletons referenced by a node. Returns an empty iterator for non-operators and
/// operators that do not reference singletons.
pub fn node_singleton_references(
&self,
node_id: GraphNodeId,
) -> std::slice::Iter<'_, Option<GraphNodeId>> {
if let Some(singletons_referenced) = self.node_singleton_references.get(node_id) {
singletons_referenced.iter()
} else {
[].iter()
}
}
}

/// Module methods.
Expand Down Expand Up @@ -794,13 +807,13 @@ impl HydroflowGraph {

/// Resolve the singletons via [`Self::node_singleton_references`] for the given `node_id`.
fn helper_resolve_singletons(&self, node_id: GraphNodeId, span: Span) -> Vec<Ident> {
self.node_singleton_references.get(node_id)
.into_iter() // `None` becomes empty iter.
.flatten()
self.node_singleton_references(node_id)
.map(|singleton_node_id| {
// TODO(mingwei): this `expect` should be caught in error checking
self.node_as_singleton_ident(
singleton_node_id.expect("Expected singleton to be resolved but was not, this is a Hydroflow bug."),
singleton_node_id.expect(
"Expected singleton to be resolved but was not, this is a Hydroflow bug.",
),
span,
)
})
Expand Down Expand Up @@ -932,6 +945,7 @@ impl HydroflowGraph {

let op_span = node.span();
let op_name = op_inst.op_constraints.name;
// TODO(mingwei): Just use `op_inst.op_constraints`?
let op_constraints = OPERATORS
.iter()
.find(|op| op_name == op.name)
Expand Down Expand Up @@ -1008,8 +1022,13 @@ impl HydroflowGraph {

let is_pull = idx < pull_to_push_idx;

let singleton_output_ident =
&self.node_as_singleton_ident(node_id, op_span);
let singleton_output_ident = &if op_constraints.has_singleton_output {
self.node_as_singleton_ident(node_id, op_span)
} else {
// This ident *should* go unused.
Ident::new(&format!("{}_has_no_singleton_output", op_name), op_span)
};

let singletons_resolved =
self.helper_resolve_singletons(node_id, op_span);
let arguments = &postprocess_singletons(
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/_lattice_fold_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub const _LATTICE_FOLD_BATCH: OperatorConstraints = OperatorConstraints {
soft_range_out: RANGE_1,
num_args: 0,
is_external_input: false,
has_singleton_output: false,
ports_inn: Some(|| PortListSpec::Fixed(parse_quote! { input, signal })),
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::MonotoneAccum),
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/_lattice_join_fused_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub const _LATTICE_JOIN_FUSED_JOIN: OperatorConstraints = OperatorConstraints {
persistence_args: &(0..=2),
type_args: &(2..=2),
is_external_input: false,
has_singleton_output: false,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::MonotoneAccum),
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/anti_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints {
persistence_args: &(0..=2),
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { pos, neg })),
ports_out: None,
input_delaytype_fn: |idx| match idx {
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/anti_join_multiset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub const ANTI_JOIN_MULTISET: OperatorConstraints = OperatorConstraints {
persistence_args: &(0..=2),
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { pos, neg })),
ports_out: None,
input_delaytype_fn: |idx| match idx {
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/assert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub const ASSERT: OperatorConstraints = OperatorConstraints {
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/assert_eq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub const ASSERT_EQ: OperatorConstraints = OperatorConstraints {
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub const CROSS_JOIN: OperatorConstraints = OperatorConstraints {
persistence_args: &(0..=2),
type_args: &(0..=1),
is_external_input: false,
has_singleton_output: false,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
ports_out: None,
input_delaytype_fn: |_| None,
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/cross_join_multiset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub const CROSS_JOIN_MULTISET: OperatorConstraints = OperatorConstraints {
persistence_args: &(0..=2),
type_args: &(0..=1),
is_external_input: false,
has_singleton_output: false,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
ports_out: None,
input_delaytype_fn: |_| None,
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/defer_signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub const DEFER_SIGNAL: OperatorConstraints = OperatorConstraints {
soft_range_out: RANGE_1,
num_args: 0,
is_external_input: false,
has_singleton_output: false,
ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { input, signal })),
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::Stratum),
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/defer_tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub const DEFER_TICK: OperatorConstraints = OperatorConstraints {
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::Tick),
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/defer_tick_lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub const DEFER_TICK_LAZY: OperatorConstraints = OperatorConstraints {
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::TickLazy),
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub const DEMUX: OperatorConstraints = OperatorConstraints {
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
ports_inn: None,
ports_out: Some(|| PortListSpec::Variadic),
input_delaytype_fn: |_| None,
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/demux_enum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub const DEMUX_ENUM: OperatorConstraints = OperatorConstraints {
persistence_args: RANGE_0,
type_args: RANGE_1,
is_external_input: false,
has_singleton_output: false,
ports_inn: None,
ports_out: Some(|| PortListSpec::Variadic),
input_delaytype_fn: |_| None,
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/dest_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub const DEST_FILE: OperatorConstraints = OperatorConstraints {
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/dest_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub const DEST_SINK: OperatorConstraints = OperatorConstraints {
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
Expand Down
Loading

0 comments on commit 2857625

Please sign in to comment.