diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs index a1845b9503..74a6492248 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table_state_arrow.rs @@ -9,8 +9,8 @@ use crate::DeltaTableError; use crate::SchemaDataType; use crate::SchemaTypeStruct; use arrow::array::{ - ArrayRef, BinaryArray, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray, - StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, + Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float64Array, Int64Array, NullArray, + StringArray, StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, }; use arrow::compute::cast; use arrow::compute::kernels::cast_utils::Parser; @@ -304,53 +304,74 @@ impl DeltaTableState { max_values: Option, } + let filter_out_empty_stats = |stats: &Result| -> bool { + let is_field_empty = |arr: &Option| -> bool { + match arr { + Some(arr) => arr.len() == arr.null_count(), + None => true, + } + }; + + let is_stats_empty = |stats: &ColStats| -> bool { + is_field_empty(&stats.null_count) + && is_field_empty(&stats.min_values) + && is_field_empty(&stats.max_values) + }; + + if let Ok(stats) = stats { + !is_stats_empty(stats) + } else { + true // keep errs + } + }; + let mut columnar_stats: Vec = SchemaLeafIterator::new(schema) .filter(|(_path, datatype)| !matches!(datatype, SchemaDataType::r#struct(_))) .map(|(path, datatype)| -> Result { - let null_count: Option = stats + let null_count = stats .iter() .flat_map(|maybe_stat| { maybe_stat .as_ref() .map(|stat| resolve_column_count_stat(&stat.null_count, &path)) }) - .collect::>>() - .map(arrow::array::Int64Array::from) - .map(|arr| -> ArrayRef { Arc::new(arr) }); + .collect::>>(); + let null_count = Some(value_vec_to_array(null_count, |values| { + Ok(Arc::new(arrow::array::Int64Array::from(values))) + })?); let arrow_type: arrow::datatypes::DataType = datatype.try_into()?; // Min and max are collected for primitive values, not list or maps let min_values = if matches!(datatype, SchemaDataType::primitive(_)) { - stats + let min_values = stats .iter() .flat_map(|maybe_stat| { maybe_stat .as_ref() .map(|stat| resolve_column_value_stat(&stat.min_values, &path)) }) - .collect::>>() - .map(|min_values| { - json_value_to_array_general(&arrow_type, min_values.into_iter()) - }) - .transpose()? + .collect::>>(); + + Some(value_vec_to_array(min_values, |values| { + json_value_to_array_general(&arrow_type, values.into_iter()) + })?) } else { None }; let max_values = if matches!(datatype, SchemaDataType::primitive(_)) { - stats + let max_values = stats .iter() .flat_map(|maybe_stat| { maybe_stat .as_ref() .map(|stat| resolve_column_value_stat(&stat.max_values, &path)) }) - .collect::>>() - .map(|max_values| { - json_value_to_array_general(&arrow_type, max_values.into_iter()) - }) - .transpose()? + .collect::>>(); + Some(value_vec_to_array(max_values, |values| { + json_value_to_array_general(&arrow_type, values.into_iter()) + })?) } else { None }; @@ -362,6 +383,7 @@ impl DeltaTableState { max_values, }) }) + .filter(filter_out_empty_stats) .collect::>()?; let mut out_columns: Vec<(Cow, ArrayRef)> = @@ -407,7 +429,7 @@ impl DeltaTableState { .last() .expect("paths must have at least one element"), values.data_type().clone(), - false, + true, ); Some((field, Arc::clone(values))) } else { @@ -480,6 +502,20 @@ impl DeltaTableState { } } +fn value_vec_to_array( + value_vec: Vec>, + map_fn: F, +) -> Result +where + F: FnOnce(Vec>) -> Result, +{ + if value_vec.iter().all(Option::is_none) { + Ok(Arc::new(NullArray::new(value_vec.len()))) + } else { + map_fn(value_vec) + } +} + fn resolve_column_value_stat<'a>( values: &'a HashMap, path: &[&'a str], @@ -546,41 +582,55 @@ impl<'a> std::iter::Iterator for SchemaLeafIterator<'a> { fn json_value_to_array_general<'a>( datatype: &arrow::datatypes::DataType, - values: impl Iterator, + values: impl Iterator>, ) -> Result { match datatype { - DataType::Boolean => Ok(Arc::new( + DataType::Boolean => Ok(Arc::new(BooleanArray::from( values - .map(|value| value.as_bool()) - .collect::(), - )), + .map(|value| value.and_then(serde_json::Value::as_bool)) + .collect_vec(), + ))), DataType::Int64 | DataType::Int32 | DataType::Int16 | DataType::Int8 => { - let i64_arr: ArrayRef = - Arc::new(values.map(|value| value.as_i64()).collect::()); + let i64_arr: ArrayRef = Arc::new(Int64Array::from( + values + .map(|value| value.and_then(serde_json::Value::as_i64)) + .collect_vec(), + )); Ok(arrow::compute::cast(&i64_arr, datatype)?) } DataType::Float32 | DataType::Float64 | DataType::Decimal128(_, _) => { - let f64_arr: ArrayRef = - Arc::new(values.map(|value| value.as_f64()).collect::()); + let f64_arr: ArrayRef = Arc::new(Float64Array::from( + values + .map(|value| value.and_then(serde_json::Value::as_f64)) + .collect_vec(), + )); Ok(arrow::compute::cast(&f64_arr, datatype)?) } - DataType::Utf8 => Ok(Arc::new( - values.map(|value| value.as_str()).collect::(), - )), - DataType::Binary => Ok(Arc::new( - values.map(|value| value.as_str()).collect::(), - )), + DataType::Utf8 => Ok(Arc::new(StringArray::from( + values + .map(|value| value.and_then(serde_json::Value::as_str)) + .collect_vec(), + ))), + DataType::Binary => Ok(Arc::new(BinaryArray::from( + values + .map(|value| value.and_then(|value| value.as_str().map(|value| value.as_bytes()))) + .collect_vec(), + ))), DataType::Timestamp(TimeUnit::Microsecond, None) => { Ok(Arc::new(TimestampMicrosecondArray::from( values - .map(|value| value.as_str().and_then(TimestampMicrosecondType::parse)) - .collect::>>(), + .map(|value| { + value.and_then(|value| { + value.as_str().and_then(TimestampMicrosecondType::parse) + }) + }) + .collect_vec(), ))) } DataType::Date32 => Ok(Arc::new(Date32Array::from( values - .map(|value| value.as_str().and_then(Date32Type::parse)) - .collect::>>(), + .map(|value| value.and_then(|value| value.as_str().and_then(Date32Type::parse))) + .collect_vec(), ))), _ => Err(DeltaTableError::Generic("Invalid datatype".to_string())), } diff --git a/rust/tests/add_actions_test.rs b/rust/tests/add_actions_test.rs index b787868051..3ee5c8dff1 100644 --- a/rust/tests/add_actions_test.rs +++ b/rust/tests/add_actions_test.rs @@ -204,10 +204,14 @@ async fn test_only_struct_stats() { "null_count.null", Arc::new(array::Int64Array::from(vec![1])), ), + ("min.null", Arc::new(array::NullArray::new(1))), + ("max.null", Arc::new(array::NullArray::new(1))), ( "null_count.boolean", Arc::new(array::Int64Array::from(vec![0])), ), + ("min.boolean", Arc::new(array::NullArray::new(1))), + ("max.boolean", Arc::new(array::NullArray::new(1))), ( "null_count.double", Arc::new(array::Int64Array::from(vec![0])), @@ -256,6 +260,8 @@ async fn test_only_struct_stats() { "null_count.binary", Arc::new(array::Int64Array::from(vec![0])), ), + ("min.binary", Arc::new(array::NullArray::new(1))), + ("max.binary", Arc::new(array::NullArray::new(1))), ( "null_count.date", Arc::new(array::Int64Array::from(vec![0])),