Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: clean up dependencies and feature flags #1014

Merged
merged 14 commits into from
Dec 30, 2022
12 changes: 7 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ jobs:
override: true
- uses: Swatinem/rust-cache@v1
- name: build and lint with clippy
run: cargo clippy --features azure,datafusion-ext,s3,gcs,glue
run: cargo clippy --features azure,datafusion,s3,gcs,glue
- name: Spot-check build for rustls features
run: cargo clippy --features s3-rustls
- name: Check docs
run: cargo doc --features azure,datafusion-ext,s3,gcs,glue
run: cargo doc --features azure,datafusion,s3,gcs,glue
- name: Check no default features
run: cargo check --no-default-features

test:
strategy:
Expand All @@ -68,7 +70,7 @@ jobs:
override: true
- uses: Swatinem/rust-cache@v1
- name: Run tests
run: cargo test --verbose --features datafusion-ext,azure
run: cargo test --verbose --features datafusion,azure

integration_test:
name: Integration Tests
Expand Down Expand Up @@ -107,10 +109,10 @@ jobs:

- name: Run tests with default ssl
run: |
cargo test --features integration_test,azure,s3,gcs,datafusion-ext
cargo test --features integration_test,azure,s3,gcs,datafusion
- name: Run tests with rustls
run: |
cargo test --features integration_test,s3-rustls,datafusion-ext
cargo test --features integration_test,s3-rustls,datafusion

parquet2_test:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ features = ["extension-module", "abi3", "abi3-py37"]
[dependencies.deltalake]
path = "../rust"
version = "0"
features = ["s3", "azure", "glue", "gcs", "python", "datafusion-ext"]
features = ["s3", "azure", "glue", "gcs", "python", "datafusion"]
3 changes: 2 additions & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ devel = [
"pytest-timeout",
"sphinx<=4.5",
"sphinx-rtd-theme",
"toml"
"toml",
"wheel"
]
pyspark = [
"pyspark",
Expand Down
14 changes: 9 additions & 5 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ edition = "2021"
arrow = { version = "28", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = "0.4.22"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
cfg-if = "1"
errno = "0.2"
futures = "0.3"
Expand All @@ -22,7 +22,7 @@ log = "0"
libc = ">=0.2.90, <1"
num-bigint = "0.4"
num-traits = "0.2.15"
object_store = { version = "0.5.2", features = ["aws_profile"] }
object_store = "0.5.2"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "28", features = ["async"], optional = true }
Expand Down Expand Up @@ -77,14 +77,15 @@ glibc_version = { path = "../glibc_version", version = "0.1" }

[features]
default = ["arrow", "parquet"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also add aws-profile in the default feature list? profile based auth is a very common practice in production environment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add it as default to s3* features, since those are the only ones where it is useful?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed!

datafusion-ext = [
"datafusion",
datafusion = [
"dep:datafusion",
"datafusion-expr",
"datafusion-common",
"datafusion-proto",
"arrow",
"parquet",
]
datafusion-ext = ["datafusion"]
azure = ["object_store/azure"]
gcs = ["object_store/gcp"]
s3 = [
Expand All @@ -94,6 +95,7 @@ s3 = [
"rusoto_dynamodb/native-tls",
"dynamodb_lock/native-tls",
"object_store/aws",
"object_store/aws_profile",
]
s3-rustls = [
"rusoto_core/rustls",
Expand All @@ -102,9 +104,11 @@ s3-rustls = [
"rusoto_dynamodb/rustls",
"dynamodb_lock/rustls",
"object_store/aws",
"object_store/aws_profile",
]
glue = ["s3", "rusoto_glue"]
python = ["arrow/pyarrow"]

# used only for integration testing
integration_test = ["fs_extra", "tempdir"]

Expand All @@ -114,4 +118,4 @@ harness = false

[[example]]
name = "basic_operations"
required-features = ["datafusion-ext"]
required-features = ["datafusion"]
18 changes: 6 additions & 12 deletions rust/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
Deltalake
=========
# Deltalake

[![crates.io](https://img.shields.io/crates/v/deltalake.svg?style=flat-square)](https://crates.io/crates/deltalake)
[![api_doc](https://img.shields.io/badge/doc-api-blue)](https://docs.rs/deltalake)

Native Delta Lake implementation in Rust


Usage
-----
## Usage

### API

Expand All @@ -17,7 +14,6 @@ let table = deltalake::open_table("./tests/data/simple_table").await.unwrap();
println!("{}", table.get_files());
```


### CLI

```bash
Expand All @@ -43,20 +39,18 @@ Examples can be run using the `cargo run --example` command. For example:
cargo run --example read_delta_table
```

Optional cargo package features
-----------------------
## Optional cargo package features

- `s3` - enable the S3 storage backend to work with Delta Tables in AWS S3.
- `s3-rustls` - enable the S3 storage backend but rely on [rustls](https://github.com/ctz/rustls) rather than OpenSSL (`native-tls`).
- `glue` - enable the Glue data catalog to work with Delta Tables with AWS Glue.
- `azure` - enable the Azure storage backend to work with Delta Tables in Azure Data Lake Storage Gen2 accounts.
- `gcs` - enable the Google storage backend to work with Delta Tables in Google Cloud Storage.
- `datafusion-ext` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion).
- `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

- `datafusion-ext` - DEPRECATED: alias for `datafusion` feature
- `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`.


Development
-----------
## Development

To run s3 integration tests from local machine, we use docker-compose to stand
up AWS local stack. To spin up the test environment run `docker-compose up` in
Expand Down
7 changes: 7 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl Add {
}

/// Get whatever stats are available. Uses (parquet struct) parsed_stats if present falling back to json stats.
#[cfg(any(feature = "parquet", feature = "parquet2"))]
pub fn get_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
match self.get_stats_parsed() {
Ok(Some(stats)) => Ok(Some(stats)),
Expand All @@ -247,6 +248,12 @@ impl Add {
}
}

/// Get whatever stats are available.
#[cfg(not(any(feature = "parquet", feature = "parquet2")))]
pub fn get_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
self.get_json_stats()
}

/// Returns the serde_json representation of stats contained in the action if present.
/// Since stats are defined as optional in the protocol, this may be None.
pub fn get_json_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
Expand Down
14 changes: 11 additions & 3 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub enum DeltaTableError {
},

/// Error returned when parsing checkpoint parquet.
// #[cfg(feature = "parquet")]
#[cfg(any(feature = "parquet", feature = "parquet2"))]
#[error("Failed to parse parquet: {}", .source)]
Parquet {
/// Parquet error details returned when reading the checkpoint failed.
Expand Down Expand Up @@ -676,6 +676,7 @@ impl DeltaTable {
Ok(())
}

#[cfg(any(feature = "parquet", feature = "parquet2"))]
async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> {
self.state = DeltaTableState::from_checkpoint(self, &check_point).await?;

Expand Down Expand Up @@ -787,6 +788,7 @@ impl DeltaTable {

/// Updates the DeltaTable to the most recent state committed to the transaction log by
/// loading the last checkpoint and incrementally applying each version since.
#[cfg(any(feature = "parquet", feature = "parquet2"))]
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
match self.get_last_checkpoint().await {
Ok(last_check_point) => {
Expand All @@ -803,6 +805,12 @@ impl DeltaTable {
}
}

/// Updates the DeltaTable to the most recent state committed to the transaction log.
#[cfg(not(any(feature = "parquet", feature = "parquet2")))]
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
self.update_incremental().await
}

/// Updates the DeltaTable to the latest version by incrementally applying newer versions.
/// It assumes that the table is already updated to the current version `self.version`.
pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> {
Expand Down Expand Up @@ -838,8 +846,9 @@ impl DeltaTable {
}
}

let mut next_version;
let mut next_version = 0;
// 1. find latest checkpoint below version
#[cfg(any(feature = "parquet", feature = "parquet2"))]
match self.find_latest_check_point_for_version(version).await? {
Some(check_point) => {
self.restore_checkpoint(check_point).await?;
Expand All @@ -848,7 +857,6 @@ impl DeltaTable {
None => {
// no checkpoint found, clear table state and start from the beginning
self.state = DeltaTableState::with_version(0);
next_version = 0;
}
}

Expand Down
24 changes: 11 additions & 13 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@
//! or Azure Blob Storage / Azure Data Lake Storage Gen2 (ADLS2). Use `s3-rustls` to use Rust TLS
//! instead of native TLS implementation.
//! - `glue` - enable the Glue data catalog to work with Delta Tables with AWS Glue.
//! - `datafusion-ext` - enable the `datafusion::datasource::TableProvider` trait implementation
//! - `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation
//! for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion).
//! - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature.
//! - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features
//! are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`.
//!
//! # Querying Delta Tables with Datafusion
//!
Expand All @@ -64,23 +67,18 @@
//! .await.unwrap();
//! };
//! ```
//!
//! It's important to note that the DataFusion library is evolving quickly, often with breaking api
//! changes, and this may cause compilation issues as a result. If you are having issues with the most
//! recently released `delta-rs` you can set a specific branch or commit in your `Cargo.toml`.
//!
//! ```toml
//! datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "07bc2c754805f536fe1cd873dbe6adfc0a21cbb3" }
//! ```

#![deny(warnings)]
#![deny(missing_docs)]

#[cfg(all(feature = "parquet", feature = "parquet2"))]
compile_error!(
"Feature parquet and parquet2 are mutually exclusive and cannot be enabled together"
"Features parquet and parquet2 are mutually exclusive and cannot be enabled together"
);

#[cfg(all(feature = "s3", feature = "s3-rustls"))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

compile_error!("Features s3 and s3-rustls are mutually exclusive and cannot be enabled together");

pub mod action;
pub mod builder;
pub mod data_catalog;
Expand All @@ -98,11 +96,11 @@ pub mod vacuum;
pub mod checkpoints;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod delta_arrow;
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
pub mod delta_datafusion;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod operations;
#[cfg(feature = "parquet")]
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod optimize;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod writer;
Expand All @@ -117,7 +115,7 @@ pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, Object
// convenience exports for consumers to avoid aligning crate versions
#[cfg(feature = "arrow")]
pub use arrow;
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
pub use datafusion;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub use operations::DeltaOps;
Expand Down
16 changes: 8 additions & 8 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError};
pub mod create;
pub mod transaction;

#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
use self::{load::LoadBuilder, write::WriteBuilder};
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
use arrow::record_batch::RecordBatch;
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
pub use datafusion::physical_plan::common::collect as collect_sendable_stream;

#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
mod load;
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
pub mod write;
// TODO the writer module does not actually depend on datafusion,
// eventually we should consolidate with the record batch writer
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
mod writer;

/// Maximum supported writer version
Expand Down Expand Up @@ -93,14 +93,14 @@ impl DeltaOps {
}

/// Load data from a DeltaTable
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
#[must_use]
pub fn load(self) -> LoadBuilder {
LoadBuilder::default().with_object_store(self.0.object_store())
}

/// Write data to Delta table
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
#[must_use]
pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
WriteBuilder::default()
Expand Down
3 changes: 3 additions & 0 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ impl std::future::IntoFuture for WriteBuilder {
let schema = batches[0].schema();

if let Ok(meta) = table.get_metadata() {
// NOTE the schema generated from the delta schema will have the delta field metadata included,
// so we need to compare the field names and datatypes instead.
// TODO update comparison logic, once we have column mappings supported.
let curr_schema: ArrowSchemaRef = Arc::new((&meta.schema).try_into()?);

if !schema_eq(curr_schema, schema.clone()) {
Expand Down
4 changes: 2 additions & 2 deletions rust/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use tokio::io::AsyncWrite;

use crate::get_storage_backend;
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
use datafusion::datasource::object_store::ObjectStoreUrl;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down Expand Up @@ -143,7 +143,7 @@ impl DeltaObjectStore {
self.config.to_uri(&Path::from(""))
}

#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
/// generate a unique enough url to identify the store in datafusion.
pub(crate) fn object_store_url(&self) -> ObjectStoreUrl {
// we are certain, that the URL can be parsed, since
Expand Down
Loading