From 09a45a3c59100cc0566868a0e081336c63e69888 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Tue, 13 Dec 2022 01:18:23 +0100 Subject: [PATCH 01/12] chore: update datafusion dependencies --- .github/workflows/build.yml | 10 ++++---- python/Cargo.toml | 4 +-- rust/Cargo.toml | 26 +++++++++++-------- rust/README.md | 19 +++++--------- rust/src/delta_arrow.rs | 38 ++++++++++------------------ rust/src/delta_datafusion.rs | 33 +++++++++++++++++------- rust/src/lib.rs | 37 ++++++++++++++++++--------- rust/src/operations/load.rs | 2 +- rust/src/operations/mod.rs | 16 ++++++------ rust/src/operations/write.rs | 15 ++++++++++- rust/src/storage/mod.rs | 4 +-- rust/src/writer/stats.rs | 2 +- rust/tests/datafusion_test.rs | 10 ++++---- rust/tests/integration_datafusion.rs | 2 +- 14 files changed, 124 insertions(+), 94 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8f23287773..83de3b09c3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -43,11 +43,11 @@ jobs: override: true - uses: Swatinem/rust-cache@v1 - name: build and lint with clippy - run: cargo clippy --features azure,datafusion-ext,s3,gcs,glue + run: cargo clippy --features azure,datafusion,s3,gcs,glue - name: Spot-check build for rustls features run: cargo clippy --features s3-rustls - name: Check docs - run: cargo doc --features azure,datafusion-ext,s3,gcs,glue + run: cargo doc --features azure,datafusion,s3,gcs,glue test: strategy: @@ -68,7 +68,7 @@ jobs: override: true - uses: Swatinem/rust-cache@v1 - name: Run tests - run: cargo test --verbose --features datafusion-ext,azure + run: cargo test --verbose --features datafusion,azure integration_test: name: Integration Tests @@ -107,10 +107,10 @@ jobs: - name: Run tests with default ssl run: | - cargo test --features integration_test,azure,s3,gcs,datafusion-ext + cargo test --features integration_test,azure,s3,gcs,datafusion - name: Run tests with rustls run: | - cargo test --features integration_test,s3-rustls,datafusion-ext + cargo test --features integration_test,s3-rustls,datafusion parquet2_test: runs-on: ubuntu-latest diff --git a/python/Cargo.toml b/python/Cargo.toml index c5aafda258..e48545a447 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -17,7 +17,7 @@ crate-type = ["cdylib"] name = "deltalake._internal" [dependencies] -arrow-schema = { version = "26", features = ["serde"] } +arrow-schema = { version = "28", features = ["serde"] } chrono = "0" env_logger = "0" futures = "0.3" @@ -37,4 +37,4 @@ features = ["extension-module", "abi3", "abi3-py37"] [dependencies.deltalake] path = "../rust" version = "0" -features = ["s3", "azure", "glue", "gcs", "python", "datafusion-ext"] +features = ["s3", "azure", "glue", "gcs", "python", "datafusion", "aws-profile"] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 1d0f0bf831..7ae4dae395 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -10,7 +10,7 @@ description = "Native Delta Lake implementation in Rust" edition = "2021" [dependencies] -arrow = { version = "26", optional = true } +arrow = { version = "28", optional = true } async-trait = "0.1" bytes = "1" chrono = "0.4.22" @@ -22,10 +22,10 @@ log = "0" libc = ">=0.2.90, <1" num-bigint = "0.4" num-traits = "0.2.15" -object_store = { version = "0.5.2", features = ["aws_profile"] } +object_store = "0.5.2" once_cell = "1.16.0" parking_lot = "0.12" -parquet = { version = "26", features = ["async"], optional = true } +parquet = { version = "28", features = ["async"], optional = true } parquet2 = { version = "0.17", optional = true } percent-encoding = "2" serde = { version = "1", features = ["derive"] } @@ -46,10 +46,10 @@ rusoto_dynamodb = { version = "0.48", default-features = false, optional = true rusoto_glue = { version = "0.48", default-features = false, optional = true } # Datafusion -datafusion = { version = "14", optional = true } -datafusion-expr = { version = "14", optional = true } -datafusion-common = { version = "14", optional = true } -datafusion-proto = { version = "14", optional = true } +datafusion = { version = "15", optional = true } +datafusion-expr = { version = "15", optional = true } +datafusion-common = { version = "15", optional = true } +datafusion-proto = { version = "15", optional = true } # NOTE dependencies only for integration tests fs_extra = { version = "1.2.0", optional = true } @@ -77,14 +77,15 @@ glibc_version = { path = "../glibc_version", version = "0.1" } [features] default = ["arrow", "parquet"] -datafusion-ext = [ - "datafusion", +datafusion = [ + "dep:datafusion", "datafusion-expr", "datafusion-common", "datafusion-proto", "arrow", "parquet", ] +datafusion-ext = ["datafusion"] azure = ["object_store/azure"] gcs = ["object_store/gcp"] s3 = [ @@ -105,6 +106,11 @@ s3-rustls = [ ] glue = ["s3", "rusoto_glue"] python = ["arrow/pyarrow"] + +# AWS_PROFILE is experimental in object_store, and comes with a hefty load of dependencies. +# must be used in conjunction with s3 or s3-rustls feature +aws-profile = ["object_store/aws_profile"] + # used only for integration testing integration_test = ["fs_extra", "tempdir"] @@ -114,4 +120,4 @@ harness = false [[example]] name = "basic_operations" -required-features = ["datafusion-ext"] +required-features = ["datafusion"] diff --git a/rust/README.md b/rust/README.md index b20bfda3f8..a7073ddbdd 100644 --- a/rust/README.md +++ b/rust/README.md @@ -1,14 +1,11 @@ -Deltalake -========= +# Deltalake [![crates.io](https://img.shields.io/crates/v/deltalake.svg?style=flat-square)](https://crates.io/crates/deltalake) [![api_doc](https://img.shields.io/badge/doc-api-blue)](https://docs.rs/deltalake) Native Delta Lake implementation in Rust - -Usage ------ +## Usage ### API @@ -17,7 +14,6 @@ let table = deltalake::open_table("./tests/data/simple_table").await.unwrap(); println!("{}", table.get_files()); ``` - ### CLI ```bash @@ -43,20 +39,19 @@ Examples can be run using the `cargo run --example` command. For example: cargo run --example read_delta_table ``` -Optional cargo package features ------------------------ +## Optional cargo package features - `s3` - enable the S3 storage backend to work with Delta Tables in AWS S3. - `s3-rustls` - enable the S3 storage backend but rely on [rustls](https://github.com/ctz/rustls) rather than OpenSSL (`native-tls`). - `glue` - enable the Glue data catalog to work with Delta Tables with AWS Glue. - `azure` - enable the Azure storage backend to work with Delta Tables in Azure Data Lake Storage Gen2 accounts. - `gcs` - enable the Google storage backend to work with Delta Tables in Google Cloud Storage. -- `datafusion-ext` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion). +- `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion). +- `datafusion-ext` - DEPRECATED: alias for `datafusion` feature +- `aws-profile` - enable support for aws profile authorization in uderlying object_store crate - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`. - -Development ------------ +## Development To run s3 integration tests from local machine, we use docker-compose to stand up AWS local stack. To spin up the test environment run `docker-compose up` in diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index cdfe1f75c6..4d3ae24be1 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -8,8 +8,6 @@ use arrow::datatypes::{ use arrow::error::ArrowError; use lazy_static::lazy_static; use regex::Regex; -use std::collections::BTreeMap; -use std::collections::HashMap; use std::convert::TryFrom; impl TryFrom<&schema::Schema> for ArrowSchema { @@ -30,25 +28,18 @@ impl TryFrom<&schema::SchemaField> for ArrowField { type Error = ArrowError; fn try_from(f: &schema::SchemaField) -> Result { - let mut field = ArrowField::new( + Ok(ArrowField::new( f.get_name(), ArrowDataType::try_from(f.get_type())?, f.is_nullable(), - ); - - let metadata: Option> = Some(f.get_metadata()) - .filter(|metadata| metadata.is_empty()) - .map(|metadata| { - metadata - .iter() - .map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?))) - .collect::>() - .map_err(|err| ArrowError::JsonError(err.to_string())) - }) - .transpose()?; - - field.set_metadata(metadata); - Ok(field) + ) + .with_metadata( + f.get_metadata() + .iter() + .map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?))) + .collect::>() + .map_err(|err| ArrowError::JsonError(err.to_string()))?, + )) } } @@ -111,7 +102,7 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType { )) })?; let precision = extract.get(1).and_then(|v| v.as_str().parse::().ok()); - let scale = extract.get(2).and_then(|v| v.as_str().parse::().ok()); + let scale = extract.get(2).and_then(|v| v.as_str().parse::().ok()); match (precision, scale) { // TODO how do we decide which variant (128 / 256) to use? (Some(p), Some(s)) => Ok(ArrowDataType::Decimal128(p, s)), @@ -205,12 +196,9 @@ impl TryFrom<&ArrowField> for schema::SchemaField { arrow_field.is_nullable(), arrow_field .metadata() - .as_ref() - .map_or_else(HashMap::new, |m| { - m.iter() - .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone()))) - .collect() - }), + .iter() + .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone()))) + .collect(), )) } } diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 3644b25e61..796a06b0df 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -29,6 +29,7 @@ use std::sync::Arc; use arrow::array::ArrayRef; use arrow::compute::{cast_with_options, CastOptions}; use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, TimeUnit}; +use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use chrono::{DateTime, NaiveDateTime, Utc}; @@ -42,14 +43,15 @@ use datafusion::execution::FunctionRegistry; use datafusion::optimizer::utils::conjunction; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use datafusion::physical_plan::file_format::FileScanConfig; +use datafusion::physical_plan::file_format::{partition_type_wrap, FileScanConfig}; use datafusion::physical_plan::{ ColumnStatistics, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use datafusion_common::scalar::ScalarValue; use datafusion_common::{Column, DataFusionError, Result as DataFusionResult}; -use datafusion_expr::{Expr, Extension, LogicalPlan}; -use datafusion_proto::logical_plan::{LogicalExtensionCodec, PhysicalExtensionCodec}; +use datafusion_expr::{CreateExternalTable, Expr, Extension, LogicalPlan}; +use datafusion_proto::logical_plan::LogicalExtensionCodec; +use datafusion_proto::physical_plan::PhysicalExtensionCodec; use object_store::{path::Path, ObjectMeta}; use url::Url; @@ -331,7 +333,7 @@ impl TableProvider for DeltaTable { async fn scan( &self, session: &SessionState, - projection: &Option>, + projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> DataFusionResult> { @@ -382,17 +384,26 @@ impl TableProvider for DeltaTable { .cloned() .collect(), )); - let parquet_scan = ParquetFormat::default() + let table_partition_cols = table_partition_cols + .iter() + .map(|col| { + let data_type = schema.field_with_name(col)?.data_type(); + Ok((col.clone(), partition_type_wrap(data_type.clone()))) + }) + .collect::, ArrowError>>()?; + + let parquet_scan = ParquetFormat::new(session.config.config_options.clone()) .create_physical_plan( FileScanConfig { object_store_url: self.storage.object_store_url(), file_schema, file_groups: file_groups.into_values().collect(), statistics: self.datafusion_table_statistics(), - projection: projection.clone(), + projection: projection.cloned(), limit, table_partition_cols, - config_options: Default::default(), + config_options: session.config.config_options.clone(), + output_ordering: Default::default(), }, filters, ) @@ -848,8 +859,12 @@ pub struct DeltaTableFactory {} #[async_trait] impl TableProviderFactory for DeltaTableFactory { - async fn create(&self, url: &str) -> datafusion::error::Result> { - let provider = open_table(url).await.unwrap(); + async fn create( + &self, + _ctx: &SessionState, + cmd: &CreateExternalTable, + ) -> datafusion::error::Result> { + let provider = open_table(cmd.location.clone()).await.unwrap(); Ok(Arc::new(provider)) } } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 2d59daa10a..09fa3ca6e4 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -41,8 +41,20 @@ //! or Azure Blob Storage / Azure Data Lake Storage Gen2 (ADLS2). Use `s3-rustls` to use Rust TLS //! instead of native TLS implementation. //! - `glue` - enable the Glue data catalog to work with Delta Tables with AWS Glue. -//! - `datafusion-ext` - enable the `datafusion::datasource::TableProvider` trait implementation +//! - `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation //! for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion). +//! - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature. +//! - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features +//! are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`. +//! - `aws-profile` - EXPERIMENTAL support for aws profile authorization +//! +//! It is strongly encouraged that users do not use `aws-profile` and instead make use of a credential +//! manager such as [aws-vault] not only to avoid the significant additional dependencies, +//! but also to avoid storing credentials in [plain text on disk] +//! +//! [aws-config]: https://docs.rs/aws-config +//! [aws-vault]: https://github.com/99designs/aws-vault +//! [plain text on disk]: https://99designs.com.au/blog/engineering/aws-vault/ //! //! # Querying Delta Tables with Datafusion //! @@ -64,23 +76,24 @@ //! .await.unwrap(); //! }; //! ``` -//! -//! It's important to note that the DataFusion library is evolving quickly, often with breaking api -//! changes, and this may cause compilation issues as a result. If you are having issues with the most -//! recently released `delta-rs` you can set a specific branch or commit in your `Cargo.toml`. -//! -//! ```toml -//! datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "07bc2c754805f536fe1cd873dbe6adfc0a21cbb3" } -//! ``` #![deny(warnings)] #![deny(missing_docs)] #[cfg(all(feature = "parquet", feature = "parquet2"))] compile_error!( - "Feature parquet and parquet2 are mutually exclusive and cannot be enabled together" + "Features parquet and parquet2 are mutually exclusive and cannot be enabled together" ); +#[cfg(all( + feature = "aws-profile", + not(any(feature = "s3", feature = "s3-rustls")) +))] +compile_error!("Feature aws-profile must be used together with s3 or s3-rustls feature"); + +#[cfg(all(feature = "s3", feature = "s3-rustls"))] +compile_error!("Features s3 and s3-rustls are mutually exclusive and cannot be enabled together"); + pub mod action; pub mod builder; pub mod data_catalog; @@ -98,7 +111,7 @@ pub mod vacuum; pub mod checkpoints; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod delta_arrow; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] pub mod delta_datafusion; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod operations; @@ -117,7 +130,7 @@ pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, Object // convenience exports for consumers to avoid aligning crate versions #[cfg(feature = "arrow")] pub use arrow; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] pub use datafusion; #[cfg(all(feature = "arrow", feature = "parquet"))] pub use operations::DeltaOps; diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index 821b11b0f5..70f825f994 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -87,7 +87,7 @@ impl std::future::IntoFuture for LoadBuilder { ctx.state() .runtime_env .register_object_store(scheme, "", store); - let scan_plan = table.scan(&ctx.state(), &None, &[], None).await?; + let scan_plan = table.scan(&ctx.state(), None, &[], None).await?; let plan = CoalescePartitionsExec::new(scan_plan); let task_ctx = Arc::new(TaskContext::from(&ctx.state())); let stream = plan.execute(0, task_ctx)?; diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index db9ae818a1..fb0d79198e 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -14,20 +14,20 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError}; pub mod create; pub mod transaction; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] use self::{load::LoadBuilder, write::WriteBuilder}; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] use arrow::record_batch::RecordBatch; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] pub use datafusion::physical_plan::common::collect as collect_sendable_stream; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] mod load; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] pub mod write; // TODO the writer module does not actually depend on datafusion, // eventually we should consolidate with the record batch writer -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] mod writer; /// Maximum supported writer version @@ -93,14 +93,14 @@ impl DeltaOps { } /// Load data from a DeltaTable - #[cfg(feature = "datafusion-ext")] + #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { LoadBuilder::default().with_object_store(self.0.object_store()) } /// Write data to Delta table - #[cfg(feature = "datafusion-ext")] + #[cfg(feature = "datafusion")] #[must_use] pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { WriteBuilder::default() diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 823f1d466d..80e5f0079a 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -273,8 +273,21 @@ impl std::future::IntoFuture for WriteBuilder { let schema = batches[0].schema(); if let Ok(meta) = table.get_metadata() { + // NOTE the schema generated from the delta schema will have the delta field metadata included, + // so we need to compare the field names and datatypes instead. + // TODO update comparison logic, once we have column mappings supported. let curr_schema: ArrowSchemaRef = Arc::new((&meta.schema).try_into()?); - if schema != curr_schema { + let curr_fields = curr_schema + .fields() + .iter() + .map(|f| (f.name(), f.data_type())) + .collect::>(); + let new_fields = schema + .fields() + .iter() + .map(|f| (f.name(), f.data_type())) + .collect::>(); + if new_fields != curr_fields { return Err(DeltaTableError::Generic( "Updating table schema not yet implemented".to_string(), )); diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 9aee1203ee..c7531fc880 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use tokio::io::AsyncWrite; use crate::get_storage_backend; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] use datafusion::datasource::object_store::ObjectStoreUrl; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -143,7 +143,7 @@ impl DeltaObjectStore { self.config.to_uri(&Path::from("")) } - #[cfg(feature = "datafusion-ext")] + #[cfg(feature = "datafusion")] /// generate a unique enough url to identify the store in datafusion. pub(crate) fn object_store_url(&self) -> ObjectStoreUrl { // we are certain, that the URL can be parsed, since diff --git a/rust/src/writer/stats.rs b/rust/src/writer/stats.rs index e14b7dc333..2ec3c5f8d3 100644 --- a/rust/src/writer/stats.rs +++ b/rust/src/writer/stats.rs @@ -38,7 +38,7 @@ pub(crate) fn apply_null_counts( array .columns() - .into_iter() + .iter() .zip(fields) .for_each(|(column, field)| { let key = field.name().to_owned(); diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 519c5eaacd..732a6fc7a3 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -1,4 +1,4 @@ -#![cfg(feature = "datafusion-ext")] +#![cfg(feature = "datafusion")] use std::collections::{HashMap, HashSet}; use std::path::PathBuf; @@ -86,7 +86,7 @@ async fn prepare_table( #[tokio::test] async fn test_datafusion_sql_registration() -> Result<()> { let mut table_factories: HashMap> = HashMap::new(); - table_factories.insert("deltatable".to_string(), Arc::new(DeltaTableFactory {})); + table_factories.insert("DELTA".to_string(), Arc::new(DeltaTableFactory {})); let cfg = RuntimeConfig::new().with_table_factories(table_factories); let env = RuntimeEnv::new(cfg).unwrap(); let ses = SessionConfig::new(); @@ -95,7 +95,7 @@ async fn test_datafusion_sql_registration() -> Result<()> { let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); d.push("tests/data/delta-0.8.0-partitioned"); let sql = format!( - "CREATE EXTERNAL TABLE demo STORED AS DELTATABLE LOCATION '{}'", + "CREATE EXTERNAL TABLE demo STORED AS DELTA LOCATION '{}'", d.to_str().unwrap() ); let _ = ctx @@ -255,7 +255,7 @@ async fn test_files_scanned() -> Result<()> { assert_eq!(table.version(), 2); let ctx = SessionContext::new(); - let plan = table.scan(&ctx.state(), &None, &[], None).await?; + let plan = table.scan(&ctx.state(), None, &[], None).await?; let plan = CoalescePartitionsExec::new(plan.clone()); let task_ctx = Arc::new(TaskContext::from(&ctx.state())); @@ -270,7 +270,7 @@ async fn test_files_scanned() -> Result<()> { Expr::Literal(ScalarValue::Int32(Some(5))), ); - let plan = CoalescePartitionsExec::new(table.scan(&ctx.state(), &None, &[filter], None).await?); + let plan = CoalescePartitionsExec::new(table.scan(&ctx.state(), None, &[filter], None).await?); let task_ctx = Arc::new(TaskContext::from(&ctx.state())); let _result = common::collect(plan.execute(0, task_ctx)?).await?; diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index 53cc18ff7e..3bb75d45c8 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -1,4 +1,4 @@ -#![cfg(all(feature = "integration_test", feature = "datafusion-ext"))] +#![cfg(all(feature = "integration_test", feature = "datafusion"))] use arrow::array::Int64Array; use datafusion::execution::context::SessionContext; From ecb1c87cab3bf9ada5dc6c0368e8edc76cba2dcc Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Tue, 13 Dec 2022 02:13:06 +0100 Subject: [PATCH 02/12] test: add test for reading non string partitions --- python/pyproject.toml | 3 +- .../_delta_log/.00000000000000000000.json.crc | Bin 0 -> 24 bytes .../_delta_log/00000000000000000000.json | 6 +++ ...-82d6-d42121d883fd.c000.snappy.parquet.crc | Bin 0 -> 12 bytes ...46f5-82d6-d42121d883fd.c000.snappy.parquet | Bin 0 -> 452 bytes ...-8051-f8b54328ffdb.c000.snappy.parquet.crc | Bin 0 -> 12 bytes ...424a-8051-f8b54328ffdb.c000.snappy.parquet | Bin 0 -> 452 bytes ...-acc4-2a9608499d7c.c000.snappy.parquet.crc | Bin 0 -> 12 bytes ...4fd0-acc4-2a9608499d7c.c000.snappy.parquet | Bin 0 -> 452 bytes rust/tests/datafusion_test.rs | 48 ++++++++++++++++++ 10 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/.00000000000000000000.json.crc create mode 100644 rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/00000000000000000000.json create mode 100644 rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/.part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet.crc create mode 100644 rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet create mode 100644 rust/tests/data/delta-2.2.0-partitioned-types/c1=5/c2=b/.part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet.crc create mode 100644 rust/tests/data/delta-2.2.0-partitioned-types/c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet create mode 100644 rust/tests/data/delta-2.2.0-partitioned-types/c1=6/c2=a/.part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet.crc create mode 100644 rust/tests/data/delta-2.2.0-partitioned-types/c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet diff --git a/python/pyproject.toml b/python/pyproject.toml index 5d4be70441..d7bc32f538 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -34,7 +34,8 @@ devel = [ "pytest-timeout", "sphinx<=4.5", "sphinx-rtd-theme", - "toml" + "toml", + "wheel" ] pyspark = [ "pyspark", diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/.00000000000000000000.json.crc b/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/.00000000000000000000.json.crc new file mode 100644 index 0000000000000000000000000000000000000000..5042c0fbc207f50828f88e86a16d4e548a90c766 GIT binary patch literal 24 gcmYc;N@ieSU}CswvEEPa6f1w)ra3v6)V4kW09j24_5c6? literal 0 HcmV?d00001 diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/00000000000000000000.json b/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..2db663806a --- /dev/null +++ b/rust/tests/data/delta-2.2.0-partitioned-types/_delta_log/00000000000000000000.json @@ -0,0 +1,6 @@ +{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}} +{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}} +{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":"b"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}} +{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}} diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/.part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet.crc b/rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/.part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..4df00298f1e6cfb68f784e1a335f317ea3c38b63 GIT binary patch literal 12 TcmYc;N@ieSU}9)J>2D7J63hb^ literal 0 HcmV?d00001 diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet b/rust/tests/data/delta-2.2.0-partitioned-types/c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3f09f1d9456c5e52e0d69433be90a12b4a4e083d GIT binary patch literal 452 zcmZWm%Sr<=6uphM4B|#;LI!3r1T8Jp(3$j&8(kD}A;q1DNSda#IDO4b6{-CTf5A`j zo7{TasVg@(_nh2&&LQbv-USqBk#hR^_5G9gC_z}D4#3eV0f2*C9;T+dM8PDJ3)TVl(6S??!a#+g>C`K(=i!^DIe6A}X? literal 0 HcmV?d00001 diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet b/rust/tests/data/delta-2.2.0-partitioned-types/c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..10ec40964bbffe84a556484b2f555460625f597e GIT binary patch literal 452 zcmZWm%Sr<=6upgF2XP}bAp^50K}!oYbS8b{Mi)g~NO313lBQ`b&h#-eRiySu{31WX z5AZX*?bMZ#WZdghfVOOkIaxOB> ztTHHXjWUE5mr*at1JTPgvUNXcQn0tTpZK8wdv&o!FLt-pM=aWa`r`GJy3`oJsy zCXz)cG3sJzQyI_RWMor4)T!q8E0Mm>t>}s*jh7}BRWX{_aF~eh!%9qbw7D`0>D}05 zn%c3Ak(e&SP!B>Jz)f2ff10AZTCX41z;AYr=TThTMJ)Q#4`fg*50;Z~SZuYds+o4J mqg16 literal 0 HcmV?d00001 diff --git a/rust/tests/data/delta-2.2.0-partitioned-types/c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet b/rust/tests/data/delta-2.2.0-partitioned-types/c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..a0e02daa50ab83d7eadb5c7112bf7779b854fbf9 GIT binary patch literal 452 zcmZWm%Sr<=6upgA2GNDkgbd801T8Jp(3$j&8(kD}A;q1DNSda#IMc_>RFT@1U*gyJ z4gQU{ow{;!bI-}W=Nyv$<(*G~7AU7*-#?$n9wi75r~`0#N&sLlm&fX!5SQn-U3O67 zGNe7`&>l8`xO#n*%ysn~qzoJ?j@zOST`KJd!F ziDVH Result<()> { Ok(()) } +#[tokio::test] +async fn test_datafusion_partitioned_types() -> Result<()> { + let ctx = SessionContext::new(); + let table = deltalake::open_table("./tests/data/delta-2.2.0-partitioned-types") + .await + .unwrap(); + ctx.register_table("demo", Arc::new(table))?; + + let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?; + + let expected = vec![ + "+----+----+----+", + "| c3 | c1 | c2 |", + "+----+----+----+", + "| 5 | 4 | c |", + "| 6 | 5 | b |", + "| 4 | 6 | a |", + "+----+----+----+", + ]; + + assert_batches_sorted_eq!(&expected, &batches); + + let expected_schema = ArrowSchema::new(vec![ + ArrowField::new("c3", ArrowDataType::Int32, true), + ArrowField::new( + "c1", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt16), + Box::new(ArrowDataType::Int32), + ), + false, + ), + ArrowField::new( + "c2", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt16), + Box::new(ArrowDataType::Utf8), + ), + false, + ), + ]); + + assert_eq!(Arc::new(expected_schema), batches[0].schema()); + + Ok(()) +} + #[tokio::test] async fn test_datafusion_date_column() -> Result<()> { let ctx = SessionContext::new(); From 3b9d027148c1dc6ee9f41d461cecb4bd769a43a8 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Tue, 13 Dec 2022 02:21:34 +0100 Subject: [PATCH 03/12] chore: use chrono without default features --- rust/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 7ae4dae395..fb57beb5ba 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -13,7 +13,7 @@ edition = "2021" arrow = { version = "28", optional = true } async-trait = "0.1" bytes = "1" -chrono = "0.4.22" +chrono = { version = "0.4.22", default-features = false, features = ["clock"] } cfg-if = "1" errno = "0.2" futures = "0.3" From b1b566e00175ff3d6777a1e38bf8d0ab32df6ca5 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Tue, 13 Dec 2022 02:39:52 +0100 Subject: [PATCH 04/12] fix: move profile code behind feature flag --- rust/src/builder/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/src/builder/mod.rs b/rust/src/builder/mod.rs index 05f64efe65..e2cc7b9f6a 100644 --- a/rust/src/builder/mod.rs +++ b/rust/src/builder/mod.rs @@ -694,6 +694,7 @@ pub fn get_s3_builder_from_options( } builder = builder.with_region(s3_options.region.name()); + #[cfg(feature = "aws-profile")] if let Some(profile) = &s3_options.profile { builder = builder.with_profile(profile); } From c55878f4d38ba045c223b992e586da9719e6a7b8 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Tue, 13 Dec 2022 17:57:41 +0100 Subject: [PATCH 05/12] Apply suggestions from code review Co-authored-by: R. Tyler Croy --- rust/README.md | 2 +- rust/src/lib.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/README.md b/rust/README.md index a7073ddbdd..dbb09e5171 100644 --- a/rust/README.md +++ b/rust/README.md @@ -48,7 +48,7 @@ cargo run --example read_delta_table - `gcs` - enable the Google storage backend to work with Delta Tables in Google Cloud Storage. - `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion). - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature -- `aws-profile` - enable support for aws profile authorization in uderlying object_store crate +- `aws-profile` - enable support for aws profile authorization in underlying object_store crate - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`. ## Development diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 09fa3ca6e4..31cc80b744 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -46,11 +46,11 @@ //! - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature. //! - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features //! are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`. -//! - `aws-profile` - EXPERIMENTAL support for aws profile authorization +//! - `aws-profile` - enable support for aws profile authorization in underlying object_store crat //! -//! It is strongly encouraged that users do not use `aws-profile` and instead make use of a credential -//! manager such as [aws-vault] not only to avoid the significant additional dependencies, -//! but also to avoid storing credentials in [plain text on disk] +//! Using `aws-profile` requires that credentials being stored in plain-text on the local disk in `~/.aws/credentials` +//! Consider using a manager such as [aws-vault] to avoid additional dependencies and storing +//! credentials in [plain text on disk] //! //! [aws-config]: https://docs.rs/aws-config //! [aws-vault]: https://github.com/99designs/aws-vault From 9b858f540597e60d59045cb9bbdb2608b6efa695 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 16 Dec 2022 18:35:15 +0100 Subject: [PATCH 06/12] chore: remove test added elsewhere --- rust/tests/datafusion_test.rs | 49 +---------------------------------- 1 file changed, 1 insertion(+), 48 deletions(-) diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 1870f75b9d..e571b07960 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -96,7 +96,7 @@ async fn test_datafusion_sql_registration() -> Result<()> { let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); d.push("tests/data/delta-0.8.0-partitioned"); let sql = format!( - "CREATE EXTERNAL TABLE demo STORED AS DELTA LOCATION '{}'", + "CREATE EXTERNAL TABLE demo STORED AS DELTATABLE LOCATION '{}'", d.to_str().unwrap() ); let _ = ctx @@ -144,53 +144,6 @@ async fn test_datafusion_simple_query_partitioned() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_datafusion_partitioned_types() -> Result<()> { - let ctx = SessionContext::new(); - let table = deltalake::open_table("./tests/data/delta-2.2.0-partitioned-types") - .await - .unwrap(); - ctx.register_table("demo", Arc::new(table))?; - - let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?; - - let expected = vec![ - "+----+----+----+", - "| c3 | c1 | c2 |", - "+----+----+----+", - "| 5 | 4 | c |", - "| 6 | 5 | b |", - "| 4 | 6 | a |", - "+----+----+----+", - ]; - - assert_batches_sorted_eq!(&expected, &batches); - - let expected_schema = ArrowSchema::new(vec![ - ArrowField::new("c3", ArrowDataType::Int32, true), - ArrowField::new( - "c1", - ArrowDataType::Dictionary( - Box::new(ArrowDataType::UInt16), - Box::new(ArrowDataType::Int32), - ), - false, - ), - ArrowField::new( - "c2", - ArrowDataType::Dictionary( - Box::new(ArrowDataType::UInt16), - Box::new(ArrowDataType::Utf8), - ), - false, - ), - ]); - - assert_eq!(Arc::new(expected_schema), batches[0].schema()); - - Ok(()) -} - #[tokio::test] async fn test_datafusion_date_column() -> Result<()> { let ctx = SessionContext::new(); From 363b6d39e279aaf5606b758ce2d68a35e4a61c64 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 30 Dec 2022 22:28:34 +0100 Subject: [PATCH 07/12] refactor: enable aws-profile for s3 features --- python/Cargo.toml | 2 +- rust/Cargo.toml | 6 ++---- rust/README.md | 1 - 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index e48545a447..02112d4bc7 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -37,4 +37,4 @@ features = ["extension-module", "abi3", "abi3-py37"] [dependencies.deltalake] path = "../rust" version = "0" -features = ["s3", "azure", "glue", "gcs", "python", "datafusion", "aws-profile"] +features = ["s3", "azure", "glue", "gcs", "python", "datafusion"] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 521dd64b91..f04bd04e4d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -95,6 +95,7 @@ s3 = [ "rusoto_dynamodb/native-tls", "dynamodb_lock/native-tls", "object_store/aws", + "object_store/aws_profile", ] s3-rustls = [ "rusoto_core/rustls", @@ -103,14 +104,11 @@ s3-rustls = [ "rusoto_dynamodb/rustls", "dynamodb_lock/rustls", "object_store/aws", + "object_store/aws_profile", ] glue = ["s3", "rusoto_glue"] python = ["arrow/pyarrow"] -# AWS_PROFILE is experimental in object_store, and comes with a hefty load of dependencies. -# must be used in conjunction with s3 or s3-rustls feature -aws-profile = ["object_store/aws_profile"] - # used only for integration testing integration_test = ["fs_extra", "tempdir"] diff --git a/rust/README.md b/rust/README.md index dbb09e5171..e7e0ff5dcd 100644 --- a/rust/README.md +++ b/rust/README.md @@ -48,7 +48,6 @@ cargo run --example read_delta_table - `gcs` - enable the Google storage backend to work with Delta Tables in Google Cloud Storage. - `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion). - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature -- `aws-profile` - enable support for aws profile authorization in underlying object_store crate - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`. ## Development From f294b20f7c652dd53fbbdc29c0888b2516c94ee8 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 30 Dec 2022 22:37:04 +0100 Subject: [PATCH 08/12] fix: update missed feature flag --- rust/tests/common/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index 8710c97858..37118d4ad5 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -16,7 +16,7 @@ use tempdir::TempDir; #[cfg(feature = "azure")] pub mod adls; pub mod clock; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] pub mod datafusion; #[cfg(any(feature = "s3", feature = "s3-rustls"))] pub mod s3; From 90b9979aead96efbf9509a15daef283499b765ad Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 30 Dec 2022 22:42:09 +0100 Subject: [PATCH 09/12] docs: remove outdated feature docs --- rust/src/lib.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 31cc80b744..4660903f10 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -46,15 +46,6 @@ //! - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature. //! - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features //! are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`. -//! - `aws-profile` - enable support for aws profile authorization in underlying object_store crat -//! -//! Using `aws-profile` requires that credentials being stored in plain-text on the local disk in `~/.aws/credentials` -//! Consider using a manager such as [aws-vault] to avoid additional dependencies and storing -//! credentials in [plain text on disk] -//! -//! [aws-config]: https://docs.rs/aws-config -//! [aws-vault]: https://github.com/99designs/aws-vault -//! [plain text on disk]: https://99designs.com.au/blog/engineering/aws-vault/ //! //! # Querying Delta Tables with Datafusion //! From a6277a924578417a765ecb50690b36e60f67a3fa Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 30 Dec 2022 23:35:30 +0100 Subject: [PATCH 10/12] build: allow building with no-default-features --- .github/workflows/build.yml | 2 ++ rust/src/action/mod.rs | 7 +++++++ rust/src/delta.rs | 14 +++++++++++--- rust/src/lib.rs | 2 +- rust/src/table_state.rs | 9 +++++++-- rust/src/time_utils.rs | 5 +++-- 6 files changed, 31 insertions(+), 8 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 83de3b09c3..adb28fec7b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -48,6 +48,8 @@ jobs: run: cargo clippy --features s3-rustls - name: Check docs run: cargo doc --features azure,datafusion,s3,gcs,glue + - name: Check no default features + run: cargo check --no-default-features test: strategy: diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 3e7b62fffb..8bc09ba8ed 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -233,6 +233,7 @@ impl Add { } /// Get whatever stats are available. Uses (parquet struct) parsed_stats if present falling back to json stats. + #[cfg(any(feature = "parquet", feature = "parquet2"))] pub fn get_stats(&self) -> Result, serde_json::error::Error> { match self.get_stats_parsed() { Ok(Some(stats)) => Ok(Some(stats)), @@ -247,6 +248,12 @@ impl Add { } } + /// Get whatever stats are available. + #[cfg(not(any(feature = "parquet", feature = "parquet2")))] + pub fn get_stats(&self) -> Result, serde_json::error::Error> { + self.get_json_stats() + } + /// Returns the serde_json representation of stats contained in the action if present. /// Since stats are defined as optional in the protocol, this may be None. pub fn get_json_stats(&self) -> Result, serde_json::error::Error> { diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 5a12496b66..478656bc3d 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -92,7 +92,7 @@ pub enum DeltaTableError { }, /// Error returned when parsing checkpoint parquet. - // #[cfg(feature = "parquet")] + #[cfg(any(feature = "parquet", feature = "parquet2"))] #[error("Failed to parse parquet: {}", .source)] Parquet { /// Parquet error details returned when reading the checkpoint failed. @@ -676,6 +676,7 @@ impl DeltaTable { Ok(()) } + #[cfg(any(feature = "parquet", feature = "parquet2"))] async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> { self.state = DeltaTableState::from_checkpoint(self, &check_point).await?; @@ -787,6 +788,7 @@ impl DeltaTable { /// Updates the DeltaTable to the most recent state committed to the transaction log by /// loading the last checkpoint and incrementally applying each version since. + #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { match self.get_last_checkpoint().await { Ok(last_check_point) => { @@ -803,6 +805,12 @@ impl DeltaTable { } } + /// Updates the DeltaTable to the most recent state committed to the transaction log. + #[cfg(not(any(feature = "parquet", feature = "parquet2")))] + pub async fn update(&mut self) -> Result<(), DeltaTableError> { + self.update_incremental().await + } + /// Updates the DeltaTable to the latest version by incrementally applying newer versions. /// It assumes that the table is already updated to the current version `self.version`. pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> { @@ -838,8 +846,9 @@ impl DeltaTable { } } - let mut next_version; + let mut next_version = 0; // 1. find latest checkpoint below version + #[cfg(any(feature = "parquet", feature = "parquet2"))] match self.find_latest_check_point_for_version(version).await? { Some(check_point) => { self.restore_checkpoint(check_point).await?; @@ -848,7 +857,6 @@ impl DeltaTable { None => { // no checkpoint found, clear table state and start from the beginning self.state = DeltaTableState::with_version(0); - next_version = 0; } } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 4660903f10..580de5eea3 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -106,7 +106,7 @@ pub mod delta_arrow; pub mod delta_datafusion; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod operations; -#[cfg(feature = "parquet")] +#[cfg(all(feature = "arrow", feature = "parquet"))] pub mod optimize; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod writer; diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 0d39926655..7b6246f689 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -1,8 +1,8 @@ //! The module for delta table state. use super::{ - ApplyLogError, CheckPoint, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, - DeltaTableConfig, DeltaTableError, DeltaTableMetaData, + ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, + DeltaTableMetaData, }; use crate::action::{self, Action}; use crate::delta_config; @@ -15,6 +15,9 @@ use std::collections::HashSet; use std::convert::TryFrom; use std::io::{BufRead, BufReader, Cursor}; +#[cfg(any(feature = "parquet", feature = "parquet2"))] +use super::{CheckPoint, DeltaTableConfig, DeltaTableError}; + /// State snapshot currently held by the Delta Table instance. #[derive(Default, Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -83,6 +86,7 @@ impl DeltaTableState { } /// Update DeltaTableState with checkpoint data. + #[cfg(any(feature = "parquet", feature = "parquet2"))] pub fn process_checkpoint_bytes( &mut self, data: bytes::Bytes, @@ -133,6 +137,7 @@ impl DeltaTableState { } /// Construct a delta table state object from checkpoint. + #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn from_checkpoint( table: &DeltaTable, check_point: &CheckPoint, diff --git a/rust/src/time_utils.rs b/rust/src/time_utils.rs index f5ef17bc1a..407185b927 100644 --- a/rust/src/time_utils.rs +++ b/rust/src/time_utils.rs @@ -1,8 +1,9 @@ //! Utility functions for converting time formats. +#![allow(unused)] #[cfg(feature = "arrow")] use arrow::temporal_conversions; -#[cfg(not(feature = "parquet2"))] +#[cfg(feature = "parquet")] use parquet::basic::TimeUnit; #[cfg(feature = "parquet2")] use parquet2::schema::types::TimeUnit; @@ -84,7 +85,7 @@ pub fn timestamp_micros_from_stats_string(s: &str) -> Result Option { let dt = match time_unit { TimeUnit::MILLIS(_) => temporal_conversions::timestamp_ms_to_datetime(n), From 3eaa2bd34eb7419aa1757c12a37719ce939ba9e6 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 30 Dec 2022 23:39:27 +0100 Subject: [PATCH 11/12] fix: fmt --- rust/src/table_state.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 7b6246f689..7dcc776099 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -1,8 +1,7 @@ //! The module for delta table state. use super::{ - ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, - DeltaTableMetaData, + ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableMetaData, }; use crate::action::{self, Action}; use crate::delta_config; From 26c1153a2752cfe89de1fa97adc1a2d46c9753e3 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 30 Dec 2022 23:42:34 +0100 Subject: [PATCH 12/12] fix: remove outdated feature flags --- rust/src/builder/mod.rs | 1 - rust/src/lib.rs | 6 ------ 2 files changed, 7 deletions(-) diff --git a/rust/src/builder/mod.rs b/rust/src/builder/mod.rs index 139ff9108f..e3778fdf30 100644 --- a/rust/src/builder/mod.rs +++ b/rust/src/builder/mod.rs @@ -696,7 +696,6 @@ pub fn get_s3_builder_from_options( } builder = builder.with_region(s3_options.region.name()); - #[cfg(feature = "aws-profile")] if let Some(profile) = &s3_options.profile { builder = builder.with_profile(profile); } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 580de5eea3..8c99bd4ea2 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -76,12 +76,6 @@ compile_error!( "Features parquet and parquet2 are mutually exclusive and cannot be enabled together" ); -#[cfg(all( - feature = "aws-profile", - not(any(feature = "s3", feature = "s3-rustls")) -))] -compile_error!("Feature aws-profile must be used together with s3 or s3-rustls feature"); - #[cfg(all(feature = "s3", feature = "s3-rustls"))] compile_error!("Features s3 and s3-rustls are mutually exclusive and cannot be enabled together");