Skip to content

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Jun 23, 2020
1 parent db9112c commit 851f839
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 47 deletions.
30 changes: 29 additions & 1 deletion amadeus-core/src/par_pipe.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use either::Either;
use futures::Stream;
use std::{
cmp::Ordering, future::Future, hash::Hash, iter, ops::FnMut, pin::Pin, task::{Context, Poll}
cmp::Ordering, future::Future, hash::Hash, iter, ops::{DerefMut, FnMut}, pin::Pin, task::{Context, Poll}
};

use crate::{pool::ProcessSend, sink::Sink};
Expand All @@ -25,6 +25,34 @@ pub trait PipeTaskAsync<Source> {
) -> Poll<()>;
}

impl<P, Source> PipeTaskAsync<Source> for Pin<P>
where
P: DerefMut + Unpin,
P::Target: PipeTaskAsync<Source>,
{
type Item = <P::Target as PipeTaskAsync<Source>>::Item;

fn poll_run(
self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Source>>,
sink: Pin<&mut impl Sink<Item = Self::Item>>,
) -> Poll<()> {
self.get_mut().as_mut().poll_run(cx, stream, sink)
}
}
impl<T: ?Sized, Source> PipeTaskAsync<Source> for &mut T
where
T: PipeTaskAsync<Source> + Unpin,
{
type Item = T::Item;

fn poll_run(
mut self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Source>>,
sink: Pin<&mut impl Sink<Item = Self::Item>>,
) -> Poll<()> {
Pin::new(&mut **self).poll_run(cx, stream, sink)
}
}

impl_par_dist_rename! {
#[must_use]
pub trait ParallelPipe<Source> {
Expand Down
37 changes: 36 additions & 1 deletion amadeus-core/src/par_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod tuple;

use futures::Stream;
use std::{
pin::Pin, task::{Context, Poll}
ops::DerefMut, pin::Pin, task::{Context, Poll}
};

use crate::pool::ProcessSend;
Expand Down Expand Up @@ -52,6 +52,41 @@ pub trait ReducerProcessSend: ReducerSend<Output = <Self as ReducerProcessSend>:
type Output: ProcessSend + 'static;
}

impl<P> ReducerAsync for Pin<P>
where
P: DerefMut + Unpin,
P::Target: ReducerAsync,
{
type Item = <P::Target as ReducerAsync>::Item;
type Output = <P::Target as ReducerAsync>::Output;

fn poll_forward(
self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Self::Item>>,
) -> Poll<()> {
self.get_mut().as_mut().poll_forward(cx, stream)
}
fn poll_output(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.get_mut().as_mut().poll_output(cx)
}
}
impl<T: ?Sized> ReducerAsync for &mut T
where
T: ReducerAsync + Unpin,
{
type Item = T::Item;
type Output = T::Output;

fn poll_forward(
mut self: Pin<&mut Self>, cx: &mut Context,
stream: Pin<&mut impl Stream<Item = Self::Item>>,
) -> Poll<()> {
Pin::new(&mut **self).poll_forward(cx, stream)
}
fn poll_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut **self).poll_output(cx)
}
}

pub trait Factory {
type Item;

Expand Down
18 changes: 9 additions & 9 deletions amadeus-core/src/par_sink/folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,43 @@ use crate::pool::ProcessSend;

mod macros {
#[macro_export]
macro_rules! folder_dist_sink {
macro_rules! folder_par_sink {
($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
type Output = <Self::ReduceC as $crate::par_sink::Reducer>::Output;
type Pipe = I;
type ReduceAFactory = FolderSyncReducerFactory<I::Item, $folder_a>;
type ReduceBFactory = FolderSyncReducerFactory<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;
type ReduceA = FolderSyncReducer<I::Item, $folder_a>;
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer>::Output, $folder_b>;
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;

fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC) {
fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceC) {
let init_a = $init_a;
let init_b = $init_b;
(
$self.i,
FolderSyncReducerFactory::new(init_a),
FolderSyncReducerFactory::new(init_b.clone()),
FolderSyncReducer::new(init_b),
)
}
};
}
#[macro_export]
macro_rules! folder_par_sink {
macro_rules! folder_dist_sink {
($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
type Output = <Self::ReduceC as $crate::par_sink::Reducer>::Output;
type Pipe = I;
type ReduceAFactory = FolderSyncReducerFactory<I::Item, $folder_a>;
type ReduceBFactory = FolderSyncReducerFactory<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;
type ReduceA = FolderSyncReducer<I::Item, $folder_a>;
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer>::Output, $folder_b>;

fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceC) {
fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC) {
let init_a = $init_a;
let init_b = $init_b;
(
$self.i,
FolderSyncReducerFactory::new(init_a),
FolderSyncReducerFactory::new(init_b.clone()),
FolderSyncReducer::new(init_b),
)
}
Expand Down
41 changes: 17 additions & 24 deletions amadeus-core/src/par_sink/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,38 @@ pub struct Pipe<A, B> {
b: B,
}

impl_par_dist! {
impl<A: ParallelPipe<Source>, B: ParallelPipe<A::Item>, Source> ParallelPipe<Source> for Pipe<A, B> {
type Item = B::Item;
type Task = JoinTask<A::Task, B::Task>;

fn task(&self) -> Self::Task {
let a = self.a.task();
let b = self.b.task();
JoinTask { a, b }
}
}
}

impl<A: ParallelPipe<Source>, B: ParallelSink<A::Item>, Source> ParallelSink<Source>
for Pipe<A, B>
{
type Output = B::Output;
type Pipe = Join<A, B::Pipe>;
type Pipe = Pipe<A, B::Pipe>;
type ReduceAFactory = B::ReduceAFactory;
type ReduceA = B::ReduceA;
type ReduceC = B::ReduceC;

fn reducers(self) -> (Self::Pipe, Self::ReduceAFactory, Self::ReduceC) {
let (a, b, c) = self.b.reducers();
(Join::new(self.a, a), b, c)
(Pipe::new(self.a, a), b, c)
}
}
impl<A: DistributedPipe<Source>, B: DistributedSink<A::Item>, Source> DistributedSink<Source>
for Pipe<A, B>
{
type Output = B::Output;
type Pipe = Join<A, B::Pipe>;
type Pipe = Pipe<A, B::Pipe>;
type ReduceAFactory = B::ReduceAFactory;
type ReduceBFactory = B::ReduceBFactory;
type ReduceA = B::ReduceA;
Expand All @@ -52,27 +65,7 @@ impl<A: DistributedPipe<Source>, B: DistributedSink<A::Item>, Source> Distribute
Self::ReduceC,
) {
let (a, b, c, d) = self.b.reducers();
(Join::new(self.a, a), b, c, d)
}
}

#[derive(new)]
#[must_use]
pub struct Join<A, B> {
a: A,
b: B,
}

impl_par_dist! {
impl<A: ParallelPipe<Source>, B: ParallelPipe<A::Item>, Source> ParallelPipe<Source> for Join<A, B> {
type Item = B::Item;
type Task = JoinTask<A::Task, B::Task>;

fn task(&self) -> Self::Task {
let a = self.a.task();
let b = self.b.task();
JoinTask { a, b }
}
(Pipe::new(self.a, a), b, c, d)
}
}

Expand Down
28 changes: 27 additions & 1 deletion amadeus-core/src/par_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use serde_closure::*;
use std::{
cmp::Ordering, collections::HashMap, future::Future, hash::Hash, iter, marker::PhantomData, ops::FnMut, pin::Pin, task::{Context, Poll}, vec
cmp::Ordering, collections::HashMap, future::Future, hash::Hash, iter, marker::PhantomData, ops::{DerefMut, FnMut}, pin::Pin, task::{Context, Poll}, vec
};

use crate::{
Expand Down Expand Up @@ -48,6 +48,32 @@ pub trait StreamTaskAsync {
) -> Poll<()>;
}

impl<P> StreamTaskAsync for Pin<P>
where
P: DerefMut + Unpin,
P::Target: StreamTaskAsync,
{
type Item = <P::Target as StreamTaskAsync>::Item;

fn poll_run(
self: Pin<&mut Self>, cx: &mut Context, sink: Pin<&mut impl Sink<Item = Self::Item>>,
) -> Poll<()> {
self.get_mut().as_mut().poll_run(cx, sink)
}
}
impl<T: ?Sized> StreamTaskAsync for &mut T
where
T: StreamTaskAsync + Unpin,
{
type Item = T::Item;

fn poll_run(
mut self: Pin<&mut Self>, cx: &mut Context, sink: Pin<&mut impl Sink<Item = Self::Item>>,
) -> Poll<()> {
Pin::new(&mut **self).poll_run(cx, sink)
}
}

#[async_trait(?Send)]
#[must_use]
pub trait DistributedStream {
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/src/par_stream/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod workaround {

#[doc(hidden)]
impl Identity {
pub fn pipe<S, A>(self, sink: S) -> Pipe<Self, S> {
pub fn pipe<S>(self, sink: S) -> Pipe<Self, S> {
Pipe::new(self, sink)
}

Expand Down
68 changes: 61 additions & 7 deletions amadeus-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ fn impl_struct(
let field_names2 = &field_names;

let num_fields = field_names.len();
let n = &(0usize..num_fields).collect::<Vec<_>>();

// The field names specified via `#[amadeus(rename = "foo")]`, falling back to struct
// field names
Expand Down Expand Up @@ -459,9 +460,9 @@ fn impl_struct(
#postgres_includes
#serde_includes
pub use ::amadeus_core::util::Wrapper;
pub use ::amadeus_types::{AmadeusOrd, Data as CoreData, DowncastFrom, Downcast, DowncastError, Value, Group, SchemaIncomplete, ListVec, __internal::{Serialize as Serialize_, Deserialize as Deserialize_, Serializer as Serializer_, Deserializer as Deserializer_}};
pub use ::amadeus_types::{AmadeusOrd, Data as CoreData, DowncastFrom, Downcast, DowncastError, Value, Group, SchemaIncomplete, ListVec, __internal::{Serialize as Serialize_, Deserialize as Deserialize_, Serializer as Serializer_, Deserializer as Deserializer_, SerializeTuple, Error as SerdeError, Visitor, SeqAccess}};
pub use #amadeus_path::data::Data;
pub use ::std::{boxed::Box, clone::Clone, collections::HashMap, convert::{From, Into}, cmp::{Ordering, PartialEq}, default::Default, error::Error, fmt::{self, Debug, Write}, hash::{Hash, Hasher}, marker::{Send, Sized, Sync}, result::Result::{self, Ok, Err}, string::String, vec, vec::Vec, option::Option::{self, Some, None}, iter::Iterator};
pub use ::std::{boxed::Box, clone::Clone, collections::HashMap, convert::{From, Into}, cmp::{Ordering, PartialEq}, default::Default, error::Error, fmt::{self, Debug, Write}, hash::{Hash, Hasher}, marker::{PhantomData, Send, Sized, Sync}, result::Result::{self, Ok, Err}, string::String, vec, vec::Vec, option::Option::{self, Some, None}, iter::Iterator};
}

#parquet_derives
Expand Down Expand Up @@ -562,21 +563,74 @@ fn impl_struct(
todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/69")
}
#[inline(always)]
fn serialize_a<S>(&self, _serializer: S) -> __::Result<S::Ok, S::Error>
fn serialize_a<S>(&self, serializer: S) -> __::Result<S::Ok, S::Error>
where
S: __::Serializer_,
for<'a> __::Wrapper<'a, #name #ty_generics>: __::Serialize_,
{
todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/69")
struct Wrap<'a,T,U>(&'a T,__::PhantomData<fn()->U>);
impl<'a,T,U> __::Serialize_ for Wrap<'a,T,U> where T: __::ListVec<U> + 'a, U: __::CoreData + __::Serialize_ {
fn serialize<S>(&self, serializer: S) -> __::Result<S::Ok, S::Error>
where
S: __::Serializer_,
{
self.0.serialize_a(serializer)
}
}
let mut tuple = serializer.serialize_tuple(1 + #num_fields)?;
__::SerializeTuple::serialize_element(&mut tuple, &self.__len)?;
#(__::SerializeTuple::serialize_element(&mut tuple, &Wrap(&self.#field_names1,__::PhantomData))?;)*
__::SerializeTuple::end(tuple)
}
#[inline(always)]
fn deserialize_a<'de, D>(_deserializer: D) -> __::Result<Self, D::Error>
fn deserialize_a<'de, D>(deserializer: D) -> __::Result<Self, D::Error>
where
D: __::Deserializer_<'de>,
for<'a> __::Wrapper<'a, #name #ty_generics>: __::Deserialize_<'de>,
Self: __::Sized,
{
todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/69")
struct TupleVisitor;
impl<'de> __::Visitor<'de> for TupleVisitor {
type Value = #vec_name #ty_generics;

fn expecting(&self, formatter: &mut __::fmt::Formatter) -> __::fmt::Result {
formatter.write_str(concat!("a tuple of size ", #num_fields))
}
#[inline]
#[allow(non_snake_case)]
fn visit_seq<A>(self, mut seq: A) -> __::Result<Self::Value, A::Error>
where
A: __::SeqAccess<'de>,
{
struct Wrap<T,U>(T, __::PhantomData<fn()->U>);
impl<'de,T,U> __::Deserialize_<'de> for Wrap<T,U> where T: __::ListVec<U>, U: __::CoreData + __::Deserialize_<'de> {
fn deserialize<D>(deserializer: D) -> __::Result<Self, D::Error>
where
D: __::Deserializer_<'de>,
{
Ok(Wrap(T::deserialize_a(deserializer)?, __::PhantomData))
}
}
let __len = match seq.next_element()? {
__::Some(value) => value,
__::None => return __::Err(__::SerdeError::invalid_length(0, &self)),
};
#(
let #field_names1 = match seq.next_element::<Wrap<_,_>>()? {
__::Some(value) => value,
__::None => return __::Err(__::SerdeError::invalid_length(1 + #n, &self)),
}.0;
)*

__::Ok(
#vec_name {
#(#field_names1,)*
__len,
}
)
}
}
__::Deserializer_::deserialize_tuple(deserializer, #num_fields, TupleVisitor)
}
fn fmt_a(&self, fmt: &mut __::fmt::Formatter) -> __::Result<(), __::fmt::Error>
where
Expand Down Expand Up @@ -626,7 +680,7 @@ fn impl_struct(
}

#[automatically_derived]
impl #impl_generics __::From<#name #ty_generics> for __::Value where #where_clause_with_data {
impl #impl_generics __::From<#name #ty_generics> for __::Value #where_clause_with_data {
fn from(value: #name #ty_generics) -> Self {
__::Value::Group(__::Group::new(__::vec![
#(__::Into::into(value.#field_names1),)*
Expand Down
4 changes: 3 additions & 1 deletion amadeus-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ pub use self::{
};

pub mod __internal {
pub use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize};
pub use serde::{
de::{Deserializer, Error, SeqAccess, Visitor}, ser::{SerializeTuple, Serializer}, Deserialize, Serialize
};
}

/// This trait lets one downcast a generic type like [`Value`] to a specific type like
Expand Down
Loading

0 comments on commit 851f839

Please sign in to comment.