diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 14cd34cad3..817e3c1d38 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -13,7 +13,7 @@ readme = "README.md" edition = "2021" [dependencies] -arrow = { version = "33.0.0", optional = true } +arrow = { version = "33", optional = true } async-trait = "0.1" bytes = "1" chrono = { version = "0.4.22", default-features = false, features = ["clock"] } @@ -26,7 +26,7 @@ log = "0" libc = ">=0.2.90, <1" num-bigint = "0.4" num-traits = "0.2.15" -object_store = "0.5.3" +object_store = "0.5.6" once_cell = "1.16.0" parking_lot = "0.12" parquet = { version = "33", features = [ @@ -57,6 +57,8 @@ datafusion = { version = "19", optional = true } datafusion-expr = { version = "19", optional = true } datafusion-common = { version = "19", optional = true } datafusion-proto = { version = "19", optional = true } +datafusion-sql = { version = "19", optional = true } +sqlparser = { version = "0.30", optional = true } # NOTE dependencies only for integration tests fs_extra = { version = "1.2.0", optional = true } @@ -91,6 +93,8 @@ datafusion = [ "datafusion-expr", "datafusion-common", "datafusion-proto", + "datafusion-sql", + "sqlparser", "arrow", "parquet", ] diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index ef6222b05b..732b2c24c2 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -544,6 +544,13 @@ pub enum DeltaOperation { /// The predicate used during the write. predicate: Option, }, + + /// Delete data matching predicate from delta table + Delete { + /// The condition the to be deleted data must match + predicate: Option, + }, + /// Represents a Delta `StreamingUpdate` operation. #[serde(rename_all = "camelCase")] StreamingUpdate { @@ -580,6 +587,7 @@ impl DeltaOperation { } DeltaOperation::Create { .. } => "CREATE TABLE", DeltaOperation::Write { .. } => "WRITE", + DeltaOperation::Delete { .. } => "DELETE", DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE", DeltaOperation::Optimize { .. } => "OPTIMIZE", DeltaOperation::FileSystemCheck { .. } => "FSCK", @@ -622,7 +630,8 @@ impl DeltaOperation { Self::Create { .. } | Self::FileSystemCheck {} | Self::StreamingUpdate { .. } - | Self::Write { .. } => true, + | Self::Write { .. } + | Self::Delete { .. } => true, } } @@ -641,9 +650,20 @@ impl DeltaOperation { match self { // TODO add more operations Self::Write { predicate, .. } => predicate.clone(), + Self::Delete { predicate, .. } => predicate.clone(), _ => None, } } + + /// Denotes if the operation reads the entire table + pub fn read_whole_table(&self) -> bool { + match self { + // TODO just adding one operation example, as currently none of the + // implemented operations scan the entire table. + Self::Write { predicate, .. } if predicate.is_none() => false, + _ => false, + } + } } /// The SaveMode used when performing a DeltaOperation diff --git a/rust/src/delta.rs b/rust/src/delta.rs index be5263ef5f..300da985dd 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -17,6 +17,7 @@ use super::schema::*; use super::table_state::DeltaTableState; use crate::action::{Add, Stats}; use crate::delta_config::DeltaConfigError; +use crate::operations::transaction::TransactionError; use crate::operations::vacuum::VacuumBuilder; use crate::storage::{commit_uri_from_version, ObjectStoreRef}; @@ -216,6 +217,12 @@ pub enum DeltaTableError { #[from] source: std::io::Error, }, + /// Error raised while commititng transaction + #[error("Transaction failed: {source}")] + Transaction { + /// The source error + source: TransactionError, + }, /// Error returned when transaction is failed to be committed because given version already exists. #[error("Delta transaction failed, version {0} already exists.")] VersionAlreadyExists(DeltaDataTypeVersion), diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 921f171d63..029a4974a1 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -390,7 +390,7 @@ impl TableProvider for DeltaTable { (!filters.is_empty()).then_some(conjunction(filters.iter().cloned())) { let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?; - let files_to_prune = pruning_predicate.prune(self)?; + let files_to_prune = pruning_predicate.prune(&self.state)?; self.get_state() .files() .iter() @@ -630,7 +630,7 @@ fn to_scalar_value(stat_val: &serde_json::Value) -> Option { } } -fn to_correct_scalar_value( +pub(crate) fn to_correct_scalar_value( stat_val: &serde_json::Value, field_dt: &ArrowDataType, ) -> Option { diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index a393091f6c..9384aa4767 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -285,7 +285,6 @@ impl std::future::IntoFuture for CreateBuilder { Box::pin(async move { let mode = this.mode.clone(); - let metadata = this.metadata.clone(); let (mut table, actions, operation) = this.into_table_and_actions()?; if table.object_store().is_delta_table_location().await? { match mode { @@ -302,10 +301,10 @@ impl std::future::IntoFuture for CreateBuilder { } let version = commit( table.object_store().as_ref(), - 0, &actions, operation, - metadata, + &table.state, + None, ) .await?; table.load_version(version).await?; diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index b5fc569637..26be9e9113 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -15,7 +15,6 @@ use crate::action::{Action, Add, DeltaOperation, Remove}; use crate::operations::transaction::commit; use crate::storage::DeltaObjectStore; use crate::table_state::DeltaTableState; -use crate::DeltaDataTypeVersion; use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; use futures::future::BoxFuture; use futures::StreamExt; @@ -50,8 +49,6 @@ pub struct FileSystemCheckMetrics { } struct FileSystemCheckPlan { - /// Version of the snapshot provided - version: DeltaDataTypeVersion, /// Delta object store for handling data files store: Arc, /// Files that no longer exists in undlying ObjectStore but have active add actions @@ -88,7 +85,6 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_relative: HashMap<&str, &Add> = HashMap::with_capacity(self.state.files().len()); - let version = self.state.version(); let store = self.store.clone(); for active in self.state.files() { @@ -118,14 +114,13 @@ impl FileSystemCheckBuilder { Ok(FileSystemCheckPlan { files_to_remove, - version, store, }) } } impl FileSystemCheckPlan { - pub async fn execute(self) -> DeltaResult { + pub async fn execute(self, snapshot: &DeltaTableState) -> DeltaResult { if self.files_to_remove.is_empty() { return Ok(FileSystemCheckMetrics { dry_run: false, @@ -135,8 +130,6 @@ impl FileSystemCheckPlan { let mut actions = Vec::with_capacity(self.files_to_remove.len()); let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len()); - let version = self.version; - let store = &self.store; for file in self.files_to_remove { let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); @@ -154,10 +147,11 @@ impl FileSystemCheckPlan { } commit( - store, - version + 1, + self.store.as_ref(), &actions, DeltaOperation::FileSystemCheck {}, + snapshot, + // TODO pass through metadata None, ) .await?; @@ -188,7 +182,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { )); } - let metrics = plan.execute().await?; + let metrics = plan.execute(&this.state).await?; let mut table = DeltaTable::new_with_state(this.store, this.state); table.update().await?; Ok((table, metrics)) diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index 747a94e7da..f3a5c5f68d 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -173,7 +173,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { this.target_size.to_owned(), writer_properties, )?; - let metrics = plan.execute(this.store.clone()).await?; + let metrics = plan.execute(this.store.clone(), &this.snapshot).await?; let mut table = DeltaTable::new_with_state(this.store, this.snapshot); table.update().await?; Ok((table, metrics)) @@ -270,7 +270,11 @@ pub struct MergePlan { impl MergePlan { /// Peform the operations outlined in the plan. - pub async fn execute(self, object_store: ObjectStoreRef) -> Result { + pub async fn execute( + self, + object_store: ObjectStoreRef, + snapshot: &DeltaTableState, + ) -> Result { let mut actions = vec![]; let mut metrics = self.metrics; @@ -368,9 +372,9 @@ impl MergePlan { commit( object_store.as_ref(), - self.read_table_version + 1, &actions, self.input_parameters.into(), + snapshot, Some(metadata), ) .await?; diff --git a/rust/src/operations/transaction/conflict_checker.rs b/rust/src/operations/transaction/conflict_checker.rs new file mode 100644 index 0000000000..a8403f1e67 --- /dev/null +++ b/rust/src/operations/transaction/conflict_checker.rs @@ -0,0 +1,862 @@ +//! Helper module to check if a transaction can be committed in case of conflicting commits. +use std::collections::HashSet; +use std::io::{BufRead, BufReader, Cursor}; + +use object_store::ObjectStore; + +use super::CommitInfo; +use crate::action::{Action, Add, DeltaOperation, MetaData, Protocol, Remove}; +use crate::delta_config::IsolationLevel; +use crate::storage::commit_uri_from_version; +use crate::{table_state::DeltaTableState, DeltaDataTypeVersion, DeltaResult, DeltaTableError}; + +#[cfg(feature = "datafusion")] +use super::state::AddContainer; +#[cfg(feature = "datafusion")] +use datafusion_expr::Expr; +#[cfg(feature = "datafusion")] +use itertools::Either; + +/// Exceptions raised during commit conflict resolution +#[derive(thiserror::Error, Debug)] +pub enum CommitConflictError { + /// This exception occurs when a concurrent operation adds files in the same partition + /// (or anywhere in an un-partitioned table) that your operation reads. The file additions + /// can be caused by INSERT, DELETE, UPDATE, or MERGE operations. + #[error("Commit failed: a concurrent transactions added new files.\nHelp: This transaction's query must be rerun to include the new data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation.")] + ConcurrentAppend, + + /// This exception occurs when a concurrent operation deleted a file that your operation read. + /// Common causes are a DELETE, UPDATE, or MERGE operation that rewrites files. + #[error("Commit failed: a concurrent transaction deleted a file this operation read.\nHelp: This transaction's query must be rerun to exclude the removed data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation.")] + ConcurrentDeleteRead, + + /// This exception occurs when a concurrent operation deleted a file that your operation also deletes. + /// This could be caused by two concurrent compaction operations rewriting the same files. + #[error("Concurrent delete-delete failed.")] + ConcurrentDeleteDelete, + + /// This exception occurs when a concurrent transaction updates the metadata of a Delta table. + /// Common causes are ALTER TABLE operations or writes to your Delta table that update the schema of the table. + #[error("Metadata changed since last commit.")] + MetadataChanged, + + /// If a streaming query using the same checkpoint location is started multiple times concurrently + /// and tries to write to the Delta table at the same time. You should never have two streaming + /// queries use the same checkpoint location and run at the same time. + #[error("Concurrent transaction failed.")] + ConcurrentTransaction, + + /// This exception can occur in the following cases: + /// - When your Delta table is upgraded to a new version. For future operations to succeed + /// you may need to upgrade your Delta Lake version. + /// - When multiple writers are creating or replacing a table at the same time. + /// - When multiple writers are writing to an empty path at the same time. + #[error("Protocol changed since last commit.")] + ProtocolChanged, + + /// Error returned when the table requires an unsupported writer version + #[error("Delta-rs does not support writer version {0}")] + UnsupportedWriterVersion(i32), + + /// Error returned when the table requires an unsupported writer version + #[error("Delta-rs does not support reader version {0}")] + UnsupportedReaderVersion(i32), + + /// Error returned when the snapshot has missing or corrupted data + #[error("Snapshot is corrupted: {source}")] + CorruptedState { + /// Source error + source: Box, + }, + + /// Error returned when evaluating predicate + #[error("Error evaluating predicate: {source}")] + Predicate { + /// Source error + source: Box, + }, + + /// Error returned when no metadata was found in the DeltaTable. + #[error("No metadata found, please make sure table is loaded.")] + NoMetadata, +} + +/// A struct representing different attributes of current transaction needed for conflict detection. +#[allow(unused)] +pub(crate) struct TransactionInfo<'a> { + txn_id: String, + /// partition predicates by which files have been queried by the transaction + /// + /// If any new data files or removed data files match this predicate, the + /// transaction should fail. + #[cfg(not(feature = "datafusion"))] + read_predicates: Option, + /// partition predicates by which files have been queried by the transaction + #[cfg(feature = "datafusion")] + read_predicates: Option, + /// appIds that have been seen by the transaction + pub(crate) read_app_ids: HashSet, + /// delta log actions that the transaction wants to commit + actions: &'a Vec, + /// read [`DeltaTableState`] used for the transaction + pub(crate) read_snapshot: &'a DeltaTableState, + /// Whether the transaction tainted the whole table + read_whole_table: bool, +} + +impl<'a> TransactionInfo<'a> { + #[cfg(feature = "datafusion")] + pub fn try_new( + read_snapshot: &'a DeltaTableState, + read_predicates: Option, + actions: &'a Vec, + read_whole_table: bool, + ) -> DeltaResult { + let read_predicates = read_predicates + .map(|pred| read_snapshot.parse_predicate_expression(pred)) + .transpose()?; + Ok(Self { + txn_id: "".into(), + read_predicates, + read_app_ids: Default::default(), + actions, + read_snapshot, + read_whole_table, + }) + } + + #[cfg(feature = "datafusion")] + #[allow(unused)] + pub fn new( + read_snapshot: &'a DeltaTableState, + read_predicates: Option, + actions: &'a Vec, + read_whole_table: bool, + ) -> Self { + Self { + txn_id: "".into(), + read_predicates, + read_app_ids: Default::default(), + actions, + read_snapshot, + read_whole_table, + } + } + + #[cfg(not(feature = "datafusion"))] + pub fn try_new( + read_snapshot: &'a DeltaTableState, + read_predicates: Option, + actions: &'a Vec, + read_whole_table: bool, + ) -> DeltaResult { + Ok(Self { + txn_id: "".into(), + read_predicates, + read_app_ids: Default::default(), + actions, + read_snapshot, + read_whole_table, + }) + } + + /// Whether the transaction changed the tables metadatas + pub fn metadata_changed(&self) -> bool { + self.actions + .iter() + .any(|a| matches!(a, Action::metaData(_))) + } + + #[cfg(feature = "datafusion")] + /// Files read by the transaction + pub fn read_files(&self) -> Result, CommitConflictError> { + if let Some(predicate) = &self.read_predicates { + Ok(Either::Left( + self.read_snapshot + .files_matching_predicate(&[predicate.clone()]) + .map_err(|err| CommitConflictError::Predicate { + source: Box::new(err), + })?, + )) + } else { + Ok(Either::Right(std::iter::empty())) + } + } + + #[cfg(not(feature = "datafusion"))] + /// Files read by the transaction + pub fn read_files(&self) -> Result, CommitConflictError> { + Ok(self.read_snapshot.files().iter()) + } + + /// Whether the whole table was read during the transaction + pub fn read_whole_table(&self) -> bool { + self.read_whole_table + } +} + +/// Summary of the Winning commit against which we want to check the conflict +pub(crate) struct WinningCommitSummary { + pub actions: Vec, + pub commit_info: Option, +} + +impl WinningCommitSummary { + pub async fn try_new( + object_store: &dyn ObjectStore, + read_version: DeltaDataTypeVersion, + winning_commit_version: DeltaDataTypeVersion, + ) -> DeltaResult { + // NOTE using asser, since a wrong version would right now mean a bug in our code. + assert_eq!(winning_commit_version, read_version + 1); + + let commit_uri = commit_uri_from_version(winning_commit_version); + let commit_log_bytes = object_store.get(&commit_uri).await?.bytes().await?; + + let reader = BufReader::new(Cursor::new(commit_log_bytes)); + + let actions = reader + .lines() + .map(|maybe_line| { + let line = maybe_line?; + serde_json::from_str::(line.as_str()).map_err(|e| { + DeltaTableError::InvalidJsonLog { + json_err: e, + version: winning_commit_version, + line, + } + }) + }) + .collect::, _>>()?; + + let commit_info = actions + .iter() + .find(|action| matches!(action, Action::commitInfo(_))) + .map(|action| match action { + Action::commitInfo(info) => info.clone(), + _ => unreachable!(), + }); + + Ok(Self { + actions, + commit_info, + }) + } + + pub fn metadata_updates(&self) -> Vec { + self.actions + .iter() + .cloned() + .filter_map(|action| match action { + Action::metaData(metadata) => Some(metadata), + _ => None, + }) + .collect() + } + + pub fn app_level_transactions(&self) -> HashSet { + self.actions + .iter() + .cloned() + .filter_map(|action| match action { + Action::txn(txn) => Some(txn.app_id), + _ => None, + }) + .collect() + } + + pub fn protocol(&self) -> Vec { + self.actions + .iter() + .cloned() + .filter_map(|action| match action { + Action::protocol(protocol) => Some(protocol), + _ => None, + }) + .collect() + } + + pub fn removed_files(&self) -> Vec { + self.actions + .iter() + .cloned() + .filter_map(|action| match action { + Action::remove(remove) => Some(remove), + _ => None, + }) + .collect() + } + + pub fn added_files(&self) -> Vec { + self.actions + .iter() + .cloned() + .filter_map(|action| match action { + Action::add(add) => Some(add), + _ => None, + }) + .collect() + } + + pub fn blind_append_added_files(&self) -> Vec { + if self.is_blind_append().unwrap_or(false) { + self.added_files() + } else { + vec![] + } + } + + pub fn changed_data_added_files(&self) -> Vec { + if self.is_blind_append().unwrap_or(false) { + vec![] + } else { + self.added_files() + } + } + + // pub fn only_add_files(&self) -> bool { + // !self + // .actions + // .iter() + // .any(|action| matches!(action, Action::remove(_))) + // } + + pub fn is_blind_append(&self) -> Option { + self.commit_info + .as_ref() + .map(|opt| opt.is_blind_append.unwrap_or(false)) + } +} + +/// Checks if a failed commit may be committed after a conflicting winning commit +pub(crate) struct ConflictChecker<'a> { + /// transaction information for current transaction at start of check + txn_info: TransactionInfo<'a>, + /// Summary of the transaction, that has been committed ahead of the current transaction + winning_commit_summary: WinningCommitSummary, + /// Isolation level for the current transaction + isolation_level: IsolationLevel, +} + +impl<'a> ConflictChecker<'a> { + pub fn new( + transaction_info: TransactionInfo<'a>, + winning_commit_summary: WinningCommitSummary, + operation: Option<&DeltaOperation>, + ) -> ConflictChecker<'a> { + let isolation_level = operation + .and_then(|op| { + if can_downgrade_to_snapshot_isolation( + &winning_commit_summary.actions, + op, + &transaction_info + .read_snapshot + .table_config() + .isolation_level(), + ) { + Some(IsolationLevel::SnapshotIsolation) + } else { + None + } + }) + .unwrap_or_else(|| { + transaction_info + .read_snapshot + .table_config() + .isolation_level() + }); + + Self { + txn_info: transaction_info, + winning_commit_summary, + isolation_level, + } + } + + /// This function checks conflict of the `initial_current_transaction_info` against the + /// `winning_commit_version` and returns an updated [`TransactionInfo`] that represents + /// the transaction as if it had started while reading the `winning_commit_version`. + pub fn check_conflicts(&self) -> Result<(), CommitConflictError> { + self.check_protocol_compatibility()?; + self.check_no_metadata_updates()?; + self.check_for_added_files_that_should_have_been_read_by_current_txn()?; + self.check_for_deleted_files_against_current_txn_read_files()?; + self.check_for_deleted_files_against_current_txn_deleted_files()?; + self.check_for_updated_application_transaction_ids_that_current_txn_depends_on()?; + Ok(()) + } + + /// Asserts that the client is up to date with the protocol and is allowed + /// to read and write against the protocol set by the committed transaction. + fn check_protocol_compatibility(&self) -> Result<(), CommitConflictError> { + for p in self.winning_commit_summary.protocol() { + if self.txn_info.read_snapshot.min_reader_version() < p.min_reader_version + || self.txn_info.read_snapshot.min_writer_version() < p.min_writer_version + { + return Err(CommitConflictError::ProtocolChanged); + }; + } + if !self.winning_commit_summary.protocol().is_empty() + && self + .txn_info + .actions + .iter() + .any(|a| matches!(a, Action::protocol(_))) + { + return Err(CommitConflictError::ProtocolChanged); + }; + Ok(()) + } + + /// Check if the committed transaction has changed metadata. + fn check_no_metadata_updates(&self) -> Result<(), CommitConflictError> { + // Fail if the metadata is different than what the txn read. + if !self.winning_commit_summary.metadata_updates().is_empty() { + Err(CommitConflictError::MetadataChanged) + } else { + Ok(()) + } + } + + /// Check if the new files added by the already committed transactions + /// should have been read by the current transaction. + fn check_for_added_files_that_should_have_been_read_by_current_txn( + &self, + ) -> Result<(), CommitConflictError> { + // Skip check, if the operation can be downgraded to snapshot isolation + if matches!(self.isolation_level, IsolationLevel::SnapshotIsolation) { + return Ok(()); + } + + // Fail if new files have been added that the txn should have read. + let added_files_to_check = match self.isolation_level { + IsolationLevel::WriteSerializable if !self.txn_info.metadata_changed() => { + // don't conflict with blind appends + self.winning_commit_summary.changed_data_added_files() + } + IsolationLevel::Serializable | IsolationLevel::WriteSerializable => { + let mut files = self.winning_commit_summary.changed_data_added_files(); + files.extend(self.winning_commit_summary.blind_append_added_files()); + files + } + IsolationLevel::SnapshotIsolation => vec![], + }; + + // Here we need to check if the current transaction would have read the + // added files. for this we need to be able to evaluate predicates. Err on the safe side is + // to assume all files match + cfg_if::cfg_if! { + if #[cfg(feature = "datafusion")] { + let added_files_matching_predicates = if let (Some(predicate), false) = ( + &self.txn_info.read_predicates, + self.txn_info.read_whole_table(), + ) { + let arrow_schema = self.txn_info.read_snapshot.arrow_schema().map_err(|err| { + CommitConflictError::CorruptedState { + source: Box::new(err), + } + })?; + let partition_columns = &self + .txn_info + .read_snapshot + .current_metadata() + .ok_or(CommitConflictError::NoMetadata)? + .partition_columns; + AddContainer::new(&added_files_to_check, partition_columns, arrow_schema) + .predicate_matches(predicate.clone()) + .map_err(|err| CommitConflictError::Predicate { + source: Box::new(err), + })? + .cloned() + .collect::>() + } else if self.txn_info.read_whole_table() { + added_files_to_check + } else { + vec![] + }; + } else { + let added_files_matching_predicates = if self.txn_info.read_whole_table() + { + added_files_to_check + } else { + vec![] + }; + } + } + + if !added_files_matching_predicates.is_empty() { + Err(CommitConflictError::ConcurrentAppend) + } else { + Ok(()) + } + } + + /// Check if [Remove] actions added by already committed transactions + /// conflicts with files read by the current transaction. + fn check_for_deleted_files_against_current_txn_read_files( + &self, + ) -> Result<(), CommitConflictError> { + // Fail if files have been deleted that the txn read. + let read_file_path: HashSet = self + .txn_info + .read_files()? + .map(|f| f.path.clone()) + .collect(); + let deleted_read_overlap = self + .winning_commit_summary + .removed_files() + .iter() + // TODO remove cloned + .cloned() + .find(|f| read_file_path.contains(&f.path)); + if deleted_read_overlap.is_some() + || (!self.winning_commit_summary.removed_files().is_empty() + && self.txn_info.read_whole_table()) + { + Err(CommitConflictError::ConcurrentDeleteRead) + } else { + Ok(()) + } + } + + /// Check if [Remove] actions added by already committed transactions conflicts + /// with [Remove] actions this transaction is trying to add. + fn check_for_deleted_files_against_current_txn_deleted_files( + &self, + ) -> Result<(), CommitConflictError> { + // Fail if a file is deleted twice. + let txn_deleted_files: HashSet = self + .txn_info + .actions + .iter() + .cloned() + .filter_map(|action| match action { + Action::remove(remove) => Some(remove.path), + _ => None, + }) + .collect(); + let winning_deleted_files: HashSet = self + .winning_commit_summary + .removed_files() + .iter() + .cloned() + .map(|r| r.path) + .collect(); + let intersection: HashSet<&String> = txn_deleted_files + .intersection(&winning_deleted_files) + .collect(); + + if !intersection.is_empty() { + Err(CommitConflictError::ConcurrentDeleteDelete) + } else { + Ok(()) + } + } + + /// Checks if the winning transaction corresponds to some AppId on which + /// current transaction also depends. + fn check_for_updated_application_transaction_ids_that_current_txn_depends_on( + &self, + ) -> Result<(), CommitConflictError> { + // Fail if the appIds seen by the current transaction has been updated by the winning + // transaction i.e. the winning transaction have [Txn] corresponding to + // some appId on which current transaction depends on. Example - This can happen when + // multiple instances of the same streaming query are running at the same time. + let winning_txns = self.winning_commit_summary.app_level_transactions(); + let txn_overlap: HashSet<&String> = winning_txns + .intersection(&self.txn_info.read_app_ids) + .collect(); + if !txn_overlap.is_empty() { + Err(CommitConflictError::ConcurrentTransaction) + } else { + Ok(()) + } + } +} + +// implementation and comments adopted from +// https://github.com/delta-io/delta/blob/1c18c1d972e37d314711b3a485e6fb7c98fce96d/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala#L1268 +// +// For no-data-change transactions such as OPTIMIZE/Auto Compaction/ZorderBY, we can +// change the isolation level to SnapshotIsolation. SnapshotIsolation allows reduced conflict +// detection by skipping the +// [ConflictChecker::check_for_added_files_that_should_have_been_read_by_current_txn] check i.e. +// don't worry about concurrent appends. +// +// We can also use SnapshotIsolation for empty transactions. e.g. consider a commit: +// t0 - Initial state of table +// t1 - Q1, Q2 starts +// t2 - Q1 commits +// t3 - Q2 is empty and wants to commit. +// In this scenario, we can always allow Q2 to commit without worrying about new files +// generated by Q1. +// +// The final order which satisfies both Serializability and WriteSerializability is: Q2, Q1 +// Note that Metadata only update transactions shouldn't be considered empty. If Q2 above has +// a Metadata update (say schema change/identity column high watermark update), then Q2 can't +// be moved above Q1 in the final SERIALIZABLE order. This is because if Q2 is moved above Q1, +// then Q1 should see the updates from Q2 - which actually didn't happen. +pub(super) fn can_downgrade_to_snapshot_isolation<'a>( + actions: impl IntoIterator, + operation: &DeltaOperation, + isolation_level: &IsolationLevel, +) -> bool { + let mut data_changed = false; + let mut has_non_file_actions = false; + for action in actions { + match action { + Action::add(act) if act.data_change => data_changed = true, + Action::remove(rem) if rem.data_change => data_changed = true, + _ => has_non_file_actions = true, + } + } + + if has_non_file_actions { + // if Non-file-actions are present (e.g. METADATA etc.), then don't downgrade the isolation level. + return false; + } + + match isolation_level { + IsolationLevel::Serializable => !data_changed, + IsolationLevel::WriteSerializable => !data_changed && !operation.changes_data(), + IsolationLevel::SnapshotIsolation => false, // this case should never happen, since spanpshot isolation canot be configured on table + } +} + +#[cfg(test)] +#[allow(unused)] +mod tests { + use super::super::test_utils as tu; + use super::super::test_utils::init_table_actions; + use super::*; + use crate::action::Action; + #[cfg(feature = "datafusion")] + use datafusion_expr::{col, lit}; + use serde_json::json; + + fn get_stats(min: i64, max: i64) -> Option { + let data = json!({ + "numRecords": 18, + "minValues": { + "value": min + }, + "maxValues": { + "value": max + }, + "nullCount": { + "value": 0 + } + }); + Some(data.to_string()) + } + + #[test] + fn test_can_downgrade_to_snapshot_isolation() { + let isolation = IsolationLevel::WriteSerializable; + let operation = DeltaOperation::Optimize { + predicate: None, + target_size: 0, + }; + let add = tu::create_add_action("p", false, None); + let res = can_downgrade_to_snapshot_isolation(&[add], &operation, &isolation); + assert!(!res) + } + + // Check whether the test transaction conflict with the concurrent writes by executing the + // given params in the following order: + // - setup (including setting table isolation level + // - reads + // - concurrentWrites + // - actions + #[cfg(feature = "datafusion")] + fn execute_test( + setup: Option>, + reads: Option, + concurrent: Vec, + actions: Vec, + read_whole_table: bool, + ) -> Result<(), CommitConflictError> { + let setup_actions = setup.unwrap_or_else(init_table_actions); + let state = DeltaTableState::from_actions(setup_actions, 0).unwrap(); + let transaction_info = TransactionInfo::new(&state, reads, &actions, read_whole_table); + let summary = WinningCommitSummary { + actions: concurrent, + commit_info: None, + }; + let checker = ConflictChecker::new(transaction_info, summary, None); + checker.check_conflicts() + } + + #[tokio::test] + #[cfg(feature = "datafusion")] + // tests adopted from https://github.com/delta-io/delta/blob/24c025128612a4ae02d0ad958621f928cda9a3ec/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala#L40-L94 + async fn test_allowed_concurrent_actions() { + // append - append + // append file to table while a concurrent writer also appends a file + let file1 = tu::create_add_action("file1", true, get_stats(1, 10)); + let file2 = tu::create_add_action("file2", true, get_stats(1, 10)); + let result = execute_test(None, None, vec![file1], vec![file2], false); + assert!(result.is_ok()); + + // disjoint delete - read + // the concurrent transaction deletes a file that the current transaction did NOT read + let file_not_read = tu::create_add_action("file_not_read", true, get_stats(1, 10)); + let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000)); + let mut setup_actions = init_table_actions(); + setup_actions.push(file_not_read); + setup_actions.push(file_read); + let result = execute_test( + Some(setup_actions), + Some(col("value").gt(lit::(10))), + vec![tu::create_remove_action("file_not_read", true)], + vec![], + false, + ); + assert!(result.is_ok()); + + // disjoint add - read + // concurrently add file, that the current transaction would not have read + let file_added = tu::create_add_action("file_added", true, get_stats(1, 10)); + let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000)); + let mut setup_actions = init_table_actions(); + setup_actions.push(file_read); + let result = execute_test( + Some(setup_actions), + Some(col("value").gt(lit::(10))), + vec![file_added], + vec![], + false, + ); + assert!(result.is_ok()); + + // TODO enable test once we have isolation level downcast + // add / read + no write + // transaction would have read file added by concurrent txn, but does not write data, + // so no real conflicting change even though data was added + // let file_added = tu::create_add_action("file_added", true, get_stats(1, 10)); + // let result = execute_test( + // None, + // Some(col("value").gt(lit::(5))), + // vec![file_added], + // vec![], + // false, + // ); + // assert!(result.is_ok()); + + // TODO disjoint transactions + } + + #[tokio::test] + #[cfg(feature = "datafusion")] + // tests adopted from https://github.com/delta-io/delta/blob/24c025128612a4ae02d0ad958621f928cda9a3ec/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala#L40-L94 + async fn test_disallowed_concurrent_actions() { + // delete - delete + // remove file from table that has previously been removed + let removed_file = tu::create_remove_action("removed_file", true); + let result = execute_test( + None, + None, + vec![removed_file.clone()], + vec![removed_file], + false, + ); + assert!(matches!( + result, + Err(CommitConflictError::ConcurrentDeleteDelete) + )); + + // add / read + write + // a file is concurrently added that should have been read by the current transaction + let file_added = tu::create_add_action("file_added", true, get_stats(1, 10)); + let file_should_have_read = + tu::create_add_action("file_should_have_read", true, get_stats(1, 10)); + let result = execute_test( + None, + Some(col("value").lt_eq(lit::(10))), + vec![file_should_have_read], + vec![file_added], + false, + ); + assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend))); + + // delete / read + // transaction reads a file that is removed by concurrent transaction + let file_read = tu::create_add_action("file_read", true, get_stats(1, 10)); + let mut setup_actions = init_table_actions(); + setup_actions.push(file_read); + let result = execute_test( + Some(setup_actions), + Some(col("value").lt_eq(lit::(10))), + vec![tu::create_remove_action("file_read", true)], + vec![], + false, + ); + assert!(matches!( + result, + Err(CommitConflictError::ConcurrentDeleteRead) + )); + + // schema change + // concurrent transactions changes table metadata + let result = execute_test( + None, + None, + vec![tu::create_metadata_action(None, None)], + vec![], + false, + ); + assert!(matches!(result, Err(CommitConflictError::MetadataChanged))); + + // upgrade / upgrade + // current and concurrent transactions change the protocol version + let result = execute_test( + None, + None, + vec![tu::create_protocol_action(None, None)], + vec![tu::create_protocol_action(None, None)], + false, + ); + assert!(matches!(result, Err(CommitConflictError::ProtocolChanged))); + + // taint whole table + // `read_whole_table` should disallow any concurrent change, even if the change + // is disjoint with the earlier filter + let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10)); + let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100)); + let file_part3 = tu::create_add_action("file_part3", true, get_stats(101, 1000)); + let mut setup_actions = init_table_actions(); + setup_actions.push(file_part1); + let result = execute_test( + Some(setup_actions), + // filter matches neither exisiting nor added files + Some(col("value").lt(lit::(0))), + vec![file_part2], + vec![file_part3], + true, + ); + assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend))); + + // taint whole table + concurrent remove + // `read_whole_table` should disallow any concurrent remove actions + let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10)); + let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100)); + let mut setup_actions = init_table_actions(); + setup_actions.push(file_part1); + let result = execute_test( + Some(setup_actions), + None, + vec![tu::create_remove_action("file_part1", true)], + vec![file_part2], + true, + ); + assert!(matches!( + result, + Err(CommitConflictError::ConcurrentDeleteRead) + )); + + // TODO "add in part=2 / read from part=1,2 and write to part=1" + + // TODO conflicting txns + } +} diff --git a/rust/src/operations/transaction.rs b/rust/src/operations/transaction/mod.rs similarity index 60% rename from rust/src/operations/transaction.rs rename to rust/src/operations/transaction/mod.rs index da4a5cf06c..47ea68fae2 100644 --- a/rust/src/operations/transaction.rs +++ b/rust/src/operations/transaction/mod.rs @@ -1,17 +1,29 @@ //! Delta transactions -use crate::action::{Action, DeltaOperation}; -use crate::storage::DeltaObjectStore; -use crate::{crate_version, DeltaDataTypeVersion, DeltaResult, DeltaTableError}; - use chrono::Utc; +use conflict_checker::ConflictChecker; use object_store::path::Path; use object_store::{Error as ObjectStoreError, ObjectStore}; use serde_json::{Map, Value}; +use crate::action::{Action, CommitInfo, DeltaOperation}; +use crate::storage::commit_uri_from_version; +use crate::table_state::DeltaTableState; +use crate::{crate_version, DeltaDataTypeVersion, DeltaResult, DeltaTableError}; + +mod conflict_checker; +#[cfg(feature = "datafusion")] +mod state; +#[cfg(test)] +pub(crate) mod test_utils; + +use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary}; + const DELTA_LOG_FOLDER: &str = "_delta_log"; +/// Error raised while commititng transaction #[derive(thiserror::Error, Debug)] -pub(crate) enum TransactionError { +pub enum TransactionError { + /// Version already exists #[error("Tried committing existing table version: {0}")] VersionAlreadyExists(DeltaDataTypeVersion), @@ -29,6 +41,12 @@ pub(crate) enum TransactionError { #[from] source: ObjectStoreError, }, + /// Error returned when a commit conflict ocurred + #[error("Failed to commit transaction: {0}")] + CommitConflict(#[from] CommitConflictError), + /// Error returned when maximum number of commit trioals is exceeded + #[error("Failed to commit transaction: {0}")] + MaxCommitAttempts(i32), } impl From for DeltaTableError { @@ -41,16 +59,11 @@ impl From for DeltaTableError { DeltaTableError::SerializeLogJson { json_err } } TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source }, + other => DeltaTableError::Transaction { source: other }, } } } -/// Return the uri of commit version. -fn commit_uri_from_version(version: DeltaDataTypeVersion) -> Path { - let version = format!("{version:020}.json"); - Path::from_iter([DELTA_LOG_FOLDER, &version]) -} - // Convert actions to their json representation fn log_entry_from_actions<'a>( actions: impl IntoIterator, @@ -113,13 +126,13 @@ pub(crate) async fn prepare_commit<'a>( Ok(path) } -/// Tries to commit a prepared commit file. Returns [`DeltaTableError::VersionAlreadyExists`] +/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists] /// if the given `version` already exists. The caller should handle the retry logic itself. /// This is low-level transaction API. If user does not want to maintain the commit loop then /// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` /// with retry logic. async fn try_commit_transaction( - storage: &DeltaObjectStore, + storage: &dyn ObjectStore, tmp_commit: &Path, version: DeltaDataTypeVersion, ) -> Result { @@ -138,80 +151,89 @@ async fn try_commit_transaction( } pub(crate) async fn commit( - storage: &DeltaObjectStore, - version: DeltaDataTypeVersion, + storage: &dyn ObjectStore, actions: &Vec, operation: DeltaOperation, + read_snapshot: &DeltaTableState, app_metadata: Option>, ) -> DeltaResult { let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?; - match try_commit_transaction(storage, &tmp_commit, version).await { - Ok(version) => Ok(version), - Err(TransactionError::VersionAlreadyExists(version)) => { - storage.delete(&tmp_commit).await?; - Err(DeltaTableError::VersionAlreadyExists(version)) + + let max_attempts = 5; + let mut attempt_number = 1; + + while attempt_number <= max_attempts { + let version = read_snapshot.version() + attempt_number; + match try_commit_transaction(storage, &tmp_commit, version).await { + Ok(version) => return Ok(version), + Err(TransactionError::VersionAlreadyExists(version)) => { + let summary = WinningCommitSummary::try_new(storage, version - 1, version).await?; + let transaction_info = TransactionInfo::try_new( + read_snapshot, + operation.read_predicate(), + actions, + // TODO allow tainting whole table + false, + )?; + let conflict_checker = + ConflictChecker::new(transaction_info, summary, Some(&operation)); + match conflict_checker.check_conflicts() { + Ok(_) => { + attempt_number += 1; + } + Err(err) => { + storage.delete(&tmp_commit).await?; + return Err(TransactionError::CommitConflict(err).into()); + } + }; + } + Err(err) => { + storage.delete(&tmp_commit).await?; + return Err(err.into()); + } } - Err(err) => Err(err.into()), } + + Err(TransactionError::MaxCommitAttempts(max_attempts as i32).into()) } #[cfg(all(test, feature = "parquet"))] mod tests { + use self::test_utils::init_table_actions; use super::*; - use crate::action::{DeltaOperation, Protocol, SaveMode}; - use crate::storage::utils::flatten_list_stream; - use crate::writer::test_utils::get_delta_metadata; - use crate::{DeltaTable, DeltaTableBuilder}; + use object_store::memory::InMemory; #[test] - fn test_commit_version() { + fn test_commit_uri_from_version() { let version = commit_uri_from_version(0); assert_eq!(version, Path::from("_delta_log/00000000000000000000.json")); let version = commit_uri_from_version(123); assert_eq!(version, Path::from("_delta_log/00000000000000000123.json")) } + #[test] + fn test_log_entry_from_actions() { + let actions = init_table_actions(); + let entry = log_entry_from_actions(&actions).unwrap(); + let lines: Vec<_> = entry.lines().collect(); + // writes every action to a line + assert_eq!(actions.len(), lines.len()) + } + #[tokio::test] - async fn test_commits_writes_file() { - let metadata = get_delta_metadata(&[]); - let operation = DeltaOperation::Create { - mode: SaveMode::Append, - location: "memory://".into(), - protocol: Protocol { - min_reader_version: 1, - min_writer_version: 1, - }, - metadata, - }; - - let commit_path = Path::from("_delta_log/00000000000000000000.json"); - let storage = DeltaTableBuilder::from_uri("memory://") - .build_storage() - .unwrap(); - - // successfully write in clean location - commit(storage.as_ref(), 0, &vec![], operation.clone(), None) - .await - .unwrap(); - let head = storage.head(&commit_path).await; - assert!(head.is_ok()); - assert_eq!(head.as_ref().unwrap().location, commit_path); - - // fail on overwriting - let failed_commit = commit(storage.as_ref(), 0, &vec![], operation, None).await; - assert!(failed_commit.is_err()); - assert!(matches!( - failed_commit.unwrap_err(), - DeltaTableError::VersionAlreadyExists(_) - )); - - // check we clean up after ourselves - let objects = flatten_list_stream(storage.as_ref(), None).await.unwrap(); - assert_eq!(objects.len(), 1); - - // table can be loaded - let mut table = DeltaTable::new(storage, Default::default()); - table.load().await.unwrap(); - assert_eq!(table.version(), 0) + async fn test_try_commit_transaction() { + let store = InMemory::new(); + let tmp_path = Path::from("_delta_log/tmp"); + let version_path = Path::from("_delta_log/00000000000000000000.json"); + store.put(&tmp_path, bytes::Bytes::new()).await.unwrap(); + store.put(&version_path, bytes::Bytes::new()).await.unwrap(); + + // fails if file version already exists + let res = try_commit_transaction(&store, &tmp_path, 0).await; + assert!(res.is_err()); + + // succeeds for next version + let res = try_commit_transaction(&store, &tmp_path, 1).await.unwrap(); + assert_eq!(res, 1); } } diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs new file mode 100644 index 0000000000..f521ac2062 --- /dev/null +++ b/rust/src/operations/transaction/state.rs @@ -0,0 +1,335 @@ +use std::convert::TryFrom; +use std::sync::Arc; + +use arrow::array::ArrayRef; +use arrow::datatypes::{DataType, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use datafusion::optimizer::utils::conjunction; +use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::scalar::ScalarValue; +use datafusion_common::{Column, DFSchema, Result as DFResult, TableReference}; +use datafusion_expr::{AggregateUDF, Expr, ScalarUDF, TableSource}; +use datafusion_sql::planner::{ContextProvider, SqlToRel}; +use itertools::Either; +use sqlparser::dialect::GenericDialect; +use sqlparser::parser::Parser; +use sqlparser::tokenizer::Tokenizer; + +use crate::action::Add; +use crate::delta_datafusion::to_correct_scalar_value; +use crate::table_state::DeltaTableState; +use crate::DeltaResult; +use crate::{schema, DeltaTableError}; + +impl DeltaTableState { + /// Get the table schema as an [`ArrowSchemaRef`] + pub fn arrow_schema(&self) -> DeltaResult { + Ok(Arc::new( + >::try_from( + self.schema().ok_or(DeltaTableError::NoMetadata)?, + )?, + )) + } + + /// Iterate over all files in the log matching a predicate + pub fn files_matching_predicate( + &self, + filters: &[Expr], + ) -> DeltaResult> { + if let Some(Some(predicate)) = + (!filters.is_empty()).then_some(conjunction(filters.iter().cloned())) + { + let pruning_predicate = PruningPredicate::try_new(predicate, self.arrow_schema()?)?; + Ok(Either::Left( + self.files() + .iter() + .zip(pruning_predicate.prune(self)?.into_iter()) + .filter_map( + |(action, keep_file)| { + if keep_file { + Some(action) + } else { + None + } + }, + ), + )) + } else { + Ok(Either::Right(self.files().iter())) + } + } + + /// Parse an expression string into a datafusion [`Expr`] + pub fn parse_predicate_expression(&self, expr: impl AsRef) -> DeltaResult { + let dialect = &GenericDialect {}; + let mut tokenizer = Tokenizer::new(dialect, expr.as_ref()); + let tokens = tokenizer + .tokenize() + .map_err(|err| DeltaTableError::GenericError { + source: Box::new(err), + })?; + let sql = Parser::new(dialect) + .with_tokens(tokens) + .parse_expr() + .map_err(|err| DeltaTableError::GenericError { + source: Box::new(err), + })?; + + // TODO should we add the table name as qualifier when available? + let df_schema = DFSchema::try_from_qualified_schema("", self.arrow_schema()?.as_ref())?; + let context_provider = DummyContextProvider::default(); + let sql_to_rel = SqlToRel::new(&context_provider); + + Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?) + } +} + +pub struct AddContainer<'a> { + inner: &'a Vec, + partition_columns: &'a Vec, + schema: ArrowSchemaRef, +} + +impl<'a> AddContainer<'a> { + /// Create a new instance of [`AddContainer`] + pub fn new( + adds: &'a Vec, + partition_columns: &'a Vec, + schema: ArrowSchemaRef, + ) -> Self { + Self { + inner: adds, + partition_columns, + schema, + } + } + + pub fn get_prune_stats(&self, column: &Column, get_max: bool) -> Option { + let (_, field) = self.schema.column_with_name(&column.name)?; + + // See issue 1214. Binary type does not support natural order which is required for Datafusion to prune + if field.data_type() == &DataType::Binary { + return None; + } + + let values = self.inner.iter().map(|add| { + if self.partition_columns.contains(&column.name) { + let value = add.partition_values.get(&column.name).unwrap(); + let value = match value { + Some(v) => serde_json::Value::String(v.to_string()), + None => serde_json::Value::Null, + }; + to_correct_scalar_value(&value, field.data_type()).unwrap_or(ScalarValue::Null) + } else if let Ok(Some(statistics)) = add.get_stats() { + let values = if get_max { + statistics.max_values + } else { + statistics.min_values + }; + + values + .get(&column.name) + .and_then(|f| to_correct_scalar_value(f.as_value()?, field.data_type())) + .unwrap_or(ScalarValue::Null) + } else { + ScalarValue::Null + } + }); + ScalarValue::iter_to_array(values).ok() + } + + /// Get an iterator of add actions / files, that MAY contain data matching the predicate. + /// + /// Expressions are evaluated for file statistics, essentially column-wise min max bounds, + /// so evaluating expressions is inexact. However, excluded files are guaranteed (for a correct log) + /// to not contain matches by the predicate expression. + pub fn predicate_matches(&self, predicate: Expr) -> DeltaResult> { + let pruning_predicate = PruningPredicate::try_new(predicate, self.schema.clone())?; + Ok(self + .inner + .iter() + .zip(pruning_predicate.prune(self)?.into_iter()) + .filter_map( + |(action, keep_file)| { + if keep_file { + Some(action) + } else { + None + } + }, + )) + } +} + +impl<'a> PruningStatistics for AddContainer<'a> { + /// return the minimum values for the named column, if known. + /// Note: the returned array must contain `num_containers()` rows + fn min_values(&self, column: &Column) -> Option { + self.get_prune_stats(column, false) + } + + /// return the maximum values for the named column, if known. + /// Note: the returned array must contain `num_containers()` rows. + fn max_values(&self, column: &Column) -> Option { + self.get_prune_stats(column, true) + } + + /// return the number of containers (e.g. row groups) being + /// pruned with these statistics + fn num_containers(&self) -> usize { + self.inner.len() + } + + /// return the number of null values for the named column as an + /// `Option`. + /// + /// Note: the returned array must contain `num_containers()` rows. + fn null_counts(&self, column: &Column) -> Option { + let values = self.inner.iter().map(|add| { + if let Ok(Some(statistics)) = add.get_stats() { + if self.partition_columns.contains(&column.name) { + let value = add.partition_values.get(&column.name).unwrap(); + match value { + Some(_) => ScalarValue::UInt64(Some(0)), + None => ScalarValue::UInt64(Some(statistics.num_records as u64)), + } + } else { + statistics + .null_count + .get(&column.name) + .map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64))) + .unwrap_or(ScalarValue::UInt64(None)) + } + } else if self.partition_columns.contains(&column.name) { + let value = add.partition_values.get(&column.name).unwrap(); + match value { + Some(_) => ScalarValue::UInt64(Some(0)), + None => ScalarValue::UInt64(None), + } + } else { + ScalarValue::UInt64(None) + } + }); + ScalarValue::iter_to_array(values).ok() + } +} + +impl PruningStatistics for DeltaTableState { + /// return the minimum values for the named column, if known. + /// Note: the returned array must contain `num_containers()` rows + fn min_values(&self, column: &Column) -> Option { + let partition_columns = &self.current_metadata()?.partition_columns; + let container = + AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?); + container.min_values(column) + } + + /// return the maximum values for the named column, if known. + /// Note: the returned array must contain `num_containers()` rows. + fn max_values(&self, column: &Column) -> Option { + let partition_columns = &self.current_metadata()?.partition_columns; + let container = + AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?); + container.max_values(column) + } + + /// return the number of containers (e.g. row groups) being + /// pruned with these statistics + fn num_containers(&self) -> usize { + self.files().len() + } + + /// return the number of null values for the named column as an + /// `Option`. + /// + /// Note: the returned array must contain `num_containers()` rows. + fn null_counts(&self, column: &Column) -> Option { + let partition_columns = &self.current_metadata()?.partition_columns; + let container = + AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?); + container.null_counts(column) + } +} + +#[derive(Default)] +struct DummyContextProvider { + options: ConfigOptions, +} + +impl ContextProvider for DummyContextProvider { + fn get_table_provider(&self, _name: TableReference) -> DFResult> { + unimplemented!() + } + + fn get_function_meta(&self, _name: &str) -> Option> { + unimplemented!() + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + unimplemented!() + } + + fn get_variable_type(&self, _: &[String]) -> Option { + unimplemented!() + } + + fn options(&self) -> &ConfigOptions { + &self.options + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::operations::transaction::test_utils::{create_add_action, init_table_actions}; + use datafusion_expr::{col, lit}; + + #[test] + fn test_parse_predicate_expression() { + let state = DeltaTableState::from_actions(init_table_actions(), 0).unwrap(); + + // parses simple expression + let parsed = state.parse_predicate_expression("value > 10").unwrap(); + let expected = col("value").gt(lit::(10)); + assert_eq!(parsed, expected); + + // fails for unknown column + let parsed = state.parse_predicate_expression("non_existent > 10"); + assert!(parsed.is_err()); + + // parses complex expression + let parsed = state + .parse_predicate_expression("value > 10 OR value <= 0") + .unwrap(); + let expected = col("value") + .gt(lit::(10)) + .or(col("value").lt_eq(lit::(0))); + assert_eq!(parsed, expected) + } + + #[test] + fn test_files_matching_predicate() { + let mut actions = init_table_actions(); + actions.push(create_add_action("excluded", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":10},\"nullCount\":{\"value\":0}}".into()))); + actions.push(create_add_action("included-1", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":100},\"nullCount\":{\"value\":0}}".into()))); + actions.push(create_add_action("included-2", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":-10},\"maxValues\":{\"value\":3},\"nullCount\":{\"value\":0}}".into()))); + + let state = DeltaTableState::from_actions(actions, 0).unwrap(); + let files = state + .files_matching_predicate(&[]) + .unwrap() + .collect::>(); + assert_eq!(files.len(), 3); + + let predictate = col("value") + .gt(lit::(10)) + .or(col("value").lt_eq(lit::(0))); + + let files = state + .files_matching_predicate(&[predictate]) + .unwrap() + .collect::>(); + assert_eq!(files.len(), 2); + assert!(files.iter().all(|add| add.path.contains("included"))); + } +} diff --git a/rust/src/operations/transaction/test_utils.rs b/rust/src/operations/transaction/test_utils.rs new file mode 100644 index 0000000000..92d981d5e1 --- /dev/null +++ b/rust/src/operations/transaction/test_utils.rs @@ -0,0 +1,155 @@ +#![allow(unused)] +use super::{prepare_commit, try_commit_transaction, CommitInfo}; +use crate::action::{Action, Add, DeltaOperation, MetaData, Protocol, Remove, SaveMode}; +use crate::table_state::DeltaTableState; +use crate::{ + DeltaTable, DeltaTableBuilder, DeltaTableMetaData, Schema, SchemaDataType, SchemaField, +}; +use std::collections::HashMap; + +pub fn create_add_action( + path: impl Into, + data_change: bool, + stats: Option, +) -> Action { + Action::add(Add { + path: path.into(), + size: 100, + data_change, + stats, + ..Default::default() + }) +} + +pub fn create_remove_action(path: impl Into, data_change: bool) -> Action { + Action::remove(Remove { + path: path.into(), + data_change, + ..Default::default() + }) +} + +pub fn create_protocol_action(max_reader: Option, max_writer: Option) -> Action { + let protocol = Protocol { + min_reader_version: max_reader.unwrap_or(crate::operations::MAX_SUPPORTED_READER_VERSION), + min_writer_version: max_writer.unwrap_or(crate::operations::MAX_SUPPORTED_WRITER_VERSION), + }; + Action::protocol(protocol) +} + +pub fn create_metadata_action( + parttiton_columns: Option>, + configuration: Option>>, +) -> Action { + let table_schema = Schema::new(vec![ + SchemaField::new( + "id".to_string(), + SchemaDataType::primitive("string".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "value".to_string(), + SchemaDataType::primitive("integer".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "modified".to_string(), + SchemaDataType::primitive("string".to_string()), + true, + HashMap::new(), + ), + ]); + let metadata = DeltaTableMetaData::new( + None, + None, + None, + table_schema, + parttiton_columns.unwrap_or_default(), + configuration.unwrap_or_default(), + ); + Action::metaData(MetaData::try_from(metadata).unwrap()) +} + +pub fn init_table_actions() -> Vec { + let raw = r#" + { + "timestamp": 1670892998177, + "operation": "WRITE", + "operationParameters": { + "mode": "Append", + "partitionBy": "[\"c1\",\"c2\"]" + }, + "isolationLevel": "Serializable", + "isBlindAppend": true, + "operationMetrics": { + "numFiles": "3", + "numOutputRows": "3", + "numOutputBytes": "1356" + }, + "engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0", + "txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c" + }"#; + + let commit_info = serde_json::from_str::(raw).unwrap(); + vec![ + Action::commitInfo(commit_info), + create_protocol_action(None, None), + create_metadata_action(None, None), + ] +} + +pub async fn create_initialized_table( + partition_cols: &[String], + configuration: Option>>, +) -> DeltaTable { + let storage = DeltaTableBuilder::from_uri("memory://") + .build_storage() + .unwrap(); + let table_schema = Schema::new(vec![ + SchemaField::new( + "id".to_string(), + SchemaDataType::primitive("string".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "value".to_string(), + SchemaDataType::primitive("integer".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "modified".to_string(), + SchemaDataType::primitive("string".to_string()), + true, + HashMap::new(), + ), + ]); + let state = DeltaTableState::from_actions(init_table_actions(), 0).unwrap(); + let operation = DeltaOperation::Create { + mode: SaveMode::ErrorIfExists, + location: "location".into(), + protocol: Protocol { + min_reader_version: 1, + min_writer_version: 1, + }, + metadata: DeltaTableMetaData::new( + None, + None, + None, + table_schema, + partition_cols.to_vec(), + configuration.unwrap_or_default(), + ), + }; + let actions = init_table_actions(); + let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, None) + .await + .unwrap(); + try_commit_transaction(storage.as_ref(), &prepared_commit, 0) + .await + .unwrap(); + DeltaTable::new_with_state(storage, state) +} diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index bcddbb390c..203e0f53f9 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -446,10 +446,11 @@ impl std::future::IntoFuture for WriteBuilder { predicate: this.predicate, }; let _version = commit( - &table.storage, - table.version() + 1, + table.storage.as_ref(), &actions, operation, + &table.state, + // TODO pass through metadata None, ) .await?; diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index 1f4eb999a1..e447bc687a 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -4,7 +4,7 @@ use super::utils::str_is_truthy; use crate::{DeltaResult, DeltaTableError}; use object_store::memory::InMemory; use object_store::path::Path; -use object_store::prefix::PrefixObjectStore; +use object_store::prefix::PrefixStore; use object_store::DynObjectStore; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -111,14 +111,14 @@ pub(crate) enum ObjectStoreImpl { impl ObjectStoreImpl { pub(crate) fn into_prefix(self, prefix: Path) -> Arc { match self { - ObjectStoreImpl::Local(store) => Arc::new(PrefixObjectStore::new(store, prefix)), - ObjectStoreImpl::InMemory(store) => Arc::new(PrefixObjectStore::new(store, prefix)), + ObjectStoreImpl::Local(store) => Arc::new(PrefixStore::new(store, prefix)), + ObjectStoreImpl::InMemory(store) => Arc::new(PrefixStore::new(store, prefix)), #[cfg(feature = "azure")] - ObjectStoreImpl::Azure(store) => Arc::new(PrefixObjectStore::new(store, prefix)), + ObjectStoreImpl::Azure(store) => Arc::new(PrefixStore::new(store, prefix)), #[cfg(any(feature = "s3", feature = "s3-native-tls"))] - ObjectStoreImpl::S3(store) => Arc::new(PrefixObjectStore::new(store, prefix)), + ObjectStoreImpl::S3(store) => Arc::new(PrefixStore::new(store, prefix)), #[cfg(feature = "gcs")] - ObjectStoreImpl::Google(store) => Arc::new(PrefixObjectStore::new(store, prefix)), + ObjectStoreImpl::Google(store) => Arc::new(PrefixStore::new(store, prefix)), } } diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index afc6297640..06584cc077 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -113,7 +113,7 @@ async fn test_filesystem_check_partitioned() -> TestResult { #[tokio::test] #[serial] -async fn test_filesystem_check_outdated() -> TestResult { +async fn test_filesystem_check_fails_for_concurrent_delete() -> TestResult { // Validate failure when a non dry only executes on the latest version let context = IntegrationContext::new(StorageIntegration::Local)?; context.load_table(TestTables::Simple).await?; @@ -132,6 +132,34 @@ async fn test_filesystem_check_outdated() -> TestResult { let op = DeltaOps::from(table); let res = op.filesystem_check().with_dry_run(false).await; + // TODO check more specific error + assert!(matches!(res, Err(DeltaTableError::Transaction { .. }))); + + Ok(()) +} + +#[tokio::test] +#[serial] +#[ignore = "should this actually fail? with conflcit resolution, we are re-trying again."] +async fn test_filesystem_check_outdated() -> TestResult { + // Validate failure when a non dry only executes on the latest version + let context = IntegrationContext::new(StorageIntegration::Local)?; + context.load_table(TestTables::Simple).await?; + let file = "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet"; + let path = Path::from_iter([&TestTables::Simple.as_name(), file]); + + // Delete an active file from underlying storage without an update to the log to simulate an external fault + context.object_store().delete(&path).await?; + + let table = context + .table_builder(TestTables::Simple) + .with_version(2) + .load() + .await?; + + let op = DeltaOps::from(table); + let res = op.filesystem_check().with_dry_run(false).await; + println!("{:?}", res); if let Err(DeltaTableError::VersionAlreadyExists(version)) = res { assert!(version == 3); } else { diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index 9952c00825..81a292867a 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -318,13 +318,12 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { transaction.add_action(Action::remove(remove)); transaction.commit(None, None).await?; - let maybe_metrics = plan.execute(dt.object_store()).await; + let maybe_metrics = plan.execute(dt.object_store(), &dt.state).await; assert!(maybe_metrics.is_err()); assert_eq!(dt.version(), version + 1); Ok(()) } -#[ignore = "we do not yet re-try in operations commits."] #[tokio::test] /// Validate that optimize succeeds when only add actions occur for a optimized partition async fn test_no_conflict_for_append_actions() -> Result<(), Box> { @@ -367,9 +366,11 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { ) .await?; - let metrics = plan.execute(dt.object_store()).await?; + let metrics = plan.execute(dt.object_store(), &dt.state).await?; assert_eq!(metrics.num_files_added, 1); assert_eq!(metrics.num_files_removed, 2); + + dt.update().await.unwrap(); assert_eq!(dt.version(), version + 2); Ok(()) }