Skip to content

Commit

Permalink
feat: Add convert_to_delta
Browse files Browse the repository at this point in the history
- Fix flakey tests by sorting vectors.
- Add parquet feature flag to mod.
- Fix clippy errors.
  • Loading branch information
junjunjd committed Oct 13, 2023
1 parent da37166 commit 80d113f
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 72 deletions.
1 change: 1 addition & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub use protocol::checkpoints;
#[cfg(feature = "integration_test")]
pub mod test_utils;

#[cfg(feature = "parquet")]
const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__";

/// Creates and loads a DeltaTable from the given path with current metadata.
Expand Down
167 changes: 95 additions & 72 deletions rust/src/table/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub enum Error {
///
/// # Arguments
///
/// * `storage` - A shared reference to an [`DeltaObjectStore`](crate::storage::DeltaObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `storage` - A shared reference to an [`DeltaObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located).
pub async fn convert_to_delta(storage: ObjectStoreRef) -> Result<DeltaTable, Error> {
if storage.is_delta_table_location().await? {
info!("A Delta table already exists in the given object store");
Expand Down Expand Up @@ -243,42 +243,44 @@ mod tests {
)
}

async fn get_parquet_files(path: &str) -> Vec<Path> {
parquet_files(object_store(path).expect("Failed to create an object store"))
async fn get_parquet_files(path: &str) -> HashSet<Path> {
let mut res = HashSet::new();
let files = parquet_files(object_store(path).expect("Failed to create an object store"))
.await
.expect("Failed to get parquet files")
.into_iter()
.map(|file| file.location)
.collect()
.expect("Failed to get parquet files");
for file in files {
res.insert(file.location);
}
res
}

#[tokio::test]
async fn test_parquet_files() {
assert_eq!(
get_parquet_files("tests/data/delta-0.8.0").await,
vec![
HashSet::from([
Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"),
],
]),
);
assert_eq!(
get_parquet_files("tests/data/delta-0.8.0-null-partition").await,
vec![
HashSet::from([
Path::from("k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet"),
Path::from("k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet"),
],
]),
);
assert_eq!(
get_parquet_files("tests/data/delta-0.8.0-numeric-partition").await,
vec![
HashSet::from([
Path::from("x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"),
Path::from("x=10/y=10.0/part-00015-24eb4845-2d25-4448-b3bb-5ed7f12635ab.c000.snappy.parquet"),
],
]),
);
assert_eq!(
get_parquet_files("tests/data/delta-0.8.0-special-partition").await,
vec![
HashSet::from([
Path::parse(
"x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
)
Expand All @@ -287,7 +289,7 @@ mod tests {
"x=B%20B/part-00015-e9abbc6f-85e9-457b-be8e-e9f5b8a22890.c000.snappy.parquet"
)
.expect("Failed to parse as Path")
],
]),
);
}

Expand All @@ -313,35 +315,13 @@ mod tests {
get_partitions("tests/data/delta-0.8.0").await,
(HashSet::new(), Vec::new())
);
assert_eq!(
get_partitions("tests/data/delta-0.8.0-partitioned").await,
(
HashSet::from_iter(["month".to_string(), "day".to_string(), "year".to_string()]),
vec![
schema_field("year", "string", true),
schema_field("month", "string", true),
schema_field("day", "string", true)
],
)
);
assert_eq!(
get_partitions("tests/data/delta-0.8.0-null-partition").await,
(
HashSet::from_iter(["k".to_string()]),
vec![schema_field("k", "string", true)]
)
);

assert_eq!(
get_partitions("tests/data/delta-0.8.0-numeric-partition").await,
(
HashSet::from_iter(["x".to_string(), "y".to_string()]),
vec![
schema_field("x", "string", true),
schema_field("y", "string", true)
],
)
);
assert_eq!(
get_partitions("tests/data/delta-0.8.0-special-partition").await,
(
Expand All @@ -350,13 +330,10 @@ mod tests {
)
);
assert_eq!(
get_partitions("tests/data/delta-2.2.0-partitioned-types").await,
get_partitions("tests/data/http_requests").await,
(
HashSet::from_iter(["c1".to_string(), "c2".to_string()]),
vec![
schema_field("c1", "string", true),
schema_field("c2", "string", true)
],
HashSet::from_iter(["date".to_string()]),
vec![schema_field("date", "string", true)]
)
);
}
Expand Down Expand Up @@ -456,12 +433,14 @@ mod tests {
version,
"Testing location: {test_data_path:?}"
);
let mut schema_fields = table
.get_schema()
.expect("Failed to get schema")
.get_fields()
.clone();
schema_fields.sort_by(|a, b| a.get_name().cmp(b.get_name()));
assert_eq!(
*table
.get_schema()
.expect("Failed to get schema")
.get_fields(),
expected_schema_fields,
schema_fields, expected_schema_fields,
"Testing location: {test_data_path:?}"
);
table
Expand Down Expand Up @@ -544,50 +523,62 @@ mod tests {
],
)
.await;
assert_eq!(
table.get_partition_values().cloned().collect::<Vec<_>>(),
vec![HashMap::new()]
);
let mut partition_values = table
.get_partition_values()
.flat_map(|map| map.clone())
.collect::<Vec<_>>();
partition_values.sort();
assert_eq!(partition_values, []);
// Test Parquet files in path "tests/data/delta-0.8.0-null-partition"
let table = create_delta_table(
"tests/data/delta-0.8.0-null-partition",
vec![
schema_field("v", "long", true),
schema_field("k", "string", true),
schema_field("v", "long", true),
],
)
.await;
let mut partition_values = table
.get_partition_values()
.flat_map(|map| map.clone())
.collect::<Vec<_>>();
partition_values.sort();
assert_eq!(
table.get_partition_values().cloned().collect::<Vec<_>>(),
vec![
HashMap::from([("k".to_string(), None)]),
HashMap::from([("k".to_string(), Some("A".to_string()))]),
partition_values,
[
("k".to_string(), None),
("k".to_string(), Some("A".to_string()))
]
);
// Test Parquet files in path "tests/data/delta-0.8.0-special-partition"
let table = create_delta_table(
"tests/data/delta-0.8.0-special-partition",
vec![
schema_field("y", "long", true),
schema_field("x", "string", true),
schema_field("y", "long", true),
],
)
.await;
let mut partition_values = table
.get_partition_values()
.flat_map(|map| map.clone())
.collect::<Vec<_>>();
partition_values.sort();
assert_eq!(
table.get_partition_values().cloned().collect::<Vec<_>>(),
vec![
HashMap::from([("x".to_string(), Some("A/A".to_string()))]),
HashMap::from([("x".to_string(), Some("B B".to_string()))])
partition_values,
[
("x".to_string(), Some("A/A".to_string())),
("x".to_string(), Some("B B".to_string())),
]
);
// Test Parquet files in path "tests/data/delta-0.8.0-partitioned"
let table = create_delta_table(
"tests/data/delta-0.8.0-partitioned",
vec![
schema_field("day", "string", true),
schema_field("month", "string", true),
schema_field("value", "string", true),
schema_field("year", "string", true),
schema_field("month", "string", true),
schema_field("day", "string", true),
],
)
.await;
Expand Down Expand Up @@ -685,12 +676,14 @@ mod tests {
version,
"Testing location: {test_data_path:?}"
);
let mut schema_fields = table
.get_schema()
.expect("Failed to get schema")
.get_fields()
.clone();
schema_fields.sort_by(|a, b| a.get_name().cmp(b.get_name()));
assert_eq!(
*table
.get_schema()
.expect("Failed to get schema")
.get_fields(),
expected_schema_fields,
schema_fields, expected_schema_fields,
"Testing location: {test_data_path:?}"
);
table
Expand All @@ -713,7 +706,7 @@ mod tests {
#[tokio::test]
async fn test_convert_to_delta_from_path() {
// Test delta table location "tests/data/delta-2.2.0-partitioned-types"
get_delta_table_from_path(
let table = get_delta_table_from_path(
"tests/data/delta-2.2.0-partitioned-types",
0,
vec![
Expand All @@ -724,15 +717,45 @@ mod tests {
"tests/data/delta-2.2.0-partitioned-types",
)
.await;
let mut partition_values = table
.get_partition_values()
.flat_map(|map| map.clone())
.collect::<Vec<_>>();
partition_values.sort();
assert_eq!(
partition_values,
[
("c1".to_string(), Some("4".to_string())),
("c1".to_string(), Some("5".to_string())),
("c1".to_string(), Some("6".to_string())),
("c2".to_string(), Some("a".to_string())),
("c2".to_string(), Some("b".to_string())),
("c2".to_string(), Some("c".to_string())),
]
);
// Test Parquet files in path "tests/data/delta-0.8.0-numeric-partition"
create_delta_table_from_path(
let table = create_delta_table_from_path(
"tests/data/delta-0.8.0-numeric-partition",
vec![
schema_field("z", "string", true),
schema_field("x", "string", true),
schema_field("y", "string", true),
schema_field("z", "string", true),
],
)
.await;
let mut partition_values = table
.get_partition_values()
.flat_map(|map| map.clone())
.collect::<Vec<_>>();
partition_values.sort();
assert_eq!(
partition_values,
[
("x".to_string(), Some("10".to_string())),
("x".to_string(), Some("9".to_string())),
("y".to_string(), Some("10.0".to_string())),
("y".to_string(), Some("9.9".to_string())),
]
);
}
}
1 change: 1 addition & 0 deletions rust/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::storage::{commit_uri_from_version, ObjectStoreRef};

pub mod builder;
pub mod config;
#[cfg(feature = "parquet")]
pub mod convert_to_delta;
pub mod state;
#[cfg(feature = "arrow")]
Expand Down

0 comments on commit 80d113f

Please sign in to comment.