From e0da6b26bf9f7215594b18043f99fe8cfb6442ce Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Mon, 13 Jul 2020 20:40:20 +0100 Subject: [PATCH] switch from feature = "nightly" to autodetection due to rust-lang/cargo#3494 --- Cargo.toml | 7 +- amadeus-aws/Cargo.toml | 7 +- amadeus-aws/src/build.rs | 1 + amadeus-aws/src/cloudfront.rs | 10 +- amadeus-aws/src/lib.rs | 2 +- amadeus-commoncrawl/Cargo.toml | 7 +- amadeus-commoncrawl/src/build.rs | 1 + amadeus-commoncrawl/src/lib.rs | 12 +- amadeus-core/Cargo.toml | 8 +- amadeus-core/src/build.rs | 1 + amadeus-core/src/lib.rs | 2 +- amadeus-core/src/par_pipe.rs | 2 +- amadeus-core/src/par_stream.rs | 2 +- amadeus-core/src/par_stream/identity.rs | 2 +- amadeus-core/src/pool.rs | 4 +- amadeus-parquet/Cargo.toml | 8 +- amadeus-parquet/src/build.rs | 1 + amadeus-parquet/src/internal/record/impls.rs | 58 +- amadeus-parquet/src/internal/util/io.rs | 8 +- amadeus-parquet/src/lib.rs | 561 ++++++++++--------- amadeus-postgres/Cargo.toml | 7 +- amadeus-postgres/src/build.rs | 1 + amadeus-postgres/src/lib.rs | 12 +- amadeus-serde/Cargo.toml | 7 +- amadeus-serde/src/build.rs | 1 + amadeus-serde/src/csv.rs | 10 +- amadeus-serde/src/json.rs | 10 +- amadeus-serde/src/lib.rs | 2 +- amadeus-types/Cargo.toml | 4 + amadeus-types/src/build.rs | 1 + azure-pipelines.yml | 8 +- src/build.rs | 12 + src/lib.rs | 5 +- src/pool.rs | 4 +- src/pool/process.rs | 2 +- 35 files changed, 419 insertions(+), 371 deletions(-) create mode 120000 amadeus-aws/src/build.rs create mode 120000 amadeus-commoncrawl/src/build.rs create mode 120000 amadeus-core/src/build.rs create mode 120000 amadeus-parquet/src/build.rs create mode 120000 amadeus-postgres/src/build.rs create mode 120000 amadeus-serde/src/build.rs create mode 120000 amadeus-types/src/build.rs create mode 100644 src/build.rs diff --git a/Cargo.toml b/Cargo.toml index 10eea974..47ac0781 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ homepage = "https://github.com/alecmocatta/amadeus" documentation = "https://docs.rs/amadeus" readme = "README.md" edition = "2018" +build = "src/build.rs" [badges] azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" } @@ -30,10 +31,9 @@ parquet = ["amadeus-parquet", "amadeus-derive/parquet"] postgres = ["amadeus-postgres", "amadeus-derive/postgres"] csv = ["amadeus-serde", "amadeus-derive/serde"] json = ["amadeus-serde", "amadeus-derive/serde"] -nightly = ["amadeus-core/nightly"] [package.metadata.docs.rs] -features = ["nightly", "constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"] +features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"] [dependencies] amadeus-core = { version = "=0.3.1", path = "amadeus-core" } @@ -63,6 +63,9 @@ serde_json = "1.0" streaming_algorithms = "0.2" tokio = { version = "0.2", features = ["macros", "time"] } +[build-dependencies] +rustversion = "1.0" + [patch.crates-io] vec-utils = { version = "*", git = "https://github.com/alecmocatta/vec-utils", branch = "stable" } streaming_algorithms = { version = "*", git = "https://github.com/alecmocatta/streaming_algorithms", branch = "stable" } diff --git a/amadeus-aws/Cargo.toml b/amadeus-aws/Cargo.toml index b5869097..6c832772 100644 --- a/amadeus-aws/Cargo.toml +++ b/amadeus-aws/Cargo.toml @@ -13,14 +13,12 @@ homepage = "https://github.com/alecmocatta/amadeus" documentation = "https://docs.rs/amadeus" readme = "README.md" edition = "2018" +build = "src/build.rs" [badges] azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" } maintenance = { status = "actively-developed" } -[features] -nightly = [] - [dependencies] amadeus-core = { version = "=0.3.1", path = "../amadeus-core" } amadeus-types = { version = "=0.3.1", path = "../amadeus-types" } @@ -43,3 +41,6 @@ vec-utils = "0.2" # dependency of rusoto_core/hyper-tls/native-tls; ensure it's vendored to simplify cross-compilation [target.'cfg(not(any(target_os = "windows", target_os = "macos", target_os = "ios")))'.dependencies] openssl = { version = "0.10", features = ["vendored"] } + +[build-dependencies] +rustversion = "1.0" diff --git a/amadeus-aws/src/build.rs b/amadeus-aws/src/build.rs new file mode 120000 index 00000000..8203cf84 --- /dev/null +++ b/amadeus-aws/src/build.rs @@ -0,0 +1 @@ +../../src/build.rs \ No newline at end of file diff --git a/amadeus-aws/src/cloudfront.rs b/amadeus-aws/src/cloudfront.rs index 4810c58a..6cb4ddeb 100644 --- a/amadeus-aws/src/cloudfront.rs +++ b/amadeus-aws/src/cloudfront.rs @@ -53,9 +53,9 @@ impl Cloudfront { } } -#[cfg(not(feature = "nightly"))] +#[cfg(not(nightly))] type Output = std::pin::Pin> + Send>>; -#[cfg(feature = "nightly")] +#[cfg(nightly)] type Output = impl Stream> + Send; FnMutNamed! { @@ -103,7 +103,7 @@ FnMutNamed! { } .flatten_stream() .map(|x: Result, _>| x.and_then(identity)); - #[cfg(not(feature = "nightly"))] + #[cfg(not(nightly))] let ret = ret.boxed(); ret } @@ -114,13 +114,13 @@ impl Source for Cloudfront { type Error = AwsError; type ParStream = DistParStream; - #[cfg(not(feature = "nightly"))] + #[cfg(not(nightly))] #[allow(clippy::type_complexity)] type DistStream = amadeus_core::par_stream::FlatMap< amadeus_core::into_par_stream::IterDistStream>, Closure, >; - #[cfg(feature = "nightly")] + #[cfg(nightly)] type DistStream = impl DistributedStream>; fn par_stream(self) -> Self::ParStream { diff --git a/amadeus-aws/src/lib.rs b/amadeus-aws/src/lib.rs index bcde2748..10fabbe3 100644 --- a/amadeus-aws/src/lib.rs +++ b/amadeus-aws/src/lib.rs @@ -7,7 +7,7 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). #![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.1")] -#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))] +#![cfg_attr(nightly, feature(type_alias_impl_trait))] #![warn( // missing_copy_implementations, // missing_debug_implementations, diff --git a/amadeus-commoncrawl/Cargo.toml b/amadeus-commoncrawl/Cargo.toml index 66df4e77..b1a35df1 100644 --- a/amadeus-commoncrawl/Cargo.toml +++ b/amadeus-commoncrawl/Cargo.toml @@ -13,14 +13,12 @@ homepage = "https://github.com/alecmocatta/amadeus" documentation = "https://docs.rs/amadeus" readme = "README.md" edition = "2018" +build = "src/build.rs" [badges] azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" } maintenance = { status = "actively-developed" } -[features] -nightly = [] - [dependencies] amadeus-core = { version = "=0.3.1", path = "../amadeus-core" } amadeus-types = { version = "=0.3.1", path = "../amadeus-types" } @@ -37,3 +35,6 @@ url = { version = "2.1", features = ["serde"] } # dependency of reqwest/native-tls; ensure it's vendored to simplify cross-compilation [target.'cfg(not(any(target_os = "windows", target_os = "macos", target_os = "ios")))'.dependencies] openssl = { version = "0.10", features = ["vendored"] } + +[build-dependencies] +rustversion = "1.0" diff --git a/amadeus-commoncrawl/src/build.rs b/amadeus-commoncrawl/src/build.rs new file mode 120000 index 00000000..8203cf84 --- /dev/null +++ b/amadeus-commoncrawl/src/build.rs @@ -0,0 +1 @@ +../../src/build.rs \ No newline at end of file diff --git a/amadeus-commoncrawl/src/lib.rs b/amadeus-commoncrawl/src/lib.rs index 7eafdd94..df59699d 100644 --- a/amadeus-commoncrawl/src/lib.rs +++ b/amadeus-commoncrawl/src/lib.rs @@ -7,7 +7,7 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). #![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.1")] -#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))] +#![cfg_attr(nightly, feature(type_alias_impl_trait))] #![warn( // missing_copy_implementations, // missing_debug_implementations, @@ -80,9 +80,9 @@ impl CommonCrawl { } } -#[cfg(not(feature = "nightly"))] +#[cfg(not(nightly))] type Output = std::pin::Pin, io::Error>> + Send>>; -#[cfg(feature = "nightly")] +#[cfg(nightly)] type Output = impl Stream, io::Error>> + Send; FnMutNamed! { @@ -99,7 +99,7 @@ FnMutNamed! { WarcParser::new(body) } .flatten_stream(); - #[cfg(not(feature = "nightly"))] + #[cfg(not(nightly))] let ret = ret.boxed(); ret } @@ -110,13 +110,13 @@ impl Source for CommonCrawl { type Error = io::Error; type ParStream = DistParStream; - #[cfg(not(feature = "nightly"))] + #[cfg(not(nightly))] #[allow(clippy::type_complexity)] type DistStream = amadeus_core::par_stream::FlatMap< amadeus_core::into_par_stream::IterDistStream>, Closure, >; - #[cfg(feature = "nightly")] + #[cfg(nightly)] type DistStream = impl DistributedStream>; fn par_stream(self) -> Self::ParStream { diff --git a/amadeus-core/Cargo.toml b/amadeus-core/Cargo.toml index 020f0bd9..e31e54ad 100644 --- a/amadeus-core/Cargo.toml +++ b/amadeus-core/Cargo.toml @@ -13,15 +13,12 @@ homepage = "https://github.com/alecmocatta/amadeus" documentation = "https://docs.rs/amadeus" readme = "README.md" edition = "2018" +build = "src/build.rs" [badges] azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" } maintenance = { status = "actively-developed" } -[features] -# uses Fn*() sugar (better docs), and SIMD for streaming_algorithms -nightly = ["streaming_algorithms/nightly"] - [dependencies] async-trait = "0.1" derive-new = "0.5" @@ -39,3 +36,6 @@ streaming_algorithms = "0.2" sum = { version = "0.1", features = ["futures", "serde"] } walkdir = "2.2" widestring = "0.4" + +[build-dependencies] +rustversion = "1.0" diff --git a/amadeus-core/src/build.rs b/amadeus-core/src/build.rs new file mode 120000 index 00000000..8203cf84 --- /dev/null +++ b/amadeus-core/src/build.rs @@ -0,0 +1 @@ +../../src/build.rs \ No newline at end of file diff --git a/amadeus-core/src/lib.rs b/amadeus-core/src/lib.rs index 48acbf4d..f8c5e07d 100644 --- a/amadeus-core/src/lib.rs +++ b/amadeus-core/src/lib.rs @@ -7,7 +7,7 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. All functionality is re-exposed in [`amadeus`](https://docs.rs/amadeus/0.3/amadeus/). #![doc(html_root_url = "https://docs.rs/amadeus-core/0.3.1")] -#![cfg_attr(feature = "nightly", feature(unboxed_closures))] +#![cfg_attr(nightly, feature(unboxed_closures))] #![recursion_limit = "25600"] #![warn( // missing_copy_implementations, diff --git a/amadeus-core/src/par_pipe.rs b/amadeus-core/src/par_pipe.rs index ffd3f1bd..3638e23b 100644 --- a/amadeus-core/src/par_pipe.rs +++ b/amadeus-core/src/par_pipe.rs @@ -288,4 +288,4 @@ macro_rules! pipe { } pipe!(ParallelPipe ParallelSink FromParallelStream Send ops assert_parallel_pipe assert_parallel_sink); -pipe!(DistributedPipe DistributedSink FromDistributedStream ProcessSend traits assert_distributed_pipe assert_distributed_sink cfg_attr(not(feature = "nightly"), serde_closure::desugar)); +pipe!(DistributedPipe DistributedSink FromDistributedStream ProcessSend traits assert_distributed_pipe assert_distributed_sink cfg_attr(not(nightly), serde_closure::desugar)); diff --git a/amadeus-core/src/par_stream.rs b/amadeus-core/src/par_stream.rs index 290117c9..da7ad6dc 100644 --- a/amadeus-core/src/par_stream.rs +++ b/amadeus-core/src/par_stream.rs @@ -482,7 +482,7 @@ stream!(ParallelStream ParallelPipe ParallelSink FromParallelStream IntoParallel } }); -stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream IntoDistributedStream into_dist_stream DistStream ProcessPool ProcessSend traits assert_distributed_stream cfg_attr(not(feature = "nightly"), serde_closure::desugar) { +stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream IntoDistributedStream into_dist_stream DistStream ProcessPool ProcessSend traits assert_distributed_stream cfg_attr(not(nightly), serde_closure::desugar) { async fn reduce( mut self, pool: &P, reduce_a_factory: R1, reduce_b_factory: R2, reduce_c: R3, ) -> B diff --git a/amadeus-core/src/par_stream/identity.rs b/amadeus-core/src/par_stream/identity.rs index 6e277e49..86344ab7 100644 --- a/amadeus-core/src/par_stream/identity.rs +++ b/amadeus-core/src/par_stream/identity.rs @@ -29,7 +29,7 @@ impl_par_dist! { mod workaround { use super::*; - #[cfg_attr(not(feature = "nightly"), serde_closure::desugar)] + #[cfg_attr(not(nightly), serde_closure::desugar)] #[doc(hidden)] impl Identity { pub fn pipe(self, sink: S) -> Pipe { diff --git a/amadeus-core/src/pool.rs b/amadeus-core/src/pool.rs index 7107e2b2..23f1b5d0 100644 --- a/amadeus-core/src/pool.rs +++ b/amadeus-core/src/pool.rs @@ -10,7 +10,7 @@ impl ProcessSend for T where T: Send + Serialize + for<'de> Deseriali type Result = std::result::Result>; -#[cfg_attr(not(feature = "nightly"), serde_closure::desugar)] +#[cfg_attr(not(nightly), serde_closure::desugar)] pub trait ProcessPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin { type ThreadPool: ThreadPool + 'static; @@ -31,7 +31,7 @@ pub trait ThreadPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin { T: Send + 'static; } -#[cfg_attr(not(feature = "nightly"), serde_closure::desugar)] +#[cfg_attr(not(nightly), serde_closure::desugar)] impl ProcessPool for &P where P: ProcessPool, diff --git a/amadeus-parquet/Cargo.toml b/amadeus-parquet/Cargo.toml index 3b6b2576..9d10f79a 100644 --- a/amadeus-parquet/Cargo.toml +++ b/amadeus-parquet/Cargo.toml @@ -13,14 +13,12 @@ homepage = "https://github.com/alecmocatta/amadeus" documentation = "https://docs.rs/amadeus" readme = "README.md" edition = "2018" +build = "src/build.rs" [badges] azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" } maintenance = { status = "actively-developed" } -[features] -nightly = [] - [dependencies] amadeus-core = { version = "=0.3.1", path = "../amadeus-core" } amadeus-types = { version = "=0.3.1", path = "../amadeus-types" } @@ -36,6 +34,7 @@ linked-hash-map = "0.5" lz4 = "1.23" num-bigint = "0.3" quick-error = "1.2.2" +rustversion = "1.0" serde = { version = "1.0", features = ["derive"] } serde_closure = "0.3" snap = "1.0" @@ -46,6 +45,9 @@ zstd = "0.4" [dev-dependencies] rand = "0.7" +[build-dependencies] +rustversion = "1.0" + [[test]] name = "derive" test = false diff --git a/amadeus-parquet/src/build.rs b/amadeus-parquet/src/build.rs new file mode 120000 index 00000000..8203cf84 --- /dev/null +++ b/amadeus-parquet/src/build.rs @@ -0,0 +1 @@ +../../src/build.rs \ No newline at end of file diff --git a/amadeus-parquet/src/internal/record/impls.rs b/amadeus-parquet/src/internal/record/impls.rs index c2d74b27..67867d1d 100644 --- a/amadeus-parquet/src/internal/record/impls.rs +++ b/amadeus-parquet/src/internal/record/impls.rs @@ -181,32 +181,32 @@ macro_rules! array { } } - // Specialize the implementation to avoid passing a potentially large array around + // TODO: Specialize the implementation to avoid passing a potentially large array around // on the stack. - impl ParquetData for Box<[u8; $i]> { - type Schema = FixedByteArraySchema<[u8; $i]>; - type Reader = BoxFixedLenByteArrayReader<[u8; $i]>; - - fn parse( - schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option, - ) -> Result<(String, Self::Schema)> { - <[u8; $i]>::parse(schema, predicate, repetition) - } - - fn reader( - _schema: &Self::Schema, path: &mut Vec, def_level: i16, rep_level: i16, - paths: &mut HashMap, batch_size: usize, - ) -> Self::Reader { - let col_path = ColumnPath::new(path.to_vec()); - let col_reader = paths.remove(&col_path).unwrap(); - BoxFixedLenByteArrayReader::<[u8; $i]> { - column: TypedTripletIter::::new( - def_level, rep_level, col_reader, batch_size, - ), - marker: PhantomData, - } - } - } + // impl ParquetData for Box<[u8; $i]> { + // type Schema = FixedByteArraySchema<[u8; $i]>; + // type Reader = BoxFixedLenByteArrayReader<[u8; $i]>; + + // fn parse( + // schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option, + // ) -> Result<(String, Self::Schema)> { + // <[u8; $i]>::parse(schema, predicate, repetition) + // } + + // fn reader( + // _schema: &Self::Schema, path: &mut Vec, def_level: i16, rep_level: i16, + // paths: &mut HashMap, batch_size: usize, + // ) -> Self::Reader { + // let col_path = ColumnPath::new(path.to_vec()); + // let col_reader = paths.remove(&col_path).unwrap(); + // BoxFixedLenByteArrayReader::<[u8; $i]> { + // column: TypedTripletIter::::new( + // def_level, rep_level, col_reader, batch_size, + // ), + // marker: PhantomData, + // } + // } + // } )*}; } amadeus_types::array!(array); @@ -219,18 +219,18 @@ impl ParquetData for Box where T: ParquetData, { - default type Schema = BoxSchema; - default type Reader = BoxReader; + type Schema = BoxSchema; + type Reader = BoxReader; type Predicate = T::Predicate; - default fn parse( + fn parse( schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option, ) -> Result<(String, Self::Schema)> { T::parse(schema, predicate, repetition) .map(|(name, schema)| (name, type_coerce(BoxSchema(schema)))) } - default fn reader( + fn reader( schema: &Self::Schema, path: &mut Vec, def_level: i16, rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { diff --git a/amadeus-parquet/src/internal/util/io.rs b/amadeus-parquet/src/internal/util/io.rs index 0498a24b..e1922637 100644 --- a/amadeus-parquet/src/internal/util/io.rs +++ b/amadeus-parquet/src/internal/util/io.rs @@ -16,7 +16,7 @@ // under the License. use std::{ - cell::RefCell, cmp, fs::File, io::{self, BufWriter, Cursor, Initializer, Read, Seek, SeekFrom, Write}, rc::Rc + cell::RefCell, cmp, fs::File, io::{self, BufWriter, Cursor, Read, Seek, SeekFrom, Write}, rc::Rc }; use crate::internal::file::reader::ParquetReader; @@ -57,7 +57,8 @@ impl Read for BufReader { self.offset += read as u64; Ok(read) } - unsafe fn initializer(&self) -> Initializer { + #[cfg(nightly)] + unsafe fn initializer(&self) -> std::io::Initializer { self.inner.initializer() } } @@ -68,6 +69,9 @@ impl Seek for BufReader { SeekFrom::Current(n) => n, SeekFrom::End(n) => self.len as i64 + n - self.offset as i64, }; + #[cfg(not(nightly))] + let _ = self.inner.seek(SeekFrom::Current(offset))?; + #[cfg(nightly)] self.inner.seek_relative(offset)?; self.offset = (self.offset as i64 + offset) as u64; Ok(self.offset) diff --git a/amadeus-parquet/src/lib.rs b/amadeus-parquet/src/lib.rs index 170d5063..e8c4cb1c 100644 --- a/amadeus-parquet/src/lib.rs +++ b/amadeus-parquet/src/lib.rs @@ -7,11 +7,11 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). #![doc(html_root_url = "https://docs.rs/amadeus-parquet/0.3.1")] -#![feature(bufreader_seek_relative)] -#![feature(read_initializer)] -#![feature(specialization)] -#![feature(type_alias_impl_trait)] -#![cfg_attr(test, feature(test))] +#![cfg_attr(nightly, feature(bufreader_seek_relative))] +#![cfg_attr(nightly, feature(read_initializer))] +#![cfg_attr(nightly, feature(specialization))] +#![cfg_attr(nightly, feature(type_alias_impl_trait))] +#![cfg_attr(nightly, feature(test))] #![warn( // missing_copy_implementations, // missing_debug_implementations, @@ -35,332 +35,341 @@ )] // #![deny(unsafe_code)] -#[cfg(test)] +#[cfg(nightly)] extern crate test; +#[cfg(nightly)] mod internal; -use async_trait::async_trait; -use educe::Educe; -use futures::{pin_mut, stream, AsyncReadExt, FutureExt, StreamExt}; -use internal::{ - errors::ParquetError as InternalParquetError, file::reader::{FileReader, ParquetReader, SerializedFileReader} -}; -use serde::{Deserialize, Serialize}; -use serde_closure::*; -use std::{ - error, fmt::{self, Debug, Display}, io::Cursor, marker::PhantomData, ops::FnMut -}; +#[cfg(nightly)] +mod wrap { + use super::internal; + use async_trait::async_trait; + use educe::Educe; + use futures::{pin_mut, stream, AsyncReadExt, FutureExt, StreamExt}; + use internal::{ + errors::ParquetError as InternalParquetError, file::reader::{FileReader, ParquetReader, SerializedFileReader} + }; + use serde::{Deserialize, Serialize}; + use serde_closure::*; + use std::{ + error, fmt::{self, Debug, Display}, io::Cursor, marker::PhantomData, ops::FnMut + }; -use amadeus_core::{ - file::{Directory, File, Page, Partition, PathBuf}, into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source -}; + use amadeus_core::{ + file::{Directory, File, Page, Partition, PathBuf}, into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source + }; -pub use internal::record::ParquetData; + pub use internal::record::ParquetData; -#[doc(hidden)] -pub mod derive { - pub use super::{ - internal::{ - basic::Repetition, column::reader::ColumnReader, errors::{ParquetError, Result as ParquetResult}, record::{DisplaySchemaGroup, Reader, Schema as ParquetSchema}, schema::types::{ColumnPath, Type} - }, ParquetData - }; -} + #[doc(hidden)] + pub mod derive { + pub use super::{ + internal::{ + basic::Repetition, column::reader::ColumnReader, errors::{ParquetError, Result as ParquetResult}, record::{DisplaySchemaGroup, Reader, Schema as ParquetSchema}, schema::types::{ColumnPath, Type} + }, ParquetData + }; + } -#[derive(Educe)] -#[educe(Clone, Debug)] -pub struct Parquet -where - File: amadeus_core::file::File, - Row: ParquetData, -{ - partitions: Vec, - marker: PhantomData Row>, -} -impl Parquet -where - F: File, - Row: ParquetData + 'static, -{ - pub async fn new(file: F) -> Result::Error> { - Ok(Self { - partitions: file.partitions().await.map_err(ParquetError::File)?, - marker: PhantomData, - }) + #[derive(Educe)] + #[educe(Clone, Debug)] + pub struct Parquet + where + File: amadeus_core::file::File, + Row: ParquetData, + { + partitions: Vec, + marker: PhantomData Row>, } -} -impl Source for Parquet -where - F: File, - Row: ParquetData + 'static, -{ - type Item = Row; - #[allow(clippy::type_complexity)] - type Error = ParquetError< - ::Error, - <::Partition as Partition>::Error, - <<::Partition as Partition>::Page as Page>::Error, - >; + impl Parquet + where + F: File, + Row: ParquetData + 'static, + { + pub async fn new(file: F) -> Result::Error> { + Ok(Self { + partitions: file.partitions().await.map_err(ParquetError::File)?, + marker: PhantomData, + }) + } + } + impl Source for Parquet + where + F: File, + Row: ParquetData + 'static, + { + type Item = Row; + #[allow(clippy::type_complexity)] + type Error = ParquetError< + ::Error, + <::Partition as Partition>::Error, + <<::Partition as Partition>::Page as Page>::Error, + >; - #[cfg(not(doc))] - type ParStream = - impl amadeus_core::par_stream::ParallelStream>; - #[cfg(doc)] - type ParStream = - DistParStream>>; - #[cfg(not(doc))] - type DistStream = impl DistributedStream>; - #[cfg(doc)] - type DistStream = amadeus_core::util::ImplDistributedStream>; + #[cfg(not(doc))] + type ParStream = + impl amadeus_core::par_stream::ParallelStream>; + #[cfg(doc)] + type ParStream = DistParStream< + amadeus_core::util::ImplDistributedStream>, + >; + #[cfg(not(doc))] + type DistStream = impl DistributedStream>; + #[cfg(doc)] + type DistStream = + amadeus_core::util::ImplDistributedStream>; - fn par_stream(self) -> Self::ParStream { - DistParStream::new(self.dist_stream()) - } - #[allow(clippy::let_and_return)] - fn dist_stream(self) -> Self::DistStream { - let ret = self - .partitions - .into_dist_stream() - .flat_map(FnMut!(|partition: F::Partition| async move { - Ok(stream::iter( - partition - .pages() - .await - .map_err(ParquetError::Partition)? - .into_iter(), - ) - .flat_map(|page| { - async move { - let mut buf = Vec::with_capacity(10 * 1024 * 1024); - let reader = Page::reader(page); - pin_mut!(reader); - let buf = PassError::new( - reader.read_to_end(&mut buf).await.map(|_| Cursor::new(buf)), - ); + fn par_stream(self) -> Self::ParStream { + DistParStream::new(self.dist_stream()) + } + #[allow(clippy::let_and_return)] + fn dist_stream(self) -> Self::DistStream { + let ret = + self.partitions + .into_dist_stream() + .flat_map(FnMut!(|partition: F::Partition| async move { Ok(stream::iter( - SerializedFileReader::new(buf)?.get_row_iter::(None)?, - )) + partition + .pages() + .await + .map_err(ParquetError::Partition)? + .into_iter(), + ) + .flat_map(|page| { + async move { + let mut buf = Vec::with_capacity(10 * 1024 * 1024); + let reader = Page::reader(page); + pin_mut!(reader); + let buf = PassError::new( + reader.read_to_end(&mut buf).await.map(|_| Cursor::new(buf)), + ); + Ok(stream::iter( + SerializedFileReader::new(buf)?.get_row_iter::(None)?, + )) + } + .map(ResultExpandIter::new) + .flatten_stream() + }) + .map(|row: Result, Self::Error>| Ok(row??))) } .map(ResultExpandIter::new) .flatten_stream() - }) - .map(|row: Result, Self::Error>| Ok(row??))) - } - .map(ResultExpandIter::new) - .flatten_stream() - .map(|row: Result, Self::Error>| Ok(row??)))); - #[cfg(doc)] - let ret = amadeus_core::util::ImplDistributedStream::new(ret); - ret + .map(|row: Result, Self::Error>| Ok(row??)))); + #[cfg(doc)] + let ret = amadeus_core::util::ImplDistributedStream::new(ret); + ret + } } -} -// impl

ParquetReader for amadeus_core::file::Reader

-// where -// P: Page, -// { -// fn len(&self) -> u64 { -// self.len() -// } -// } + // impl

ParquetReader for amadeus_core::file::Reader

+ // where + // P: Page, + // { + // fn len(&self) -> u64 { + // self.len() + // } + // } -#[derive(Serialize, Deserialize)] -pub struct ParquetDirectory { - directory: D, -} -impl ParquetDirectory { - pub fn new(directory: D) -> Self { - Self { directory } + #[derive(Serialize, Deserialize)] + pub struct ParquetDirectory { + directory: D, } -} -#[async_trait(?Send)] -impl File for ParquetDirectory -where - D: Directory, - D::Partition: Debug, -{ - type Partition = D::Partition; - type Error = D::Error; + impl ParquetDirectory { + pub fn new(directory: D) -> Self { + Self { directory } + } + } + #[async_trait(?Send)] + impl File for ParquetDirectory + where + D: Directory, + D::Partition: Debug, + { + type Partition = D::Partition; + type Error = D::Error; - async fn partitions(self) -> Result, Self::Error> { - self.partitions_filter(|_| true).await + async fn partitions(self) -> Result, Self::Error> { + self.partitions_filter(|_| true).await + } } -} -#[async_trait(?Send)] -impl Directory for ParquetDirectory -where - D: Directory, - D::Partition: Debug, -{ - async fn partitions_filter( - self, mut f: F, - ) -> Result::Partition>, ::Error> + #[async_trait(?Send)] + impl Directory for ParquetDirectory where - F: FnMut(&PathBuf) -> bool, + D: Directory, + D::Partition: Debug, { - // "Logic" interpreted from https://github.com/apache/arrow/blob/927cfeff875e557e28649891ea20ca38cb9d1536/python/pyarrow/parquet.py#L705-L829 - // and https://github.com/apache/spark/blob/5a7403623d0525c23ab8ae575e9d1383e3e10635/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L348-L359 - // and https://github.com/apache/spark/blob/5a7403623d0525c23ab8ae575e9d1383e3e10635/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala - self.directory - .partitions_filter(|path| { - let skip; - if !path.is_file() { - let dir_name = path.last().unwrap().to_string_lossy(); + async fn partitions_filter( + self, mut f: F, + ) -> Result::Partition>, ::Error> + where + F: FnMut(&PathBuf) -> bool, + { + // "Logic" interpreted from https://github.com/apache/arrow/blob/927cfeff875e557e28649891ea20ca38cb9d1536/python/pyarrow/parquet.py#L705-L829 + // and https://github.com/apache/spark/blob/5a7403623d0525c23ab8ae575e9d1383e3e10635/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L348-L359 + // and https://github.com/apache/spark/blob/5a7403623d0525c23ab8ae575e9d1383e3e10635/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala + self.directory + .partitions_filter(|path| { + let skip; + if !path.is_file() { + let dir_name = path.last().unwrap().to_string_lossy(); - skip = dir_name.starts_with('.') // Hidden files - || (dir_name.starts_with('_') && !dir_name.contains('=')) // ARROW-1079: Filter out "private" directories starting with underscore; - } else { - let file_name = path.file_name().unwrap().to_string_lossy(); - let extension = file_name.rfind('.').map(|offset| &file_name[offset + 1..]); - skip = file_name.starts_with('.') // Hidden files - || file_name == "_metadata" || file_name == "_common_metadata" // Summary metadata - || file_name == "_SUCCESS" // Spark success marker - || extension == Some("_COPYING_") // File copy in progress; TODO: Should we error on this? - || extension == Some("crc") // Checksums - || file_name.ends_with("_$folder$"); // This is created by Apache tools on S3 - } - !skip && f(path) - }) - .await + skip = dir_name.starts_with('.') // Hidden files + || (dir_name.starts_with('_') && !dir_name.contains('=')) // ARROW-1079: Filter out "private" directories starting with underscore; + } else { + let file_name = path.file_name().unwrap().to_string_lossy(); + let extension = file_name.rfind('.').map(|offset| &file_name[offset + 1..]); + skip = file_name.starts_with('.') // Hidden files + || file_name == "_metadata" || file_name == "_common_metadata" // Summary metadata + || file_name == "_SUCCESS" // Spark success marker + || extension == Some("_COPYING_") // File copy in progress; TODO: Should we error on this? + || extension == Some("crc") // Checksums + || file_name.ends_with("_$folder$"); // This is created by Apache tools on S3 + } + !skip && f(path) + }) + .await + } } -} -mod misc_serde { - use super::internal; - use internal::errors::ParquetError; - use serde::{Deserialize, Deserializer, Serialize, Serializer}; + mod misc_serde { + use super::internal; + use internal::errors::ParquetError; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; - pub struct Serde(T); + pub struct Serde(T); - impl Serialize for Serde<&ParquetError> { - fn serialize(&self, serializer: S) -> Result + impl Serialize for Serde<&ParquetError> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + <(usize, &str)>::serialize( + &match self.0 { + ParquetError::General(message) => (0, message), + ParquetError::EOF(message) => (1, message), + _ => unimplemented!(), + }, + serializer, + ) + } + } + impl<'de> Deserialize<'de> for Serde { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + <(usize, String)>::deserialize(deserializer) + .map(|(kind, message)| match kind { + 1 => ParquetError::EOF(message), + _ => ParquetError::General(message), + }) + .map(Self) + } + } + + pub fn serialize(t: &T, serializer: S) -> Result where + for<'a> Serde<&'a T>: Serialize, S: Serializer, { - <(usize, &str)>::serialize( - &match self.0 { - ParquetError::General(message) => (0, message), - ParquetError::EOF(message) => (1, message), - _ => unimplemented!(), - }, - serializer, - ) + Serde(t).serialize(serializer) } - } - impl<'de> Deserialize<'de> for Serde { - fn deserialize(deserializer: D) -> Result + pub fn deserialize<'de, T, D>(deserializer: D) -> Result where + Serde: Deserialize<'de>, D: Deserializer<'de>, { - <(usize, String)>::deserialize(deserializer) - .map(|(kind, message)| match kind { - 1 => ParquetError::EOF(message), - _ => ParquetError::General(message), - }) - .map(Self) + Serde::::deserialize(deserializer).map(|x| x.0) } } - pub fn serialize(t: &T, serializer: S) -> Result + #[derive(Serialize, Deserialize, Debug)] + pub enum ParquetError { + File(A), + Partition(B), + Page(C), + Parquet(#[serde(with = "misc_serde")] InternalParquetError), + } + impl PartialEq for ParquetError where - for<'a> Serde<&'a T>: Serialize, - S: Serializer, + A: PartialEq, + B: PartialEq, + C: PartialEq, { - Serde(t).serialize(serializer) + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::File(a), Self::File(b)) => a == b, + (Self::Partition(a), Self::Partition(b)) => a == b, + (Self::Page(a), Self::Page(b)) => a == b, + (Self::Parquet(a), Self::Parquet(b)) => a == b, + _ => false, + } + } } - pub fn deserialize<'de, T, D>(deserializer: D) -> Result + impl error::Error for ParquetError where - Serde: Deserialize<'de>, - D: Deserializer<'de>, + A: error::Error, + B: error::Error, + C: error::Error, { - Serde::::deserialize(deserializer).map(|x| x.0) } -} - -#[derive(Serialize, Deserialize, Debug)] -pub enum ParquetError { - File(A), - Partition(B), - Page(C), - Parquet(#[serde(with = "misc_serde")] InternalParquetError), -} -impl PartialEq for ParquetError -where - A: PartialEq, - B: PartialEq, - C: PartialEq, -{ - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Self::File(a), Self::File(b)) => a == b, - (Self::Partition(a), Self::Partition(b)) => a == b, - (Self::Page(a), Self::Page(b)) => a == b, - (Self::Parquet(a), Self::Parquet(b)) => a == b, - _ => false, + impl Display for ParquetError + where + A: Display, + B: Display, + C: Display, + { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::File(err) => Display::fmt(err, f), + Self::Partition(err) => Display::fmt(err, f), + Self::Page(err) => Display::fmt(err, f), + Self::Parquet(err) => Display::fmt(err, f), + } } } -} -impl error::Error for ParquetError -where - A: error::Error, - B: error::Error, - C: error::Error, -{ -} -impl Display for ParquetError -where - A: Display, - B: Display, - C: Display, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::File(err) => Display::fmt(err, f), - Self::Partition(err) => Display::fmt(err, f), - Self::Page(err) => Display::fmt(err, f), - Self::Parquet(err) => Display::fmt(err, f), + impl From for ParquetError { + fn from(err: InternalParquetError) -> Self { + Self::Parquet(err) } } -} -impl From for ParquetError { - fn from(err: InternalParquetError) -> Self { - Self::Parquet(err) - } -} -use std::io; + use std::io; -impl ParquetReader for PassError>> { - fn len(&self) -> u64 { - self.0.as_ref().unwrap().get_ref().len() as u64 + impl ParquetReader for PassError>> { + fn len(&self) -> u64 { + self.0.as_ref().unwrap().get_ref().len() as u64 + } } -} -struct PassError(Result>); -impl PassError { - fn new(r: Result) -> Self { - Self(r.map_err(Some)) + struct PassError(Result>); + impl PassError { + fn new(r: Result) -> Self { + Self(r.map_err(Some)) + } } -} -impl io::Read for PassError -where - R: io::Read, -{ - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match &mut self.0 { - Ok(r) => r.read(buf), - Err(r) => Err(r.take().unwrap()), + impl io::Read for PassError + where + R: io::Read, + { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match &mut self.0 { + Ok(r) => r.read(buf), + Err(r) => Err(r.take().unwrap()), + } } } -} -impl io::Seek for PassError -where - R: io::Seek, -{ - fn seek(&mut self, pos: io::SeekFrom) -> io::Result { - match &mut self.0 { - Ok(r) => r.seek(pos), - Err(r) => Err(r.take().unwrap()), + impl io::Seek for PassError + where + R: io::Seek, + { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + match &mut self.0 { + Ok(r) => r.seek(pos), + Err(r) => Err(r.take().unwrap()), + } } } } +#[cfg(nightly)] +pub use wrap::*; diff --git a/amadeus-postgres/Cargo.toml b/amadeus-postgres/Cargo.toml index a0590dae..e32b2d60 100644 --- a/amadeus-postgres/Cargo.toml +++ b/amadeus-postgres/Cargo.toml @@ -13,14 +13,12 @@ homepage = "https://github.com/alecmocatta/amadeus" documentation = "https://docs.rs/amadeus" readme = "README.md" edition = "2018" +build = "src/build.rs" [badges] azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" } maintenance = { status = "actively-developed" } -[features] -nightly = [] - [dependencies] amadeus-core = { version = "=0.3.1", path = "../amadeus-core" } amadeus-types = { version = "=0.3.1", path = "../amadeus-types" } @@ -34,3 +32,6 @@ serde = { version = "1.0", features = ["derive"] } serde_closure = "0.3" sum = "0.1" tokio = "0.2" + +[build-dependencies] +rustversion = "1.0" diff --git a/amadeus-postgres/src/build.rs b/amadeus-postgres/src/build.rs new file mode 120000 index 00000000..8203cf84 --- /dev/null +++ b/amadeus-postgres/src/build.rs @@ -0,0 +1 @@ +../../src/build.rs \ No newline at end of file diff --git a/amadeus-postgres/src/lib.rs b/amadeus-postgres/src/lib.rs index 86c42ca3..b11ed694 100644 --- a/amadeus-postgres/src/lib.rs +++ b/amadeus-postgres/src/lib.rs @@ -7,7 +7,7 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). #![doc(html_root_url = "https://docs.rs/amadeus-postgres/0.3.1")] -#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))] +#![cfg_attr(nightly, feature(type_alias_impl_trait))] #![warn( // missing_copy_implementations, // missing_debug_implementations, @@ -212,9 +212,9 @@ where } } -#[cfg(not(feature = "nightly"))] +#[cfg(not(nightly))] type Output = Pin> + Send>>; -#[cfg(feature = "nightly")] +#[cfg(nightly)] type Output = impl Stream> + Send; FnMutNamed! { @@ -261,7 +261,7 @@ FnMutNamed! { }) } .flatten_stream(); - #[cfg(not(feature = "nightly"))] + #[cfg(not(nightly))] let ret = ret.boxed(); ret } @@ -275,7 +275,7 @@ where type Error = PostgresError; type ParStream = DistParStream; - #[cfg(not(feature = "nightly"))] + #[cfg(not(nightly))] #[allow(clippy::type_complexity)] type DistStream = amadeus_core::par_stream::FlatMap< amadeus_core::into_par_stream::IterDistStream< @@ -283,7 +283,7 @@ where >, Closure, >; - #[cfg(feature = "nightly")] + #[cfg(nightly)] type DistStream = impl DistributedStream>; fn par_stream(self) -> Self::ParStream { diff --git a/amadeus-serde/Cargo.toml b/amadeus-serde/Cargo.toml index 4d2c2355..6f180ab1 100644 --- a/amadeus-serde/Cargo.toml +++ b/amadeus-serde/Cargo.toml @@ -13,14 +13,12 @@ homepage = "https://github.com/alecmocatta/amadeus" documentation = "https://docs.rs/amadeus" readme = "README.md" edition = "2018" +build = "src/build.rs" [badges] azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" } maintenance = { status = "actively-developed" } -[features] -nightly = [] - [dependencies] amadeus-core = { version = "=0.3.1", path = "../amadeus-core" } amadeus-types = { version = "=0.3.1", path = "../amadeus-types" } @@ -35,3 +33,6 @@ serde_closure = "0.3" serde_json = "1.0" sum = { version = "0.1", features = ["serde"] } vec-utils = "0.2" + +[build-dependencies] +rustversion = "1.0" diff --git a/amadeus-serde/src/build.rs b/amadeus-serde/src/build.rs new file mode 120000 index 00000000..8203cf84 --- /dev/null +++ b/amadeus-serde/src/build.rs @@ -0,0 +1 @@ +../../src/build.rs \ No newline at end of file diff --git a/amadeus-serde/src/csv.rs b/amadeus-serde/src/csv.rs index cf88f040..b74995af 100644 --- a/amadeus-serde/src/csv.rs +++ b/amadeus-serde/src/csv.rs @@ -80,9 +80,9 @@ where } type Error = CsvError::Error, <

::Page as Page>::Error>; -#[cfg(not(feature = "nightly"))] +#[cfg(not(nightly))] type Output = std::pin::Pin>>>>; -#[cfg(feature = "nightly")] +#[cfg(nightly)] type Output = impl Stream>>; FnMutNamed! { @@ -128,7 +128,7 @@ FnMutNamed! { .map(ResultExpandIter::new) .flatten_stream() .map(|row: Result>, Error>| Ok(row??)); - #[cfg(not(feature = "nightly"))] + #[cfg(not(nightly))] let ret = ret.boxed_local(); ret } @@ -148,13 +148,13 @@ where >; type ParStream = DistParStream; - #[cfg(not(feature = "nightly"))] + #[cfg(not(nightly))] #[allow(clippy::type_complexity)] type DistStream = amadeus_core::par_stream::FlatMap< amadeus_core::into_par_stream::IterDistStream>, Closure, >; - #[cfg(feature = "nightly")] + #[cfg(nightly)] type DistStream = impl DistributedStream>; fn par_stream(self) -> Self::ParStream { diff --git a/amadeus-serde/src/json.rs b/amadeus-serde/src/json.rs index 31415cff..161176c2 100644 --- a/amadeus-serde/src/json.rs +++ b/amadeus-serde/src/json.rs @@ -39,9 +39,9 @@ where } type Error = JsonError::Error, <

::Page as Page>::Error>; -#[cfg(not(feature = "nightly"))] +#[cfg(not(nightly))] type Output = std::pin::Pin>>>>; -#[cfg(feature = "nightly")] +#[cfg(nightly)] type Output = impl Stream>>; FnMutNamed! { @@ -82,7 +82,7 @@ FnMutNamed! { .map(ResultExpandIter::new) .flatten_stream() .map(|row: Result>, Error>| Ok(row??)); - #[cfg(not(feature = "nightly"))] + #[cfg(not(nightly))] let ret = ret.boxed_local(); ret } @@ -102,13 +102,13 @@ where >; type ParStream = DistParStream; - #[cfg(not(feature = "nightly"))] + #[cfg(not(nightly))] #[allow(clippy::type_complexity)] type DistStream = amadeus_core::par_stream::FlatMap< amadeus_core::into_par_stream::IterDistStream>, Closure, >; - #[cfg(feature = "nightly")] + #[cfg(nightly)] type DistStream = impl DistributedStream>; fn par_stream(self) -> Self::ParStream { diff --git a/amadeus-serde/src/lib.rs b/amadeus-serde/src/lib.rs index effeb0cf..b474e596 100644 --- a/amadeus-serde/src/lib.rs +++ b/amadeus-serde/src/lib.rs @@ -7,7 +7,7 @@ //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). #![doc(html_root_url = "https://docs.rs/amadeus-serde/0.3.1")] -#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))] +#![cfg_attr(nightly, feature(type_alias_impl_trait))] #![warn( // missing_copy_implementations, // missing_debug_implementations, diff --git a/amadeus-types/Cargo.toml b/amadeus-types/Cargo.toml index 19658990..bcd5ecc3 100644 --- a/amadeus-types/Cargo.toml +++ b/amadeus-types/Cargo.toml @@ -13,6 +13,7 @@ homepage = "https://github.com/alecmocatta/amadeus" documentation = "https://docs.rs/amadeus" readme = "README.md" edition = "2018" +build = "src/build.rs" [badges] azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" } @@ -29,3 +30,6 @@ ordered-float = "2.0" serde = { version = "1.0", features = ["derive"] } url = { version = "2.1", features = ["serde"] } vec-utils = "0.2" + +[build-dependencies] +rustversion = "1.0" diff --git a/amadeus-types/src/build.rs b/amadeus-types/src/build.rs new file mode 120000 index 00000000..8203cf84 --- /dev/null +++ b/amadeus-types/src/build.rs @@ -0,0 +1 @@ +../../src/build.rs \ No newline at end of file diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 70427ca1..ed38d534 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -22,9 +22,9 @@ jobs: rust_toolchain: nightly rust_lint_toolchain: nightly-2020-07-12 rust_flags: '' - rust_features_clippy: ';nightly;aws;commoncrawl;parquet;postgres;csv;json;aws commoncrawl parquet postgres csv json;aws commoncrawl parquet postgres csv json nightly amadeus-aws/nightly amadeus-commoncrawl/nightly amadeus-parquet/nightly amadeus-postgres/nightly amadeus-serde/nightly' - rust_features: 'aws commoncrawl parquet postgres csv json' - rust_doc_features: 'aws commoncrawl parquet postgres csv json' + rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;constellation aws commoncrawl parquet postgres csv json' + rust_features: 'constellation aws commoncrawl parquet postgres csv json' + rust_doc_features: 'constellation aws commoncrawl parquet postgres csv json' rust_target_check: '' rust_target_build: '' rust_target_run: '' @@ -46,7 +46,7 @@ jobs: rust_toolchain: nightly rust_lint_toolchain: nightly-2020-07-12 rust_flags: '' - rust_features_clippy: ';nightly;aws;commoncrawl;parquet;postgres;csv;json;aws commoncrawl parquet postgres csv json' + rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;aws commoncrawl parquet postgres csv json' rust_features: 'aws commoncrawl parquet postgres csv json' rust_doc_features: 'aws commoncrawl parquet postgres csv json' rust_target_check: '' diff --git a/src/build.rs b/src/build.rs new file mode 100644 index 00000000..4464998f --- /dev/null +++ b/src/build.rs @@ -0,0 +1,12 @@ +fn main() { + println!("cargo:rerun-if-changed=build.rs"); + + nightly(); +} + +#[rustversion::nightly] +fn nightly() { + println!("cargo:rustc-cfg=nightly"); +} +#[rustversion::not(nightly)] +fn nightly() {} diff --git a/src/lib.rs b/src/lib.rs index 376b84aa..73dccc1a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ #![doc( html_logo_url = "https://raw.githubusercontent.com/constellation-rs/amadeus/master/logo.svg?sanitize=true" )] -#![cfg_attr(feature = "nightly", feature(unboxed_closures))] +#![cfg_attr(nightly, feature(unboxed_closures))] #![warn( // missing_copy_implementations, // missing_debug_implementations, @@ -29,6 +29,9 @@ )] #![deny(unsafe_code)] +#[cfg(all(not(nightly), feature = "parquet"))] +compile_error!("The Amadeus Parquet connector currently requires nightly"); + #[cfg(all( feature = "aws", feature = "parquet", diff --git a/src/pool.rs b/src/pool.rs index e7926428..775ca43d 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -18,7 +18,7 @@ use amadeus_core::pool::{ type Result = std::result::Result>; #[cfg(feature = "constellation")] -#[cfg_attr(not(feature = "nightly"), serde_closure::desugar)] +#[cfg_attr(not(nightly), serde_closure::desugar)] impl ProcessPoolTrait for ProcessPool { type ThreadPool = ThreadPool; @@ -35,7 +35,7 @@ impl ProcessPoolTrait for ProcessPool { } } -#[cfg_attr(not(feature = "nightly"), serde_closure::desugar)] +#[cfg_attr(not(nightly), serde_closure::desugar)] impl ProcessPoolTrait for ThreadPool { type ThreadPool = Self; diff --git a/src/pool/process.rs b/src/pool/process.rs index a43911ce..343884d0 100644 --- a/src/pool/process.rs +++ b/src/pool/process.rs @@ -259,7 +259,7 @@ impl Drop for ProcessPoolInner { #[derive(Debug)] pub struct ProcessPool(Arc); -#[cfg_attr(not(feature = "nightly"), serde_closure::desugar)] +#[cfg_attr(not(nightly), serde_closure::desugar)] impl ProcessPool { pub fn new( processes: Option, tasks_per_core: Option, resources: Resources,