Skip to content

Commit

Permalink
feat: make more efficient use of snapshot in write command
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Pack authored and roeap committed Apr 14, 2023
1 parent 3c62e8d commit 64d7c74
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 145 deletions.
4 changes: 1 addition & 3 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ impl DeltaOps {
#[cfg(feature = "datafusion")]
#[must_use]
pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
WriteBuilder::default()
.with_input_batches(batches)
.with_object_store(self.0.object_store())
WriteBuilder::new(self.0.object_store(), self.0.state).with_input_batches(batches)
}

/// Vacuum stale files from delta table
Expand Down
238 changes: 105 additions & 133 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
//! In combination with `Overwrite`, a `replaceWhere` option can be used to transactionally
//! replace data that matches a predicate.
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
Expand All @@ -25,11 +23,11 @@ use super::writer::{DeltaWriter, WriterConfig};
use super::MAX_SUPPORTED_WRITER_VERSION;
use super::{transaction::commit, CreateBuilder};
use crate::action::{Action, Add, DeltaOperation, Remove, SaveMode};
use crate::builder::DeltaTableBuilder;
use crate::delta::{DeltaResult, DeltaTable, DeltaTableError};
use crate::delta_datafusion::DeltaDataChecker;
use crate::schema::Schema;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::writer::record_batch::divide_by_partition_values;
use crate::writer::utils::PartitionPath;

Expand Down Expand Up @@ -76,12 +74,14 @@ impl From<WriteError> for DeltaTableError {
/// Write data into a DeltaTable
#[derive(Debug, Clone)]
pub struct WriteBuilder {
/// A snapshot of the to-be-loaded table's state
snapshot: DeltaTableState,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// The input plan
input: Option<Arc<dyn ExecutionPlan>>,
/// Datafusion session state relevant for executing the input plan
state: Option<SessionState>,
/// Location where the table is stored
location: Option<String>,
/// SaveMode defines how to treat data already written to table location
mode: SaveMode,
/// Column names for table partitioning
Expand All @@ -92,45 +92,27 @@ pub struct WriteBuilder {
target_file_size: Option<usize>,
/// Number of records to be written in single batch to underlying writer
write_batch_size: Option<usize>,
/// An object store to be used as backend for delta table
object_store: Option<Arc<DeltaObjectStore>>,
/// Storage options used to create a new storage backend
storage_options: Option<HashMap<String, String>>,
/// RecordBatches to be written into the table
batches: Option<Vec<RecordBatch>>,
}

impl Default for WriteBuilder {
fn default() -> Self {
Self::new()
}
}

impl WriteBuilder {
/// Create a new [`WriteBuilder`]
pub fn new() -> Self {
pub fn new(store: Arc<DeltaObjectStore>, snapshot: DeltaTableState) -> Self {
Self {
snapshot,
store,
input: None,
state: None,
location: None,
mode: SaveMode::Append,
partition_columns: None,
predicate: None,
storage_options: None,
target_file_size: None,
write_batch_size: None,
object_store: None,
batches: None,
}
}

/// Specify the path to the location where table data is stored,
/// which could be a path on distributed storage.
pub fn with_location(mut self, location: impl Into<String>) -> Self {
self.location = Some(location.into());
self
}

/// Specify the behavior when a table exists at location
pub fn with_save_mode(mut self, save_mode: SaveMode) -> Self {
self.mode = save_mode;
Expand Down Expand Up @@ -171,20 +153,6 @@ impl WriteBuilder {
self
}

/// Set options used to initialize storage backend
///
/// Options may be passed in the HashMap or set as environment variables.
pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
self.storage_options = Some(storage_options);
self
}

/// Provide a [`DeltaObjectStore`] instance, that points at table location
pub fn with_object_store(mut self, object_store: Arc<DeltaObjectStore>) -> Self {
self.object_store = Some(object_store);
self
}

/// Specify the target file size for data files written to the delta table.
pub fn with_target_file_size(mut self, target_file_size: usize) -> Self {
self.target_file_size = Some(target_file_size);
Expand All @@ -196,90 +164,75 @@ impl WriteBuilder {
self.write_batch_size = Some(write_batch_size);
self
}

async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
match self.store.is_delta_table_location().await? {
true => {
let min_writer = self.snapshot.min_writer_version();
if min_writer > MAX_SUPPORTED_WRITER_VERSION {
Err(WriteError::UnsupportedWriterVersion(min_writer).into())
} else {
match self.mode {
SaveMode::ErrorIfExists => {
Err(WriteError::AlreadyExists(self.store.root_uri()).into())
}
_ => Ok(vec![]),
}
}
}
false => {
let schema: Schema = if let Some(plan) = &self.input {
Ok(plan.schema().try_into()?)
} else if let Some(batches) = &self.batches {
if batches.is_empty() {
return Err(WriteError::MissingData.into());
}
Ok(batches[0].schema().try_into()?)
} else {
Err(WriteError::MissingData)
}?;
let mut builder = CreateBuilder::new()
.with_object_store(self.store.clone())
.with_columns(schema.get_fields().clone());
if let Some(partition_columns) = self.partition_columns.as_ref() {
builder = builder.with_partition_columns(partition_columns.clone())
}
let (_, actions, _) = builder.into_table_and_actions()?;
Ok(actions)
}
}
}
}

impl std::future::IntoFuture for WriteBuilder {
type Output = DeltaResult<DeltaTable>;
type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

fn schema_to_vec_name_type(schema: ArrowSchemaRef) -> Vec<(String, DataType)> {
schema
.fields()
.iter()
.map(|f| (f.name().to_owned(), f.data_type().clone()))
.collect::<Vec<_>>()
}

fn schema_eq(l: ArrowSchemaRef, r: ArrowSchemaRef) -> bool {
schema_to_vec_name_type(l) == schema_to_vec_name_type(r)
}
let mut this = self;

Box::pin(async move {
let object_store = if let Some(store) = this.object_store {
Ok(store)
} else {
DeltaTableBuilder::from_uri(this.location.unwrap())
.with_storage_options(this.storage_options.unwrap_or_default())
.build_storage()
}?;
// Create table actions to initialize table in case it does not yet exist and should be created
let mut actions = this.check_preconditions().await?;

// TODO we can find a more optimized config. Of course we want to pass in the state anyhow..
let mut table = DeltaTable::new(object_store.clone(), Default::default());
let mut actions = match table.load().await {
Err(DeltaTableError::NotATable(_)) => {
let schema: Schema = if let Some(plan) = &this.input {
Ok(plan.schema().try_into()?)
} else if let Some(batches) = &this.batches {
if batches.is_empty() {
return Err(WriteError::MissingData.into());
}
Ok(batches[0].schema().try_into()?)
} else {
Err(WriteError::MissingData)
}?;
let mut builder = CreateBuilder::new()
.with_object_store(table.object_store())
.with_columns(schema.get_fields().clone());
if let Some(partition_columns) = this.partition_columns.as_ref() {
builder = builder.with_partition_columns(partition_columns.clone())
}
let (_, actions, _) = builder.into_table_and_actions()?;
Ok(actions)
}
Ok(_) => {
if table.get_min_writer_version() > MAX_SUPPORTED_WRITER_VERSION {
Err(
WriteError::UnsupportedWriterVersion(table.get_min_writer_version())
.into(),
)
} else {
match this.mode {
SaveMode::ErrorIfExists => {
Err(WriteError::AlreadyExists(table.table_uri()).into())
}
_ => Ok(vec![]),
}
}
}
Err(err) => Err(err),
}?;
let active_partitions = this
.snapshot
.current_metadata()
.map(|meta| meta.partition_columns.clone());

// validate partition columns
let partition_columns = if let Ok(meta) = table.get_metadata() {
let partition_columns = if let Some(active_part) = active_partitions {
if let Some(ref partition_columns) = this.partition_columns {
if &meta.partition_columns != partition_columns {
if &active_part != partition_columns {
Err(WriteError::PartitionColumnMismatch {
expected: table.get_metadata()?.partition_columns.clone(),
expected: active_part,
got: partition_columns.to_vec(),
})
} else {
Ok(partition_columns.clone())
}
} else {
Ok(meta.partition_columns.clone())
Ok(active_part)
}
} else {
Ok(this.partition_columns.unwrap_or_default())
Expand All @@ -292,13 +245,15 @@ impl std::future::IntoFuture for WriteBuilder {
Err(WriteError::MissingData)
} else {
let schema = batches[0].schema();

if let Ok(meta) = table.get_metadata() {
// NOTE the schema generated from the delta schema will have the delta field metadata included,
// so we need to compare the field names and datatypes instead.
// TODO update comparison logic, once we have column mappings supported.
let curr_schema: ArrowSchemaRef = Arc::new((&meta.schema).try_into()?);

let table_schema = this
.snapshot
.physical_arrow_schema(this.store.clone())
.await
.or_else(|_| this.snapshot.arrow_schema());

// we cannot get a schema, if there is not data and or meta data in the table,
// i.e. not initialized
if let Ok(curr_schema) = table_schema {
if !schema_eq(curr_schema, schema.clone()) {
return Err(DeltaTableError::Generic(
"Updating table schema not yet implemented".to_string(),
Expand Down Expand Up @@ -345,9 +300,10 @@ impl std::future::IntoFuture for WriteBuilder {
Err(WriteError::MissingData)
}?;

let invariants = table
.get_metadata()
.and_then(|meta| meta.schema.get_invariants())
let invariants = this
.snapshot
.current_metadata()
.and_then(|meta| meta.schema.get_invariants().ok())
.unwrap_or_default();
let checker = DeltaDataChecker::new(invariants);

Expand All @@ -368,7 +324,7 @@ impl std::future::IntoFuture for WriteBuilder {
this.target_file_size,
this.write_batch_size,
);
let mut writer = DeltaWriter::new(object_store.clone(), config);
let mut writer = DeltaWriter::new(this.store.clone(), config);
let checker_stream = checker.clone();
let mut stream = inner_plan.execute(i, task_ctx)?;
let handle: tokio::task::JoinHandle<DeltaResult<Vec<Add>>> =
Expand Down Expand Up @@ -396,6 +352,7 @@ impl std::future::IntoFuture for WriteBuilder {
.into_iter()
.map(Action::add)
.collect::<Vec<_>>();

actions.extend(add_actions);

// Collect remove actions if we are overwriting the table
Expand Down Expand Up @@ -424,8 +381,8 @@ impl std::future::IntoFuture for WriteBuilder {
todo!("Overwriting data based on predicate is not yet implemented")
}
_ => {
let remove_actions = table
.get_state()
let remove_actions = this
.snapshot
.files()
.iter()
.map(to_remove_action)
Expand All @@ -435,34 +392,49 @@ impl std::future::IntoFuture for WriteBuilder {
}
};

// Finally, commit ...
let operation = DeltaOperation::Write {
mode: this.mode,
partition_by: if !partition_columns.is_empty() {
Some(partition_columns)
} else {
None
},
predicate: this.predicate,
};
let _version = commit(
table.storage.as_ref(),
let version = commit(
this.store.as_ref(),
&actions,
operation,
&table.state,
DeltaOperation::Write {
mode: this.mode,
partition_by: if !partition_columns.is_empty() {
Some(partition_columns)
} else {
None
},
predicate: this.predicate,
},
&this.snapshot,
// TODO pass through metadata
None,
)
.await?;
table.update().await?;

// TODO we do not have the table config available, but since we are merging only our newly
// created actions, it may be safe to assume, that we want to include all actions.
// then again, having only some tombstones may be misleading.
this.snapshot
.merge(DeltaTableState::from_actions(actions, version)?, true, true);

// TODO should we build checkpoints based on config?

Ok(table)
Ok(DeltaTable::new_with_state(this.store, this.snapshot))
})
}
}

fn schema_to_vec_name_type(schema: ArrowSchemaRef) -> Vec<(String, DataType)> {
schema
.fields()
.iter()
.map(|f| (f.name().to_owned(), f.data_type().clone()))
.collect::<Vec<_>>()
}

fn schema_eq(l: ArrowSchemaRef, r: ArrowSchemaRef) -> bool {
schema_to_vec_name_type(l) == schema_to_vec_name_type(r)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 64d7c74

Please sign in to comment.