diff --git a/rust/src/checkpoints.rs b/rust/src/checkpoints.rs index d18686bd9b..f81e13866f 100644 --- a/rust/src/checkpoints.rs +++ b/rust/src/checkpoints.rs @@ -170,6 +170,12 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result, Checkpoi let tombstones = state.unexpired_tombstones(); + // if any, tombstones do not include extended file metadata, we must omit the extended metadata fields from the remove schema + // See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file + let use_extended_remove_schema = tombstones + .iter() + .all(|r| r.extended_file_metadata == Some(true)); + // protocol let mut jsons = std::iter::once(action::Action::protocol(action::Protocol { min_reader_version: state.min_reader_version(), @@ -193,11 +199,17 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result, Checkpoi }), ) // removes - .chain( - tombstones - .iter() - .map(|f| action::Action::remove((*f).clone())), - ) + .chain(tombstones.iter().map(|r| { + let mut r = (*r).clone(); + + // As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly. + // https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314 + if None == r.extended_file_metadata { + r.extended_file_metadata = Some(false); + } + + action::Action::remove(r) + })) .map(|a| serde_json::to_value(a).map_err(ArrowError::from)) // adds .chain(state.files().iter().map(|f| { @@ -208,6 +220,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result, Checkpoi let arrow_schema = delta_log_schema_for_table( >::try_from(¤t_metadata.schema)?, current_metadata.partition_columns.as_slice(), + use_extended_remove_schema, ); debug!("Writing to checkpoint parquet buffer..."); diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index 263d4dbbca..40ec89aa52 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -162,9 +162,18 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType { } } +/// Returns an arrow schema representing the delta log for use in checkpoints +/// +/// # Arguments +/// +/// * `table_schema` - The arrow schema representing the table backed by the delta log +/// * `partition_columns` - The list of partition columns of the table. +/// * `use_extended_remove_schema` - Whether to include extended file metadata in remove action schema. +/// Required for compatibility with different versions of Databricks runtime. pub(crate) fn delta_log_schema_for_table( table_schema: ArrowSchema, partition_columns: &[String], + use_extended_remove_schema: bool, ) -> SchemaRef { lazy_static! { static ref SCHEMA_FIELDS: Vec = vec![ @@ -241,47 +250,6 @@ pub(crate) fn delta_log_schema_for_table( ]), true ), - ArrowField::new( - "remove", - ArrowDataType::Struct(vec![ - ArrowField::new("path", ArrowDataType::Utf8, true), - ArrowField::new("deletionTimestamp", ArrowDataType::Int64, true), - ArrowField::new("dataChange", ArrowDataType::Boolean, true), - ArrowField::new("extendedFileMetadata", ArrowDataType::Boolean, true), - ArrowField::new("size", ArrowDataType::Int64, true), - ArrowField::new( - "partitionValues", - ArrowDataType::Map( - Box::new(ArrowField::new( - "key_value", - ArrowDataType::Struct(vec![ - ArrowField::new("key", ArrowDataType::Utf8, false), - ArrowField::new("value", ArrowDataType::Utf8, true), - ]), - false - )), - false - ), - true - ), - ArrowField::new( - "tags", - ArrowDataType::Map( - Box::new(ArrowField::new( - "key_value", - ArrowDataType::Struct(vec![ - ArrowField::new("key", ArrowDataType::Utf8, false), - ArrowField::new("value", ArrowDataType::Utf8, true), - ]), - false - )), - false - ), - true - ) - ]), - true - ) ]; static ref ADD_FIELDS: Vec = vec![ ArrowField::new("path", ArrowDataType::Utf8, true), @@ -320,17 +288,55 @@ pub(crate) fn delta_log_schema_for_table( true ) ]; + static ref REMOVE_FIELDS: Vec = vec![ + ArrowField::new("path", ArrowDataType::Utf8, true), + ArrowField::new("deletionTimestamp", ArrowDataType::Int64, true), + ArrowField::new("dataChange", ArrowDataType::Boolean, true), + ArrowField::new("extendedFileMetadata", ArrowDataType::Boolean, true), + ]; + static ref REMOVE_EXTENDED_FILE_METADATA_FIELDS: Vec = vec![ + ArrowField::new("size", ArrowDataType::Int64, true), + ArrowField::new( + "partitionValues", + ArrowDataType::Map( + Box::new(ArrowField::new( + "key_value", + ArrowDataType::Struct(vec![ + ArrowField::new("key", ArrowDataType::Utf8, false), + ArrowField::new("value", ArrowDataType::Utf8, true), + ]), + false + )), + false + ), + true + ), + ArrowField::new( + "tags", + ArrowDataType::Map( + Box::new(ArrowField::new( + "key_value", + ArrowDataType::Struct(vec![ + ArrowField::new("key", ArrowDataType::Utf8, false), + ArrowField::new("value", ArrowDataType::Utf8, true), + ]), + false + )), + false + ), + true + ) + ]; } + // create add fields according to the specific data table schema let (partition_fields, non_partition_fields): (Vec, Vec) = table_schema .fields() .iter() .map(|f| f.to_owned()) .partition(|field| partition_columns.contains(field.name())); - let mut stats_parsed_fields: Vec = vec![ArrowField::new("numRecords", ArrowDataType::Int64, true)]; - if !non_partition_fields.is_empty() { let mut max_min_vec = Vec::new(); non_partition_fields @@ -352,15 +358,12 @@ pub(crate) fn delta_log_schema_for_table( stats_parsed_fields.push(null_count_struct); } - let mut add_fields = ADD_FIELDS.clone(); - add_fields.push(ArrowField::new( "stats_parsed", ArrowDataType::Struct(stats_parsed_fields), true, )); - if !partition_fields.is_empty() { add_fields.push(ArrowField::new( "partitionValues_parsed", @@ -369,12 +372,24 @@ pub(crate) fn delta_log_schema_for_table( )); } + // create remove fields with or without extendedFileMetadata + let mut remove_fields = REMOVE_FIELDS.clone(); + if use_extended_remove_schema { + remove_fields.extend(REMOVE_EXTENDED_FILE_METADATA_FIELDS.clone()); + } + + // include add and remove fields in checkpoint schema let mut schema_fields = SCHEMA_FIELDS.clone(); schema_fields.push(ArrowField::new( "add", ArrowDataType::Struct(add_fields), true, )); + schema_fields.push(ArrowField::new( + "remove", + ArrowDataType::Struct(remove_fields), + true, + )); let arrow_schema = ArrowSchema::new(schema_fields); @@ -442,12 +457,17 @@ mod tests { ArrowField::new("col1", ArrowDataType::Int32, true), ]); let partition_columns = vec!["pcol".to_string()]; - let log_schema = delta_log_schema_for_table(table_schema, partition_columns.as_slice()); + let log_schema = + delta_log_schema_for_table(table_schema.clone(), partition_columns.as_slice(), false); + // verify top-level schema contains all expected fields and they are named correctly. let expected_fields = vec!["metaData", "protocol", "txn", "remove", "add"]; for f in log_schema.fields().iter() { assert!(expected_fields.contains(&f.name().as_str())); } + assert_eq!(5, log_schema.fields().len()); + + // verify add fields match as expected. a lot of transformation goes into these. let add_fields: Vec<_> = log_schema .fields() .iter() @@ -462,12 +482,10 @@ mod tests { .flatten() .collect(); assert_eq!(9, add_fields.len()); - let add_field_map: HashMap<_, _> = add_fields .iter() .map(|f| (f.name().to_owned(), f.clone())) .collect(); - let partition_values_parsed = add_field_map.get("partitionValues_parsed").unwrap(); if let ArrowDataType::Struct(fields) = partition_values_parsed.data_type() { assert_eq!(1, fields.len()); @@ -476,7 +494,6 @@ mod tests { } else { unreachable!(); } - let stats_parsed = add_field_map.get("stats_parsed").unwrap(); if let ArrowDataType::Struct(fields) = stats_parsed.data_type() { assert_eq!(4, fields.len()); @@ -508,5 +525,51 @@ mod tests { } else { unreachable!(); } + + // verify extended remove schema fields **ARE NOT** included when `use_extended_remove_schema` is false. + let remove_fields: Vec<_> = log_schema + .fields() + .iter() + .filter(|f| f.name() == "remove") + .map(|f| { + if let ArrowDataType::Struct(fields) = f.data_type() { + fields.iter().map(|f| f.clone()) + } else { + unreachable!(); + } + }) + .flatten() + .collect(); + assert_eq!(4, remove_fields.len()); + + // verify extended remove schema fields **ARE** included when `use_extended_remove_schema` is true. + let log_schema = + delta_log_schema_for_table(table_schema, partition_columns.as_slice(), true); + let remove_fields: Vec<_> = log_schema + .fields() + .iter() + .filter(|f| f.name() == "remove") + .map(|f| { + if let ArrowDataType::Struct(fields) = f.data_type() { + fields.iter().map(|f| f.clone()) + } else { + unreachable!(); + } + }) + .flatten() + .collect(); + assert_eq!(7, remove_fields.len()); + let expected_fields = vec![ + "path", + "deletionTimestamp", + "dataChange", + "extendedFileMetadata", + "partitionValues", + "size", + "tags", + ]; + for f in remove_fields.iter() { + assert!(expected_fields.contains(&f.name().as_str())); + } } }