Skip to content

Commit

Permalink
[rust] Low level create table API (#342)
Browse files Browse the repository at this point in the history
Co-authored-by: QP Hou <dave2008713@gmail.com>
  • Loading branch information
Smurphy000 and houqp authored Aug 4, 2021
1 parent 773a31e commit 129940f
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 7 deletions.
30 changes: 26 additions & 4 deletions rust/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,36 @@ impl Add {
}

/// Describes the data format of files in the table.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct Format {
/// Name of the encoding for files in this table.
provider: String,
/// A map containing configuration options for the format.
options: Option<HashMap<String, String>>,
}

impl Format {
/// Allows creation of a new action::Format
pub fn new(provider: String, options: Option<HashMap<String, String>>) -> Self {
Self { provider, options }
}

/// Return the Format provider
pub fn get_provider(self) -> String {
self.provider
}
}

// Assuming this is a more appropriate default than derived Default
impl Default for Format {
fn default() -> Self {
Self {
provider: "parquet".to_string(),
options: Default::default(),
}
}
}

/// Action that describes the metadata of the table.
/// This is a top-level action in Delta log entries.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand Down Expand Up @@ -604,7 +626,7 @@ impl Remove {

/// Action used by streaming systems to track progress using application-specific versions to
/// enable idempotency.
#[derive(Serialize, Deserialize, Debug, Default)]
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Txn {
/// A unique identifier for the application performing the transaction.
Expand Down Expand Up @@ -657,7 +679,7 @@ impl Txn {

/// Action used to increase the version of the Delta protocol required to read or write to the
/// table.
#[derive(Serialize, Deserialize, Debug, Default)]
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Protocol {
/// Minimum version of the Delta read protocol a client must implement to correctly read the
Expand Down Expand Up @@ -702,7 +724,7 @@ impl Protocol {

/// Represents an action in the Delta log. The Delta log is an aggregate of all actions performed
/// on the table, so the full list of actions is required to properly read a table.
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Action {
/// Changes the current metadata of the table. Must be present in the first version of a table.
/// Subsequent `metaData` actions completely overwrite previous metadata.
Expand Down
140 changes: 138 additions & 2 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ pub enum DeltaTableError {
}

/// Delta table metadata
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct DeltaTableMetaData {
/// Unique identifier for this table
pub id: Guid,
Expand All @@ -207,6 +207,31 @@ pub struct DeltaTableMetaData {
pub configuration: HashMap<String, String>,
}

impl DeltaTableMetaData {
/// Create metadata for a DeltaTable from scratch
pub fn new(
name: Option<String>,
description: Option<String>,
format: Option<action::Format>,
schema: Schema,
partition_columns: Vec<String>,
configuration: HashMap<String, String>,
) -> Self {
// Reference implementation uses uuid v4 to create GUID:
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L350
Self {
id: Uuid::new_v4().to_string(),
name,
description,
format: format.unwrap_or_default(),
schema,
partition_columns,
created_time: Utc::now().timestamp_millis(),
configuration,
}
}
}

impl fmt::Display for DeltaTableMetaData {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
Expand Down Expand Up @@ -1035,6 +1060,31 @@ impl DeltaTable {
})
}

/// Create a DeltaTable with version 0 given the provided MetaData and Protocol
pub async fn create(
&mut self,
metadata: DeltaTableMetaData,
protocol: action::Protocol,
) -> Result<(), DeltaTransactionError> {
let meta = action::MetaData::try_from(metadata)?;

// TODO add commit info action
let actions = vec![Action::protocol(protocol), Action::metaData(meta)];
let mut transaction = self.create_transaction(None);
transaction.add_actions(actions.clone());

let prepared_commit = transaction.prepare_commit(None).await?;
self.try_commit_transaction(&prepared_commit, 0).await?;

// Mutate the DeltaTable's state using process_action()
// in order to get most up-to-date state based on the commit above
for action in actions {
let _ = process_action(&mut self.state, action)?;
}

Ok(())
}

/// Time travel Delta table to latest version that's created at or before provided `datetime`
/// argument.
///
Expand Down Expand Up @@ -1493,7 +1543,7 @@ pub fn crate_version() -> &'static str {
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::{collections::HashMap, fs::File, path::Path};

#[test]
fn state_records_new_txn_version() {
Expand Down Expand Up @@ -1578,4 +1628,90 @@ mod tests {
assert!(parquet_filename.contains("col1=a/col2=b/part-00000-"));
}
}

#[tokio::test]
async fn test_create_delta_table() {
// Setup
let test_schema = Schema::new(
"test".to_string(),
vec![
SchemaField::new(
"Id".to_string(),
SchemaDataType::primitive("integer".to_string()),
true,
HashMap::new(),
),
SchemaField::new(
"Name".to_string(),
SchemaDataType::primitive("string".to_string()),
true,
HashMap::new(),
),
],
);

let delta_md = DeltaTableMetaData::new(
Some("Test Table Create".to_string()),
Some("This table is made to test the create function for a DeltaTable".to_string()),
None,
test_schema,
vec![],
HashMap::new(),
);

let protocol = action::Protocol {
min_reader_version: 1,
min_writer_version: 2,
};

let tmp_dir = tempdir::TempDir::new("create_table_test").unwrap();
let table_dir = tmp_dir.path().join("test_create");

let path = table_dir.to_str().unwrap();
let backend = Box::new(storage::file::FileStorageBackend::new(
tmp_dir.path().to_str().unwrap(),
));
let mut dt = DeltaTable::new(path, backend).unwrap();

// Action
dt.create(delta_md.clone(), protocol.clone()).await.unwrap();

// Validation
// assert DeltaTable version is now 0 and no data files have been added
assert_eq!(dt.version, 0);
assert_eq!(dt.state.files.len(), 0);

// assert new _delta_log file created in tempDir
let table_path = Path::new(&dt.table_uri);
assert!(table_path.exists());

let delta_log = table_path.join("_delta_log");
assert!(delta_log.exists());

let version_file = delta_log.join("00000000000000000000.json");
assert!(version_file.exists());

// Checking the data written to delta table is the same when read back
let version_data = File::open(version_file).unwrap();
let lines = BufReader::new(version_data).lines();

for line in lines {
let action: Action = serde_json::from_str(line.unwrap().as_str()).unwrap();
match action {
Action::protocol(action) => {
assert_eq!(action, protocol);
}
Action::metaData(action) => {
assert_eq!(DeltaTableMetaData::try_from(action).unwrap(), delta_md);
}
_ => (),
}
}

// assert DeltaTableState metadata matches fields in above DeltaTableMetaData
// assert metadata name
let current_metadata = dt.get_metadata().unwrap();
assert!(current_metadata.partition_columns.is_empty());
assert!(current_metadata.configuration.is_empty());
}
}
22 changes: 21 additions & 1 deletion rust/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ pub struct SchemaField {
}

impl SchemaField {
/// Create a new SchemaField from scratch
pub fn new(
name: String,
r#type: SchemaDataType,
nullable: bool,
metadata: HashMap<String, String>,
) -> Self {
Self {
name,
r#type,
nullable,
metadata,
}
}

/// The column name of the schema field.
pub fn get_name(&self) -> &str {
&self.name
Expand Down Expand Up @@ -147,7 +162,7 @@ pub enum SchemaDataType {
}

/// Represents the schema of the delta table.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct Schema {
r#type: String,
fields: Vec<SchemaField>,
Expand All @@ -158,4 +173,9 @@ impl Schema {
pub fn get_fields(&self) -> &Vec<SchemaField> {
&self.fields
}

/// Create a new Schema using a vector of SchemaFields
pub fn new(r#type: String, fields: Vec<SchemaField>) -> Self {
Self { r#type, fields }
}
}

0 comments on commit 129940f

Please sign in to comment.