Skip to content

Commit

Permalink
fix(hydroflow_plus): add Clone bounds to cross_join and simplify …
Browse files Browse the repository at this point in the history
…broadcast logic (#1375)

Summary:

Test Plan:
---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with
[ReviewStack](https://reviewstack.dev/hydro-project/hydroflow/pull/1375).
* #1376
* __->__ #1375
  • Loading branch information
shadaj committed Aug 7, 2024
1 parent bd793e2 commit c12b249
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 48 deletions.
59 changes: 25 additions & 34 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> {
}

// TODO(shadaj): should allow for differing windows, using strongest one
pub fn cross_product<O>(self, other: Stream<'a, O, W, N>) -> Stream<'a, (T, O), W, N> {
pub fn cross_product<O>(self, other: Stream<'a, O, W, N>) -> Stream<'a, (T, O), W, N>
where
T: Clone,
O: Clone,
{
if self.node.id() != other.node.id() {
panic!("cross_product must be called on streams on the same node");
}
Expand Down Expand Up @@ -292,7 +296,10 @@ impl<'a, T, N: Location + Clone> Stream<'a, T, Windowed, N> {
pub fn sample_every(
self,
duration: impl Quoted<'a, std::time::Duration> + Copy + 'a,
) -> Stream<'a, T, Windowed, N> {
) -> Stream<'a, T, Windowed, N>
where
T: Clone,
{
let interval = duration.splice();

let samples = Stream::<'a, hydroflow::tokio::time::Instant, Windowed, N>::new(
Expand Down Expand Up @@ -532,25 +539,16 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> {
) -> Stream<'a, N::Out<T>, Async, N2>
where
N: HfSend<N2, V, In<T> = (N2::Id, T)>,
T: Serialize + DeserializeOwned,
T: Clone + Serialize + DeserializeOwned,
N2::Id: Clone,
{
let ids_spliced = other.ids().splice();
let ids = other.ids();

let other_ids = Stream::<'a, &N2::Id, Windowed, N>::new(
self.node.clone(),
self.ir_leaves.clone(),
HfPlusNode::Source {
source: HfPlusSource::Iter(ids_spliced.into()),
location_id: self.node.id(),
},
)
.cloned()
.all_ticks();

other_ids
.cross_product(self.assume_windowed())
.send_bincode(other)
self.flat_map(q!(|b| ids.iter().map(move |id| (
::std::clone::Clone::clone(id),
::std::clone::Clone::clone(&b)
))))
.send_bincode(other)
}

pub fn broadcast_bincode_interleaved<N2: Location + Cluster<'a> + Clone, Tag, V>(
Expand All @@ -559,7 +557,7 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> {
) -> Stream<'a, T, Async, N2>
where
N: HfSend<N2, V, In<T> = (N2::Id, T), Out<T> = (Tag, T)>,
T: Serialize + DeserializeOwned,
T: Clone + Serialize + DeserializeOwned,
N2::Id: Clone,
{
self.broadcast_bincode(other).map(q!(|(_, b)| b))
Expand All @@ -572,23 +570,15 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> {
where
N: HfSend<N2, V, In<Bytes> = (N2::Id, T)>,
N2::Id: Clone,
T: Clone,
{
let ids_spliced = other.ids().splice();

let other_ids = Stream::<'a, &N2::Id, Windowed, N>::new(
self.node.clone(),
self.ir_leaves.clone(),
HfPlusNode::Source {
source: HfPlusSource::Iter(ids_spliced.into()),
location_id: self.node.id(),
},
)
.cloned()
.all_ticks();
let ids = other.ids();

other_ids
.cross_product(self.assume_windowed())
.send_bytes(other)
self.flat_map(q!(|b| ids.iter().map(move |id| (
::std::clone::Clone::clone(id),
::std::clone::Clone::clone(&b)
))))
.send_bytes(other)
}

pub fn broadcast_bytes_interleaved<N2: Location + Cluster<'a> + Clone, Tag, V>(
Expand All @@ -598,6 +588,7 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> {
where
N: HfSend<N2, V, In<Bytes> = (N2::Id, T), Out<Bytes> = (Tag, Bytes)>,
N2::Id: Clone,
T: Clone,
{
self.broadcast_bytes(other).map(q!(|(_, b)| b))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,15 @@ expression: builder.extract().ir()
},
),
),
input: CrossProduct(
Persist(
Map {
f: { use hydroflow_plus :: __staged :: stream :: * ; | d | d . clone () },
input: Source {
source: Iter(
{ use hydroflow_plus_cli_integration :: __staged :: deploy :: * ; panic ! () },
),
location_id: 0,
},
},
),
Source {
input: FlatMap {
f: { use hydroflow_plus :: __staged :: stream :: * ; let ids = { use hydroflow_plus_cli_integration :: __staged :: deploy :: * ; panic ! () } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) },
input: Source {
source: Iter(
{ use crate :: __staged :: cluster :: many_to_many :: * ; 0 .. 2 },
),
location_id: 0,
},
),
},
},
},
]

0 comments on commit c12b249

Please sign in to comment.