-
Notifications
You must be signed in to change notification settings - Fork 400
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a convert_to_delta method for converting a Parquet table to a Delta Table in place.
- Loading branch information
Showing
3 changed files
with
279 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,277 @@ | ||
//! Convert a Parquet table to a Delta table in place | ||
|
||
use crate::{ | ||
errors::DeltaTableError, | ||
open_table, | ||
operations::create::CreateBuilder, | ||
protocol::{Action, Add, SaveMode}, | ||
schema::Schema, | ||
storage::DeltaObjectStore, | ||
DeltaTable, DeltaTablePartition, SchemaDataType, SchemaField, | ||
}; | ||
use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError}; | ||
use futures::StreamExt; | ||
use log::*; | ||
use object_store::{ObjectMeta, ObjectStore}; | ||
use parquet::{ | ||
arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}, | ||
errors::ParquetError, | ||
}; | ||
use std::{ | ||
collections::{HashMap, HashSet}, | ||
num::TryFromIntError, | ||
sync::Arc, | ||
}; | ||
use url::Url; | ||
|
||
/// Error converting a Parquet table to a Delta table | ||
#[derive(Debug, thiserror::Error)] | ||
pub enum Error { | ||
/// Object store error | ||
#[error("Object store error: {0}")] | ||
ObjectStore(#[from] object_store::Error), | ||
/// Arrow error | ||
#[error("Arrow error: {0}")] | ||
Arrow(#[from] ArrowError), | ||
/// Parquet error | ||
#[error("Parquet error: {0}")] | ||
Parquet(#[from] ParquetError), | ||
/// DeltaTable error | ||
#[error("DeltaTable error: {0}")] | ||
DeltaTable(#[from] DeltaTableError), | ||
/// Error converting usize to i64 | ||
#[error("Error converting usize to i64: {0}")] | ||
TryFromUsize(#[from] TryFromIntError), | ||
/// Error canonicalizing the given path | ||
#[error("Error canonicalizing the given path: {0}")] | ||
CanonicalizePath(#[from] std::io::Error), | ||
/// Error converting the path into an URL | ||
#[error("Error converting the path into an URL")] | ||
UrlFromFilePath, | ||
} | ||
|
||
/// Convert a Parquet table to a Delta table in place | ||
pub async fn convert_to_delta(path: &str) -> Result<DeltaTable, Error> { | ||
match open_table(path).await { | ||
Ok(table) => { | ||
info!("A Delta table already exists in the given path"); | ||
Ok(table) | ||
} | ||
Err(_) => { | ||
info!("Converting Parquet table into a Delta table..."); | ||
let url = url(path)?; | ||
let store = object_store(url.clone())?; | ||
let parquet_files = parquet_files(store.clone()).await?; | ||
delta_table(parquet_files, store, url).await | ||
} | ||
} | ||
} | ||
|
||
fn url(path: &str) -> Result<Url, Error> { | ||
Ok(if let Ok(url) = Url::parse(path) { | ||
url | ||
} else { | ||
info!( | ||
"Cannot parse an absolute URL from the given path {path:?}. Canonicalizing the path..." | ||
); | ||
Url::from_file_path(std::fs::canonicalize(path)?).map_err(|_| Error::UrlFromFilePath)? | ||
}) | ||
} | ||
|
||
fn object_store(url: Url) -> Result<Arc<DeltaObjectStore>, Error> { | ||
debug!("Creating an object store for URL: {url:#?}"); | ||
Ok(Arc::new(DeltaObjectStore::try_new(url, HashMap::new())?)) | ||
} | ||
|
||
async fn parquet_files(store: Arc<DeltaObjectStore>) -> Result<Vec<ObjectMeta>, Error> { | ||
let objects = store.list(None).await?; | ||
Ok(objects | ||
.filter_map(|path| async move { | ||
if let Ok(meta) = path { | ||
if Some("parquet") == meta.location.extension() { | ||
debug!("Found parquet file in {:#?}", meta.location); | ||
Some(meta) | ||
} else { | ||
None | ||
} | ||
} else { | ||
None | ||
} | ||
}) | ||
.collect() | ||
.await) | ||
} | ||
|
||
async fn arrow_schema( | ||
file: &ObjectMeta, | ||
store: Arc<DeltaObjectStore>, | ||
) -> Result<ArrowSchema, Error> { | ||
Ok( | ||
ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(store.clone(), file.clone())) | ||
.await? | ||
.schema() | ||
.as_ref() | ||
.clone(), | ||
) | ||
} | ||
|
||
fn add_action( | ||
file: &ObjectMeta, | ||
partition_values: HashMap<String, Option<String>>, | ||
) -> Result<Action, Error> { | ||
Ok(Action::add(Add { | ||
path: file.location.to_string(), | ||
size: i64::try_from(file.size)?, | ||
partition_values, | ||
modification_time: file.last_modified.timestamp_millis(), | ||
..Default::default() | ||
})) | ||
} | ||
|
||
fn partitions( | ||
file: &ObjectMeta, | ||
// A HashSet for all unique partition columns in a Parquet table | ||
partition_columns: &mut HashSet<String>, | ||
// A Vector for schema fields of all unique partition columns in a Parquet table | ||
partition_schema_fields: &mut Vec<SchemaField>, | ||
) -> Result<HashMap<String, Option<String>>, Error> { | ||
// A HashMap from partition column to value for this parquet file | ||
let mut partitions = HashMap::new(); | ||
for s in file.location.as_ref().split('/') { | ||
if !s.ends_with(".parquet") { | ||
let partition = DeltaTablePartition::try_from(s)?; | ||
let (key, val) = (partition.key.to_string(), partition.value.to_string()); | ||
debug!( | ||
"Found partition {partition:#?} in parquet file {:#?}", | ||
file.location | ||
); | ||
partitions.insert(key.clone(), Some(val)); | ||
if partition_columns.insert(key.clone()) { | ||
partition_schema_fields.push(SchemaField::new( | ||
key, | ||
SchemaDataType::primitive("string".to_string()), | ||
true, | ||
HashMap::new(), | ||
)); | ||
} | ||
} | ||
} | ||
Ok(partitions) | ||
} | ||
|
||
async fn delta_table( | ||
files: Vec<ObjectMeta>, | ||
store: Arc<DeltaObjectStore>, | ||
url: Url, | ||
) -> Result<DeltaTable, Error> { | ||
let (mut schemas, mut actions, mut partition_columns, mut partition_schema_fields) = | ||
(Vec::new(), Vec::new(), HashSet::new(), Vec::new()); | ||
for file in files { | ||
debug!("Processing parquet file: {:#?}", file.location); | ||
schemas.push(arrow_schema(&file, store.clone()).await?); | ||
let partition_values = | ||
partitions(&file, &mut partition_columns, &mut partition_schema_fields)?; | ||
actions.push(add_action(&file, partition_values)?); | ||
} | ||
let mut schema_fields = Schema::try_from(&ArrowSchema::try_merge(schemas)?)? | ||
.get_fields() | ||
.clone(); | ||
schema_fields.append(&mut partition_schema_fields); | ||
debug!("Schema fields for the parquet table: {schema_fields:#?}"); | ||
Ok(CreateBuilder::new() | ||
.with_location(url) | ||
.with_object_store(store.clone()) | ||
.with_columns(schema_fields) | ||
.with_partition_columns(partition_columns.into_iter()) | ||
.with_actions(actions) | ||
.with_save_mode(SaveMode::Ignore) | ||
.await?) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::{ | ||
object_store, parquet_files, partitions, url, HashMap, HashSet, SchemaDataType, SchemaField, | ||
}; | ||
use object_store::path::Path; | ||
use test_case::test_case; | ||
|
||
fn schema_field(key: &str) -> SchemaField { | ||
SchemaField::new( | ||
key.to_string(), | ||
SchemaDataType::primitive("string".to_string()), | ||
true, | ||
HashMap::new(), | ||
) | ||
} | ||
|
||
#[test_case("tests/data/delta-0.8.0" => | ||
vec![ | ||
Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"), | ||
Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"), | ||
Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"), | ||
] | ||
)] | ||
#[test_case("tests/data/delta-0.8.0-numeric-partition" => | ||
vec![ | ||
Path::from("x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"), | ||
Path::from("x=10/y=10.0/part-00015-24eb4845-2d25-4448-b3bb-5ed7f12635ab.c000.snappy.parquet"), | ||
] | ||
)] | ||
#[tokio::test] | ||
async fn test_parquet_files(path: &str) -> Vec<Path> { | ||
let files = parquet_files( | ||
object_store(url(path).expect("Failed to get URL from the path")) | ||
.expect("Failed to create an object store"), | ||
) | ||
.await | ||
.expect("Failed to get parquet files"); | ||
files | ||
.into_iter() | ||
.map(|meta| meta.location) | ||
.collect::<Vec<_>>() | ||
} | ||
|
||
#[test_case( | ||
"tests/data/delta-0.8.0-partitioned" => ( | ||
HashSet::from(["month".to_string(), "day".to_string(), "year".to_string()]), | ||
vec![ | ||
schema_field("year"), | ||
schema_field("month"), | ||
schema_field("day"), | ||
] | ||
))] | ||
#[test_case( | ||
"tests/data/delta-0.8.0-special-partition" => ( | ||
HashSet::from(["x".to_string()]), | ||
vec![ | ||
schema_field("x"), | ||
] | ||
))] | ||
#[test_case( | ||
"tests/data/delta-0.8.0-null-partition" => ( | ||
HashSet::from(["k".to_string()]), | ||
vec![ | ||
schema_field("k"), | ||
] | ||
))] | ||
#[tokio::test] | ||
async fn test_partitions(path: &str) -> (HashSet<String>, Vec<SchemaField>) { | ||
let files = parquet_files( | ||
object_store(url(path).expect("Failed to get URL from the path")) | ||
.expect("Failed to create an object store"), | ||
) | ||
.await | ||
.expect("Failed to get parquet files"); | ||
let (mut partition_columns, mut partition_schema_fields) = (HashSet::new(), Vec::new()); | ||
for file in files { | ||
partitions(&file, &mut partition_columns, &mut partition_schema_fields) | ||
.expect("Failed to get partitions"); | ||
} | ||
(partition_columns, partition_schema_fields) | ||
} | ||
|
||
// TODO | ||
#[allow(dead_code)] | ||
fn test_convert_to_delta() {} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters