Skip to content

Commit

Permalink
Support DataFusion 15 (#1021)
Browse files Browse the repository at this point in the history
# Description

Add support for DataFusion 15. 

Upgrades dependencies:
- `DataFusion` from `14` to `15`
- `Arrow` from `26` to `28`

# Related Issue(s)

- closes #1020 

# Documentation
  • Loading branch information
andrei-ionescu authored Dec 16, 2022
1 parent d1e68ca commit bd8db74
Show file tree
Hide file tree
Showing 18 changed files with 125 additions and 50 deletions.
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ crate-type = ["cdylib"]
name = "deltalake._internal"

[dependencies]
arrow-schema = { version = "26", features = ["serde"] }
arrow-schema = { version = "28", features = ["serde"] }
chrono = "0"
env_logger = "0"
futures = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl ObjectInputFile {
}
// reference is end of the stream; offset is usually negative
2 => {
self.pos = self.content_length as i64 + offset;
self.pos = self.content_length + offset;
}
_ => {
return Err(PyValueError::new_err(
Expand Down
12 changes: 6 additions & 6 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Native Delta Lake implementation in Rust"
edition = "2021"

[dependencies]
arrow = { version = "26", optional = true }
arrow = { version = "28", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = "0.4.22"
Expand All @@ -25,7 +25,7 @@ num-traits = "0.2.15"
object_store = { version = "0.5.2", features = ["aws_profile"] }
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "26", features = ["async"], optional = true }
parquet = { version = "28", features = ["async"], optional = true }
parquet2 = { version = "0.17", optional = true }
percent-encoding = "2"
serde = { version = "1", features = ["derive"] }
Expand All @@ -46,10 +46,10 @@ rusoto_dynamodb = { version = "0.48", default-features = false, optional = true
rusoto_glue = { version = "0.48", default-features = false, optional = true }

# Datafusion
datafusion = { version = "14", optional = true }
datafusion-expr = { version = "14", optional = true }
datafusion-common = { version = "14", optional = true }
datafusion-proto = { version = "14", optional = true }
datafusion = { version = "15", optional = true }
datafusion-expr = { version = "15", optional = true }
datafusion-common = { version = "15", optional = true }
datafusion-proto = { version = "15", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
Expand Down
37 changes: 14 additions & 23 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use arrow::datatypes::{
use arrow::error::ArrowError;
use lazy_static::lazy_static;
use regex::Regex;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::convert::TryFrom;

impl TryFrom<&schema::Schema> for ArrowSchema {
Expand All @@ -30,24 +28,20 @@ impl TryFrom<&schema::SchemaField> for ArrowField {
type Error = ArrowError;

fn try_from(f: &schema::SchemaField) -> Result<Self, ArrowError> {
let mut field = ArrowField::new(
let metadata = f
.get_metadata()
.iter()
.map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?)))
.collect::<Result<_, serde_json::Error>>()
.map_err(|err| ArrowError::JsonError(err.to_string()))?;

let field = ArrowField::new(
f.get_name(),
ArrowDataType::try_from(f.get_type())?,
f.is_nullable(),
);
)
.with_metadata(metadata);

let metadata: Option<BTreeMap<String, String>> = Some(f.get_metadata())
.filter(|metadata| metadata.is_empty())
.map(|metadata| {
metadata
.iter()
.map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?)))
.collect::<Result<_, serde_json::Error>>()
.map_err(|err| ArrowError::JsonError(err.to_string()))
})
.transpose()?;

field.set_metadata(metadata);
Ok(field)
}
}
Expand Down Expand Up @@ -111,7 +105,7 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType {
))
})?;
let precision = extract.get(1).and_then(|v| v.as_str().parse::<u8>().ok());
let scale = extract.get(2).and_then(|v| v.as_str().parse::<u8>().ok());
let scale = extract.get(2).and_then(|v| v.as_str().parse::<i8>().ok());
match (precision, scale) {
// TODO how do we decide which variant (128 / 256) to use?
(Some(p), Some(s)) => Ok(ArrowDataType::Decimal128(p, s)),
Expand Down Expand Up @@ -205,12 +199,9 @@ impl TryFrom<&ArrowField> for schema::SchemaField {
arrow_field.is_nullable(),
arrow_field
.metadata()
.as_ref()
.map_or_else(HashMap::new, |m| {
m.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
.collect()
}),
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
.collect(),
))
}
}
Expand Down
37 changes: 27 additions & 10 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::sync::Arc;
use arrow::array::ArrayRef;
use arrow::compute::{cast_with_options, CastOptions};
use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
Expand All @@ -42,14 +43,16 @@ use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::file_format::{partition_type_wrap, FileScanConfig};
use datafusion::physical_plan::{
ColumnStatistics, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, DataFusionError, Result as DataFusionResult};
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::{Expr, Extension, LogicalPlan};
use datafusion_proto::logical_plan::{LogicalExtensionCodec, PhysicalExtensionCodec};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use object_store::{path::Path, ObjectMeta};
use url::Url;

Expand Down Expand Up @@ -331,7 +334,7 @@ impl TableProvider for DeltaTable {
async fn scan(
&self,
session: &SessionState,
projection: &Option<Vec<usize>>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -382,16 +385,26 @@ impl TableProvider for DeltaTable {
.cloned()
.collect(),
));
let parquet_scan = ParquetFormat::default()

let parquet_scan = ParquetFormat::new(session.config_options())
.create_physical_plan(
FileScanConfig {
object_store_url: self.storage.object_store_url(),
file_schema,
file_groups: file_groups.into_values().collect(),
statistics: self.datafusion_table_statistics(),
projection: projection.clone(),
projection: projection.cloned(),
limit,
table_partition_cols,
table_partition_cols: table_partition_cols
.iter()
.map(|c| {
Ok((
c.to_owned(),
partition_type_wrap(schema.field_with_name(c)?.data_type().clone()),
))
})
.collect::<Result<Vec<_>, ArrowError>>()?,
output_ordering: None,
config_options: Default::default(),
},
filters,
Expand Down Expand Up @@ -829,7 +842,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec {
fn try_encode_table_provider(
&self,
node: Arc<dyn TableProvider>,
mut buf: &mut Vec<u8>,
buf: &mut Vec<u8>,
) -> Result<(), DataFusionError> {
let table = node
.as_ref()
Expand All @@ -838,7 +851,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec {
.ok_or_else(|| {
DataFusionError::Internal("Can't encode non-delta tables".to_string())
})?;
serde_json::to_writer(&mut buf, table)
serde_json::to_writer(buf, table)
.map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
}
}
Expand All @@ -848,8 +861,12 @@ pub struct DeltaTableFactory {}

#[async_trait]
impl TableProviderFactory for DeltaTableFactory {
async fn create(&self, url: &str) -> datafusion::error::Result<Arc<dyn TableProvider>> {
let provider = open_table(url).await.unwrap();
async fn create(
&self,
_ctx: &SessionState,
cmd: &CreateExternalTable,
) -> datafusion::error::Result<Arc<dyn TableProvider>> {
let provider = open_table(cmd.to_owned().location).await.unwrap();
Ok(Arc::new(provider))
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl std::future::IntoFuture for LoadBuilder {
ctx.state()
.runtime_env
.register_object_store(scheme, "", store);
let scan_plan = table.scan(&ctx.state(), &None, &[], None).await?;
let scan_plan = table.scan(&ctx.state(), None, &[], None).await?;
let plan = CoalescePartitionsExec::new(scan_plan);
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = plan.execute(0, task_ctx)?;
Expand Down
19 changes: 16 additions & 3 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::storage::DeltaObjectStore;
use crate::writer::record_batch::divide_by_partition_values;
use crate::writer::utils::PartitionPath;

use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::datatypes::{DataType, SchemaRef as ArrowSchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan};
Expand Down Expand Up @@ -196,11 +196,23 @@ impl std::future::IntoFuture for WriteBuilder {
fn into_future(self) -> Self::IntoFuture {
let this = self;

fn schema_to_vec_name_type(schema: ArrowSchemaRef) -> Vec<(String, DataType)> {
schema
.fields()
.iter()
.map(|f| (f.name().to_owned(), f.data_type().clone()))
.collect::<Vec<_>>()
}

fn schema_eq(l: ArrowSchemaRef, r: ArrowSchemaRef) -> bool {
schema_to_vec_name_type(l) == schema_to_vec_name_type(r)
}

Box::pin(async move {
let object_store = if let Some(store) = this.object_store {
Ok(store)
} else {
DeltaTableBuilder::from_uri(&this.location.unwrap())
DeltaTableBuilder::from_uri(this.location.unwrap())
.with_storage_options(this.storage_options.unwrap_or_default())
.build_storage()
}?;
Expand Down Expand Up @@ -274,7 +286,8 @@ impl std::future::IntoFuture for WriteBuilder {

if let Ok(meta) = table.get_metadata() {
let curr_schema: ArrowSchemaRef = Arc::new((&meta.schema).try_into()?);
if schema != curr_schema {

if !schema_eq(curr_schema, schema.clone()) {
return Err(DeltaTableError::Generic(
"Updating table schema not yet implemented".to_string(),
));
Expand Down
2 changes: 1 addition & 1 deletion rust/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl JsonWriter {
partition_columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
) -> Result<Self, DeltaTableError> {
let storage = DeltaTableBuilder::from_uri(&table_uri)
let storage = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build_storage()?;

Expand Down
2 changes: 1 addition & 1 deletion rust/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub(crate) fn apply_null_counts(

array
.columns()
.into_iter()
.iter()
.zip(fields)
.for_each(|(column, field)| {
let key = field.name().to_owned();
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}
{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}
{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":"b"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}
{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
54 changes: 51 additions & 3 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use arrow::array::*;
use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::{SessionContext, TaskContext};
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn prepare_table(
#[tokio::test]
async fn test_datafusion_sql_registration() -> Result<()> {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> = HashMap::new();
table_factories.insert("deltatable".to_string(), Arc::new(DeltaTableFactory {}));
table_factories.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
let env = RuntimeEnv::new(cfg).unwrap();
let ses = SessionConfig::new();
Expand Down Expand Up @@ -255,7 +256,7 @@ async fn test_files_scanned() -> Result<()> {
assert_eq!(table.version(), 2);

let ctx = SessionContext::new();
let plan = table.scan(&ctx.state(), &None, &[], None).await?;
let plan = table.scan(&ctx.state(), None, &[], None).await?;
let plan = CoalescePartitionsExec::new(plan.clone());

let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
Expand All @@ -270,7 +271,7 @@ async fn test_files_scanned() -> Result<()> {
Expr::Literal(ScalarValue::Int32(Some(5))),
);

let plan = CoalescePartitionsExec::new(table.scan(&ctx.state(), &None, &[filter], None).await?);
let plan = CoalescePartitionsExec::new(table.scan(&ctx.state(), None, &[filter], None).await?);
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let _result = common::collect(plan.execute(0, task_ctx)?).await?;

Expand All @@ -280,3 +281,50 @@ async fn test_files_scanned() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_datafusion_partitioned_types() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/delta-2.2.0-partitioned-types")
.await
.unwrap();
ctx.register_table("demo", Arc::new(table))?;

let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?;

let expected = vec![
"+----+----+----+",
"| c3 | c1 | c2 |",
"+----+----+----+",
"| 5 | 4 | c |",
"| 6 | 5 | b |",
"| 4 | 6 | a |",
"+----+----+----+",
];

assert_batches_sorted_eq!(&expected, &batches);

let expected_schema = ArrowSchema::new(vec![
ArrowField::new("c3", ArrowDataType::Int32, true),
ArrowField::new(
"c1",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(ArrowDataType::Int32),
),
false,
),
ArrowField::new(
"c2",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(ArrowDataType::Utf8),
),
false,
),
]);

assert_eq!(Arc::new(expected_schema), batches[0].schema());

Ok(())
}

0 comments on commit bd8db74

Please sign in to comment.