Skip to content

Commit

Permalink
fix: get_prune_stats returns homogenous ArrayRef (#1413)
Browse files Browse the repository at this point in the history
# Description
Switch the `get_prune_stats` functions to use `None` to represent `null`
instead of `ScalarValue::Null` as `ArrayRef` must be of all the same
type.

# Related Issue(s)

- closes #1374 

# Documentation


https://github.com/apache/arrow-datafusion/blob/dd5e1dbbfd20539b40ae65acb8883f7e392cba92/datafusion/core/src/physical_optimizer/pruning.rs#L54-L72

---------

Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
  • Loading branch information
cmackenzie1 and rtyler authored Jun 2, 2023
1 parent 3ddb146 commit f67ac09
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 10 deletions.
19 changes: 14 additions & 5 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl DeltaTableState {
}
}

// TODO: Collapse with operations/transaction/state.rs method of same name
fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option<ArrayRef> {
let field = table
.get_schema()
Expand All @@ -262,7 +263,9 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option
Some(v) => serde_json::Value::String(v.to_string()),
None => serde_json::Value::Null,
};
to_correct_scalar_value(&value, &data_type).unwrap_or(ScalarValue::Null)
to_correct_scalar_value(&value, &data_type).unwrap_or(
get_null_of_arrow_type(&data_type).expect("Could not determine null type"),
)
} else if let Ok(Some(statistics)) = add.get_stats() {
let values = if get_max {
statistics.max_values
Expand All @@ -273,9 +276,12 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option
values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type))
.unwrap_or(ScalarValue::Null)
.unwrap_or(
get_null_of_arrow_type(&data_type).expect("Could not determine null type"),
)
} else {
ScalarValue::Null
// No statistics available
get_null_of_arrow_type(&data_type).expect("Could not determine null type")
}
});
ScalarValue::iter_to_array(values).ok()
Expand Down Expand Up @@ -547,7 +553,7 @@ impl ExecutionPlan for DeltaScan {
}
}

fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
match t {
ArrowDataType::Null => Ok(ScalarValue::Null),
ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
Expand Down Expand Up @@ -584,11 +590,14 @@ fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
})
}
ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
k.clone(),
Box::new(get_null_of_arrow_type(v).unwrap()),
)),
//Unsupported types...
ArrowDataType::Float16
| ArrowDataType::Decimal256(_, _)
| ArrowDataType::Union(_, _)
| ArrowDataType::Dictionary(_, _)
| ArrowDataType::LargeList(_)
| ArrowDataType::Struct(_)
| ArrowDataType::List(_)
Expand Down
18 changes: 13 additions & 5 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use sqlparser::parser::Parser;
use sqlparser::tokenizer::Tokenizer;

use crate::action::Add;
use crate::delta_datafusion::{logical_expr_to_physical_expr, to_correct_scalar_value};
use crate::delta_datafusion::{
get_null_of_arrow_type, logical_expr_to_physical_expr, to_correct_scalar_value,
};
use crate::table_state::DeltaTableState;
use crate::DeltaResult;
use crate::DeltaTableError;
Expand Down Expand Up @@ -190,14 +192,18 @@ impl<'a> AddContainer<'a> {
return None;
}

let data_type = field.data_type();

let values = self.inner.iter().map(|add| {
if self.partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
let value = match value {
Some(v) => serde_json::Value::String(v.to_string()),
None => serde_json::Value::Null,
};
to_correct_scalar_value(&value, field.data_type()).unwrap_or(ScalarValue::Null)
to_correct_scalar_value(&value, data_type).unwrap_or(
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
)
} else if let Ok(Some(statistics)) = add.get_stats() {
let values = if get_max {
statistics.max_values
Expand All @@ -207,10 +213,12 @@ impl<'a> AddContainer<'a> {

values
.get(&column.name)
.and_then(|f| to_correct_scalar_value(f.as_value()?, field.data_type()))
.unwrap_or(ScalarValue::Null)
.and_then(|f| to_correct_scalar_value(f.as_value()?, data_type))
.unwrap_or(
get_null_of_arrow_type(data_type).expect("Could not determine null type"),
)
} else {
ScalarValue::Null
get_null_of_arrow_type(data_type).expect("Could not determine null type")
}
});
ScalarValue::iter_to_array(values).ok()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":1}}
{"metaData":{"id":"d5ad9276-c21f-474e-bfa8-996099dce265","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"temperature\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"createdTime":1684886484991,"configuration":{}}}
{"commitInfo":{"timestamp":1684886484992,"operation":"CREATE TABLE","operationParameters":{"mode":"ErrorIfExists","metadata":"{\"configuration\":{},\"created_time\":1684886484991,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"d5ad9276-c21f-474e-bfa8-996099dce265\",\"name\":null,\"partition_columns\":[\"date\"],\"schema\":{\"fields\":[{\"metadata\":{},\"name\":\"timestamp\",\"nullable\":true,\"type\":\"timestamp\"},{\"metadata\":{},\"name\":\"temperature\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"date\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}}","protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":1}","location":"file:///Users/cole/github.com/cmackenzie1/delta-rs/rust/tests/data/issue_1374"},"clientVersion":"delta-rs.0.11.0"}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"add":{"path":"date=2023-05-24/part-00000-e2b01fc6-a906-4008-82df-e98efdcdd49c-c000.snappy.parquet","size":1021,"partitionValues":{"date":"2023-05-24"},"modificationTime":1684886485017,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"timestamp\":null,\"temperature\":8},\"maxValues\":{\"timestamp\":\"2023-05-24T00:01:25.014Z\",\"temperature\":90},\"nullCount\":{\"temperature\":0,\"timestamp\":0}}","tags":null}}
{"add":{"path":"date=2023-05-24/part-00000-e2b01fc6-a906-4008-82df-e98efdcdd47d-c000.snappy.parquet","size":1021,"partitionValues":{"date":"2023-05-24"},"modificationTime":1684886485017,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"timestamp\":\"2023-05-24T00:01:25.014Z\",\"temperature\":8},\"maxValues\":{\"timestamp\":\"2023-05-24T00:01:25.014Z\",\"temperature\":90},\"nullCount\":{\"temperature\":0,\"timestamp\":0}}","tags":null}}
{"commitInfo":{"timestamp":1685483647338,"clientVersion":"delta-rs.0.11.0"}}
1 change: 1 addition & 0 deletions rust/tests/data/issue_1374/_delta_log/_last_checkpoint
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"parts":null,"size":20622,"version":1}
Binary file not shown.
Binary file not shown.
37 changes: 37 additions & 0 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,3 +873,40 @@ async fn test_issue_1291_datafusion_sql_partitioned_data() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_issue_1374() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/issue_1374")
.await
.unwrap();
ctx.register_table("t", Arc::new(table))?;

let batches = ctx
.sql(
r#"SELECT *
FROM t
WHERE timestamp BETWEEN '2023-05-24T00:00:00.000Z' AND '2023-05-25T00:00:00.000Z'
LIMIT 5
"#,
)
.await?
.collect()
.await?;

let expected = vec![
"+----------------------------+-------------+------------+",
"| timestamp | temperature | date |",
"+----------------------------+-------------+------------+",
"| 2023-05-24T00:01:25.010301 | 8 | 2023-05-24 |",
"| 2023-05-24T00:01:25.013902 | 21 | 2023-05-24 |",
"| 2023-05-24T00:01:25.013972 | 58 | 2023-05-24 |",
"| 2023-05-24T00:01:25.014025 | 24 | 2023-05-24 |",
"| 2023-05-24T00:01:25.014072 | 90 | 2023-05-24 |",
"+----------------------------+-------------+------------+",
];

assert_batches_sorted_eq!(&expected, &batches);

Ok(())
}

0 comments on commit f67ac09

Please sign in to comment.