Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix checkpoint compatibility for remove fields #427

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi

let tombstones = state.unexpired_tombstones();

// if any, tombstones do not include extended file metadata, we must omit the extended metadata fields from the remove schema
// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file
let use_extended_remove_schema = tombstones
.iter()
.all(|r| r.extended_file_metadata == Some(true));
Comment on lines +175 to +177
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is to avoid extending parquet schema with null metadata? E.g. so it'll make DBR 8.x to fail I suppose

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly - this is to avoid writing out the additional fields in the schema and prevent the break in DBR 8.x.


// protocol
let mut jsons = std::iter::once(action::Action::protocol(action::Protocol {
min_reader_version: state.min_reader_version(),
Expand All @@ -193,11 +199,17 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi
}),
)
// removes
.chain(
tombstones
.iter()
.map(|f| action::Action::remove((*f).clone())),
)
.chain(tombstones.iter().map(|r| {
let mut r = (*r).clone();

// As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly.
// https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314
if None == r.extended_file_metadata {
r.extended_file_metadata = Some(false);
}

action::Action::remove(r)
}))
.map(|a| serde_json::to_value(a).map_err(ArrowError::from))
// adds
.chain(state.files().iter().map(|f| {
Expand All @@ -208,6 +220,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi
let arrow_schema = delta_log_schema_for_table(
<ArrowSchema as TryFrom<&Schema>>::try_from(&current_metadata.schema)?,
current_metadata.partition_columns.as_slice(),
use_extended_remove_schema,
);

debug!("Writing to checkpoint parquet buffer...");
Expand Down
163 changes: 113 additions & 50 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,18 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType {
}
}

/// Returns an arrow schema representing the delta log for use in checkpoints
///
/// # Arguments
///
/// * `table_schema` - The arrow schema representing the table backed by the delta log
/// * `partition_columns` - The list of partition columns of the table.
/// * `use_extended_remove_schema` - Whether to include extended file metadata in remove action schema.
/// Required for compatibility with different versions of Databricks runtime.
pub(crate) fn delta_log_schema_for_table(
table_schema: ArrowSchema,
partition_columns: &[String],
use_extended_remove_schema: bool,
) -> SchemaRef {
lazy_static! {
static ref SCHEMA_FIELDS: Vec<ArrowField> = vec![
Expand Down Expand Up @@ -241,47 +250,6 @@ pub(crate) fn delta_log_schema_for_table(
]),
true
),
ArrowField::new(
"remove",
ArrowDataType::Struct(vec![
ArrowField::new("path", ArrowDataType::Utf8, true),
ArrowField::new("deletionTimestamp", ArrowDataType::Int64, true),
ArrowField::new("dataChange", ArrowDataType::Boolean, true),
ArrowField::new("extendedFileMetadata", ArrowDataType::Boolean, true),
ArrowField::new("size", ArrowDataType::Int64, true),
ArrowField::new(
"partitionValues",
ArrowDataType::Map(
Box::new(ArrowField::new(
"key_value",
ArrowDataType::Struct(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Utf8, true),
]),
false
)),
false
),
true
),
ArrowField::new(
"tags",
ArrowDataType::Map(
Box::new(ArrowField::new(
"key_value",
ArrowDataType::Struct(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Utf8, true),
]),
false
)),
false
),
true
)
]),
true
)
];
static ref ADD_FIELDS: Vec<ArrowField> = vec![
ArrowField::new("path", ArrowDataType::Utf8, true),
Expand Down Expand Up @@ -320,17 +288,55 @@ pub(crate) fn delta_log_schema_for_table(
true
)
];
static ref REMOVE_FIELDS: Vec<ArrowField> = vec![
ArrowField::new("path", ArrowDataType::Utf8, true),
ArrowField::new("deletionTimestamp", ArrowDataType::Int64, true),
ArrowField::new("dataChange", ArrowDataType::Boolean, true),
ArrowField::new("extendedFileMetadata", ArrowDataType::Boolean, true),
];
static ref REMOVE_EXTENDED_FILE_METADATA_FIELDS: Vec<ArrowField> = vec![
ArrowField::new("size", ArrowDataType::Int64, true),
ArrowField::new(
"partitionValues",
ArrowDataType::Map(
Box::new(ArrowField::new(
"key_value",
ArrowDataType::Struct(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Utf8, true),
]),
false
)),
false
),
true
),
ArrowField::new(
"tags",
ArrowDataType::Map(
Box::new(ArrowField::new(
"key_value",
ArrowDataType::Struct(vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Utf8, true),
]),
false
)),
false
),
true
)
];
}

// create add fields according to the specific data table schema
let (partition_fields, non_partition_fields): (Vec<ArrowField>, Vec<ArrowField>) = table_schema
.fields()
.iter()
.map(|f| f.to_owned())
.partition(|field| partition_columns.contains(field.name()));

let mut stats_parsed_fields: Vec<ArrowField> =
vec![ArrowField::new("numRecords", ArrowDataType::Int64, true)];

if !non_partition_fields.is_empty() {
let mut max_min_vec = Vec::new();
non_partition_fields
Expand All @@ -352,15 +358,12 @@ pub(crate) fn delta_log_schema_for_table(

stats_parsed_fields.push(null_count_struct);
}

let mut add_fields = ADD_FIELDS.clone();

add_fields.push(ArrowField::new(
"stats_parsed",
ArrowDataType::Struct(stats_parsed_fields),
true,
));

if !partition_fields.is_empty() {
add_fields.push(ArrowField::new(
"partitionValues_parsed",
Expand All @@ -369,12 +372,24 @@ pub(crate) fn delta_log_schema_for_table(
));
}

// create remove fields with or without extendedFileMetadata
let mut remove_fields = REMOVE_FIELDS.clone();
if use_extended_remove_schema {
remove_fields.extend(REMOVE_EXTENDED_FILE_METADATA_FIELDS.clone());
}

Comment on lines +375 to +380
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xianwill btw, wouldn't the easier hotfix will be a just write a extended_file_metadata=false without any other columns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

however for the full compitability with delta 1.0 I agree that we must include them

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mosyp - Just setting extended_file_metadata to false doesn't fix the issue. Same error still happens. If its false, the schema for the other three must be omitted entirely.

// include add and remove fields in checkpoint schema
let mut schema_fields = SCHEMA_FIELDS.clone();
schema_fields.push(ArrowField::new(
"add",
ArrowDataType::Struct(add_fields),
true,
));
schema_fields.push(ArrowField::new(
"remove",
ArrowDataType::Struct(remove_fields),
true,
));

let arrow_schema = ArrowSchema::new(schema_fields);

Expand Down Expand Up @@ -442,12 +457,17 @@ mod tests {
ArrowField::new("col1", ArrowDataType::Int32, true),
]);
let partition_columns = vec!["pcol".to_string()];
let log_schema = delta_log_schema_for_table(table_schema, partition_columns.as_slice());
let log_schema =
delta_log_schema_for_table(table_schema.clone(), partition_columns.as_slice(), false);

// verify top-level schema contains all expected fields and they are named correctly.
let expected_fields = vec!["metaData", "protocol", "txn", "remove", "add"];
for f in log_schema.fields().iter() {
assert!(expected_fields.contains(&f.name().as_str()));
}
assert_eq!(5, log_schema.fields().len());

// verify add fields match as expected. a lot of transformation goes into these.
let add_fields: Vec<_> = log_schema
.fields()
.iter()
Expand All @@ -462,12 +482,10 @@ mod tests {
.flatten()
.collect();
assert_eq!(9, add_fields.len());

let add_field_map: HashMap<_, _> = add_fields
.iter()
.map(|f| (f.name().to_owned(), f.clone()))
.collect();

let partition_values_parsed = add_field_map.get("partitionValues_parsed").unwrap();
if let ArrowDataType::Struct(fields) = partition_values_parsed.data_type() {
assert_eq!(1, fields.len());
Expand All @@ -476,7 +494,6 @@ mod tests {
} else {
unreachable!();
}

let stats_parsed = add_field_map.get("stats_parsed").unwrap();
if let ArrowDataType::Struct(fields) = stats_parsed.data_type() {
assert_eq!(4, fields.len());
Expand Down Expand Up @@ -508,5 +525,51 @@ mod tests {
} else {
unreachable!();
}

// verify extended remove schema fields **ARE NOT** included when `use_extended_remove_schema` is false.
let remove_fields: Vec<_> = log_schema
.fields()
.iter()
.filter(|f| f.name() == "remove")
.map(|f| {
if let ArrowDataType::Struct(fields) = f.data_type() {
fields.iter().map(|f| f.clone())
} else {
unreachable!();
}
})
.flatten()
.collect();
assert_eq!(4, remove_fields.len());

// verify extended remove schema fields **ARE** included when `use_extended_remove_schema` is true.
let log_schema =
delta_log_schema_for_table(table_schema, partition_columns.as_slice(), true);
let remove_fields: Vec<_> = log_schema
.fields()
.iter()
.filter(|f| f.name() == "remove")
.map(|f| {
if let ArrowDataType::Struct(fields) = f.data_type() {
fields.iter().map(|f| f.clone())
} else {
unreachable!();
}
})
.flatten()
.collect();
assert_eq!(7, remove_fields.len());
let expected_fields = vec![
"path",
"deletionTimestamp",
"dataChange",
"extendedFileMetadata",
"partitionValues",
"size",
"tags",
];
for f in remove_fields.iter() {
assert!(expected_fields.contains(&f.name().as_str()));
}
}
}