diff --git a/Cargo.lock b/Cargo.lock index c9cd0e5bda..7da44862bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,15 +91,16 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "24.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d68391300d5237f6725f0f869ae7cb65d45fcf8a6d18f6ceecd328fb803bef93" +checksum = "e24e2bcd431a4aa0ff003fdd2dc21c78cfb42f31459c89d2312c2746fe17a5ac" dependencies = [ "ahash 0.8.2", "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", + "arrow-select", "bitflags", "chrono", "comfy-table", @@ -120,9 +121,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "24.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0bb00c5862b5eea683812083c495bef01a9a5149da46ad2f4c0e4aa8800f64d" +checksum = "c9044300874385f19e77cbf90911e239bd23630d8f23bb0f948f9067998a13b7" dependencies = [ "ahash 0.8.2", "arrow-buffer", @@ -136,18 +137,19 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "24.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e594d0fe0026a8bc2459bdc5ac9623e5fb666724a715e0acbc96ba30c5d4cc7" +checksum = "78476cbe9e3f808dcecab86afe42d573863c63e149c62e6e379ed2522743e626" dependencies = [ "half 2.1.0", + "num", ] [[package]] name = "arrow-data" -version = "24.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8500df05060d86fdc53e9b5cb32e51bfeaacc040fdeced3eb99ac0d59200ff45" +checksum = "4d916feee158c485dad4f701cba31bc9a90a8db87d9df8e2aa8adc0c20a2bbb9" dependencies = [ "arrow-buffer", "arrow-schema", @@ -157,13 +159,41 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "24.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86d1fef01f25e1452c86fa6887f078de8e0aaeeb828370feab205944cfc30e27" +checksum = "0f9406eb7834ca6bd8350d1baa515d18b9fcec487eddacfb62f5e19511f7bd37" dependencies = [ "serde", ] +[[package]] +name = "arrow-select" +version = "26.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6593a01586751c74498495d2f5a01fcd438102b52965c11dd98abf4ebcacef37" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "async-compression" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" +dependencies = [ + "bzip2", + "flate2", + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -317,6 +347,27 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +[[package]] +name = "bzip2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6afcd980b5f3a45017c57e57a2fcccbb351cc43a356ce117ef760ef8052b89b0" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cast" version = "0.3.0" @@ -681,14 +732,16 @@ dependencies = [ [[package]] name = "datafusion" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bdec06a3db088da76fc28cb0877b8b5438ca6b6025e04d975bace0fd85df19" +checksum = "e7a8411475928479fe57af18698626f0a44f3c29153e051dce45f7455c08a6d5" dependencies = [ "ahash 0.8.2", "arrow", + "async-compression", "async-trait", "bytes", + "bzip2", "chrono", "datafusion-common", "datafusion-expr", @@ -696,6 +749,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-row", "datafusion-sql", + "flate2", "futures", "glob", "hashbrown", @@ -708,6 +762,7 @@ dependencies = [ "parking_lot", "parquet", "paste", + "percent-encoding", "pin-project-lite", "rand 0.8.5", "smallvec", @@ -715,17 +770,19 @@ dependencies = [ "tempfile", "tokio", "tokio-stream", + "tokio-util", "url", "uuid", ] [[package]] name = "datafusion-common" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "506eab038bf2d39ac02c22be30b019873ca01f887148b939d309a0e9523f4515" +checksum = "15f1ffcbc1f040c9ab99f41db1c743d95aff267bb2e7286aaa010738b7402251" dependencies = [ "arrow", + "chrono", "object_store", "ordered-float 3.4.0", "parquet", @@ -734,21 +791,22 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d2810e369c735d69479e27fe8410e97a76ed07484aa9b3ad7c039efa504257" +checksum = "1883d9590d303ef38fa295567e7fdb9f8f5f511fcc167412d232844678cd295c" dependencies = [ "ahash 0.8.2", "arrow", "datafusion-common", + "log", "sqlparser", ] [[package]] name = "datafusion-optimizer" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60f3b80326243629d02e33f37e955a7114781c6c44caf9d8b254618157de7143" +checksum = "2127d46d566ab3463d70da9675fc07b9d634be8d17e80d0e1ce79600709fe651" dependencies = [ "arrow", "async-trait", @@ -762,34 +820,40 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9bf3b7ae861d351a85174fd4fddb29d249950b2f23671318971960452b4b9ab" +checksum = "0d108b6fe8eeb317ecad1d74619e8758de49cccc8c771b56c97962fd52eaae23" dependencies = [ "ahash 0.8.2", "arrow", + "arrow-buffer", + "arrow-schema", "blake2", "blake3", "chrono", "datafusion-common", "datafusion-expr", "datafusion-row", + "half 2.1.0", "hashbrown", + "itertools", "lazy_static", "md-5 0.10.5", + "num-traits", "ordered-float 3.4.0", "paste", "rand 0.8.5", "regex", "sha2 0.10.6", "unicode-segmentation", + "uuid", ] [[package]] name = "datafusion-row" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f44a2a722719c569b437b3aa2108a99dc911369e8d86c44e6293225c3387147" +checksum = "43537b6377d506e4788bf21e9ed943340e076b48ca4d077e6ea4405ca5e54a1c" dependencies = [ "arrow", "datafusion-common", @@ -799,9 +863,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e98493e04385c924d1d3d7ab8739c41f1ebf676a46863181103a0fb9c7168fa9" +checksum = "244d08d4710e1088d9c0949c9b5b8d68d9cf2cde7203134a4cc389e870fe2354" dependencies = [ "arrow", "datafusion-common", @@ -1039,12 +1103,11 @@ dependencies = [ [[package]] name = "flatbuffers" -version = "2.1.2" +version = "22.9.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86b428b715fdbdd1c364b84573b5fdc0f84f8e423661b9f398735278bc7f2b6a" +checksum = "8ce016b9901aef3579617931fbb2df8fc9a9f7cb95a16eb8acc8148209bb9e70" dependencies = [ "bitflags", - "smallvec", "thiserror", ] @@ -2054,9 +2117,9 @@ dependencies = [ [[package]] name = "parquet" -version = "24.0.0" +version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74fd590f0672998df84503d1bcbebc69732583d03cc3495c7dd8d3e5a1d8437f" +checksum = "3bf8fa7ab6572791325a8595f55dc532dde88b996ae10a5ca8a2db746784ecc4" dependencies = [ "ahash 0.8.2", "arrow", @@ -2070,7 +2133,6 @@ dependencies = [ "lz4", "num", "num-bigint", - "rand 0.8.5", "seq-macro", "snap", "thrift", @@ -2967,9 +3029,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "sqlparser" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0781f2b6bd03e5adf065c8e772b49eaea9f640d06a1b9130330fe8bd2563f4fd" +checksum = "86be66ea0b2b22749cfa157d16e2e84bf793e626a3375f4d378dc289fa03affb" dependencies = [ "log", ] diff --git a/python/Cargo.toml b/python/Cargo.toml index c409aa53b3..e8370c0292 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -17,7 +17,7 @@ crate-type = ["cdylib"] name = "deltalake._internal" [dependencies] -arrow-schema = { version = "24", features = ["serde"] } +arrow-schema = { version = "26", features = ["serde"] } chrono = "0" env_logger = "0" futures = "0.3" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index b99b0f1096..3d919c6d73 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -10,7 +10,7 @@ description = "Native Delta Lake implementation in Rust" edition = "2021" [dependencies] -arrow = { version = "24", optional = true } +arrow = { version = "26", optional = true } async-trait = "0.1" bytes = "1" chrono = "0.4.22" @@ -25,7 +25,7 @@ num-traits = "0.2.15" object_store = "0.5.1" once_cell = "1.16.0" parking_lot = "0.12" -parquet = { version = "24", features = ["async"], optional = true } +parquet = { version = "26", features = ["async"], optional = true } parquet2 = { version = "0.16", optional = true } percent-encoding = "2" serde = { version = "1", features = ["derive"] } @@ -46,9 +46,9 @@ rusoto_dynamodb = { version = "0.48", default-features = false, optional = true rusoto_glue = { version = "0.48", default-features = false, optional = true } # Datafusion -datafusion = { version = "13", optional = true } -datafusion-expr = { version = "13", optional = true } -datafusion-common = { version = "13", optional = true } +datafusion = { version = "14", optional = true } +datafusion-expr = { version = "14", optional = true } +datafusion-common = { version = "14", optional = true } # NOTE dependencies only for integration tests fs_extra = { version = "1.2.0", optional = true } diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 2ef8e0336b..4ede6226c6 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -38,12 +38,13 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType}; use datafusion::execution::context::{SessionContext, SessionState}; +use datafusion::optimizer::utils::conjunction; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics}; use datafusion_common::scalar::ScalarValue; use datafusion_common::{Column, DataFusionError, Result as DataFusionResult}; -use datafusion_expr::{combine_filters, Expr}; +use datafusion_expr::Expr; use object_store::{path::Path, ObjectMeta}; use url::Url; @@ -332,7 +333,9 @@ impl TableProvider for DeltaTable { // and partitions are somewhat evenly distributed, probably not the worst choice ... // However we may want to do some additional balancing in case we are far off from the above. let mut file_groups: HashMap, Vec> = HashMap::new(); - if let Some(Some(predicate)) = (!filters.is_empty()).then_some(combine_filters(filters)) { + if let Some(Some(predicate)) = + (!filters.is_empty()).then_some(conjunction(filters.iter().cloned())) + { let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?; let files_to_prune = pruning_predicate.prune(self)?; self.get_state() @@ -377,6 +380,7 @@ impl TableProvider for DeltaTable { projection: projection.clone(), limit, table_partition_cols, + config_options: Default::default(), }, filters, ) diff --git a/rust/src/time_utils.rs b/rust/src/time_utils.rs index b8b8b0b12a..f5ef17bc1a 100644 --- a/rust/src/time_utils.rs +++ b/rust/src/time_utils.rs @@ -22,7 +22,7 @@ mod temporal_conversions { /// converts a `i64` representing a `timestamp(ms)` to [`NaiveDateTime`] #[inline] - pub fn timestamp_ms_to_datetime(v: i64) -> NaiveDateTime { + pub fn timestamp_ms_to_datetime(v: i64) -> Option { let (sec, milli_sec) = split_second(v, MILLISECONDS); NaiveDateTime::from_timestamp_opt( @@ -31,12 +31,11 @@ mod temporal_conversions { // discard extracted seconds and convert milliseconds to nanoseconds milli_sec * MICROSECONDS as u32, ) - .unwrap() } /// converts a `i64` representing a `timestamp(us)` to [`NaiveDateTime`] #[inline] - pub fn timestamp_us_to_datetime(v: i64) -> NaiveDateTime { + pub fn timestamp_us_to_datetime(v: i64) -> Option { let (sec, micro_sec) = split_second(v, MICROSECONDS); NaiveDateTime::from_timestamp_opt( @@ -45,12 +44,11 @@ mod temporal_conversions { // discard extracted seconds and convert microseconds to nanoseconds micro_sec * MILLISECONDS as u32, ) - .unwrap() } /// converts a `i64` representing a `timestamp(ns)` to [`NaiveDateTime`] #[inline] - pub fn timestamp_ns_to_datetime(v: i64) -> NaiveDateTime { + pub fn timestamp_ns_to_datetime(v: i64) -> Option { let (sec, nano_sec) = split_second(v, NANOSECONDS); NaiveDateTime::from_timestamp_opt( @@ -58,7 +56,6 @@ mod temporal_conversions { sec, // discard extracted seconds nano_sec, ) - .unwrap() } /// @@ -88,26 +85,26 @@ pub fn timestamp_micros_from_stats_string(s: &str) -> Result String { +pub fn timestamp_to_delta_stats_string(n: i64, time_unit: &TimeUnit) -> Option { let dt = match time_unit { TimeUnit::MILLIS(_) => temporal_conversions::timestamp_ms_to_datetime(n), TimeUnit::MICROS(_) => temporal_conversions::timestamp_us_to_datetime(n), TimeUnit::NANOS(_) => temporal_conversions::timestamp_ns_to_datetime(n), - }; + }?; - format!("{}", dt.format("%Y-%m-%dT%H:%M:%S%.3fZ")) + Some(format!("{}", dt.format("%Y-%m-%dT%H:%M:%S%.3fZ"))) } /// Convert the timestamp to a ISO-8601 style format suitable for JSON statistics. #[cfg(feature = "parquet2")] -pub fn timestamp_to_delta_stats_string(n: i64, time_unit: &TimeUnit) -> String { +pub fn timestamp_to_delta_stats_string(n: i64, time_unit: &TimeUnit) -> Option { let dt = match time_unit { TimeUnit::Milliseconds => temporal_conversions::timestamp_ms_to_datetime(n), TimeUnit::Microseconds => temporal_conversions::timestamp_us_to_datetime(n), TimeUnit::Nanoseconds => temporal_conversions::timestamp_ns_to_datetime(n), - }; + }?; - format!("{}", dt.format("%Y-%m-%dT%H:%M:%S%.3fZ")) + Some(format!("{}", dt.format("%Y-%m-%dT%H:%M:%S%.3fZ"))) } #[cfg(test)] @@ -120,28 +117,32 @@ mod tests { #[test] fn test_timestamp_to_delta_stats_string() { let s = - timestamp_to_delta_stats_string(1628685199541, &TimeUnit::MILLIS(MilliSeconds::new())); + timestamp_to_delta_stats_string(1628685199541, &TimeUnit::MILLIS(MilliSeconds::new())) + .unwrap(); assert_eq!("2021-08-11T12:33:19.541Z".to_string(), s); let s = timestamp_to_delta_stats_string( 1628685199541000, &TimeUnit::MICROS(MicroSeconds::new()), - ); + ) + .unwrap(); assert_eq!("2021-08-11T12:33:19.541Z".to_string(), s); let s = timestamp_to_delta_stats_string( 1628685199541000000, &TimeUnit::NANOS(NanoSeconds::new()), - ); + ) + .unwrap(); assert_eq!("2021-08-11T12:33:19.541Z".to_string(), s); } #[cfg(feature = "parquet2")] #[test] fn test_timestamp_to_delta_stats_string() { - let s = timestamp_to_delta_stats_string(1628685199541, &TimeUnit::Milliseconds); + let s = timestamp_to_delta_stats_string(1628685199541, &TimeUnit::Milliseconds).unwrap(); assert_eq!("2021-08-11T12:33:19.541Z".to_string(), s); - let s = timestamp_to_delta_stats_string(1628685199541000, &TimeUnit::Microseconds); + let s = timestamp_to_delta_stats_string(1628685199541000, &TimeUnit::Microseconds).unwrap(); assert_eq!("2021-08-11T12:33:19.541Z".to_string(), s); - let s = timestamp_to_delta_stats_string(1628685199541000000, &TimeUnit::Nanoseconds); + let s = + timestamp_to_delta_stats_string(1628685199541000000, &TimeUnit::Nanoseconds).unwrap(); assert_eq!("2021-08-11T12:33:19.541Z".to_string(), s); } diff --git a/rust/src/writer/stats.rs b/rust/src/writer/stats.rs index 1fb3467796..e14b7dc333 100644 --- a/rust/src/writer/stats.rs +++ b/rust/src/writer/stats.rs @@ -315,8 +315,10 @@ fn min_and_max_from_parquet_statistics( match column_descr.logical_type().as_ref() { Some(LogicalType::Timestamp { unit, .. }) => { - let min = min.map(|n| Value::String(timestamp_to_delta_stats_string(n, unit))); - let max = max.map(|n| Value::String(timestamp_to_delta_stats_string(n, unit))); + let min = min + .and_then(|n| timestamp_to_delta_stats_string(n, unit).map(Value::String)); + let max = max + .and_then(|n| timestamp_to_delta_stats_string(n, unit).map(Value::String)); Ok((min, max)) }