From 64d7c743bfec42b2d206a099251a59d349d1fed4 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 9 Apr 2023 19:22:39 +0200 Subject: [PATCH] feat: make more efficient use of snapshot in write command --- rust/src/operations/mod.rs | 4 +- rust/src/operations/write.rs | 238 +++++++++++++++------------------- rust/tests/datafusion_test.rs | 18 +-- 3 files changed, 115 insertions(+), 145 deletions(-) diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 1f17d4b967..47febc388e 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -109,9 +109,7 @@ impl DeltaOps { #[cfg(feature = "datafusion")] #[must_use] pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { - WriteBuilder::default() - .with_input_batches(batches) - .with_object_store(self.0.object_store()) + WriteBuilder::new(self.0.object_store(), self.0.state).with_input_batches(batches) } /// Vacuum stale files from delta table diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 203e0f53f9..d59b018341 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -15,8 +15,6 @@ //! In combination with `Overwrite`, a `replaceWhere` option can be used to transactionally //! replace data that matches a predicate. -// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala - use std::collections::HashMap; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -25,11 +23,11 @@ use super::writer::{DeltaWriter, WriterConfig}; use super::MAX_SUPPORTED_WRITER_VERSION; use super::{transaction::commit, CreateBuilder}; use crate::action::{Action, Add, DeltaOperation, Remove, SaveMode}; -use crate::builder::DeltaTableBuilder; use crate::delta::{DeltaResult, DeltaTable, DeltaTableError}; use crate::delta_datafusion::DeltaDataChecker; use crate::schema::Schema; use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; use crate::writer::record_batch::divide_by_partition_values; use crate::writer::utils::PartitionPath; @@ -76,12 +74,14 @@ impl From for DeltaTableError { /// Write data into a DeltaTable #[derive(Debug, Clone)] pub struct WriteBuilder { + /// A snapshot of the to-be-loaded table's state + snapshot: DeltaTableState, + /// Delta object store for handling data files + store: Arc, /// The input plan input: Option>, /// Datafusion session state relevant for executing the input plan state: Option, - /// Location where the table is stored - location: Option, /// SaveMode defines how to treat data already written to table location mode: SaveMode, /// Column names for table partitioning @@ -92,45 +92,27 @@ pub struct WriteBuilder { target_file_size: Option, /// Number of records to be written in single batch to underlying writer write_batch_size: Option, - /// An object store to be used as backend for delta table - object_store: Option>, - /// Storage options used to create a new storage backend - storage_options: Option>, /// RecordBatches to be written into the table batches: Option>, } -impl Default for WriteBuilder { - fn default() -> Self { - Self::new() - } -} - impl WriteBuilder { /// Create a new [`WriteBuilder`] - pub fn new() -> Self { + pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { Self { + snapshot, + store, input: None, state: None, - location: None, mode: SaveMode::Append, partition_columns: None, predicate: None, - storage_options: None, target_file_size: None, write_batch_size: None, - object_store: None, batches: None, } } - /// Specify the path to the location where table data is stored, - /// which could be a path on distributed storage. - pub fn with_location(mut self, location: impl Into) -> Self { - self.location = Some(location.into()); - self - } - /// Specify the behavior when a table exists at location pub fn with_save_mode(mut self, save_mode: SaveMode) -> Self { self.mode = save_mode; @@ -171,20 +153,6 @@ impl WriteBuilder { self } - /// Set options used to initialize storage backend - /// - /// Options may be passed in the HashMap or set as environment variables. - pub fn with_storage_options(mut self, storage_options: HashMap) -> Self { - self.storage_options = Some(storage_options); - self - } - - /// Provide a [`DeltaObjectStore`] instance, that points at table location - pub fn with_object_store(mut self, object_store: Arc) -> Self { - self.object_store = Some(object_store); - self - } - /// Specify the target file size for data files written to the delta table. pub fn with_target_file_size(mut self, target_file_size: usize) -> Self { self.target_file_size = Some(target_file_size); @@ -196,6 +164,44 @@ impl WriteBuilder { self.write_batch_size = Some(write_batch_size); self } + + async fn check_preconditions(&self) -> DeltaResult> { + match self.store.is_delta_table_location().await? { + true => { + let min_writer = self.snapshot.min_writer_version(); + if min_writer > MAX_SUPPORTED_WRITER_VERSION { + Err(WriteError::UnsupportedWriterVersion(min_writer).into()) + } else { + match self.mode { + SaveMode::ErrorIfExists => { + Err(WriteError::AlreadyExists(self.store.root_uri()).into()) + } + _ => Ok(vec![]), + } + } + } + false => { + let schema: Schema = if let Some(plan) = &self.input { + Ok(plan.schema().try_into()?) + } else if let Some(batches) = &self.batches { + if batches.is_empty() { + return Err(WriteError::MissingData.into()); + } + Ok(batches[0].schema().try_into()?) + } else { + Err(WriteError::MissingData) + }?; + let mut builder = CreateBuilder::new() + .with_object_store(self.store.clone()) + .with_columns(schema.get_fields().clone()); + if let Some(partition_columns) = self.partition_columns.as_ref() { + builder = builder.with_partition_columns(partition_columns.clone()) + } + let (_, actions, _) = builder.into_table_and_actions()?; + Ok(actions) + } + } + } } impl std::future::IntoFuture for WriteBuilder { @@ -203,83 +209,30 @@ impl std::future::IntoFuture for WriteBuilder { type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { - let this = self; - - fn schema_to_vec_name_type(schema: ArrowSchemaRef) -> Vec<(String, DataType)> { - schema - .fields() - .iter() - .map(|f| (f.name().to_owned(), f.data_type().clone())) - .collect::>() - } - - fn schema_eq(l: ArrowSchemaRef, r: ArrowSchemaRef) -> bool { - schema_to_vec_name_type(l) == schema_to_vec_name_type(r) - } + let mut this = self; Box::pin(async move { - let object_store = if let Some(store) = this.object_store { - Ok(store) - } else { - DeltaTableBuilder::from_uri(this.location.unwrap()) - .with_storage_options(this.storage_options.unwrap_or_default()) - .build_storage() - }?; + // Create table actions to initialize table in case it does not yet exist and should be created + let mut actions = this.check_preconditions().await?; - // TODO we can find a more optimized config. Of course we want to pass in the state anyhow.. - let mut table = DeltaTable::new(object_store.clone(), Default::default()); - let mut actions = match table.load().await { - Err(DeltaTableError::NotATable(_)) => { - let schema: Schema = if let Some(plan) = &this.input { - Ok(plan.schema().try_into()?) - } else if let Some(batches) = &this.batches { - if batches.is_empty() { - return Err(WriteError::MissingData.into()); - } - Ok(batches[0].schema().try_into()?) - } else { - Err(WriteError::MissingData) - }?; - let mut builder = CreateBuilder::new() - .with_object_store(table.object_store()) - .with_columns(schema.get_fields().clone()); - if let Some(partition_columns) = this.partition_columns.as_ref() { - builder = builder.with_partition_columns(partition_columns.clone()) - } - let (_, actions, _) = builder.into_table_and_actions()?; - Ok(actions) - } - Ok(_) => { - if table.get_min_writer_version() > MAX_SUPPORTED_WRITER_VERSION { - Err( - WriteError::UnsupportedWriterVersion(table.get_min_writer_version()) - .into(), - ) - } else { - match this.mode { - SaveMode::ErrorIfExists => { - Err(WriteError::AlreadyExists(table.table_uri()).into()) - } - _ => Ok(vec![]), - } - } - } - Err(err) => Err(err), - }?; + let active_partitions = this + .snapshot + .current_metadata() + .map(|meta| meta.partition_columns.clone()); // validate partition columns - let partition_columns = if let Ok(meta) = table.get_metadata() { + let partition_columns = if let Some(active_part) = active_partitions { if let Some(ref partition_columns) = this.partition_columns { - if &meta.partition_columns != partition_columns { + if &active_part != partition_columns { Err(WriteError::PartitionColumnMismatch { - expected: table.get_metadata()?.partition_columns.clone(), + expected: active_part, got: partition_columns.to_vec(), }) } else { Ok(partition_columns.clone()) } } else { - Ok(meta.partition_columns.clone()) + Ok(active_part) } } else { Ok(this.partition_columns.unwrap_or_default()) @@ -292,13 +245,15 @@ impl std::future::IntoFuture for WriteBuilder { Err(WriteError::MissingData) } else { let schema = batches[0].schema(); - - if let Ok(meta) = table.get_metadata() { - // NOTE the schema generated from the delta schema will have the delta field metadata included, - // so we need to compare the field names and datatypes instead. - // TODO update comparison logic, once we have column mappings supported. - let curr_schema: ArrowSchemaRef = Arc::new((&meta.schema).try_into()?); - + let table_schema = this + .snapshot + .physical_arrow_schema(this.store.clone()) + .await + .or_else(|_| this.snapshot.arrow_schema()); + + // we cannot get a schema, if there is not data and or meta data in the table, + // i.e. not initialized + if let Ok(curr_schema) = table_schema { if !schema_eq(curr_schema, schema.clone()) { return Err(DeltaTableError::Generic( "Updating table schema not yet implemented".to_string(), @@ -345,9 +300,10 @@ impl std::future::IntoFuture for WriteBuilder { Err(WriteError::MissingData) }?; - let invariants = table - .get_metadata() - .and_then(|meta| meta.schema.get_invariants()) + let invariants = this + .snapshot + .current_metadata() + .and_then(|meta| meta.schema.get_invariants().ok()) .unwrap_or_default(); let checker = DeltaDataChecker::new(invariants); @@ -368,7 +324,7 @@ impl std::future::IntoFuture for WriteBuilder { this.target_file_size, this.write_batch_size, ); - let mut writer = DeltaWriter::new(object_store.clone(), config); + let mut writer = DeltaWriter::new(this.store.clone(), config); let checker_stream = checker.clone(); let mut stream = inner_plan.execute(i, task_ctx)?; let handle: tokio::task::JoinHandle>> = @@ -396,6 +352,7 @@ impl std::future::IntoFuture for WriteBuilder { .into_iter() .map(Action::add) .collect::>(); + actions.extend(add_actions); // Collect remove actions if we are overwriting the table @@ -424,8 +381,8 @@ impl std::future::IntoFuture for WriteBuilder { todo!("Overwriting data based on predicate is not yet implemented") } _ => { - let remove_actions = table - .get_state() + let remove_actions = this + .snapshot .files() .iter() .map(to_remove_action) @@ -435,34 +392,49 @@ impl std::future::IntoFuture for WriteBuilder { } }; - // Finally, commit ... - let operation = DeltaOperation::Write { - mode: this.mode, - partition_by: if !partition_columns.is_empty() { - Some(partition_columns) - } else { - None - }, - predicate: this.predicate, - }; - let _version = commit( - table.storage.as_ref(), + let version = commit( + this.store.as_ref(), &actions, - operation, - &table.state, + DeltaOperation::Write { + mode: this.mode, + partition_by: if !partition_columns.is_empty() { + Some(partition_columns) + } else { + None + }, + predicate: this.predicate, + }, + &this.snapshot, // TODO pass through metadata None, ) .await?; - table.update().await?; + + // TODO we do not have the table config available, but since we are merging only our newly + // created actions, it may be safe to assume, that we want to include all actions. + // then again, having only some tombstones may be misleading. + this.snapshot + .merge(DeltaTableState::from_actions(actions, version)?, true, true); // TODO should we build checkpoints based on config? - Ok(table) + Ok(DeltaTable::new_with_state(this.store, this.snapshot)) }) } } +fn schema_to_vec_name_type(schema: ArrowSchemaRef) -> Vec<(String, DataType)> { + schema + .fields() + .iter() + .map(|f| (f.name().to_owned(), f.data_type().clone())) + .collect::>() +} + +fn schema_eq(l: ArrowSchemaRef, r: ArrowSchemaRef) -> bool { + schema_to_vec_name_type(l) == schema_to_vec_name_type(r) +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index a6d18aad7e..dcb69e2338 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -188,13 +188,14 @@ async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> { // Trying to execute the write from the input plan without providing Datafusion with a session // state containing the referenced object store in the registry results in an error. - assert!(WriteBuilder::new() - .with_input_execution_plan(source_scan.clone()) - .with_object_store(target_table.object_store()) - .await - .unwrap_err() - .to_string() - .contains("No suitable object store found for delta-rs://")); + assert!( + WriteBuilder::new(target_table.object_store(), target_table.state.clone()) + .with_input_execution_plan(source_scan.clone()) + .await + .unwrap_err() + .to_string() + .contains("No suitable object store found for delta-rs://") + ); // Register the missing source table object store let source_uri = source_scan @@ -214,10 +215,9 @@ async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> { ); // Execute write to the target table with the proper state - let target_table = WriteBuilder::new() + let target_table = WriteBuilder::new(target_table.object_store(), target_table.state.clone()) .with_input_execution_plan(source_scan) .with_input_session_state(state) - .with_object_store(target_table.object_store()) .await?; ctx.register_table("target", Arc::new(target_table))?;