diff --git a/rust/src/lib.rs b/rust/src/lib.rs index eedec98a8e..c3cc6127dd 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -126,6 +126,7 @@ pub use protocol::checkpoints; #[cfg(feature = "integration_test")] pub mod test_utils; +#[cfg(feature = "parquet")] const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__"; /// Creates and loads a DeltaTable from the given path with current metadata. diff --git a/rust/src/table/convert_to_delta.rs b/rust/src/table/convert_to_delta.rs index 7bdb2dd90f..d0439d09fe 100644 --- a/rust/src/table/convert_to_delta.rs +++ b/rust/src/table/convert_to_delta.rs @@ -61,7 +61,7 @@ pub enum Error { /// /// # Arguments /// -/// * `storage` - A shared reference to an [`DeltaObjectStore`](crate::storage::DeltaObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located). +/// * `storage` - A shared reference to an [`DeltaObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). pub async fn convert_to_delta(storage: ObjectStoreRef) -> Result { if storage.is_delta_table_location().await? { info!("A Delta table already exists in the given object store"); @@ -243,42 +243,44 @@ mod tests { ) } - async fn get_parquet_files(path: &str) -> Vec { - parquet_files(object_store(path).expect("Failed to create an object store")) + async fn get_parquet_files(path: &str) -> HashSet { + let mut res = HashSet::new(); + let files = parquet_files(object_store(path).expect("Failed to create an object store")) .await - .expect("Failed to get parquet files") - .into_iter() - .map(|file| file.location) - .collect() + .expect("Failed to get parquet files"); + for file in files { + res.insert(file.location); + } + res } #[tokio::test] async fn test_parquet_files() { assert_eq!( get_parquet_files("tests/data/delta-0.8.0").await, - vec![ + HashSet::from([ Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"), Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"), Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"), - ], + ]), ); assert_eq!( get_parquet_files("tests/data/delta-0.8.0-null-partition").await, - vec![ + HashSet::from([ Path::from("k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet"), Path::from("k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"), - ], + ]), ); assert_eq!( get_parquet_files("tests/data/delta-0.8.0-numeric-partition").await, - vec![ + HashSet::from([ Path::from("x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"), Path::from("x=10/y=10.0/part-00015-24eb4845-2d25-4448-b3bb-5ed7f12635ab.c000.snappy.parquet"), - ], + ]), ); assert_eq!( get_parquet_files("tests/data/delta-0.8.0-special-partition").await, - vec![ + HashSet::from([ Path::parse( "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet" ) @@ -287,7 +289,7 @@ mod tests { "x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet" ) .expect("Failed to parse as Path") - ], + ]), ); } @@ -313,17 +315,6 @@ mod tests { get_partitions("tests/data/delta-0.8.0").await, (HashSet::new(), Vec::new()) ); - assert_eq!( - get_partitions("tests/data/delta-0.8.0-partitioned").await, - ( - HashSet::from_iter(["month".to_string(), "day".to_string(), "year".to_string()]), - vec![ - schema_field("year", "string", true), - schema_field("month", "string", true), - schema_field("day", "string", true) - ], - ) - ); assert_eq!( get_partitions("tests/data/delta-0.8.0-null-partition").await, ( @@ -331,17 +322,6 @@ mod tests { vec![schema_field("k", "string", true)] ) ); - - assert_eq!( - get_partitions("tests/data/delta-0.8.0-numeric-partition").await, - ( - HashSet::from_iter(["x".to_string(), "y".to_string()]), - vec![ - schema_field("x", "string", true), - schema_field("y", "string", true) - ], - ) - ); assert_eq!( get_partitions("tests/data/delta-0.8.0-special-partition").await, ( @@ -350,13 +330,10 @@ mod tests { ) ); assert_eq!( - get_partitions("tests/data/delta-2.2.0-partitioned-types").await, + get_partitions("tests/data/http_requests").await, ( - HashSet::from_iter(["c1".to_string(), "c2".to_string()]), - vec![ - schema_field("c1", "string", true), - schema_field("c2", "string", true) - ], + HashSet::from_iter(["date".to_string()]), + vec![schema_field("date", "string", true)] ) ); } @@ -456,12 +433,14 @@ mod tests { version, "Testing location: {test_data_path:?}" ); + let mut schema_fields = table + .get_schema() + .expect("Failed to get schema") + .get_fields() + .clone(); + schema_fields.sort_by(|a, b| a.get_name().cmp(b.get_name())); assert_eq!( - *table - .get_schema() - .expect("Failed to get schema") - .get_fields(), - expected_schema_fields, + schema_fields, expected_schema_fields, "Testing location: {test_data_path:?}" ); table @@ -544,50 +523,62 @@ mod tests { ], ) .await; - assert_eq!( - table.get_partition_values().cloned().collect::>(), - vec![HashMap::new()] - ); + let mut partition_values = table + .get_partition_values() + .flat_map(|map| map.clone()) + .collect::>(); + partition_values.sort(); + assert_eq!(partition_values, []); // Test Parquet files in path "tests/data/delta-0.8.0-null-partition" let table = create_delta_table( "tests/data/delta-0.8.0-null-partition", vec![ - schema_field("v", "long", true), schema_field("k", "string", true), + schema_field("v", "long", true), ], ) .await; + let mut partition_values = table + .get_partition_values() + .flat_map(|map| map.clone()) + .collect::>(); + partition_values.sort(); assert_eq!( - table.get_partition_values().cloned().collect::>(), - vec![ - HashMap::from([("k".to_string(), None)]), - HashMap::from([("k".to_string(), Some("A".to_string()))]), + partition_values, + [ + ("k".to_string(), None), + ("k".to_string(), Some("A".to_string())) ] ); // Test Parquet files in path "tests/data/delta-0.8.0-special-partition" let table = create_delta_table( "tests/data/delta-0.8.0-special-partition", vec![ - schema_field("y", "long", true), schema_field("x", "string", true), + schema_field("y", "long", true), ], ) .await; + let mut partition_values = table + .get_partition_values() + .flat_map(|map| map.clone()) + .collect::>(); + partition_values.sort(); assert_eq!( - table.get_partition_values().cloned().collect::>(), - vec![ - HashMap::from([("x".to_string(), Some("A/A".to_string()))]), - HashMap::from([("x".to_string(), Some("B B".to_string()))]) + partition_values, + [ + ("x".to_string(), Some("A/A".to_string())), + ("x".to_string(), Some("B B".to_string())), ] ); // Test Parquet files in path "tests/data/delta-0.8.0-partitioned" let table = create_delta_table( "tests/data/delta-0.8.0-partitioned", vec![ + schema_field("day", "string", true), + schema_field("month", "string", true), schema_field("value", "string", true), schema_field("year", "string", true), - schema_field("month", "string", true), - schema_field("day", "string", true), ], ) .await; @@ -685,12 +676,14 @@ mod tests { version, "Testing location: {test_data_path:?}" ); + let mut schema_fields = table + .get_schema() + .expect("Failed to get schema") + .get_fields() + .clone(); + schema_fields.sort_by(|a, b| a.get_name().cmp(b.get_name())); assert_eq!( - *table - .get_schema() - .expect("Failed to get schema") - .get_fields(), - expected_schema_fields, + schema_fields, expected_schema_fields, "Testing location: {test_data_path:?}" ); table @@ -713,7 +706,7 @@ mod tests { #[tokio::test] async fn test_convert_to_delta_from_path() { // Test delta table location "tests/data/delta-2.2.0-partitioned-types" - get_delta_table_from_path( + let table = get_delta_table_from_path( "tests/data/delta-2.2.0-partitioned-types", 0, vec![ @@ -724,15 +717,45 @@ mod tests { "tests/data/delta-2.2.0-partitioned-types", ) .await; + let mut partition_values = table + .get_partition_values() + .flat_map(|map| map.clone()) + .collect::>(); + partition_values.sort(); + assert_eq!( + partition_values, + [ + ("c1".to_string(), Some("4".to_string())), + ("c1".to_string(), Some("5".to_string())), + ("c1".to_string(), Some("6".to_string())), + ("c2".to_string(), Some("a".to_string())), + ("c2".to_string(), Some("b".to_string())), + ("c2".to_string(), Some("c".to_string())), + ] + ); // Test Parquet files in path "tests/data/delta-0.8.0-numeric-partition" - create_delta_table_from_path( + let table = create_delta_table_from_path( "tests/data/delta-0.8.0-numeric-partition", vec![ - schema_field("z", "string", true), schema_field("x", "string", true), schema_field("y", "string", true), + schema_field("z", "string", true), ], ) .await; + let mut partition_values = table + .get_partition_values() + .flat_map(|map| map.clone()) + .collect::>(); + partition_values.sort(); + assert_eq!( + partition_values, + [ + ("x".to_string(), Some("10".to_string())), + ("x".to_string(), Some("9".to_string())), + ("y".to_string(), Some("10.0".to_string())), + ("y".to_string(), Some("9.9".to_string())), + ] + ); } } diff --git a/rust/src/table/mod.rs b/rust/src/table/mod.rs index a599afda10..80101143a8 100644 --- a/rust/src/table/mod.rs +++ b/rust/src/table/mod.rs @@ -30,6 +30,7 @@ use crate::storage::{commit_uri_from_version, ObjectStoreRef}; pub mod builder; pub mod config; +#[cfg(feature = "parquet")] pub mod convert_to_delta; pub mod state; #[cfg(feature = "arrow")]