Skip to content

Commit

Permalink
fix: delta scan partition ordering bug (#1789)
Browse files Browse the repository at this point in the history
# Description
Sometimes the order of partition columns in our delta schema does not
match the order of partition columns in the deltatable metadata.
This would cause `DeltaScan` to provide incorrect values for partition
columns.
This is fixed by having `DeltaScan` use the metadata as the source of
truth.

# Related Issue(s)
- closes #1787
  • Loading branch information
Blajda authored Nov 4, 2023
1 parent bcd3e0d commit 45e7841
Showing 1 changed file with 91 additions and 20 deletions.
111 changes: 91 additions & 20 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,14 @@ impl<'a> DeltaScanBuilder<'a> {
// However we may want to do some additional balancing in case we are far off from the above.
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();

let table_partition_cols = &self
.snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;

for action in files.iter() {
let mut part = partitioned_file_from_action(action, &schema);
let mut part = partitioned_file_from_action(action, table_partition_cols, &schema);

if config.file_column_name.is_some() {
part.partition_values
Expand All @@ -602,13 +608,6 @@ impl<'a> DeltaScanBuilder<'a> {
.push(part);
}

let table_partition_cols = self
.snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns
.clone();

let file_schema = Arc::new(ArrowSchema::new(
schema
.fields()
Expand Down Expand Up @@ -923,20 +922,30 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarVal

pub(crate) fn partitioned_file_from_action(
action: &protocol::Add,
partition_columns: &[String],
schema: &ArrowSchema,
) -> PartitionedFile {
let partition_values = schema
.fields()
let partition_values = partition_columns
.iter()
.filter_map(|f| {
action.partition_values.get(f.name()).map(|val| match val {
Some(value) => to_correct_scalar_value(
&serde_json::Value::String(value.to_string()),
f.data_type(),
)
.unwrap_or(ScalarValue::Null),
None => get_null_of_arrow_type(f.data_type()).unwrap_or(ScalarValue::Null),
})
.map(|part| {
action
.partition_values
.get(part)
.map(|val| {
schema
.field_with_name(part)
.map(|field| match val {
Some(value) => to_correct_scalar_value(
&serde_json::Value::String(value.to_string()),
field.data_type(),
)
.unwrap_or(ScalarValue::Null),
None => get_null_of_arrow_type(field.data_type())
.unwrap_or(ScalarValue::Null),
})
.unwrap_or(ScalarValue::Null)
})
.unwrap_or(ScalarValue::Null)
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -1618,6 +1627,7 @@ pub async fn find_files<'a>(

#[cfg(test)]
mod tests {
use crate::writer::test_utils::get_delta_schema;
use arrow::array::StructArray;
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{TimeZone, Utc};
Expand Down Expand Up @@ -1797,7 +1807,8 @@ mod tests {
Field::new("month", ArrowDataType::Int64, true),
]);

let file = partitioned_file_from_action(&action, &schema);
let part_columns = vec!["year".to_string(), "month".to_string()];
let file = partitioned_file_from_action(&action, &part_columns, &schema);
let ref_file = PartitionedFile {
object_meta: object_store::ObjectMeta {
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
Expand Down Expand Up @@ -1929,4 +1940,64 @@ mod tests {
];
assert_batches_sorted_eq!(&expected, &actual);
}

#[tokio::test]
async fn delta_scan_mixed_partition_order() {
// Tests issue (1787) where partition columns were incorrect when they
// have a different order in the metadata and table schema
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("modified", DataType::Utf8, true),
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));

let table = crate::DeltaOps::new_in_memory()
.create()
.with_columns(get_delta_schema().get_fields().clone())
.with_partition_columns(["modified", "id"])
.await
.unwrap();
assert_eq!(table.version(), 0);

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();

let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

let df = ctx.sql("select * from test").await.unwrap();
let actual = df.collect().await.unwrap();
let expected = vec![
"+-------+------------+----+",
"| value | modified | id |",
"+-------+------------+----+",
"| 1 | 2021-02-01 | A |",
"| 10 | 2021-02-01 | B |",
"| 100 | 2021-02-02 | D |",
"| 20 | 2021-02-02 | C |",
"+-------+------------+----+",
];
assert_batches_sorted_eq!(&expected, &actual);
}
}

0 comments on commit 45e7841

Please sign in to comment.