diff --git a/rust/src/action.rs b/rust/src/action.rs index de5b9a10bb..40c3cc8bfa 100644 --- a/rust/src/action.rs +++ b/rust/src/action.rs @@ -354,7 +354,7 @@ impl Add { } /// Describes the data format of files in the table. -#[derive(Serialize, Deserialize, Debug, Default, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct Format { /// Name of the encoding for files in this table. provider: String, @@ -362,6 +362,28 @@ pub struct Format { options: Option>, } +impl Format { + /// Allows creation of a new action::Format + pub fn new(provider: String, options: Option>) -> Self { + Self { provider, options } + } + + /// Return the Format provider + pub fn get_provider(self) -> String { + self.provider + } +} + +// Assuming this is a more appropriate default than derived Default +impl Default for Format { + fn default() -> Self { + Self { + provider: "parquet".to_string(), + options: Default::default(), + } + } +} + /// Action that describes the metadata of the table. /// This is a top-level action in Delta log entries. #[derive(Serialize, Deserialize, Debug, Default, Clone)] @@ -604,7 +626,7 @@ impl Remove { /// Action used by streaming systems to track progress using application-specific versions to /// enable idempotency. -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(rename_all = "camelCase")] pub struct Txn { /// A unique identifier for the application performing the transaction. @@ -657,7 +679,7 @@ impl Txn { /// Action used to increase the version of the Delta protocol required to read or write to the /// table. -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)] #[serde(rename_all = "camelCase")] pub struct Protocol { /// Minimum version of the Delta read protocol a client must implement to correctly read the @@ -702,7 +724,7 @@ impl Protocol { /// Represents an action in the Delta log. The Delta log is an aggregate of all actions performed /// on the table, so the full list of actions is required to properly read a table. -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub enum Action { /// Changes the current metadata of the table. Must be present in the first version of a table. /// Subsequent `metaData` actions completely overwrite previous metadata. diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 9afa3a59cd..1860d9fc9e 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -187,7 +187,7 @@ pub enum DeltaTableError { } /// Delta table metadata -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct DeltaTableMetaData { /// Unique identifier for this table pub id: Guid, @@ -207,6 +207,31 @@ pub struct DeltaTableMetaData { pub configuration: HashMap, } +impl DeltaTableMetaData { + /// Create metadata for a DeltaTable from scratch + pub fn new( + name: Option, + description: Option, + format: Option, + schema: Schema, + partition_columns: Vec, + configuration: HashMap, + ) -> Self { + // Reference implementation uses uuid v4 to create GUID: + // https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L350 + Self { + id: Uuid::new_v4().to_string(), + name, + description, + format: format.unwrap_or_default(), + schema, + partition_columns, + created_time: Utc::now().timestamp_millis(), + configuration, + } + } +} + impl fmt::Display for DeltaTableMetaData { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( @@ -1035,6 +1060,31 @@ impl DeltaTable { }) } + /// Create a DeltaTable with version 0 given the provided MetaData and Protocol + pub async fn create( + &mut self, + metadata: DeltaTableMetaData, + protocol: action::Protocol, + ) -> Result<(), DeltaTransactionError> { + let meta = action::MetaData::try_from(metadata)?; + + // TODO add commit info action + let actions = vec![Action::protocol(protocol), Action::metaData(meta)]; + let mut transaction = self.create_transaction(None); + transaction.add_actions(actions.clone()); + + let prepared_commit = transaction.prepare_commit(None).await?; + self.try_commit_transaction(&prepared_commit, 0).await?; + + // Mutate the DeltaTable's state using process_action() + // in order to get most up-to-date state based on the commit above + for action in actions { + let _ = process_action(&mut self.state, action)?; + } + + Ok(()) + } + /// Time travel Delta table to latest version that's created at or before provided `datetime` /// argument. /// @@ -1493,7 +1543,7 @@ pub fn crate_version() -> &'static str { mod tests { use super::*; use pretty_assertions::assert_eq; - use std::collections::HashMap; + use std::{collections::HashMap, fs::File, path::Path}; #[test] fn state_records_new_txn_version() { @@ -1578,4 +1628,90 @@ mod tests { assert!(parquet_filename.contains("col1=a/col2=b/part-00000-")); } } + + #[tokio::test] + async fn test_create_delta_table() { + // Setup + let test_schema = Schema::new( + "test".to_string(), + vec![ + SchemaField::new( + "Id".to_string(), + SchemaDataType::primitive("integer".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "Name".to_string(), + SchemaDataType::primitive("string".to_string()), + true, + HashMap::new(), + ), + ], + ); + + let delta_md = DeltaTableMetaData::new( + Some("Test Table Create".to_string()), + Some("This table is made to test the create function for a DeltaTable".to_string()), + None, + test_schema, + vec![], + HashMap::new(), + ); + + let protocol = action::Protocol { + min_reader_version: 1, + min_writer_version: 2, + }; + + let tmp_dir = tempdir::TempDir::new("create_table_test").unwrap(); + let table_dir = tmp_dir.path().join("test_create"); + + let path = table_dir.to_str().unwrap(); + let backend = Box::new(storage::file::FileStorageBackend::new( + tmp_dir.path().to_str().unwrap(), + )); + let mut dt = DeltaTable::new(path, backend).unwrap(); + + // Action + dt.create(delta_md.clone(), protocol.clone()).await.unwrap(); + + // Validation + // assert DeltaTable version is now 0 and no data files have been added + assert_eq!(dt.version, 0); + assert_eq!(dt.state.files.len(), 0); + + // assert new _delta_log file created in tempDir + let table_path = Path::new(&dt.table_uri); + assert!(table_path.exists()); + + let delta_log = table_path.join("_delta_log"); + assert!(delta_log.exists()); + + let version_file = delta_log.join("00000000000000000000.json"); + assert!(version_file.exists()); + + // Checking the data written to delta table is the same when read back + let version_data = File::open(version_file).unwrap(); + let lines = BufReader::new(version_data).lines(); + + for line in lines { + let action: Action = serde_json::from_str(line.unwrap().as_str()).unwrap(); + match action { + Action::protocol(action) => { + assert_eq!(action, protocol); + } + Action::metaData(action) => { + assert_eq!(DeltaTableMetaData::try_from(action).unwrap(), delta_md); + } + _ => (), + } + } + + // assert DeltaTableState metadata matches fields in above DeltaTableMetaData + // assert metadata name + let current_metadata = dt.get_metadata().unwrap(); + assert!(current_metadata.partition_columns.is_empty()); + assert!(current_metadata.configuration.is_empty()); + } } diff --git a/rust/src/schema.rs b/rust/src/schema.rs index 098f66caed..cb13ca4e55 100644 --- a/rust/src/schema.rs +++ b/rust/src/schema.rs @@ -44,6 +44,21 @@ pub struct SchemaField { } impl SchemaField { + /// Create a new SchemaField from scratch + pub fn new( + name: String, + r#type: SchemaDataType, + nullable: bool, + metadata: HashMap, + ) -> Self { + Self { + name, + r#type, + nullable, + metadata, + } + } + /// The column name of the schema field. pub fn get_name(&self) -> &str { &self.name @@ -147,7 +162,7 @@ pub enum SchemaDataType { } /// Represents the schema of the delta table. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct Schema { r#type: String, fields: Vec, @@ -158,4 +173,9 @@ impl Schema { pub fn get_fields(&self) -> &Vec { &self.fields } + + /// Create a new Schema using a vector of SchemaFields + pub fn new(r#type: String, fields: Vec) -> Self { + Self { r#type, fields } + } }