Skip to content

Commit

Permalink
feat: Add convert_to_delta
Browse files Browse the repository at this point in the history
Add a convert_to_delta method for converting a Parquet table to a Delta
Table in place.
  • Loading branch information
junjunjd committed Oct 1, 2023
1 parent fcfd1bf commit 781ea5b
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 0 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
utime = "0.3"
hyper = { version = "0.14", features = ["server"] }
test-case = "3.2.1"

[features]
azure = ["object_store/azure"]
Expand Down
267 changes: 267 additions & 0 deletions rust/src/table/convert_to_delta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
//! Convert a Parquet table to a Delta table in place

use crate::{
errors::DeltaTableError,
open_table,
operations::create::CreateBuilder,
protocol::{Action, Add, SaveMode},
schema::Schema,
storage::DeltaObjectStore,
DeltaTable, DeltaTablePartition, SchemaDataType, SchemaField,
};
use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError};
use futures::StreamExt;
use log::*;
use object_store::{ObjectMeta, ObjectStore};
use parquet::{
arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder},
errors::ParquetError,
};
use std::{
collections::{HashMap, HashSet},
num::TryFromIntError,
sync::Arc,
};
use url::Url;

/// Error converting a Parquet table to a Delta table
#[allow(dead_code)]
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Object store error
#[error("Object store error: {0}")]
ObjectStore(#[from] object_store::Error),
/// Arrow error
#[error("Arrow error: {0}")]
Arrow(#[from] ArrowError),
/// Parquet error
#[error("Parquet error: {0}")]
Parquet(#[from] ParquetError),
/// DeltaTable error
#[error("DeltaTable error: {0}")]
DeltaTable(#[from] DeltaTableError),
/// Error converting usize to i64
#[error("Error converting usize to i64: {0}")]
TryFromUsize(#[from] TryFromIntError),
/// Error canonicalizing the given path
#[error("Error canonicalizing the given path: {0}")]
CanonicalizePath(#[from] std::io::Error),
/// Error converting the path into an URL
#[error("Error converting the path into an URL")]
UrlFromFilePath,
}

/// Convert a Parquet table to a Delta table in place
pub async fn convert_to_delta(path: &str) -> Result<DeltaTable, Error> {
match open_table(path).await {
Ok(table) => {
info!("A Delta table already exists in the given path");
Ok(table)
}
Err(_) => {
info!("Converting Parquet table into a Delta table...");
let url = url(path)?;
let store = object_store(url.clone())?;
let parquet_files = parquet_files(store.clone()).await?;
delta_table(parquet_files, store, url).await
}
}
}

fn url(path: &str) -> Result<Url, Error> {
Ok(if let Ok(url) = Url::parse(path) {
url
} else {
info!(
"Cannot parse an absolute URL from the given path {path:?}. Canonicalizing the path..."
);
Url::from_file_path(std::fs::canonicalize(path)?).map_err(|_| Error::UrlFromFilePath)?
})
}

fn object_store(url: Url) -> Result<Arc<DeltaObjectStore>, Error> {
debug!("Creating an object store for URL: {url:#?}");
Ok(Arc::new(DeltaObjectStore::try_new(url, HashMap::new())?))
}

async fn parquet_files(store: Arc<DeltaObjectStore>) -> Result<Vec<ObjectMeta>, Error> {
let objects = store.list(None).await?;
Ok(objects
.filter_map(|path| async move {
if let Ok(meta) = path {
if Some("parquet") == meta.location.extension() {
debug!("Found parquet file in {:#?}", meta.location);
Some(meta)
} else {
None
}
} else {
None
}
})
.collect()
.await)
}

async fn arrow_schema(
file: &ObjectMeta,
store: Arc<DeltaObjectStore>,
) -> Result<ArrowSchema, Error> {
Ok(
ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(store.clone(), file.clone()))
.await?
.schema()
.as_ref()
.clone(),
)
}

fn add_action(
file: &ObjectMeta,
partition_values: HashMap<String, Option<String>>,
) -> Result<Action, Error> {
Ok(Action::add(Add {
path: file.location.to_string(),
size: i64::try_from(file.size)?,
partition_values,
modification_time: file.last_modified.timestamp_millis(),
..Default::default()
}))
}

fn partitions(
file: &ObjectMeta,
// A HashSet for all unique partition columns in a Parquet table
partition_columns: &mut HashSet<String>,
// A Vector for schema fields of all unique partition columns in a Parquet table
partition_schema_fields: &mut Vec<SchemaField>,
) -> Result<HashMap<String, Option<String>>, Error> {
// A HashMap from partition column to value for this parquet file
let mut partitions = HashMap::new();
for s in file.location.as_ref().split('/') {
if !s.ends_with(".parquet") {
let partition = DeltaTablePartition::try_from(s)?;
let (key, val) = (partition.key.to_string(), partition.value.to_string());
debug!(
"Found partition {partition:#?} in parquet file {:#?}",
file.location
);
partitions.insert(key.clone(), Some(val));
if partition_columns.insert(key.clone()) {
partition_schema_fields.push(SchemaField::new(
key,
SchemaDataType::primitive("string".to_string()),
true,
HashMap::new(),
));
}
}
}
Ok(partitions)
}

async fn delta_table(
files: Vec<ObjectMeta>,
store: Arc<DeltaObjectStore>,
url: Url,
) -> Result<DeltaTable, Error> {
let (mut schemas, mut actions, mut partition_columns, mut partition_schema_fields) =
(Vec::new(), Vec::new(), HashSet::new(), Vec::new());
for file in files {
debug!("Processing parquet file: {:#?}", file.location);
schemas.push(arrow_schema(&file, store.clone()).await?);
let partition_values =
partitions(&file, &mut partition_columns, &mut partition_schema_fields)?;
actions.push(add_action(&file, partition_values)?);
}
let mut schema_fields = Schema::try_from(&ArrowSchema::try_merge(schemas)?)?
.get_fields()
.clone();
schema_fields.append(&mut partition_schema_fields);
debug!("Schema fields for the parquet table: {schema_fields:#?}");
Ok(CreateBuilder::new()
.with_location(url)
.with_object_store(store.clone())
.with_columns(schema_fields)
.with_partition_columns(partition_columns.into_iter())
.with_actions(actions)
.with_save_mode(SaveMode::Ignore)
.await?)
}

#[cfg(test)]
mod tests {
use super::{
object_store, parquet_files, partitions, url, HashMap, HashSet, SchemaDataType, SchemaField,
};
use object_store::path::Path;
use test_case::test_case;

fn schema_field(key: &str) -> SchemaField {
SchemaField::new(
key.to_string(),
SchemaDataType::primitive("string".to_string()),
true,
HashMap::new(),
)
}

#[test_case("tests/data/delta-0.8.0" =>
vec![
Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"),
]
)]
#[tokio::test]
async fn test_parquet_files(path: &str) -> Vec<Path> {
let files = parquet_files(
object_store(url(path).expect("Failed to get URL from the path"))
.expect("Failed to create an object store"),
)
.await
.expect("Failed to get parquet files");
files
.into_iter()
.map(|meta| meta.location)
.collect::<Vec<_>>()
}

#[test_case(
"tests/data/delta-0.8.0-partitioned" => (
HashSet::from(["month".to_string(), "day".to_string(), "year".to_string()]),
vec![
schema_field("year"),
schema_field("month"),
schema_field("day"),
]
))]
#[tokio::test]
async fn test_partitions(path: &str) -> (HashSet<String>, Vec<SchemaField>) {
let files = parquet_files(
object_store(url(path).expect("Failed to get URL from the path"))
.expect("Failed to create an object store"),
)
.await
.expect("Failed to get parquet files");
let (mut partition_columns, mut partition_schema_fields) = (HashSet::new(), Vec::new());
for file in files {
partitions(&file, &mut partition_columns, &mut partition_schema_fields)
.expect("Failed to get partitions");
}
(partition_columns, partition_schema_fields)
}

// TODO
#[allow(dead_code)]
fn test_arrow_schema() {}
// TODO
#[allow(dead_code)]
fn test_add_action() {}
// TODO
#[allow(dead_code)]
fn test_delta_table() {}
// TODO
#[allow(dead_code)]
fn test_convert_to_delta() {}
}
1 change: 1 addition & 0 deletions rust/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::storage::{commit_uri_from_version, ObjectStoreRef};

pub mod builder;
pub mod config;
pub mod convert_to_delta;
pub mod state;
#[cfg(feature = "arrow")]
pub mod state_arrow;
Expand Down

0 comments on commit 781ea5b

Please sign in to comment.