Skip to content

Commit

Permalink
feat(hydroflow_lang): enable singleton reference usage in all operato…
Browse files Browse the repository at this point in the history
…rs (#1133)

also cleans up local imports
  • Loading branch information
MingweiSamuel committed Apr 5, 2024
1 parent f9b26b6 commit 8cb29fa
Show file tree
Hide file tree
Showing 77 changed files with 254 additions and 610 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

48 changes: 31 additions & 17 deletions hydroflow_lang/src/graph/hydroflow_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::diagnostic::{Diagnostic, Level};
use crate::graph::ops::null_write_iterator_fn;
use crate::graph::MODULE_BOUNDARY_NODE_STR;
use crate::pretty_span::{PrettyRowCol, PrettySpan};
use crate::process_singletons::postprocess_singletons;

/// A graph representing a Hydroflow dataflow graph (with or without subgraph partitioning,
/// stratification, and handoff insertion). This is a "meta" graph used for generating Rust source
Expand Down Expand Up @@ -311,7 +312,7 @@ impl HydroflowGraph {
output_ports,
singletons_referenced: operator.singletons_referenced.clone(),
generics,
arguments: operator.args.clone(),
arguments_pre: operator.args.clone(),
arguments_raw: operator.args_raw.clone(),
},
));
Expand Down Expand Up @@ -359,7 +360,7 @@ impl HydroflowGraph {
output_ports: vec![output_port],
singletons_referenced: operator.singletons_referenced.clone(),
generics,
arguments: operator.args.clone(),
arguments_pre: operator.args.clone(),
arguments_raw: operator.args_raw.clone(),
})
};
Expand Down Expand Up @@ -786,6 +787,26 @@ impl HydroflowGraph {
Ident::new(&*format!("edge_{:?}", edge_id.data()), span)
}

/// For per-node singleton references. Helper to generate a deterministic `Ident` for the given node.
fn node_as_singleton_ident(&self, node_id: GraphNodeId, span: Span) -> Ident {
Ident::new(&format!("singleton_op_{:?}", node_id.data()), span)
}

/// Resolve the singletons via [`Self::node_singleton_references`] for the given `node_id`.
fn helper_resolve_singletons(&self, node_id: GraphNodeId, span: Span) -> Vec<Ident> {
self.node_singleton_references.get(node_id)
.into_iter() // `None` becomes empty iter.
.flatten()
.map(|singleton_node_id| {
// TODO(mingwei): this `expect` should be caught in error checking
self.node_as_singleton_ident(
singleton_node_id.expect("Expected singleton to be resolved but was not, this is a Hydroflow bug."),
span,
)
})
.collect::<Vec<_>>()
}

/// Returns each subgraph's receive and send handoffs.
/// `Map<GraphSubgraphId, (recv handoffs, send handoffs)>`
fn helper_collect_subgraph_handoffs(
Expand Down Expand Up @@ -987,21 +1008,14 @@ impl HydroflowGraph {

let is_pull = idx < pull_to_push_idx;

// TODO(mingwei): make this a helper method.
let singleton_output_ident =
&Ident::new(&format!("singleton_op_{:?}", node_id.data()), op_span);

let singletons_resolved = &*self.node_singleton_references.get(node_id)
.into_iter() // `None` becomes empty iter.
.flatten()
.map(|singleton_node_id| {
// TODO(mingwei): make this a helper method.
Ident::new(&format!(
"singleton_op_{:?}",
singleton_node_id.expect("TODO(mingwei): Failed to resolve singleton").data()
), op_span)
})
.collect::<Vec<_>>();
&self.node_as_singleton_ident(node_id, op_span);
let singletons_resolved =
self.helper_resolve_singletons(node_id, op_span);
let arguments = &postprocess_singletons(
op_inst.arguments_raw.clone(),
singletons_resolved,
);

let context_args = WriteContextArgs {
root,
Expand All @@ -1027,7 +1041,7 @@ impl HydroflowGraph {
singleton_output_ident,
op_name,
op_inst,
singletons_resolved,
arguments,
flow_props_in: &*flow_props_in,
};

Expand Down
5 changes: 4 additions & 1 deletion hydroflow_lang/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ pub struct OperatorInstance {
pub generics: OpInstGenerics,
/// Arguments provided by the user into the operator as arguments.
/// I.e. the `a, b, c` in `-> my_op(a, b, c) -> `.
pub arguments: Punctuated<Expr, Token![,]>,
///
/// These arguments do not include singleton postprocessing codegen. Instead use
/// [`ops::WriteContextArgs::arguments`].
pub arguments_pre: Punctuated<Expr, Token![,]>,
/// Unparsed arguments, for singleton parsing.
pub arguments_raw: TokenStream,
}
Expand Down
6 changes: 3 additions & 3 deletions hydroflow_lang/src/graph/ops/_lattice_fold_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use quote::{quote_spanned, ToTokens};
use syn::parse_quote;

use super::{
DelayType, OperatorCategory, OperatorConstraints, OperatorWriteOutput, PortListSpec,
WriteContextArgs, LATTICE_FOLD_REDUCE_FLOW_PROP_FN, RANGE_0, RANGE_1,
DelayType, GraphEdgeType, OpInstGenerics, OperatorCategory, OperatorConstraints,
OperatorInstance, OperatorWriteOutput, PortListSpec, WriteContextArgs,
LATTICE_FOLD_REDUCE_FLOW_PROP_FN, RANGE_0, RANGE_1,
};
use crate::graph::{GraphEdgeType, OpInstGenerics, OperatorInstance};

/// > 2 input streams, 1 output stream, no arguments.
///
Expand Down
Loading

0 comments on commit 8cb29fa

Please sign in to comment.