diff --git a/amadeus-core/src/par_pipe.rs b/amadeus-core/src/par_pipe.rs index a87e7d21..d0c50c88 100644 --- a/amadeus-core/src/par_pipe.rs +++ b/amadeus-core/src/par_pipe.rs @@ -1,7 +1,7 @@ use either::Either; use futures::Stream; use std::{ - cmp::Ordering, future::Future, hash::Hash, iter, ops::FnMut, pin::Pin, task::{Context, Poll} + cmp::Ordering, future::Future, hash::Hash, iter, ops::{DerefMut, FnMut}, pin::Pin, task::{Context, Poll} }; use crate::{pool::ProcessSend, sink::Sink}; @@ -25,6 +25,34 @@ pub trait PipeTaskAsync { ) -> Poll<()>; } +impl PipeTaskAsync for Pin

+where + P: DerefMut + Unpin, + P::Target: PipeTaskAsync, +{ + type Item = >::Item; + + fn poll_run( + self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, + sink: Pin<&mut impl Sink>, + ) -> Poll<()> { + self.get_mut().as_mut().poll_run(cx, stream, sink) + } +} +impl PipeTaskAsync for &mut T +where + T: PipeTaskAsync + Unpin, +{ + type Item = T::Item; + + fn poll_run( + mut self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, + sink: Pin<&mut impl Sink>, + ) -> Poll<()> { + Pin::new(&mut **self).poll_run(cx, stream, sink) + } +} + impl_par_dist_rename! { #[must_use] pub trait ParallelPipe { diff --git a/amadeus-core/src/par_sink.rs b/amadeus-core/src/par_sink.rs index f25fe2a0..420f9b5c 100644 --- a/amadeus-core/src/par_sink.rs +++ b/amadeus-core/src/par_sink.rs @@ -16,7 +16,7 @@ mod tuple; use futures::Stream; use std::{ - pin::Pin, task::{Context, Poll} + ops::DerefMut, pin::Pin, task::{Context, Poll} }; use crate::pool::ProcessSend; @@ -52,6 +52,41 @@ pub trait ReducerProcessSend: ReducerSend: type Output: ProcessSend + 'static; } +impl

ReducerAsync for Pin

+where + P: DerefMut + Unpin, + P::Target: ReducerAsync, +{ + type Item = ::Item; + type Output = ::Output; + + fn poll_forward( + self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, + ) -> Poll<()> { + self.get_mut().as_mut().poll_forward(cx, stream) + } + fn poll_output(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.get_mut().as_mut().poll_output(cx) + } +} +impl ReducerAsync for &mut T +where + T: ReducerAsync + Unpin, +{ + type Item = T::Item; + type Output = T::Output; + + fn poll_forward( + mut self: Pin<&mut Self>, cx: &mut Context, + stream: Pin<&mut impl Stream>, + ) -> Poll<()> { + Pin::new(&mut **self).poll_forward(cx, stream) + } + fn poll_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + Pin::new(&mut **self).poll_output(cx) + } +} + pub trait Factory { type Item; diff --git a/amadeus-core/src/par_sink/folder.rs b/amadeus-core/src/par_sink/folder.rs index 4b141d93..20515984 100644 --- a/amadeus-core/src/par_sink/folder.rs +++ b/amadeus-core/src/par_sink/folder.rs @@ -14,43 +14,43 @@ use crate::pool::ProcessSend; mod macros { #[macro_export] - macro_rules! folder_dist_sink { + macro_rules! folder_par_sink { ($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => { type Output = ::Output; type Pipe = I; type ReduceAFactory = FolderSyncReducerFactory; - type ReduceBFactory = FolderSyncReducerFactory<::Output, $folder_b>; type ReduceA = FolderSyncReducer; - type ReduceB = FolderSyncReducer<::Output, $folder_b>; - type ReduceC = FolderSyncReducer<::Output, $folder_b>; + type ReduceC = FolderSyncReducer<::Output, $folder_b>; - fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC) { + fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceC) { let init_a = $init_a; let init_b = $init_b; ( $self.i, FolderSyncReducerFactory::new(init_a), - FolderSyncReducerFactory::new(init_b.clone()), FolderSyncReducer::new(init_b), ) } }; } #[macro_export] - macro_rules! folder_par_sink { + macro_rules! folder_dist_sink { ($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => { type Output = ::Output; type Pipe = I; type ReduceAFactory = FolderSyncReducerFactory; + type ReduceBFactory = FolderSyncReducerFactory<::Output, $folder_b>; type ReduceA = FolderSyncReducer; - type ReduceC = FolderSyncReducer<::Output, $folder_b>; + type ReduceB = FolderSyncReducer<::Output, $folder_b>; + type ReduceC = FolderSyncReducer<::Output, $folder_b>; - fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceC) { + fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC) { let init_a = $init_a; let init_b = $init_b; ( $self.i, FolderSyncReducerFactory::new(init_a), + FolderSyncReducerFactory::new(init_b.clone()), FolderSyncReducer::new(init_b), ) } diff --git a/amadeus-core/src/par_sink/pipe.rs b/amadeus-core/src/par_sink/pipe.rs index 857850ac..6bb797d5 100644 --- a/amadeus-core/src/par_sink/pipe.rs +++ b/amadeus-core/src/par_sink/pipe.rs @@ -18,25 +18,38 @@ pub struct Pipe { b: B, } +impl_par_dist! { + impl, B: ParallelPipe, Source> ParallelPipe for Pipe { + type Item = B::Item; + type Task = JoinTask; + + fn task(&self) -> Self::Task { + let a = self.a.task(); + let b = self.b.task(); + JoinTask { a, b } + } + } +} + impl, B: ParallelSink, Source> ParallelSink for Pipe { type Output = B::Output; - type Pipe = Join; + type Pipe = Pipe; type ReduceAFactory = B::ReduceAFactory; type ReduceA = B::ReduceA; type ReduceC = B::ReduceC; fn reducers(self) -> (Self::Pipe, Self::ReduceAFactory, Self::ReduceC) { let (a, b, c) = self.b.reducers(); - (Join::new(self.a, a), b, c) + (Pipe::new(self.a, a), b, c) } } impl, B: DistributedSink, Source> DistributedSink for Pipe { type Output = B::Output; - type Pipe = Join; + type Pipe = Pipe; type ReduceAFactory = B::ReduceAFactory; type ReduceBFactory = B::ReduceBFactory; type ReduceA = B::ReduceA; @@ -52,27 +65,7 @@ impl, B: DistributedSink, Source> Distribute Self::ReduceC, ) { let (a, b, c, d) = self.b.reducers(); - (Join::new(self.a, a), b, c, d) - } -} - -#[derive(new)] -#[must_use] -pub struct Join { - a: A, - b: B, -} - -impl_par_dist! { - impl, B: ParallelPipe, Source> ParallelPipe for Join { - type Item = B::Item; - type Task = JoinTask; - - fn task(&self) -> Self::Task { - let a = self.a.task(); - let b = self.b.task(); - JoinTask { a, b } - } + (Pipe::new(self.a, a), b, c, d) } } diff --git a/amadeus-core/src/par_stream.rs b/amadeus-core/src/par_stream.rs index 1f561828..4dab072a 100644 --- a/amadeus-core/src/par_stream.rs +++ b/amadeus-core/src/par_stream.rs @@ -19,7 +19,7 @@ use pin_project::pin_project; use serde::{Deserialize, Serialize}; use serde_closure::*; use std::{ - cmp::Ordering, collections::HashMap, future::Future, hash::Hash, iter, marker::PhantomData, ops::FnMut, pin::Pin, task::{Context, Poll}, vec + cmp::Ordering, collections::HashMap, future::Future, hash::Hash, iter, marker::PhantomData, ops::{DerefMut, FnMut}, pin::Pin, task::{Context, Poll}, vec }; use crate::{ @@ -48,6 +48,32 @@ pub trait StreamTaskAsync { ) -> Poll<()>; } +impl

StreamTaskAsync for Pin

+where + P: DerefMut + Unpin, + P::Target: StreamTaskAsync, +{ + type Item = ::Item; + + fn poll_run( + self: Pin<&mut Self>, cx: &mut Context, sink: Pin<&mut impl Sink>, + ) -> Poll<()> { + self.get_mut().as_mut().poll_run(cx, sink) + } +} +impl StreamTaskAsync for &mut T +where + T: StreamTaskAsync + Unpin, +{ + type Item = T::Item; + + fn poll_run( + mut self: Pin<&mut Self>, cx: &mut Context, sink: Pin<&mut impl Sink>, + ) -> Poll<()> { + Pin::new(&mut **self).poll_run(cx, sink) + } +} + #[async_trait(?Send)] #[must_use] pub trait DistributedStream { diff --git a/amadeus-core/src/par_stream/identity.rs b/amadeus-core/src/par_stream/identity.rs index d80981df..77cd2b76 100644 --- a/amadeus-core/src/par_stream/identity.rs +++ b/amadeus-core/src/par_stream/identity.rs @@ -30,7 +30,7 @@ mod workaround { #[doc(hidden)] impl Identity { - pub fn pipe(self, sink: S) -> Pipe { + pub fn pipe(self, sink: S) -> Pipe { Pipe::new(self, sink) } diff --git a/amadeus-derive/src/lib.rs b/amadeus-derive/src/lib.rs index eaeb29df..d0b2e50d 100644 --- a/amadeus-derive/src/lib.rs +++ b/amadeus-derive/src/lib.rs @@ -190,6 +190,7 @@ fn impl_struct( let field_names2 = &field_names; let num_fields = field_names.len(); + let n = &(0usize..num_fields).collect::>(); // The field names specified via `#[amadeus(rename = "foo")]`, falling back to struct // field names @@ -459,9 +460,9 @@ fn impl_struct( #postgres_includes #serde_includes pub use ::amadeus_core::util::Wrapper; - pub use ::amadeus_types::{AmadeusOrd, Data as CoreData, DowncastFrom, Downcast, DowncastError, Value, Group, SchemaIncomplete, ListVec, __internal::{Serialize as Serialize_, Deserialize as Deserialize_, Serializer as Serializer_, Deserializer as Deserializer_}}; + pub use ::amadeus_types::{AmadeusOrd, Data as CoreData, DowncastFrom, Downcast, DowncastError, Value, Group, SchemaIncomplete, ListVec, __internal::{Serialize as Serialize_, Deserialize as Deserialize_, Serializer as Serializer_, Deserializer as Deserializer_, SerializeTuple, Error as SerdeError, Visitor, SeqAccess}}; pub use #amadeus_path::data::Data; - pub use ::std::{boxed::Box, clone::Clone, collections::HashMap, convert::{From, Into}, cmp::{Ordering, PartialEq}, default::Default, error::Error, fmt::{self, Debug, Write}, hash::{Hash, Hasher}, marker::{Send, Sized, Sync}, result::Result::{self, Ok, Err}, string::String, vec, vec::Vec, option::Option::{self, Some, None}, iter::Iterator}; + pub use ::std::{boxed::Box, clone::Clone, collections::HashMap, convert::{From, Into}, cmp::{Ordering, PartialEq}, default::Default, error::Error, fmt::{self, Debug, Write}, hash::{Hash, Hasher}, marker::{PhantomData, Send, Sized, Sync}, result::Result::{self, Ok, Err}, string::String, vec, vec::Vec, option::Option::{self, Some, None}, iter::Iterator}; } #parquet_derives @@ -562,21 +563,74 @@ fn impl_struct( todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/69") } #[inline(always)] - fn serialize_a(&self, _serializer: S) -> __::Result + fn serialize_a(&self, serializer: S) -> __::Result where S: __::Serializer_, for<'a> __::Wrapper<'a, #name #ty_generics>: __::Serialize_, { - todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/69") + struct Wrap<'a,T,U>(&'a T,__::PhantomDataU>); + impl<'a,T,U> __::Serialize_ for Wrap<'a,T,U> where T: __::ListVec + 'a, U: __::CoreData + __::Serialize_ { + fn serialize(&self, serializer: S) -> __::Result + where + S: __::Serializer_, + { + self.0.serialize_a(serializer) + } + } + let mut tuple = serializer.serialize_tuple(1 + #num_fields)?; + __::SerializeTuple::serialize_element(&mut tuple, &self.__len)?; + #(__::SerializeTuple::serialize_element(&mut tuple, &Wrap(&self.#field_names1,__::PhantomData))?;)* + __::SerializeTuple::end(tuple) } #[inline(always)] - fn deserialize_a<'de, D>(_deserializer: D) -> __::Result + fn deserialize_a<'de, D>(deserializer: D) -> __::Result where D: __::Deserializer_<'de>, for<'a> __::Wrapper<'a, #name #ty_generics>: __::Deserialize_<'de>, Self: __::Sized, { - todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/69") + struct TupleVisitor; + impl<'de> __::Visitor<'de> for TupleVisitor { + type Value = #vec_name #ty_generics; + + fn expecting(&self, formatter: &mut __::fmt::Formatter) -> __::fmt::Result { + formatter.write_str(concat!("a tuple of size ", #num_fields)) + } + #[inline] + #[allow(non_snake_case)] + fn visit_seq(self, mut seq: A) -> __::Result + where + A: __::SeqAccess<'de>, + { + struct Wrap(T, __::PhantomDataU>); + impl<'de,T,U> __::Deserialize_<'de> for Wrap where T: __::ListVec, U: __::CoreData + __::Deserialize_<'de> { + fn deserialize(deserializer: D) -> __::Result + where + D: __::Deserializer_<'de>, + { + Ok(Wrap(T::deserialize_a(deserializer)?, __::PhantomData)) + } + } + let __len = match seq.next_element()? { + __::Some(value) => value, + __::None => return __::Err(__::SerdeError::invalid_length(0, &self)), + }; + #( + let #field_names1 = match seq.next_element::>()? { + __::Some(value) => value, + __::None => return __::Err(__::SerdeError::invalid_length(1 + #n, &self)), + }.0; + )* + + __::Ok( + #vec_name { + #(#field_names1,)* + __len, + } + ) + } + } + __::Deserializer_::deserialize_tuple(deserializer, #num_fields, TupleVisitor) } fn fmt_a(&self, fmt: &mut __::fmt::Formatter) -> __::Result<(), __::fmt::Error> where @@ -626,7 +680,7 @@ fn impl_struct( } #[automatically_derived] - impl #impl_generics __::From<#name #ty_generics> for __::Value where #where_clause_with_data { + impl #impl_generics __::From<#name #ty_generics> for __::Value #where_clause_with_data { fn from(value: #name #ty_generics) -> Self { __::Value::Group(__::Group::new(__::vec![ #(__::Into::into(value.#field_names1),)* diff --git a/amadeus-types/src/lib.rs b/amadeus-types/src/lib.rs index 0d575b52..cf54be73 100644 --- a/amadeus-types/src/lib.rs +++ b/amadeus-types/src/lib.rs @@ -72,7 +72,9 @@ pub use self::{ }; pub mod __internal { - pub use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize}; + pub use serde::{ + de::{Deserializer, Error, SeqAccess, Visitor}, ser::{SerializeTuple, Serializer}, Deserialize, Serialize + }; } /// This trait lets one downcast a generic type like [`Value`] to a specific type like diff --git a/tests/cloudfront.rs b/tests/cloudfront.rs index 4937da5e..0392a996 100644 --- a/tests/cloudfront.rs +++ b/tests/cloudfront.rs @@ -42,7 +42,10 @@ async fn cloudfront() { assert_eq!(list.len(), count); for _el in list {} - let count3 = rows.par_stream().pipe(pool, Identity.count()).await; + let count3 = rows + .par_stream() + .pipe(pool, Identity.pipe(Identity.count())) + .await; assert_eq!(count3, 207_928); println!("in {:?}", start.elapsed().unwrap()); diff --git a/tests/cloudfront_dist.rs b/tests/cloudfront_dist.rs index e77cc26a..81eeed3d 100644 --- a/tests/cloudfront_dist.rs +++ b/tests/cloudfront_dist.rs @@ -69,7 +69,10 @@ async fn run(pool: &P) -> Duration { assert_eq!(list.len(), count); for _el in list {} - let count3 = rows.dist_stream().pipe(pool, Identity.count()).await; + let count3 = rows + .dist_stream() + .pipe(pool, Identity.pipe(Identity.count())) + .await; assert_eq!(count3, 207_928); start.elapsed().unwrap()