Skip to content

Commit

Permalink
make reducers async
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed May 22, 2020
1 parent 9c5d139 commit 6d97618
Show file tree
Hide file tree
Showing 28 changed files with 928 additions and 529 deletions.
35 changes: 17 additions & 18 deletions amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,23 @@ impl Source for Cloudfront {
.map_err(AwsError::from)
.map(|res| {
let body = res.body.unwrap().into_blocking_read();
BufReader::new(MultiGzDecoder::new(
Box::new(body) as Box<dyn io::Read + Send>
))
.lines()
.filter(|x: &Result<String, io::Error>| {
if let Ok(x) = x {
x.chars().filter(|x| !x.is_whitespace()).nth(0) != Some('#')
} else {
true
}
})
.map(|x: Result<String, io::Error>| {
if let Ok(x) = x {
Ok(CloudfrontRow::from_line(&x))
} else {
Err(AwsError::from(x.err().unwrap()))
}
})
BufReader::new(MultiGzDecoder::new(body))
.lines()
.filter(|x: &Result<String, io::Error>| {
if let Ok(x) = x {
x.chars().filter(|x| !x.is_whitespace()).nth(0) != Some('#')
} else {
true
}
})
.map(|x: Result<String, io::Error>| {
if let Ok(x) = x {
println!("got row");
Ok(CloudfrontRow::from_line(&x))
} else {
Err(AwsError::from(x.err().unwrap()))
}
})
}),
))
}))
Expand Down
257 changes: 167 additions & 90 deletions amadeus-core/src/dist_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ mod update;

use ::sum::*;
use async_trait::async_trait;
use core::{
future::Future, pin::Pin, task::{Context, Poll}
};
use either::Either;
use futures::{pin_mut, ready, stream::StreamExt, Stream};
use futures::{pin_mut, ready, stream, stream::StreamExt, Stream};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use serde_closure::*;
use std::{cmp::Ordering, hash::Hash, iter, marker::PhantomData, ops::FnMut, vec};
use std::{
cmp::Ordering, future::Future, hash::Hash, iter, marker::PhantomData, ops::FnMut, pin::Pin, task::{Context, Poll}, vec
};

use crate::{
into_dist_iter::IntoDistributedIterator, pool::{ProcessPool, ProcessSend}, sink::{Sink, SinkBuffer, SinkMap}, util::type_coerce
Expand Down Expand Up @@ -176,45 +175,104 @@ pub trait DistributedIterator {
.map(|tasks| {
let reduce1 = reduce1factory.make();
pool.spawn(FnOnce!(move || -> <R1 as ReducerA>::Output {
futures::executor::block_on(async {
for task in tasks {
let task = task.into_async();
pin_mut!(task);
if !futures::future::poll_fn(|cx| {
task.as_mut().poll_run(
cx,
&mut SinkBuffer::new(
&mut None,
|_cx: &mut Context, item: &mut Option<_>| {
Poll::Ready(reduce1.push(item.take().unwrap()))
},
),
)
})
.await
{
break;
futures::executor::block_on(futures::future::poll_fn(|cx| {

#[pin_project]
struct Connect<A, B>(#[pin] A, #[pin] B);
impl<A, B> ConsumerAsync for Connect<A, B>
where
A: ConsumerAsync,
B: ConsumerMultiAsync<A::Item>,
{
type Item = B::Item;
fn poll_run(
self: Pin<&mut Self>, cx: &mut Context, sink: Pin<&mut impl Sink<Self::Item>>,
) -> Poll<()> {
#[pin_project]
struct Proxy<'a, I, B>(#[pin] I, Pin<&'a mut B>);
impl<'a, I, B, Item> Sink<Item> for Proxy<'a, I, B>
where
I: Sink<B::Item>,
B: ConsumerMultiAsync<Item>,
{
fn poll_forward(
self: Pin<&mut Self>, cx: &mut Context,
stream: Pin<&mut impl Stream<Item = Item>>,
) -> Poll<()> {
let self_ = self.project();
self_.1.as_mut().poll_run(cx, stream, self_.0)
}
}
let self_ = self.project();
let sink = Proxy(sink, self_.1);
pin_mut!(sink);
self_.0.poll_run(cx, sink)
}
}
reduce1.ret()
})

// let task = tasks.into_iter().next().unwrap().into_async();
// pin_mut!(task);

todo!()

}))
// #[pin_project]
// struct Connect<I, R>(I, #[pin] R);
// impl<I,R,C> Future for Connect<I,R> where I: Iterator<Item=C>, R: Reducer, C: Consumer<Item=R::Item> {
// type Output = R::Output;
// fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// let self_ = self.project();
// // struct Connect
// let task = self_.0.next().unwrap().into_async();
// pin_mut!(task);
// let stream = stream::empty();
// pin_mut!(stream);
// self_.1.push(cx, stream)
// }
// }
// futures::executor::block_on(Connect(tasks.into_iter(), reduce1))

// async {
// for task in tasks {
// let task = task.into_async();
// pin_mut!(task);
// let mut state = None;
// let sink = SinkBuffer::new(
// &mut state,
// |cx: &mut Context, item: &mut Option<_>| {
// Poll::Ready(if let Some(item) = item.take() {
// reduce1.push(cx, item)
// })
// },
// );
// pin_mut!(sink);
// if !futures::future::poll_fn(|cx| {
// task.as_mut().poll_run(cx, sink.as_mut())
// })
// .await
// {
// break;
// }
// }
// })
}))
})
.collect::<futures::stream::FuturesUnordered<_>>();
let mut more = true;
let mut panicked = None;
while let Some(res) = handles.next().await {
match res {
Ok(res) => {
more = more && reduce2.push(res);
}
Err(e) => panicked = Some(e),
}
}
if let Some(err) = panicked {
panic!("Amadeus: task '<unnamed>' panicked at '{}'", err)
}
reduce2.ret()
todo!();
// let mut more = true;
// let mut panicked = None;
// while let Some(res) = handles.next().await {
// match res {
// Ok(res) => {
// more = more && reduce2.push(res);
// }
// Err(e) => panicked = Some(e),
// }
// }
// if let Some(err) = panicked {
// panic!("Amadeus: task '<unnamed>' panicked at '{}'", err)
// }
// reduce2.ret()
}

#[doc(hidden)]
Expand All @@ -240,39 +298,53 @@ pub trait DistributedIterator {
.map(|task| ConnectConsumer(task, self.1.task()))
}
}
#[pin_project]
#[derive(Serialize, Deserialize)]
#[serde(
bound(serialize = "A: Serialize, B: Serialize"),
bound(deserialize = "A: Deserialize<'de>, B: Deserialize<'de>")
)]
struct ConnectConsumer<A, B>(#[pin] A, #[pin] B);
impl<A: Consumer, B> Consumer for ConnectConsumer<A, B>
struct ConnectConsumer<A, B>(A, B);
impl<A, B> Consumer for ConnectConsumer<A, B>
where
A: Consumer,
B: ConsumerMulti<A::Item>,
{
type Item = B::Item;
type Async = ConnectConsumer<A::Async, B::Async>;
type Async = ConnectConsumerAsync<A::Async, B::Async>;
fn into_async(self) -> Self::Async {
ConnectConsumer(self.0.into_async(), self.1.into_async())
ConnectConsumerAsync(self.0.into_async(), self.1.into_async())
}
}
impl<A: ConsumerAsync, B> ConsumerAsync for ConnectConsumer<A, B>
#[pin_project]
struct ConnectConsumerAsync<A, B>(#[pin] A, #[pin] B);
impl<A, B> ConsumerAsync for ConnectConsumerAsync<A, B>
where
A: ConsumerAsync,
B: ConsumerMultiAsync<A::Item>,
{
type Item = B::Item;
fn poll_run(
self: Pin<&mut Self>, cx: &mut Context, sink: &mut impl Sink<Self::Item>,
) -> Poll<bool> {
self: Pin<&mut Self>, cx: &mut Context, sink: Pin<&mut impl Sink<Self::Item>>,
) -> Poll<()> {
#[pin_project]
struct Proxy<'a, I, B>(#[pin] I, Pin<&'a mut B>);
impl<'a, I, B, Item> Sink<Item> for Proxy<'a, I, B>
where
I: Sink<B::Item>,
B: ConsumerMultiAsync<Item>,
{
fn poll_forward(
self: Pin<&mut Self>, cx: &mut Context,
stream: Pin<&mut impl Stream<Item = Item>>,
) -> Poll<()> {
let self_ = self.project();
self_.1.as_mut().poll_run(cx, stream, self_.0)
}
}
let self_ = self.project();
let mut a = self_.1;
self_.0.poll_run(
cx,
&mut SinkBuffer::new(&mut None, |cx: &mut Context, item: &mut Option<_>| {
a.as_mut().poll_run(cx, item.take(), sink)
}),
)
let sink = Proxy(sink, self_.1);
pin_mut!(sink);
self_.0.poll_run(cx, sink)
}
}

Expand Down Expand Up @@ -316,62 +388,66 @@ pub trait DistributedIterator {
.map(|task| ConnectConsumer(task, self.1.task(), self.2.task(), PhantomData))
}
}
#[pin_project]
#[derive(Serialize, Deserialize)]
#[serde(
bound(serialize = "A: Serialize, B: Serialize, C: Serialize"),
bound(deserialize = "A: Deserialize<'de>, B: Deserialize<'de>, C: Deserialize<'de>")
)]
struct ConnectConsumer<A, B, C, RefAItem>(
#[pin] A,
#[pin] B,
#[pin] C,
PhantomData<fn(RefAItem)>,
);
impl<A: Consumer, B, C, RefAItem> Consumer for ConnectConsumer<A, B, C, RefAItem>
struct ConnectConsumer<A, B, C, RefAItem>(A, B, C, PhantomData<fn(RefAItem)>);
impl<A, B, C, RefAItem> Consumer for ConnectConsumer<A, B, C, RefAItem>
where
A: Consumer,
B: ConsumerMulti<A::Item>,
C: ConsumerMulti<RefAItem>,
{
type Item = Sum2<B::Item, C::Item>;
type Async = ConnectConsumer<A::Async, B::Async, C::Async, RefAItem>;
type Async = ConnectConsumerAsync<A::Async, B::Async, C::Async, RefAItem, A::Item>;
fn into_async(self) -> Self::Async {
ConnectConsumer(
ConnectConsumerAsync(
self.0.into_async(),
self.1.into_async(),
self.2.into_async(),
None,
PhantomData,
)
}
}
impl<A: ConsumerAsync, B, C, RefAItem> ConsumerAsync for ConnectConsumer<A, B, C, RefAItem>
#[pin_project]
struct ConnectConsumerAsync<A, B, C, RefAItem, T>(
#[pin] A,
#[pin] B,
#[pin] C,
Option<Option<T>>,
PhantomData<fn(RefAItem)>,
);
impl<A, B, C, RefAItem> ConsumerAsync for ConnectConsumerAsync<A, B, C, RefAItem, A::Item>
where
A: ConsumerAsync,
B: ConsumerMultiAsync<A::Item>,
C: ConsumerMultiAsync<RefAItem>,
{
type Item = Sum2<B::Item, C::Item>;
fn poll_run(
self: Pin<&mut Self>, cx: &mut Context, mut sink: &mut impl Sink<Self::Item>,
) -> Poll<bool> {
self: Pin<&mut Self>, cx: &mut Context, mut sink: Pin<&mut impl Sink<Self::Item>>,
) -> Poll<()> {
let self_ = self.project();
let mut a = self_.1;
let mut b = self_.2;
self_.0.poll_run(
cx,
&mut SinkBuffer::new(&mut None, |cx: &mut Context, item: &mut Option<_>| {
let a_ = ready!(b.as_mut().poll_run(
cx,
type_coerce(item.as_ref()),
&mut SinkMap::new(&mut sink, |item| Sum2::B(type_coerce(item)))
));
let b_ = ready!(a.as_mut().poll_run(
cx,
item.take(),
&mut SinkMap::new(&mut sink, Sum2::A)
));
Poll::Ready(a_ | b_)
}),
)
let sink = SinkBuffer::new(self_.3, |cx: &mut Context, item: &mut Option<_>| {
let stream = stream::iter(iter::once(type_coerce(item.as_ref())));
pin_mut!(stream);
let sink_ = SinkMap::new(&mut sink, |item| Sum2::B(type_coerce(item)));
pin_mut!(sink_);
ready!(b.as_mut().poll_run(cx, stream, sink_));
let stream = stream::iter(iter::from_fn(|| item.take()));
pin_mut!(stream);
let sink_ = SinkMap::new(&mut sink, Sum2::A);
pin_mut!(sink_);
ready!(a.as_mut().poll_run(cx, stream, sink_));
Poll::Ready(())
});
pin_mut!(sink);
self_.0.poll_run(cx, sink)
}
}

Expand Down Expand Up @@ -875,22 +951,23 @@ pub trait ConsumerMulti<Source> {
pub trait ConsumerAsync {
type Item;
fn poll_run(
self: Pin<&mut Self>, cx: &mut Context, sink: &mut impl Sink<Self::Item>,
) -> Poll<bool>;
self: Pin<&mut Self>, cx: &mut Context, sink: Pin<&mut impl Sink<Self::Item>>,
) -> Poll<()>;
}
pub trait ConsumerMultiAsync<Source> {
type Item;
fn poll_run(
self: Pin<&mut Self>, cx: &mut Context, source: Option<Source>,
sink: &mut impl Sink<Self::Item>,
) -> Poll<bool>;
self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Source>>,
sink: Pin<&mut impl Sink<Self::Item>>,
) -> Poll<()>;
}

pub trait Reducer {
type Item;
type Output;
fn push(&mut self, item: Self::Item) -> bool;
fn ret(self) -> Self::Output;
fn push(
self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Self::Item>>,
) -> Poll<Self::Output>;
}
pub trait ReducerA: Reducer<Output = <Self as ReducerA>::Output> + ProcessSend {
type Output: ProcessSend;
Expand Down
Loading

0 comments on commit 6d97618

Please sign in to comment.