Skip to content

Commit

Permalink
fix (manifest-list): added serde aliases to support both forms conven…
Browse files Browse the repository at this point in the history
…tions (#365)

* added serde aliases to support both forms conventions

* reading manifests without avro schema

* adding avro files of both versions and add a test to deser both

* fixed typo
  • Loading branch information
a-agmon committed May 9, 2024
1 parent 1c66e5a commit ffb691d
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
62 changes: 57 additions & 5 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl ManifestList {
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type_provider)
}
FormatVersion::V2 => {
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2, bs)?;
let reader = Reader::new(bs)?;
let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type_provider)
}
Expand Down Expand Up @@ -802,6 +802,9 @@ pub(super) mod _serde {
pub key_metadata: Option<ByteBuf>,
}

// Aliases were added to fields that were renamed in Iceberg 1.5.0 (https://github.com/apache/iceberg/pull/5338), in order to support both conventions/versions.
// In the current implementation deserialization is done using field names, and therefore these fields may appear as either.
// see issue that raised this here: https://github.com/apache/iceberg-rust/issues/338
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct ManifestFileV2 {
pub manifest_path: String,
Expand All @@ -811,8 +814,11 @@ pub(super) mod _serde {
pub sequence_number: i64,
pub min_sequence_number: i64,
pub added_snapshot_id: i64,
#[serde(alias = "added_data_files_count", alias = "added_files_count")]
pub added_data_files_count: i32,
#[serde(alias = "existing_data_files_count", alias = "existing_files_count")]
pub existing_data_files_count: i32,
#[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")]
pub deleted_data_files_count: i32,
pub added_rows_count: i64,
pub existing_rows_count: i64,
Expand Down Expand Up @@ -1089,16 +1095,16 @@ pub(super) mod _serde {

#[cfg(test)]
mod test {
use apache_avro::{Reader, Schema};
use std::{collections::HashMap, fs, sync::Arc};

use tempfile::TempDir;

use crate::{
io::FileIOBuilder,
spec::{
manifest_list::{_serde::ManifestListV1, UNASSIGNED_SEQUENCE_NUMBER},
FieldSummary, Literal, ManifestContentType, ManifestFile, ManifestList,
ManifestListWriter, NestedField, PrimitiveType, StructType, Type,
manifest_list::_serde::ManifestListV1, FieldSummary, Literal, ManifestContentType,
ManifestFile, ManifestList, ManifestListWriter, NestedField, PrimitiveType, StructType,
Type, UNASSIGNED_SEQUENCE_NUMBER,
},
};

Expand Down Expand Up @@ -1462,4 +1468,50 @@ mod test {

temp_dir.close().unwrap();
}

#[tokio::test]
async fn test_manifest_list_v2_deserializer_aliases() {
// reading avro manifest file generated by iceberg 1.4.0
let avro_1_path = "testdata/manifests_lists/manifest-list-v2-1.avro";
let bs_1 = fs::read(avro_1_path).unwrap();
let avro_1_fields = read_avro_schema_fields_as_str(bs_1.clone()).await;
assert_eq!(
avro_1_fields,
"manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_data_files_count, existing_data_files_count, deleted_data_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
);
// reading avro manifest file generated by iceberg 1.5.0
let avro_2_path = "testdata/manifests_lists/manifest-list-v2-2.avro";
let bs_2 = fs::read(avro_2_path).unwrap();
let avro_2_fields = read_avro_schema_fields_as_str(bs_2.clone()).await;
assert_eq!(
avro_2_fields,
"manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_files_count, existing_files_count, deleted_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
);
// deserializing both files to ManifestList struct
let _manifest_list_1 =
ManifestList::parse_with_version(&bs_1, crate::spec::FormatVersion::V2, move |_id| {
Ok(Some(StructType::new(vec![])))
})
.unwrap();
let _manifest_list_2 =
ManifestList::parse_with_version(&bs_2, crate::spec::FormatVersion::V2, move |_id| {
Ok(Some(StructType::new(vec![])))
})
.unwrap();
}

async fn read_avro_schema_fields_as_str(bs: Vec<u8>) -> String {
let reader = Reader::new(&bs[..]).unwrap();
let schema = reader.writer_schema();
let fields: String = match schema {
Schema::Record(record) => record
.fields
.iter()
.map(|field| field.name.clone())
.collect::<Vec<String>>()
.join(", "),
_ => "".to_string(),
};
fields
}
}
Binary file not shown.
Binary file not shown.

0 comments on commit ffb691d

Please sign in to comment.