Skip to content

Commit

Permalink
chore: bump datafusion to 25 (#1389)
Browse files Browse the repository at this point in the history
# Description

Updates arrow and datafusion to latest dependencies.
  • Loading branch information
roeap authored May 23, 2023
1 parent 2350d16 commit 930d16e
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 50 deletions.
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ doc = false
name = "deltalake._internal"

[dependencies]
arrow-schema = { version = "36", features = ["serde"] }
arrow-schema = { version = "39", features = ["serde"] }
chrono = "0"
env_logger = "0"
futures = "0.3"
Expand Down
28 changes: 12 additions & 16 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ readme = "README.md"
edition = "2021"

[dependencies]
arrow = { version = "38", optional = true }
arrow-array = { version = "38", optional = true }
arrow-cast = { version = "38", optional = true }
arrow-schema = { version = "38", optional = true }
arrow = { version = "39", optional = true }
arrow-array = { version = "39", optional = true }
arrow-cast = { version = "39", optional = true }
arrow-schema = { version = "39", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
Expand All @@ -36,7 +36,7 @@ num-traits = "0.2.15"
object_store = "0.5.6"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "38", features = [
parquet = { version = "39", features = [
"async",
"object_store",
], optional = true }
Expand Down Expand Up @@ -65,12 +65,12 @@ reqwest-middleware = { version = "0.2.1", optional = true }
reqwest-retry = { version = "0.2.2", optional = true }

# Datafusion
datafusion = { version = "24", optional = true }
datafusion-expr = { version = "24", optional = true }
datafusion-common = { version = "24", optional = true }
datafusion-proto = { version = "24", optional = true }
datafusion-sql = { version = "24", optional = true }
datafusion-physical-expr = { version = "24", optional = true }
datafusion = { version = "25", optional = true }
datafusion-expr = { version = "25", optional = true }
datafusion-common = { version = "25", optional = true }
datafusion-proto = { version = "25", optional = true }
datafusion-sql = { version = "25", optional = true }
datafusion-physical-expr = { version = "25", optional = true }

sqlparser = { version = "0.33", optional = true }

Expand Down Expand Up @@ -142,11 +142,7 @@ s3 = [
"object_store/aws",
"object_store/aws_profile",
]
unity-experimental = [
"reqwest",
"reqwest-middleware",
"reqwest-retry",
]
unity-experimental = ["reqwest", "reqwest-middleware", "reqwest-retry"]

[[bench]]
name = "read_checkpoint"
Expand Down
6 changes: 2 additions & 4 deletions rust/examples/recordbatch-writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,13 @@ fn convert_to_batch(table: &DeltaTable, records: &Vec<WeatherRecord>) -> RecordB
* Table in an existing directory that doesn't currently contain a Delta table
*/
async fn create_initialized_table(table_path: &Path) -> DeltaTable {
let table = DeltaOps::try_from_uri(table_path)
DeltaOps::try_from_uri(table_path)
.await
.unwrap()
.create()
.with_columns(WeatherRecord::columns())
.await
.unwrap();

table
.unwrap()
}

#[cfg(test)]
Expand Down
12 changes: 5 additions & 7 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,10 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::RecordBatchStream;
use datafusion::prelude::Expr;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::TreeNode;
use datafusion_common::tree_node::TreeNodeVisitor;
use datafusion_common::tree_node::VisitRecursion;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::DFSchema;
use datafusion_expr::col;
use datafusion_expr::Volatility;
use datafusion_expr::expr::{ScalarFunction, ScalarUDF};
use datafusion_expr::{col, Volatility};
use futures::future::BoxFuture;
use futures::stream::StreamExt;
use parquet::file::properties::WriterProperties;
Expand Down Expand Up @@ -283,7 +281,7 @@ impl TreeNodeVisitor for ExprProperties {
| Expr::Case(_)
| Expr::Cast(_)
| Expr::TryCast(_) => (),
Expr::ScalarFunction { fun, .. } => {
Expr::ScalarFunction(ScalarFunction { fun, .. }) => {
let v = fun.volatility();
if v > Volatility::Immutable {
self.result = Err(DeltaTableError::Generic(format!(
Expand All @@ -293,7 +291,7 @@ impl TreeNodeVisitor for ExprProperties {
return Ok(VisitRecursion::Stop);
}
}
Expr::ScalarUDF { fun, .. } => {
Expr::ScalarUDF(ScalarUDF { fun, .. }) => {
let v = fun.signature.volatility;
if v > Volatility::Immutable {
self.result = Err(DeltaTableError::Generic(format!(
Expand Down
38 changes: 21 additions & 17 deletions rust/src/table_state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use arrow::array::{
};
use arrow::compute::cast;
use arrow::compute::kernels::cast_utils::Parser;
use arrow::datatypes::{DataType, Date32Type, Field, TimeUnit, TimestampMicrosecondType};
use arrow::datatypes::{DataType, Date32Type, Field, Fields, TimeUnit, TimestampMicrosecondType};
use itertools::Itertools;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -193,14 +193,17 @@ impl DeltaTableState {
let fields = partition_column_types
.into_iter()
.zip(metadata.partition_columns.iter())
.map(|(datatype, name)| arrow::datatypes::Field::new(name, datatype, true));
let field_arrays = fields
.zip(partition_columns.into_iter())
.map(|(datatype, name)| arrow::datatypes::Field::new(name, datatype, true))
.collect::<Vec<_>>();
if field_arrays.is_empty() {

if fields.is_empty() {
vec![]
} else {
let arr = Arc::new(arrow::array::StructArray::from(field_arrays));
let arr = Arc::new(arrow::array::StructArray::try_new(
Fields::from(fields),
partition_columns,
None,
)?);
vec![(Cow::Borrowed("partition_values"), arr)]
}
};
Expand Down Expand Up @@ -259,16 +262,13 @@ impl DeltaTableState {
.map(|(key, array)| (format!("tags.{key}"), array)),
)?)
} else {
let (fields, arrays): (Vec<_>, Vec<_>) = arrays
.into_iter()
.map(|(key, array)| (Field::new(key, array.data_type().clone(), true), array))
.unzip();
Ok(arrow::record_batch::RecordBatch::try_from_iter(vec![(
"tags",
Arc::new(StructArray::from(
arrays
.into_iter()
.map(|(key, array)| {
(Field::new(key, array.data_type().clone(), true), array)
})
.collect_vec(),
)) as ArrayRef,
Arc::new(StructArray::new(Fields::from(fields), arrays, None)) as ArrayRef,
)])?)
}
}
Expand Down Expand Up @@ -418,7 +418,7 @@ impl DeltaTableState {
let combine_arrays = |sub_fields: &Vec<ColStats>,
getter: for<'a> fn(&'a ColStats) -> &'a Option<ArrayRef>|
-> Option<ArrayRef> {
let fields = sub_fields
let (fields, arrays): (Vec<_>, Vec<_>) = sub_fields
.iter()
.flat_map(|sub_field| {
if let Some(values) = getter(sub_field) {
Expand All @@ -435,11 +435,15 @@ impl DeltaTableState {
None
}
})
.collect::<Vec<_>>();
.unzip();
if fields.is_empty() {
None
} else {
Some(Arc::new(StructArray::from(fields)))
Some(Arc::new(StructArray::new(
Fields::from(fields),
arrays,
None,
)))
}
};

Expand Down
11 changes: 6 additions & 5 deletions rust/tests/add_actions_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use arrow::array::{self, ArrayRef, StructArray};
use arrow::compute::kernels::cast_utils::Parser;
use arrow::compute::sort_to_indices;
use arrow::datatypes::{DataType, Date32Type, Field, TimestampMicrosecondType};
use arrow::datatypes::{DataType, Date32Type, Field, Fields, TimestampMicrosecondType};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;

Expand Down Expand Up @@ -54,10 +54,11 @@ async fn test_with_partitions() {

expected_columns[4] = (
"partition_values",
Arc::new(array::StructArray::from(vec![(
Field::new("k", DataType::Utf8, true),
Arc::new(array::StringArray::from(vec![Some("A"), None])) as ArrayRef,
)])),
Arc::new(array::StructArray::new(
Fields::from(vec![Field::new("k", DataType::Utf8, true)]),
vec![Arc::new(array::StringArray::from(vec![Some("A"), None])) as ArrayRef],
None,
)),
);
let expected = RecordBatch::try_from_iter(expected_columns).unwrap();

Expand Down

0 comments on commit 930d16e

Please sign in to comment.