Skip to content

Commit

Permalink
Merge pull request #60 from constellation-rs/cleanup
Browse files Browse the repository at this point in the history
Parallel streams
  • Loading branch information
mergify[bot] authored Jun 19, 2020
2 parents ce5cdd4 + 7770863 commit ef5af91
Show file tree
Hide file tree
Showing 98 changed files with 6,718 additions and 4,539 deletions.
46 changes: 40 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ amadeus-parquet = { version = "=0.2.0", path = "amadeus-parquet", optional = tru
amadeus-postgres = { version = "=0.2.0", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.2.0", path = "amadeus-serde", optional = true }
constellation-rs = { version = "0.1", default-features = false, optional = true }
derive-new = "0.5"
futures = "0.3"
num_cpus = "1.13"
pin-project = "0.4"
Expand All @@ -57,52 +58,85 @@ tokio = { version = "0.2", features = ["rt-threaded", "rt-util"] }
[dev-dependencies]
either = { version = "1.5", features = ["serde"] }
rand = "0.7"
streaming_algorithms = "0.1"
tokio = { version = "0.2", features = ["macros"] }
streaming_algorithms = "0.2"
tokio = { version = "0.2", features = ["macros", "time"] }

[[example]]
name = "common_crawl"
name = "cloudfront_logs"
required-features = ["aws"]

[[example]]
name = "commoncrawl"
required-features = ["commoncrawl"]

[[example]]
name = "commoncrawl_dist"
required-features = ["commoncrawl", "constellation"]

[[test]]
name = "into_dist_stream"
name = "into_par_stream_dist"
harness = false

[[test]]
name = "panic"
name = "panic_dist"
harness = false

[[test]]
name = "threads"
name = "threads_dist"
harness = false

[[test]]
name = "cloudfront"
required-features = ["aws"]

[[test]]
name = "cloudfront_dist"
harness = false
required-features = ["aws"]

[[test]]
name = "commoncrawl"
required-features = ["commoncrawl"]

[[test]]
name = "commoncrawl_dist"
harness = false
required-features = ["commoncrawl"]

[[test]]
name = "parquet"
required-features = ["parquet"]

[[test]]
name = "parquet_dist"
harness = false
required-features = ["parquet"]

[[test]]
name = "csv"
required-features = ["csv"]

[[test]]
name = "csv_dist"
harness = false
required-features = ["csv"]

[[test]]
name = "json"
required-features = ["json"]

[[test]]
name = "json_dist"
harness = false
required-features = ["json"]

[[test]]
name = "postgres"
required-features = ["postgres"]
test = false # TODO set up postgres on CI

[[test]]
name = "postgres_dist"
harness = false
required-features = ["postgres"]
test = false # TODO set up postgres on CI
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</p>

<p align="center">
<a href="https://docs.rs/amadeus/0.2.0">📖 Docs</a> | <a href="https://constellation.rs/amadeus">🌐 Home</a> | <a href="https://constellation.zulipchat.com/#narrow/stream/213231-amadeus">💬 Chat</a>
<a href="https://docs.rs/amadeus/0.2.0/amadeus/">📖 Docs</a> | <a href="https://constellation.rs/amadeus">🌐 Home</a> | <a href="https://constellation.zulipchat.com/#narrow/stream/213231-amadeus">💬 Chat</a>
</p>

## Amadeus provides:
Expand Down
1 change: 1 addition & 0 deletions amadeus-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ tokio = "0.2"
url = { version = "2.1", features = ["serde"] }

# dependency of rusoto_core/hyper-tls/native-tls; ensure it's vendored to simplify cross-compilation
[target.'cfg(not(any(target_os = "windows", target_os = "macos", target_os = "ios")))'.dependencies]
openssl = { version = "0.10", features = ["vendored"] }
11 changes: 10 additions & 1 deletion amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
};

use amadeus_core::{
dist_stream::DistributedStream, into_dist_stream::IntoDistributedStream, util::ResultExpandIter, Source
into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source
};
use amadeus_types::{DateTime, IpAddr, Url};

Expand Down Expand Up @@ -54,11 +54,20 @@ impl Source for Cloudfront {
type Item = CloudfrontRow;
type Error = AwsError;

#[cfg(not(feature = "doc"))]
type ParStream =
impl amadeus_core::par_stream::ParallelStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(feature = "doc")]
type ParStream =
DistParStream<amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>>;
#[cfg(not(feature = "doc"))]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(feature = "doc")]
type DistStream = amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
DistParStream::new(self.dist_stream())
}
#[allow(clippy::let_and_return)]
fn dist_stream(self) -> Self::DistStream {
let Self {
Expand Down
9 changes: 4 additions & 5 deletions amadeus-aws/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ impl Directory for S3Directory {
let file_name = path.pop().unwrap();
skip = skip
&& path.len() >= current_path.depth()
&& path
&& current_path.iter().eq(path
.iter()
.take(current_path.depth())
.copied()
.eq(current_path.iter());
.copied());
if skip {
return false;
}
Expand Down Expand Up @@ -232,7 +231,7 @@ impl Page for S3Page {
self.inner.len
}
fn set_len(&self, _len: u64) -> Result<(), Self::Error> {
unimplemented!()
todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/61")
}
fn read(&self, offset: u64, len: usize) -> BoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
let self_ = S3Page {
Expand Down Expand Up @@ -281,6 +280,6 @@ impl Page for S3Page {
})
}
fn write(&self, _offset: u64, _buf: Box<[u8]>) -> BoxFuture<'static, Result<(), Self::Error>> {
unimplemented!()
todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/61")
}
}
2 changes: 1 addition & 1 deletion amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl From<io::Error> for AwsError {
}
impl<E> From<RusotoError<E>> for AwsError
where
AwsError: From<E>,
E: Into<AwsError>,
{
fn from(err: RusotoError<E>) -> Self {
match err {
Expand Down
1 change: 1 addition & 0 deletions amadeus-commoncrawl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ serde_closure = "0.2"
url = { version = "2.1", features = ["serde"] }

# dependency of reqwest/native-tls; ensure it's vendored to simplify cross-compilation
[target.'cfg(not(any(target_os = "windows", target_os = "macos", target_os = "ios")))'.dependencies]
openssl = { version = "0.10", features = ["vendored"] }
11 changes: 10 additions & 1 deletion amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde_closure::*;
use std::{io, time};

use amadeus_core::{
dist_stream::DistributedStream, into_dist_stream::IntoDistributedStream, Source
into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::DistParStream, Source
};
use amadeus_types::Webpage;

Expand Down Expand Up @@ -58,11 +58,20 @@ impl Source for CommonCrawl {
type Item = Webpage<'static>;
type Error = io::Error;

#[cfg(not(feature = "doc"))]
type ParStream =
impl amadeus_core::par_stream::ParallelStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(feature = "doc")]
type ParStream =
DistParStream<amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>>;
#[cfg(not(feature = "doc"))]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(feature = "doc")]
type DistStream = amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
DistParStream::new(self.dist_stream())
}
#[allow(clippy::let_and_return)]
fn dist_stream(self) -> Self::DistStream {
let ret = self
Expand Down
5 changes: 4 additions & 1 deletion amadeus-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ doc = []

[dependencies]
async-trait = "0.1"
derive-new = "0.5"
educe = "0.4"
either = { version = "1.5", features = ["serde"] }
futures = "0.3"
owned_chars = "0.3"
Expand All @@ -31,6 +33,7 @@ rand = "0.7"
replace_with = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_closure = "0.2"
streaming_algorithms = "0.1"
streaming_algorithms = "0.2"
sum = { version = "0.1", features = ["serde"] }
walkdir = "2.2"
widestring = "0.4"
Loading

0 comments on commit ef5af91

Please sign in to comment.