diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index e5585a7709e0..969c26ed6a19 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -154,7 +154,11 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> { } // TODO(shadaj): should allow for differing windows, using strongest one - pub fn cross_product(self, other: Stream<'a, O, W, N>) -> Stream<'a, (T, O), W, N> { + pub fn cross_product(self, other: Stream<'a, O, W, N>) -> Stream<'a, (T, O), W, N> + where + T: Clone, + O: Clone, + { if self.node.id() != other.node.id() { panic!("cross_product must be called on streams on the same node"); } @@ -292,7 +296,10 @@ impl<'a, T, N: Location + Clone> Stream<'a, T, Windowed, N> { pub fn sample_every( self, duration: impl Quoted<'a, std::time::Duration> + Copy + 'a, - ) -> Stream<'a, T, Windowed, N> { + ) -> Stream<'a, T, Windowed, N> + where + T: Clone, + { let interval = duration.splice(); let samples = Stream::<'a, hydroflow::tokio::time::Instant, Windowed, N>::new( @@ -532,25 +539,16 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> { ) -> Stream<'a, N::Out, Async, N2> where N: HfSend = (N2::Id, T)>, - T: Serialize + DeserializeOwned, + T: Clone + Serialize + DeserializeOwned, N2::Id: Clone, { - let ids_spliced = other.ids().splice(); + let ids = other.ids(); - let other_ids = Stream::<'a, &N2::Id, Windowed, N>::new( - self.node.clone(), - self.ir_leaves.clone(), - HfPlusNode::Source { - source: HfPlusSource::Iter(ids_spliced.into()), - location_id: self.node.id(), - }, - ) - .cloned() - .all_ticks(); - - other_ids - .cross_product(self.assume_windowed()) - .send_bincode(other) + self.flat_map(q!(|b| ids.iter().map(move |id| ( + ::std::clone::Clone::clone(id), + ::std::clone::Clone::clone(&b) + )))) + .send_bincode(other) } pub fn broadcast_bincode_interleaved + Clone, Tag, V>( @@ -559,7 +557,7 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> { ) -> Stream<'a, T, Async, N2> where N: HfSend = (N2::Id, T), Out = (Tag, T)>, - T: Serialize + DeserializeOwned, + T: Clone + Serialize + DeserializeOwned, N2::Id: Clone, { self.broadcast_bincode(other).map(q!(|(_, b)| b)) @@ -572,23 +570,15 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> { where N: HfSend = (N2::Id, T)>, N2::Id: Clone, + T: Clone, { - let ids_spliced = other.ids().splice(); - - let other_ids = Stream::<'a, &N2::Id, Windowed, N>::new( - self.node.clone(), - self.ir_leaves.clone(), - HfPlusNode::Source { - source: HfPlusSource::Iter(ids_spliced.into()), - location_id: self.node.id(), - }, - ) - .cloned() - .all_ticks(); + let ids = other.ids(); - other_ids - .cross_product(self.assume_windowed()) - .send_bytes(other) + self.flat_map(q!(|b| ids.iter().map(move |id| ( + ::std::clone::Clone::clone(id), + ::std::clone::Clone::clone(&b) + )))) + .send_bytes(other) } pub fn broadcast_bytes_interleaved + Clone, Tag, V>( @@ -598,6 +588,7 @@ impl<'a, T, W, N: Location + Clone> Stream<'a, T, W, N> { where N: HfSend = (N2::Id, T), Out = (Tag, Bytes)>, N2::Id: Clone, + T: Clone, { self.broadcast_bytes(other).map(q!(|(_, b)| b)) } diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap index 708bb1dbbe74..565142fe6968 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap @@ -29,25 +29,15 @@ expression: builder.extract().ir() }, ), ), - input: CrossProduct( - Persist( - Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | d | d . clone () }, - input: Source { - source: Iter( - { use hydroflow_plus_cli_integration :: __staged :: deploy :: * ; panic ! () }, - ), - location_id: 0, - }, - }, - ), - Source { + input: FlatMap { + f: { use hydroflow_plus :: __staged :: stream :: * ; let ids = { use hydroflow_plus_cli_integration :: __staged :: deploy :: * ; panic ! () } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }, + input: Source { source: Iter( { use crate :: __staged :: cluster :: many_to_many :: * ; 0 .. 2 }, ), location_id: 0, }, - ), + }, }, }, ]