Skip to content

Commit

Permalink
Fix checkpoint compatibility for remove fields (#427)
Browse files Browse the repository at this point in the history
  • Loading branch information
xianwill authored Sep 13, 2021
1 parent 92e7beb commit eb0ecf7
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 55 deletions.
23 changes: 18 additions & 5 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi

let tombstones = state.unexpired_tombstones();

// if any, tombstones do not include extended file metadata, we must omit the extended metadata fields from the remove schema
// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file
let use_extended_remove_schema = tombstones
.iter()
.all(|r| r.extended_file_metadata == Some(true));

// protocol
let mut jsons = std::iter::once(action::Action::protocol(action::Protocol {
min_reader_version: state.min_reader_version(),
Expand All @@ -193,11 +199,17 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi
}),
)
// removes
.chain(
tombstones
.iter()
.map(|f| action::Action::remove((*f).clone())),
)
.chain(tombstones.iter().map(|r| {
let mut r = (*r).clone();

// As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly.
// https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314
if None == r.extended_file_metadata {
r.extended_file_metadata = Some(false);
}

action::Action::remove(r)
}))
.map(|a| serde_json::to_value(a).map_err(ArrowError::from))
// adds
.chain(state.files().iter().map(|f| {
Expand All @@ -208,6 +220,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi
let arrow_schema = delta_log_schema_for_table(
<ArrowSchema as TryFrom<&Schema>>::try_from(&current_metadata.schema)?,
current_metadata.partition_columns.as_slice(),
use_extended_remove_schema,
);

debug!("Writing to checkpoint parquet buffer...");
Expand Down
163 changes: 113 additions & 50 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,18 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType {
}
}

/// Returns an arrow schema representing the delta log for use in checkpoints
///
/// # Arguments
///
/// * `table_schema` - The arrow schema representing the table backed by the delta log
/// * `partition_columns` - The list of partition columns of the table.
/// * `use_extended_remove_schema` - Whether to include extended file metadata in remove action schema.
/// Required for compatibility with different versions of Databricks runtime.
pub(crate) fn delta_log_schema_for_table(
table_schema: ArrowSchema,
partition_columns: &[String],
use_extended_remove_schema: bool,
) -> SchemaRef {
lazy_static! {
static ref SCHEMA_FIELDS: Vec<ArrowField> = vec![
Expand Down Expand Up @@ -241,47 +250,6 @@ pub(crate) fn delta_log_schema_for_table(
]),
true
),
ArrowField::new(
"remove",
ArrowDataType::Struct(vec![
ArrowField::new("path", ArrowDataType::Utf8, true),
ArrowField::new("deletionTimestamp", ArrowDataType::Int64, true),
ArrowField::new("dataChange", ArrowDataType::Boolean, true),
ArrowField::new("extendedFileMetadata", ArrowDataType::Boolean, true),
ArrowField::new("size", ArrowDataType::Int64, true),
ArrowField::new(
"partitionValues",
ArrowDataType::Map(
Box::new(ArrowField::new(
"key_value",
ArrowDataType::Struct(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Utf8, true),
]),
false
)),
false
),
true
),
ArrowField::new(
"tags",
ArrowDataType::Map(
Box::new(ArrowField::new(
"key_value",
ArrowDataType::Struct(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Utf8, true),
]),
false
)),
false
),
true
)
]),
true
)
];
static ref ADD_FIELDS: Vec<ArrowField> = vec![
ArrowField::new("path", ArrowDataType::Utf8, true),
Expand Down Expand Up @@ -320,17 +288,55 @@ pub(crate) fn delta_log_schema_for_table(
true
)
];
static ref REMOVE_FIELDS: Vec<ArrowField> = vec![
ArrowField::new("path", ArrowDataType::Utf8, true),
ArrowField::new("deletionTimestamp", ArrowDataType::Int64, true),
ArrowField::new("dataChange", ArrowDataType::Boolean, true),
ArrowField::new("extendedFileMetadata", ArrowDataType::Boolean, true),
];
static ref REMOVE_EXTENDED_FILE_METADATA_FIELDS: Vec<ArrowField> = vec![
ArrowField::new("size", ArrowDataType::Int64, true),
ArrowField::new(
"partitionValues",
ArrowDataType::Map(
Box::new(ArrowField::new(
"key_value",
ArrowDataType::Struct(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Utf8, true),
]),
false
)),
false
),
true
),
ArrowField::new(
"tags",
ArrowDataType::Map(
Box::new(ArrowField::new(
"key_value",
ArrowDataType::Struct(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Utf8, true),
]),
false
)),
false
),
true
)
];
}

// create add fields according to the specific data table schema
let (partition_fields, non_partition_fields): (Vec<ArrowField>, Vec<ArrowField>) = table_schema
.fields()
.iter()
.map(|f| f.to_owned())
.partition(|field| partition_columns.contains(field.name()));

let mut stats_parsed_fields: Vec<ArrowField> =
vec![ArrowField::new("numRecords", ArrowDataType::Int64, true)];

if !non_partition_fields.is_empty() {
let mut max_min_vec = Vec::new();
non_partition_fields
Expand All @@ -352,15 +358,12 @@ pub(crate) fn delta_log_schema_for_table(

stats_parsed_fields.push(null_count_struct);
}

let mut add_fields = ADD_FIELDS.clone();

add_fields.push(ArrowField::new(
"stats_parsed",
ArrowDataType::Struct(stats_parsed_fields),
true,
));

if !partition_fields.is_empty() {
add_fields.push(ArrowField::new(
"partitionValues_parsed",
Expand All @@ -369,12 +372,24 @@ pub(crate) fn delta_log_schema_for_table(
));
}

// create remove fields with or without extendedFileMetadata
let mut remove_fields = REMOVE_FIELDS.clone();
if use_extended_remove_schema {
remove_fields.extend(REMOVE_EXTENDED_FILE_METADATA_FIELDS.clone());
}

// include add and remove fields in checkpoint schema
let mut schema_fields = SCHEMA_FIELDS.clone();
schema_fields.push(ArrowField::new(
"add",
ArrowDataType::Struct(add_fields),
true,
));
schema_fields.push(ArrowField::new(
"remove",
ArrowDataType::Struct(remove_fields),
true,
));

let arrow_schema = ArrowSchema::new(schema_fields);

Expand Down Expand Up @@ -442,12 +457,17 @@ mod tests {
ArrowField::new("col1", ArrowDataType::Int32, true),
]);
let partition_columns = vec!["pcol".to_string()];
let log_schema = delta_log_schema_for_table(table_schema, partition_columns.as_slice());
let log_schema =
delta_log_schema_for_table(table_schema.clone(), partition_columns.as_slice(), false);

// verify top-level schema contains all expected fields and they are named correctly.
let expected_fields = vec!["metaData", "protocol", "txn", "remove", "add"];
for f in log_schema.fields().iter() {
assert!(expected_fields.contains(&f.name().as_str()));
}
assert_eq!(5, log_schema.fields().len());

// verify add fields match as expected. a lot of transformation goes into these.
let add_fields: Vec<_> = log_schema
.fields()
.iter()
Expand All @@ -462,12 +482,10 @@ mod tests {
.flatten()
.collect();
assert_eq!(9, add_fields.len());

let add_field_map: HashMap<_, _> = add_fields
.iter()
.map(|f| (f.name().to_owned(), f.clone()))
.collect();

let partition_values_parsed = add_field_map.get("partitionValues_parsed").unwrap();
if let ArrowDataType::Struct(fields) = partition_values_parsed.data_type() {
assert_eq!(1, fields.len());
Expand All @@ -476,7 +494,6 @@ mod tests {
} else {
unreachable!();
}

let stats_parsed = add_field_map.get("stats_parsed").unwrap();
if let ArrowDataType::Struct(fields) = stats_parsed.data_type() {
assert_eq!(4, fields.len());
Expand Down Expand Up @@ -508,5 +525,51 @@ mod tests {
} else {
unreachable!();
}

// verify extended remove schema fields **ARE NOT** included when `use_extended_remove_schema` is false.
let remove_fields: Vec<_> = log_schema
.fields()
.iter()
.filter(|f| f.name() == "remove")
.map(|f| {
if let ArrowDataType::Struct(fields) = f.data_type() {
fields.iter().map(|f| f.clone())
} else {
unreachable!();
}
})
.flatten()
.collect();
assert_eq!(4, remove_fields.len());

// verify extended remove schema fields **ARE** included when `use_extended_remove_schema` is true.
let log_schema =
delta_log_schema_for_table(table_schema, partition_columns.as_slice(), true);
let remove_fields: Vec<_> = log_schema
.fields()
.iter()
.filter(|f| f.name() == "remove")
.map(|f| {
if let ArrowDataType::Struct(fields) = f.data_type() {
fields.iter().map(|f| f.clone())
} else {
unreachable!();
}
})
.flatten()
.collect();
assert_eq!(7, remove_fields.len());
let expected_fields = vec![
"path",
"deletionTimestamp",
"dataChange",
"extendedFileMetadata",
"partitionValues",
"size",
"tags",
];
for f in remove_fields.iter() {
assert!(expected_fields.contains(&f.name().as_str()));
}
}
}

0 comments on commit eb0ecf7

Please sign in to comment.