Skip to content

Commit

Permalink
feat(hydroflow_plus): add operators necessary for Paxos / PBFT (#1376)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Aug 7, 2024
1 parent c12b249 commit eaf497b
Show file tree
Hide file tree
Showing 30 changed files with 176 additions and 73 deletions.
88 changes: 84 additions & 4 deletions hydroflow_plus/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub enum HfPlusNode {

Union(Box<HfPlusNode>, Box<HfPlusNode>),
CrossProduct(Box<HfPlusNode>, Box<HfPlusNode>),
CrossSingleton(Box<HfPlusNode>, Box<HfPlusNode>),
Join(Box<HfPlusNode>, Box<HfPlusNode>),
Difference(Box<HfPlusNode>, Box<HfPlusNode>),
AntiJoin(Box<HfPlusNode>, Box<HfPlusNode>),
Expand All @@ -203,6 +204,8 @@ pub enum HfPlusNode {
f: DebugExpr,
input: Box<HfPlusNode>,
},

DeferTick(Box<HfPlusNode>),
Enumerate(Box<HfPlusNode>),
Inspect {
f: DebugExpr,
Expand All @@ -211,6 +214,7 @@ pub enum HfPlusNode {

Unique(Box<HfPlusNode>),

Sort(Box<HfPlusNode>),
Fold {
init: DebugExpr,
acc: DebugExpr,
Expand Down Expand Up @@ -298,6 +302,10 @@ impl HfPlusNode {
Box::new(transform(*left, seen_tees)),
Box::new(transform(*right, seen_tees)),
),
HfPlusNode::CrossSingleton(left, right) => HfPlusNode::CrossSingleton(
Box::new(transform(*left, seen_tees)),
Box::new(transform(*right, seen_tees)),
),
HfPlusNode::Join(left, right) => HfPlusNode::Join(
Box::new(transform(*left, seen_tees)),
Box::new(transform(*right, seen_tees)),
Expand Down Expand Up @@ -327,6 +335,10 @@ impl HfPlusNode {
f,
input: Box::new(transform(*input, seen_tees)),
},
HfPlusNode::Sort(input) => HfPlusNode::Sort(Box::new(transform(*input, seen_tees))),
HfPlusNode::DeferTick(input) => {
HfPlusNode::DeferTick(Box::new(transform(*input, seen_tees)))
}
HfPlusNode::Enumerate(input) => {
HfPlusNode::Enumerate(Box::new(transform(*input, seen_tees)))
}
Expand Down Expand Up @@ -527,11 +539,44 @@ impl HfPlusNode {
(union_ident, left_location_id)
}

HfPlusNode::CrossSingleton(left, right) => {
let (left_ident, left_location_id) =
left.emit(graph_builders, built_tees, next_stmt_id);
let (right_ident, right_location_id) =
right.emit(graph_builders, built_tees, next_stmt_id);

assert_eq!(
left_location_id, right_location_id,
"cross_singleton inputs must be in the same location"
);

let union_id = *next_stmt_id;
*next_stmt_id += 1;

let cross_ident =
syn::Ident::new(&format!("stream_{}", union_id), Span::call_site());

let builder = graph_builders.entry(left_location_id).or_default();
builder.add_statement(parse_quote! {
#cross_ident = cross_singleton();
});

builder.add_statement(parse_quote! {
#left_ident -> [input]#cross_ident;
});

builder.add_statement(parse_quote! {
#right_ident -> [single]#cross_ident;
});

(cross_ident, left_location_id)
}

HfPlusNode::CrossProduct(..) | HfPlusNode::Join(..) => {
let operator: syn::Ident = if matches!(self, HfPlusNode::CrossProduct(..)) {
parse_quote!(cross_join)
parse_quote!(cross_join_multiset)
} else {
parse_quote!(join)
parse_quote!(join_multiset)
};

let (HfPlusNode::CrossProduct(left, right) | HfPlusNode::Join(left, right)) = self
Expand Down Expand Up @@ -605,9 +650,9 @@ impl HfPlusNode {

HfPlusNode::Difference(..) | HfPlusNode::AntiJoin(..) => {
let operator: syn::Ident = if matches!(self, HfPlusNode::Difference(..)) {
parse_quote!(difference)
parse_quote!(difference_multiset)
} else {
parse_quote!(anti_join)
parse_quote!(anti_join_multiset)
};

let (HfPlusNode::Difference(left, right) | HfPlusNode::AntiJoin(left, right)) =
Expand Down Expand Up @@ -732,6 +777,41 @@ impl HfPlusNode {
(filter_map_ident, input_location_id)
}

HfPlusNode::Sort(input) => {
let (input_ident, input_location_id) =
input.emit(graph_builders, built_tees, next_stmt_id);

let sort_id = *next_stmt_id;
*next_stmt_id += 1;

let sort_ident = syn::Ident::new(&format!("stream_{}", sort_id), Span::call_site());

let builder = graph_builders.entry(input_location_id).or_default();
builder.add_statement(parse_quote! {
#sort_ident = #input_ident -> sort();
});

(sort_ident, input_location_id)
}

HfPlusNode::DeferTick(input) => {
let (input_ident, input_location_id) =
input.emit(graph_builders, built_tees, next_stmt_id);

let defer_tick_id = *next_stmt_id;
*next_stmt_id += 1;

let defer_tick_ident =
syn::Ident::new(&format!("stream_{}", defer_tick_id), Span::call_site());

let builder = graph_builders.entry(input_location_id).or_default();
builder.add_statement(parse_quote! {
#defer_tick_ident = #input_ident -> defer_tick_lazy();
});

(defer_tick_ident, input_location_id)
}

HfPlusNode::Enumerate(input) => {
let (input_ident, input_location_id) =
input.emit(graph_builders, built_tees, next_stmt_id);
Expand Down
55 changes: 50 additions & 5 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,35 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> {
)
}

pub fn cross_singleton<O>(self, other: Stream<'a, O, Windowed, N>) -> Stream<'a, (T, O), W, N>
where
O: Clone,
{
if self.node.id() != other.node.id() {
panic!("cross_singleton must be called on streams on the same node");
}

Stream::new(
self.node,
self.ir_leaves,
HfPlusNode::CrossSingleton(
Box::new(self.ir_node.into_inner()),
Box::new(other.ir_node.into_inner()),
),
)
}

/// Allow this stream through if the other stream has elements, otherwise the output is empty.
pub fn continue_if<U>(self, signal: Stream<'a, U, Windowed, N>) -> Stream<'a, T, W, N> {
self.cross_singleton(signal.map(q!(|_u| ())))
.map(q!(|(d, _signal)| d))
}

/// Allow this stream through if the other stream is empty, otherwise the output is empty.
pub fn continue_unless<U>(self, other: Stream<'a, U, Windowed, N>) -> Stream<'a, T, W, N> {
self.continue_if(other.count().filter(q!(|c| *c == 0)))
}

// TODO(shadaj): should allow for differing windows, using strongest one
pub fn cross_product<O>(self, other: Stream<'a, O, W, N>) -> Stream<'a, (T, O), W, N>
where
Expand Down Expand Up @@ -252,6 +281,17 @@ impl<'a, T, N: Location + Clone> Stream<'a, T, Windowed, N> {
)
}

pub fn sort(self) -> Stream<'a, T, Windowed, N>
where
T: Ord,
{
Stream::new(
self.node,
self.ir_leaves,
HfPlusNode::Sort(Box::new(self.ir_node.into_inner())),
)
}

pub fn count(self) -> Stream<'a, usize, Windowed, N> {
self.fold(q!(|| 0usize), q!(|count, _| *count += 1))
}
Expand All @@ -264,6 +304,14 @@ impl<'a, T, N: Location + Clone> Stream<'a, T, Windowed, N> {
)
}

pub fn defer_tick(self) -> Stream<'a, T, Windowed, N> {
Stream::new(
self.node,
self.ir_leaves,
HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())),
)
}

pub fn unique(self) -> Stream<'a, T, Windowed, N>
where
T: Eq + Hash,
Expand Down Expand Up @@ -296,10 +344,7 @@ impl<'a, T, N: Location + Clone> Stream<'a, T, Windowed, N> {
pub fn sample_every(
self,
duration: impl Quoted<'a, std::time::Duration> + Copy + 'a,
) -> Stream<'a, T, Windowed, N>
where
T: Clone,
{
) -> Stream<'a, T, Windowed, N> {
let interval = duration.splice();

let samples = Stream::<'a, hydroflow::tokio::time::Instant, Windowed, N>::new(
Expand All @@ -311,7 +356,7 @@ impl<'a, T, N: Location + Clone> Stream<'a, T, Windowed, N> {
},
);

self.cross_product(samples).map(q!(|(a, _)| a))
self.continue_if(samples)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ expression: built.ir()
ForEach {
f: { use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) | { println ! ("pi: {} ({} trials)" , 4.0 * inside as f64 / total as f64 , total) ; } },
input: Map {
f: { use hydroflow_plus :: __staged :: stream :: * ; | (a , _) | a },
input: CrossProduct(
f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d },
input: CrossSingleton(
Reduce {
f: { use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; } },
input: Persist(
Expand Down Expand Up @@ -61,11 +61,14 @@ expression: built.ir()
},
),
},
Source {
source: Interval(
{ use crate :: __staged :: cluster :: compute_pi :: * ; Duration :: from_secs (1) },
),
location_id: 1,
Map {
f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () },
input: Source {
source: Interval(
{ use crate :: __staged :: cluster :: compute_pi :: * ; Duration :: from_secs (1) },
),
location_id: 1,
},
},
),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ expression: ir.surface_syntax_string()
3v1 = map ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b });
4v1 = reduce :: < 'static > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; } });
5v1 = source_interval ({ use crate :: __staged :: cluster :: compute_pi :: * ; Duration :: from_secs (1) });
6v1 = cross_join :: < 'tick , 'tick > ();
7v1 = map ({ use hydroflow_plus :: __staged :: stream :: * ; | (a , _) | a });
8v1 = for_each ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) | { println ! ("pi: {} ({} trials)" , 4.0 * inside as f64 / total as f64 , total) ; } });
6v1 = map ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () });
7v1 = cross_singleton ();
8v1 = map ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d });
9v1 = for_each ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) | { println ! ("pi: {} ({} trials)" , 4.0 * inside as f64 / total as f64 , total) ; } });

1v1 -> 2v1;
2v1 -> 3v1;
3v1 -> 4v1;
4v1 -> 6v1;
5v1 -> 6v1;
4v1 -> 7v1;
6v1 -> 7v1;
7v1 -> 8v1;

8v1 -> 9v1;
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ digraph {
n1v1 [label="(n1v1) source_stream(users_stream)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_stream(messages)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) map({\l use crate::__staged::local::chat_app::*;\l |s| s.to_uppercase()\l})\l", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) cross_join::<'static, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) cross_join_multiset::<'static, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) for_each({\l use crate::__staged::local::chat_app::*;\l let output = output;\l |t| {\l output.send(t).unwrap();\l }\l})\l", shape=house, fillcolor="#ffff88"]
n2v1 -> n3v1
n1v1 -> n4v1 [label="0"]
Expand Down Expand Up @@ -41,4 +41,3 @@ digraph {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_stream(users_stream)</code>"/]:::pullClass
2v1[\"(2v1) <code>source_stream(messages)</code>"/]:::pullClass
3v1[\"<div style=text-align:center>(3v1)</div> <code>map({<br> use crate::__staged::local::chat_app::*;<br> |s| s.to_uppercase()<br>})</code>"/]:::pullClass
4v1[\"(4v1) <code>cross_join::&lt;'static, 'tick&gt;()</code>"/]:::pullClass
4v1[\"(4v1) <code>cross_join_multiset::&lt;'static, 'tick&gt;()</code>"/]:::pullClass
5v1[/"<div style=text-align:center>(5v1)</div> <code>for_each({<br> use crate::__staged::local::chat_app::*;<br> let output = output;<br> |t| {<br> output.send(t).unwrap();<br> }<br>})</code>"\]:::pushClass
2v1-->3v1
1v1-->|0|4v1
Expand All @@ -36,4 +36,3 @@ subgraph sg_1v1 ["sg_1v1 stratum 0"]
4v1
end
end

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ digraph {
n1v1 [label="(n1v1) source_stream(users_stream)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_stream(messages)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) map({\l use crate::__staged::local::chat_app::*;\l |s| s.to_uppercase()\l})\l", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) cross_join::<'static, 'static>()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) cross_join_multiset::<'static, 'static>()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) multiset_delta()", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) for_each({\l use crate::__staged::local::chat_app::*;\l let output = output;\l |t| {\l output.send(t).unwrap();\l }\l})\l", shape=house, fillcolor="#ffff88"]
n2v1 -> n3v1
Expand Down Expand Up @@ -48,4 +48,3 @@ digraph {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_stream(users_stream)</code>"/]:::pullClass
2v1[\"(2v1) <code>source_stream(messages)</code>"/]:::pullClass
3v1[\"<div style=text-align:center>(3v1)</div> <code>map({<br> use crate::__staged::local::chat_app::*;<br> |s| s.to_uppercase()<br>})</code>"/]:::pullClass
4v1[\"(4v1) <code>cross_join::&lt;'static, 'static&gt;()</code>"/]:::pullClass
4v1[\"(4v1) <code>cross_join_multiset::&lt;'static, 'static&gt;()</code>"/]:::pullClass
5v1[\"(5v1) <code>multiset_delta()</code>"/]:::pullClass
6v1[/"<div style=text-align:center>(6v1)</div> <code>for_each({<br> use crate::__staged::local::chat_app::*;<br> let output = output;<br> |t| {<br> output.send(t).unwrap();<br> }<br>})</code>"\]:::pushClass
2v1-->3v1
Expand Down Expand Up @@ -42,4 +42,3 @@ subgraph sg_1v1 ["sg_1v1 stratum 0"]
5v1
end
end

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ digraph {
n3v1 [label="(n3v1) tee()", shape=house, fillcolor="#ffff88"]
n4v1 [label="(n4v1) map({\l use crate::__staged::local::graph_reachability::*;\l |r| (r, ())\l})\l", shape=house, fillcolor="#ffff88"]
n5v1 [label="(n5v1) source_stream(edges)", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) join::<'tick, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) join_multiset::<'tick, 'tick>()", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) map({\l use crate::__staged::local::graph_reachability::*;\l |(_from, (_, to))| to\l})\l", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) unique::<'tick>()", shape=house, fillcolor="#ffff88"]
n9v1 [label="(n9v1) for_each({\l use crate::__staged::local::graph_reachability::*;\l let reached_out = reached_out;\l |v| {\l reached_out.send(v).unwrap();\l }\l})\l", shape=house, fillcolor="#ffff88"]
Expand Down Expand Up @@ -73,4 +73,3 @@ digraph {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ linkStyle default stroke:#aaa
3v1[/"(3v1) <code>tee()</code>"\]:::pushClass
4v1[/"<div style=text-align:center>(4v1)</div> <code>map({<br> use crate::__staged::local::graph_reachability::*;<br> |r| (r, ())<br>})</code>"\]:::pushClass
5v1[\"(5v1) <code>source_stream(edges)</code>"/]:::pullClass
6v1[\"(6v1) <code>join::&lt;'tick, 'tick&gt;()</code>"/]:::pullClass
6v1[\"(6v1) <code>join_multiset::&lt;'tick, 'tick&gt;()</code>"/]:::pullClass
7v1[\"<div style=text-align:center>(7v1)</div> <code>map({<br> use crate::__staged::local::graph_reachability::*;<br> |(_from, (_, to))| to<br>})</code>"/]:::pullClass
8v1[/"(8v1) <code>unique::&lt;'tick&gt;()</code>"\]:::pushClass
9v1[/"<div style=text-align:center>(9v1)</div> <code>for_each({<br> use crate::__staged::local::graph_reachability::*;<br> let reached_out = reached_out;<br> |v| {<br> reached_out.send(v).unwrap();<br> }<br>})</code>"\]:::pushClass
Expand Down Expand Up @@ -64,4 +64,3 @@ subgraph sg_1v1 ["sg_1v1 stratum 0"]
8v1
end
end

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ digraph {
n2v1 [label="(n2v1) map({\l use crate::__staged::local::negation::*;\l |v| (v, v)\l})\l", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) source_iter({\l use crate::__staged::local::negation::*;\l 3..6\l})\l", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) anti_join::<'tick, 'static>()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) anti_join_multiset::<'tick, 'static>()", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) for_each({\l use crate::__staged::local::negation::*;\l let output = output;\l |v: (u32, u32)| {\l output.send(v.0).unwrap();\l }\l})\l", shape=house, fillcolor="#ffff88"]
n7v1 [label="(n7v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n1v1 -> n2v1
Expand Down Expand Up @@ -55,4 +55,3 @@ digraph {
}
}
}

Loading

0 comments on commit eaf497b

Please sign in to comment.