Skip to content

Commit

Permalink
docs: add simple operations example (#953)
Browse files Browse the repository at this point in the history
# Description

Adding a simple example how to use operations APIs to create and work
with delta tables and some minor documentation tweaks.

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->

Co-authored-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
roeap and wjones127 authored Nov 22, 2022
1 parent f3176e0 commit 7a2f647
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 20 deletions.
4 changes: 4 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,7 @@ integration_test = ["fs_extra", "tempdir"]
[[bench]]
name = "read_checkpoint"
harness = false

[[example]]
name = "basic_operations"
required-features = ["datafusion-ext"]
77 changes: 77 additions & 0 deletions rust/examples/basic_operations.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use arrow::{
array::{Int32Array, StringArray},
datatypes::{DataType, Field, Schema as ArrowSchema},
record_batch::RecordBatch,
};
use deltalake::operations::collect_sendable_stream;
use deltalake::{action::SaveMode, DeltaOps, SchemaDataType, SchemaField};
use std::sync::Arc;

fn get_table_columns() -> Vec<SchemaField> {
vec![
SchemaField::new(
String::from("int"),
SchemaDataType::primitive(String::from("integer")),
false,
Default::default(),
),
SchemaField::new(
String::from("string"),
SchemaDataType::primitive(String::from("string")),
true,
Default::default(),
),
]
}

fn get_table_batches() -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("int", DataType::Int32, false),
Field::new("string", DataType::Utf8, true),
]));

let int_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
let str_values = StringArray::from(vec!["A", "B", "A", "B", "A", "A", "A", "B", "B", "A", "A"]);

RecordBatch::try_new(schema, vec![Arc::new(int_values), Arc::new(str_values)]).unwrap()
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), deltalake::DeltaTableError> {
// Create a delta operations client pointing at an un-initialized in-memory location.
// In a production environment this would be created with "try_new" and point at
// a real storage location.
let ops = DeltaOps::new_in_memory();

// The operations module uses a builder pattern that allows specifying several options
// on how the command behaves. The builders implement `Into<Future>`, so once
// options are set you can run the command using `.await`.
let table = ops
.create()
.with_columns(get_table_columns())
.with_table_name("my_table")
.with_comment("A table to show how delta-rs works")
.await?;

assert_eq!(table.version(), 0);

let batch = get_table_batches();
let table = DeltaOps(table).write(vec![batch.clone()]).await?;

assert_eq!(table.version(), 1);

// To overwrite instead of append (which is the default), use `.with_save_mode`:
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Overwrite)
.await?;

assert_eq!(table.version(), 2);

let (_table, stream) = DeltaOps(table).load().await?;
let data: Vec<RecordBatch> = collect_sendable_stream(stream).await?;

println!("{:?}", data);

Ok(())
}
80 changes: 65 additions & 15 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::storage::DeltaObjectStore;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

use futures::future::BoxFuture;
use serde_json::Value;
use serde_json::{Map, Value};

#[derive(thiserror::Error, Debug)]
enum CreateError {
Expand Down Expand Up @@ -51,10 +51,11 @@ pub struct CreateBuilder {
comment: Option<String>,
columns: Vec<SchemaField>,
partition_columns: Option<Vec<String>>,
properties: HashMap<String, Value>,
storage_options: Option<HashMap<String, String>>,
actions: Vec<Action>,
object_store: Option<Arc<DeltaObjectStore>>,
configuration: HashMap<String, Option<String>>,
metadata: Option<Map<String, Value>>,
}

impl Default for CreateBuilder {
Expand All @@ -71,12 +72,13 @@ impl CreateBuilder {
location: None,
mode: SaveMode::ErrorIfExists,
comment: None,
columns: Vec::new(),
columns: Default::default(),
partition_columns: None,
properties: HashMap::new(),
storage_options: None,
actions: Vec::new(),
actions: Default::default(),
object_store: None,
configuration: Default::default(),
metadata: Default::default(),
}
}

Expand Down Expand Up @@ -138,12 +140,6 @@ impl CreateBuilder {
self
}

/// Specify a table property
pub fn with_property(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
self.properties.insert(key.into(), value.into());
self
}

/// Set options used to initialize storage backend
///
/// Options may be passed in the HashMap or set as environment variables.
Expand All @@ -157,7 +153,36 @@ impl CreateBuilder {
self
}

/// Set configuration on created table
pub fn with_configuration(mut self, configuration: HashMap<String, Option<String>>) -> Self {
self.configuration = configuration;
self
}

/// Specify a table property in the table configuration
pub fn with_configuration_property(
mut self,
key: impl Into<String>,
value: Option<impl Into<String>>,
) -> Self {
self.configuration
.insert(key.into(), value.map(|v| v.into()));
self
}

/// Append custom (application-specific) metadata to the commit.
///
/// This might include provenance information such as an id of the
/// user that made the commit or the program that created it.
pub fn with_metadata(mut self, metadata: Map<String, Value>) -> Self {
self.metadata = Some(metadata);
self
}

/// Specify additional actions to be added to the commit.
///
/// This method is mainly meant for internal use. Manually adding inconsistent
/// actions to a create operation may have undesired effects - use with caution.
pub fn with_actions(mut self, actions: impl IntoIterator<Item = Action>) -> Self {
self.actions.extend(actions);
self
Expand Down Expand Up @@ -222,7 +247,7 @@ impl CreateBuilder {
None,
SchemaTypeStruct::new(self.columns),
self.partition_columns.unwrap_or_default(),
HashMap::new(),
self.configuration,
);

let operation = DeltaOperation::Create {
Expand Down Expand Up @@ -255,6 +280,7 @@ impl std::future::IntoFuture for CreateBuilder {

Box::pin(async move {
let mode = this.mode.clone();
let metadata = this.metadata.clone();
let (mut table, actions, operation) = this.into_table_and_actions()?;
if table.object_store().is_delta_table_location().await? {
match mode {
Expand All @@ -269,8 +295,14 @@ impl std::future::IntoFuture for CreateBuilder {
}
}
}
let version =
commit(table.object_store().as_ref(), 0, actions, operation, None).await?;
let version = commit(
table.object_store().as_ref(),
0,
actions,
operation,
metadata,
)
.await?;
table.load_version(version).await?;

Ok(table)
Expand All @@ -282,6 +314,7 @@ impl std::future::IntoFuture for CreateBuilder {
mod tests {
use super::*;
use crate::operations::DeltaOps;
use crate::table_properties::APPEND_ONLY;
use crate::writer::test_utils::get_delta_schema;

#[tokio::test]
Expand Down Expand Up @@ -323,7 +356,24 @@ mod tests {
.await
.unwrap();
assert_eq!(table.get_min_reader_version(), 0);
assert_eq!(table.get_min_writer_version(), 0)
assert_eq!(table.get_min_writer_version(), 0);

let table = CreateBuilder::new()
.with_location("memory://")
.with_columns(schema.get_fields().clone())
.with_configuration_property(APPEND_ONLY, Some("true"))
.await
.unwrap();
let append = table
.get_metadata()
.unwrap()
.configuration
.get(APPEND_ONLY)
.unwrap()
.as_ref()
.unwrap()
.clone();
assert_eq!(String::from("true"), append)
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions rust/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl Default for LoadBuilder {
}

impl LoadBuilder {
/// Create a new [`LoadBuilder`]
pub fn new() -> Self {
Self {
location: None,
Expand Down
15 changes: 11 additions & 4 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
//! High level delta commands that can be executed against a delta table
//! High level operations API to interact with Delta tables
//!
//! At the heart of the high level operations APIs is the [`DeltaOps`] struct,
//! which consumes a [`DeltaTable`] and exposes methods to attain builders for
//! several high level operations. The specific builder structs allow fine-tuning
//! the operations' behaviors and will return an updated table potentially in conjunction
//! with a [data stream][datafusion::physical_plan::SendableRecordBatchStream],
//! if the operation returns data as well.

use self::create::CreateBuilder;
use crate::builder::DeltaTableBuilder;
Expand Down Expand Up @@ -54,7 +61,7 @@ impl DeltaOps {
/// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table
///
/// Using this will not persist any changes beyond the lifetime of the table object.
/// THe main purpose of in-memory tables is for use in testing.
/// The main purpose of in-memory tables is for use in testing.
///
/// ```
/// use deltalake::DeltaOps;
Expand Down Expand Up @@ -85,7 +92,7 @@ impl DeltaOps {
CreateBuilder::default().with_object_store(self.0.object_store())
}

/// Write data to Delta table
/// Load data from a DeltaTable
#[cfg(feature = "datafusion-ext")]
#[must_use]
pub fn load(self) -> LoadBuilder {
Expand All @@ -95,7 +102,7 @@ impl DeltaOps {
/// Write data to Delta table
#[cfg(feature = "datafusion-ext")]
#[must_use]
pub fn write(self, batches: Vec<RecordBatch>) -> WriteBuilder {
pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
WriteBuilder::default()
.with_input_batches(batches)
.with_object_store(self.0.object_store())
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl PartitionWriter {
}

async fn flush_arrow_writer(&mut self) -> DeltaResult<()> {
// replace counter / buffers adn close the current writer
// replace counter / buffers and close the current writer
let (writer, buffer) = self.replace_arrow_buffer(vec![])?;
let null_counts = std::mem::take(&mut self.null_counts);
let metadata = writer.close()?;
Expand Down

0 comments on commit 7a2f647

Please sign in to comment.