diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 931ab75b26..a0fad6579d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -111,3 +111,7 @@ integration_test = ["fs_extra", "tempdir"] [[bench]] name = "read_checkpoint" harness = false + +[[example]] +name = "basic_operations" +required-features = ["datafusion-ext"] diff --git a/rust/examples/basic_operations.rs b/rust/examples/basic_operations.rs new file mode 100644 index 0000000000..2b50f9968a --- /dev/null +++ b/rust/examples/basic_operations.rs @@ -0,0 +1,77 @@ +use arrow::{ + array::{Int32Array, StringArray}, + datatypes::{DataType, Field, Schema as ArrowSchema}, + record_batch::RecordBatch, +}; +use deltalake::operations::collect_sendable_stream; +use deltalake::{action::SaveMode, DeltaOps, SchemaDataType, SchemaField}; +use std::sync::Arc; + +fn get_table_columns() -> Vec { + vec![ + SchemaField::new( + String::from("int"), + SchemaDataType::primitive(String::from("integer")), + false, + Default::default(), + ), + SchemaField::new( + String::from("string"), + SchemaDataType::primitive(String::from("string")), + true, + Default::default(), + ), + ] +} + +fn get_table_batches() -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("int", DataType::Int32, false), + Field::new("string", DataType::Utf8, true), + ])); + + let int_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + let str_values = StringArray::from(vec!["A", "B", "A", "B", "A", "A", "A", "B", "B", "A", "A"]); + + RecordBatch::try_new(schema, vec![Arc::new(int_values), Arc::new(str_values)]).unwrap() +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), deltalake::DeltaTableError> { + // Create a delta operations client pointing at an un-initialized in-memory location. + // In a production environment this would be created with "try_new" and point at + // a real storage location. + let ops = DeltaOps::new_in_memory(); + + // The operations module uses a builder pattern that allows specifying several options + // on how the command behaves. The builders implement `Into`, so once + // options are set you can run the command using `.await`. + let table = ops + .create() + .with_columns(get_table_columns()) + .with_table_name("my_table") + .with_comment("A table to show how delta-rs works") + .await?; + + assert_eq!(table.version(), 0); + + let batch = get_table_batches(); + let table = DeltaOps(table).write(vec![batch.clone()]).await?; + + assert_eq!(table.version(), 1); + + // To overwrite instead of append (which is the default), use `.with_save_mode`: + let table = DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::Overwrite) + .await?; + + assert_eq!(table.version(), 2); + + let (_table, stream) = DeltaOps(table).load().await?; + let data: Vec = collect_sendable_stream(stream).await?; + + println!("{:?}", data); + + Ok(()) +} diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index e1204addc1..622afed005 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -14,7 +14,7 @@ use crate::storage::DeltaObjectStore; use crate::{DeltaResult, DeltaTable, DeltaTableError}; use futures::future::BoxFuture; -use serde_json::Value; +use serde_json::{Map, Value}; #[derive(thiserror::Error, Debug)] enum CreateError { @@ -51,10 +51,11 @@ pub struct CreateBuilder { comment: Option, columns: Vec, partition_columns: Option>, - properties: HashMap, storage_options: Option>, actions: Vec, object_store: Option>, + configuration: HashMap>, + metadata: Option>, } impl Default for CreateBuilder { @@ -71,12 +72,13 @@ impl CreateBuilder { location: None, mode: SaveMode::ErrorIfExists, comment: None, - columns: Vec::new(), + columns: Default::default(), partition_columns: None, - properties: HashMap::new(), storage_options: None, - actions: Vec::new(), + actions: Default::default(), object_store: None, + configuration: Default::default(), + metadata: Default::default(), } } @@ -138,12 +140,6 @@ impl CreateBuilder { self } - /// Specify a table property - pub fn with_property(mut self, key: impl Into, value: impl Into) -> Self { - self.properties.insert(key.into(), value.into()); - self - } - /// Set options used to initialize storage backend /// /// Options may be passed in the HashMap or set as environment variables. @@ -157,7 +153,36 @@ impl CreateBuilder { self } + /// Set configuration on created table + pub fn with_configuration(mut self, configuration: HashMap>) -> Self { + self.configuration = configuration; + self + } + + /// Specify a table property in the table configuration + pub fn with_configuration_property( + mut self, + key: impl Into, + value: Option>, + ) -> Self { + self.configuration + .insert(key.into(), value.map(|v| v.into())); + self + } + + /// Append custom (application-specific) metadata to the commit. + /// + /// This might include provenance information such as an id of the + /// user that made the commit or the program that created it. + pub fn with_metadata(mut self, metadata: Map) -> Self { + self.metadata = Some(metadata); + self + } + /// Specify additional actions to be added to the commit. + /// + /// This method is mainly meant for internal use. Manually adding inconsistent + /// actions to a create operation may have undesired effects - use with caution. pub fn with_actions(mut self, actions: impl IntoIterator) -> Self { self.actions.extend(actions); self @@ -222,7 +247,7 @@ impl CreateBuilder { None, SchemaTypeStruct::new(self.columns), self.partition_columns.unwrap_or_default(), - HashMap::new(), + self.configuration, ); let operation = DeltaOperation::Create { @@ -255,6 +280,7 @@ impl std::future::IntoFuture for CreateBuilder { Box::pin(async move { let mode = this.mode.clone(); + let metadata = this.metadata.clone(); let (mut table, actions, operation) = this.into_table_and_actions()?; if table.object_store().is_delta_table_location().await? { match mode { @@ -269,8 +295,14 @@ impl std::future::IntoFuture for CreateBuilder { } } } - let version = - commit(table.object_store().as_ref(), 0, actions, operation, None).await?; + let version = commit( + table.object_store().as_ref(), + 0, + actions, + operation, + metadata, + ) + .await?; table.load_version(version).await?; Ok(table) @@ -282,6 +314,7 @@ impl std::future::IntoFuture for CreateBuilder { mod tests { use super::*; use crate::operations::DeltaOps; + use crate::table_properties::APPEND_ONLY; use crate::writer::test_utils::get_delta_schema; #[tokio::test] @@ -323,7 +356,24 @@ mod tests { .await .unwrap(); assert_eq!(table.get_min_reader_version(), 0); - assert_eq!(table.get_min_writer_version(), 0) + assert_eq!(table.get_min_writer_version(), 0); + + let table = CreateBuilder::new() + .with_location("memory://") + .with_columns(schema.get_fields().clone()) + .with_configuration_property(APPEND_ONLY, Some("true")) + .await + .unwrap(); + let append = table + .get_metadata() + .unwrap() + .configuration + .get(APPEND_ONLY) + .unwrap() + .as_ref() + .unwrap() + .clone(); + assert_eq!(String::from("true"), append) } #[tokio::test] diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index 7ca233e20c..821b11b0f5 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -25,6 +25,7 @@ impl Default for LoadBuilder { } impl LoadBuilder { + /// Create a new [`LoadBuilder`] pub fn new() -> Self { Self { location: None, diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index ffc0bafad7..db9ae818a1 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -1,4 +1,11 @@ -//! High level delta commands that can be executed against a delta table +//! High level operations API to interact with Delta tables +//! +//! At the heart of the high level operations APIs is the [`DeltaOps`] struct, +//! which consumes a [`DeltaTable`] and exposes methods to attain builders for +//! several high level operations. The specific builder structs allow fine-tuning +//! the operations' behaviors and will return an updated table potentially in conjunction +//! with a [data stream][datafusion::physical_plan::SendableRecordBatchStream], +//! if the operation returns data as well. use self::create::CreateBuilder; use crate::builder::DeltaTableBuilder; @@ -54,7 +61,7 @@ impl DeltaOps { /// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table /// /// Using this will not persist any changes beyond the lifetime of the table object. - /// THe main purpose of in-memory tables is for use in testing. + /// The main purpose of in-memory tables is for use in testing. /// /// ``` /// use deltalake::DeltaOps; @@ -85,7 +92,7 @@ impl DeltaOps { CreateBuilder::default().with_object_store(self.0.object_store()) } - /// Write data to Delta table + /// Load data from a DeltaTable #[cfg(feature = "datafusion-ext")] #[must_use] pub fn load(self) -> LoadBuilder { @@ -95,7 +102,7 @@ impl DeltaOps { /// Write data to Delta table #[cfg(feature = "datafusion-ext")] #[must_use] - pub fn write(self, batches: Vec) -> WriteBuilder { + pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { WriteBuilder::default() .with_input_batches(batches) .with_object_store(self.0.object_store()) diff --git a/rust/src/operations/writer.rs b/rust/src/operations/writer.rs index f49a00d9b5..4a98102199 100644 --- a/rust/src/operations/writer.rs +++ b/rust/src/operations/writer.rs @@ -333,7 +333,7 @@ impl PartitionWriter { } async fn flush_arrow_writer(&mut self) -> DeltaResult<()> { - // replace counter / buffers adn close the current writer + // replace counter / buffers and close the current writer let (writer, buffer) = self.replace_arrow_buffer(vec![])?; let null_counts = std::mem::take(&mut self.null_counts); let metadata = writer.close()?;