From 56ced4532bc87fb2b32e17ebc78d6ccbe94b90c5 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 14 Mar 2024 22:27:39 -0400 Subject: [PATCH 1/5] impl write app txn --- crates/core/src/kernel/models/actions.rs | 20 ++++++++++ crates/core/src/kernel/snapshot/mod.rs | 3 +- crates/core/src/operations/transaction/mod.rs | 37 ++++++++++++++++++- crates/core/src/operations/write.rs | 21 +++++++++++ crates/core/src/table/state.rs | 2 + 5 files changed, 80 insertions(+), 3 deletions(-) diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index f389102e52..a26bfef3d4 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -671,6 +671,26 @@ pub struct Txn { pub last_updated: Option, } +impl Txn { + /// Create a new application transactions. See [`Txn`] for details. + pub fn new(app_id: &dyn ToString, version: i64) -> Self { + Self::new_with_last_update(app_id, version, None) + } + + /// Create a new application transactions. See [`Txn`] for details. + pub fn new_with_last_update( + app_id: &dyn ToString, + version: i64, + last_updated: Option, + ) -> Self { + Txn { + app_id: app_id.to_string(), + version, + last_updated, + } + } +} + /// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored. /// However the reference implementation as well as delta-rs store useful information that may for instance /// allow us to be more permissive in commit conflict resolution. diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index e27fe3e2f0..6dfc741d57 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -759,7 +759,8 @@ mod tests { predicate: None, }; - let actions = vec![CommitData::new(removes, operation, HashMap::new()).unwrap()]; + let actions = + vec![CommitData::new(removes, operation, HashMap::new(), Vec::new()).unwrap()]; let new_version = snapshot.advance(&actions)?; assert_eq!(new_version, version + 1); diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 64cb25c0be..4fdb21642f 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -77,7 +77,7 @@ use serde_json::Value; use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary}; use crate::errors::DeltaTableError; use crate::kernel::{ - Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, WriterFeatures, + Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, Txn, WriterFeatures, }; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; @@ -245,6 +245,8 @@ pub struct CommitData { pub operation: DeltaOperation, /// The Metadata pub app_metadata: HashMap, + /// Application specific transaction + pub app_transactions: Vec, } impl CommitData { @@ -253,6 +255,7 @@ impl CommitData { mut actions: Vec, operation: DeltaOperation, mut app_metadata: HashMap, + app_transactions: Vec, ) -> Result { if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) { let mut commit_info = operation.get_commit_info(); @@ -265,10 +268,18 @@ impl CommitData { commit_info.info = app_metadata.clone(); actions.push(Action::CommitInfo(commit_info)) } + + for txn in &app_transactions { + actions.push(Action::Txn(txn.clone())) + } + + dbg!("{:?}", &actions); + Ok(CommitData { actions, operation, app_metadata, + app_transactions, }) } @@ -298,6 +309,7 @@ impl CommitData { /// Enable controling commit behaviour and modifying metadata that is written during a commit. pub struct CommitProperties { pub(crate) app_metadata: HashMap, + pub(crate) app_transaction: Vec, max_retries: usize, } @@ -305,6 +317,7 @@ impl Default for CommitProperties { fn default() -> Self { Self { app_metadata: Default::default(), + app_transaction: Vec::new(), max_retries: DEFAULT_RETRIES, } } @@ -319,6 +332,18 @@ impl CommitProperties { self.app_metadata = HashMap::from_iter(metadata); self } + + /// Add an additonal application transaction to the commit + pub fn with_application_transaction(mut self, txn: Txn) -> Self { + self.app_transaction.push(txn); + self + } + + /// Override application transactions for the commit + pub fn with_application_transactions(mut self, txn: Vec) -> Self { + self.app_transaction = txn; + self + } } impl From for CommitBuilder { @@ -326,6 +351,7 @@ impl From for CommitBuilder { CommitBuilder { max_retries: value.max_retries, app_metadata: value.app_metadata, + app_transaction: value.app_transaction, ..Default::default() } } @@ -335,6 +361,7 @@ impl From for CommitBuilder { pub struct CommitBuilder { actions: Vec, app_metadata: HashMap, + app_transaction: Vec, max_retries: usize, } @@ -343,6 +370,7 @@ impl Default for CommitBuilder { CommitBuilder { actions: Vec::new(), app_metadata: HashMap::new(), + app_transaction: Vec::new(), max_retries: DEFAULT_RETRIES, } } @@ -368,7 +396,12 @@ impl<'a> CommitBuilder { log_store: LogStoreRef, operation: DeltaOperation, ) -> Result, CommitBuilderError> { - let data = CommitData::new(self.actions, operation, self.app_metadata)?; + let data = CommitData::new( + self.actions, + operation, + self.app_metadata, + self.app_transaction, + )?; Ok(PreCommit { log_store, table_data, diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index d2751a6b1f..d21dbc1f8c 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -866,6 +866,7 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE #[cfg(test)] mod tests { use super::*; + use crate::kernel::Txn; use crate::operations::{collect_sendable_stream, DeltaOps}; use crate::protocol::SaveMode; use crate::writer::test_utils::datafusion::write_batch; @@ -1610,4 +1611,24 @@ mod tests { let actual = get_data_sorted(&table, "id,value,modified").await; assert_batches_sorted_eq!(&expected, &actual); } + + #[tokio::test] + async fn test_app_txn() { + let batch = get_record_batch(None, false); + let table = DeltaOps::new_in_memory() + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::ErrorIfExists) + .with_partition_columns(["modified"]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 1)), + ) + .await + .unwrap(); + assert_eq!(table.version(), 0); + assert_eq!(table.get_files_count(), 2); + + let app_txns = table.get_app_transaction_version(); + println!("{:?}", &app_txns); + assert_eq!(app_txns.len(), 1); + } } diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 89595e870a..ab4161ae8a 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -83,6 +83,7 @@ impl DeltaTableState { metadata: metadata.clone(), }, HashMap::new(), + Vec::new(), ) .unwrap()]; @@ -194,6 +195,7 @@ impl DeltaTableState { actions, operation: operation.clone(), app_metadata: HashMap::new(), + app_transactions: Vec::new(), }; let new_version = self.snapshot.advance(&vec![commit_data])?; if new_version != version { From 03d071eccf1b2ed400b0769254dccc4eb4b56bd0 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 24 Mar 2024 00:01:27 -0400 Subject: [PATCH 2/5] :WIP: POC for log replay visitor + txn ids --- .../core/src/kernel/snapshot/log_segment.rs | 7 ++- crates/core/src/kernel/snapshot/mod.rs | 56 +++++++++++++++---- crates/core/src/kernel/snapshot/replay.rs | 43 ++++++++++---- crates/core/src/operations/write.rs | 2 + crates/core/src/table/mod.rs | 4 +- crates/core/src/table/state.rs | 54 +++++++++++++++++- 6 files changed, 138 insertions(+), 28 deletions(-) diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 0b82231ee8..c2d38db2b3 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -30,9 +30,12 @@ lazy_static! { pub(super) static ref COMMIT_SCHEMA: StructType = StructType::new(vec![ ActionType::Add.schema_field().clone(), ActionType::Remove.schema_field().clone(), + ActionType::Txn.schema_field().clone(), + ]); + pub(super) static ref CHECKPOINT_SCHEMA: StructType = StructType::new(vec![ + ActionType::Add.schema_field().clone(), + ActionType::Txn.schema_field().clone(), ]); - pub(super) static ref CHECKPOINT_SCHEMA: StructType = - StructType::new(vec![ActionType::Add.schema_field().clone(),]); pub(super) static ref TOMBSTONE_SCHEMA: StructType = StructType::new(vec![ActionType::Remove.schema_field().clone(),]); } diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 6dfc741d57..daba3705aa 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -33,6 +33,7 @@ use crate::kernel::StructType; use crate::logstore::LogStore; use crate::operations::transaction::CommitData; use crate::table::config::TableConfig; +use crate::table::state::AppTransactionVisitor; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; mod log_data; @@ -197,10 +198,11 @@ impl Snapshot { } /// Get the files in the snapshot - pub fn files( + pub fn files<'a>( &self, store: Arc, - ) -> DeltaResult>>> { + visitors: Vec<&'a mut dyn ReplayVistor>, + ) -> DeltaResult>>> { let log_stream = self.log_segment.commit_stream( store.clone(), &log_segment::COMMIT_SCHEMA, @@ -211,7 +213,7 @@ impl Snapshot { &log_segment::CHECKPOINT_SCHEMA, &self.config, ); - ReplayStream::try_new(log_stream, checkpoint_stream, self) + ReplayStream::try_new(log_stream, checkpoint_stream, self, visitors) } /// Get the commit infos in the snapshot @@ -337,6 +339,20 @@ impl Snapshot { } } +/// TODO! +pub trait ReplayVistor: Send { + /// TODO! + fn visit_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()>; +} + +struct PrintVistor {} +impl ReplayVistor for PrintVistor { + fn visit_batch(&mut self, _batch: &RecordBatch) -> DeltaResult<()> { + println!("Hello world!"); + Ok(()) + } +} + /// A snapshot of a Delta table that has been eagerly loaded into memory. #[derive(Debug, Clone, PartialEq)] pub struct EagerSnapshot { @@ -353,9 +369,22 @@ impl EagerSnapshot { store: Arc, config: DeltaTableConfig, version: Option, + ) -> DeltaResult { + let mut p = PrintVistor {}; + let mut i = AppTransactionVisitor::new(); + Self::try_new_with_visitor(table_root, store, config, version, vec![&mut p, &mut i]).await + } + + /// Create a new [`EagerSnapshot`] instance + pub async fn try_new_with_visitor( + table_root: &Path, + store: Arc, + config: DeltaTableConfig, + version: Option, + visitors: Vec<&mut dyn ReplayVistor>, ) -> DeltaResult { let snapshot = Snapshot::try_new(table_root, store.clone(), config, version).await?; - let files = snapshot.files(store)?.try_collect().await?; + let files = snapshot.files(store, visitors)?.try_collect().await?; Ok(Self { snapshot, files }) } @@ -379,9 +408,13 @@ impl EagerSnapshot { log_store: Arc, target_version: Option, ) -> DeltaResult<()> { + dbg!("Update Call: {} {:?}", self.version(), target_version); if Some(self.version()) == target_version { return Ok(()); } + + let mut p = PrintVistor {}; + let visitors: Vec<&mut dyn ReplayVistor> = vec![&mut p]; let new_slice = self .snapshot .update_inner(log_store.clone(), target_version) @@ -404,11 +437,13 @@ impl EagerSnapshot { ) .boxed() }; + dbg!("here2"); let mapper = LogMapper::try_new(&self.snapshot)?; - let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot)? - .map(|batch| batch.and_then(|b| mapper.map_batch(b))) - .try_collect() - .await?; + let files = + ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot, visitors)? + .map(|batch| batch.and_then(|b| mapper.map_batch(b))) + .try_collect() + .await?; self.files = files; } @@ -483,6 +518,7 @@ impl EagerSnapshot { &mut self, commits: impl IntoIterator, ) -> DeltaResult { + println!("Advance!"); let mut metadata = None; let mut protocol = None; let mut send = Vec::new(); @@ -633,7 +669,7 @@ mod tests { assert_eq!(tombstones.len(), 31); let batches = snapshot - .files(store.clone())? + .files(store.clone(), vec![])? .try_collect::>() .await?; let expected = [ @@ -663,7 +699,7 @@ mod tests { ) .await?; let batches = snapshot - .files(store.clone())? + .files(store.clone(), vec![])? .try_collect::>() .await?; let num_files = batches.iter().map(|b| b.num_rows() as i64).sum::(); diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 71408b27d5..42857efd84 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -23,14 +23,17 @@ use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName}; use crate::kernel::arrow::json; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; +use super::ReplayVistor; use super::Snapshot; pin_project! { - pub struct ReplayStream { + pub struct ReplayStream<'a, S> { scanner: LogReplayScanner, mapper: Arc, + visitors: Vec<&'a mut dyn ReplayVistor>, + #[pin] commits: S, @@ -39,8 +42,13 @@ pin_project! { } } -impl ReplayStream { - pub(super) fn try_new(commits: S, checkpoint: S, snapshot: &Snapshot) -> DeltaResult { +impl<'a, S> ReplayStream<'a, S> { + pub(super) fn try_new( + commits: S, + checkpoint: S, + snapshot: &Snapshot, + visitors: Vec<&'a mut dyn ReplayVistor>, + ) -> DeltaResult { let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?); let mapper = Arc::new(LogMapper { stats_schema, @@ -50,6 +58,7 @@ impl ReplayStream { commits, checkpoint, mapper, + visitors, scanner: LogReplayScanner::new(), }) } @@ -127,7 +136,7 @@ fn map_batch( Ok(batch) } -impl Stream for ReplayStream +impl<'a, S> Stream for ReplayStream<'a, S> where S: Stream>, { @@ -136,19 +145,29 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); let res = this.commits.poll_next(cx).map(|b| match b { - Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, true) { - Ok(filtered) => Some(this.mapper.map_batch(filtered)), - Err(e) => Some(Err(e)), - }, + Some(Ok(batch)) => { + for visitor in this.visitors.iter_mut() { + visitor.visit_batch(&batch).unwrap(); + } + match this.scanner.process_files_batch(&batch, true) { + Ok(filtered) => Some(this.mapper.map_batch(filtered)), + Err(e) => Some(Err(e)), + } + } Some(Err(e)) => Some(Err(e)), None => None, }); if matches!(res, Poll::Ready(None)) { this.checkpoint.poll_next(cx).map(|b| match b { - Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, false) { - Ok(filtered) => Some(this.mapper.map_batch(filtered)), - Err(e) => Some(Err(e)), - }, + Some(Ok(batch)) => { + for visitor in this.visitors.iter_mut() { + visitor.visit_batch(&batch).unwrap(); + } + match this.scanner.process_files_batch(&batch, false) { + Ok(filtered) => Some(this.mapper.map_batch(filtered)), + Err(e) => Some(Err(e)), + } + } Some(Err(e)) => Some(Err(e)), None => None, }) diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 868c4f53e6..c66455b50a 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -812,9 +812,11 @@ impl std::future::IntoFuture for WriteBuilder { // created actions, it may be safe to assume, that we want to include all actions. // then again, having only some tombstones may be misleading. if let Some(mut snapshot) = this.snapshot { + dbg!("merge1!"); snapshot.merge(commit.data.actions, &commit.data.operation, commit.version)?; Ok(DeltaTable::new_with_state(this.log_store, snapshot)) } else { + dbg!("update!"); let mut table = DeltaTable::new(this.log_store, Default::default()); table.update().await?; Ok(table) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 30150bb483..3078794d2e 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -341,13 +341,15 @@ impl DeltaTable { &mut self, max_version: Option, ) -> Result<(), DeltaTableError> { - debug!( + dbg!( "incremental update with version({}) and max_version({max_version:?})", self.version(), + max_version, ); match self.state.as_mut() { Some(state) => state.update(self.log_store.clone(), max_version).await, _ => { + dbg!("New!"); let state = DeltaTableState::try_new( &Path::default(), self.log_store.object_store(), diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index ab4161ae8a..a0250cd234 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -3,6 +3,9 @@ use std::collections::HashMap; use std::sync::Arc; +use arrow::compute::{filter_record_batch, is_not_null}; +use arrow_array::{Array, Int64Array, StringArray, StructArray}; +use arrow_cast::pretty::pretty_format_batches; use chrono::Utc; use futures::TryStreamExt; use object_store::{path::Path, ObjectStore}; @@ -10,9 +13,10 @@ use serde::{Deserialize, Serialize}; use super::config::TableConfig; use super::{get_partition_col_data_types, DeltaTableConfig}; +use crate::kernel::arrow::extract as ex; use crate::kernel::{ Action, Add, DataType, EagerSnapshot, LogDataHandler, LogicalFile, Metadata, Protocol, Remove, - StructType, + ReplayVistor, StructType, }; use crate::logstore::LogStore; use crate::operations::transaction::CommitData; @@ -20,6 +24,40 @@ use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::protocol::DeltaOperation; use crate::{DeltaResult, DeltaTableError}; +pub(crate) struct AppTransactionVisitor { + app_transaction_version: HashMap, +} + +impl AppTransactionVisitor { + pub(crate) fn new() -> Self { + Self { + app_transaction_version: HashMap::new(), + } + } +} + +impl ReplayVistor for AppTransactionVisitor { + fn visit_batch(&mut self, batch: &arrow_array::RecordBatch) -> DeltaResult<()> { + let txn_col = ex::extract_and_cast::(batch, "txn")?; + let filter = is_not_null(txn_col)?; + + let filtered = filter_record_batch(batch, &filter)?; + let arr = ex::extract_and_cast::(&filtered, "txn")?; + + let id = ex::extract_and_cast::(arr, "appId")?; + let version = ex::extract_and_cast::(arr, "version")?; + + for idx in 0..id.len() { + let app = ex::read_str(id, idx)?; + let version = ex::read_primitive(version, idx)?; + + self.app_transaction_version.insert(app.to_owned(), version); + } + + Ok(()) + } +} + /// State snapshot currently held by the Delta Table instance. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -36,10 +74,19 @@ impl DeltaTableState { config: DeltaTableConfig, version: Option, ) -> DeltaResult { - let snapshot = EagerSnapshot::try_new(table_root, store.clone(), config, version).await?; + let mut app_visitor = AppTransactionVisitor::new(); + let visitors: Vec<&mut dyn ReplayVistor> = vec![&mut app_visitor]; + let snapshot = EagerSnapshot::try_new_with_visitor( + table_root, + store.clone(), + config, + version, + visitors, + ) + .await?; Ok(Self { snapshot, - app_transaction_version: HashMap::new(), + app_transaction_version: app_visitor.app_transaction_version, }) } @@ -215,6 +262,7 @@ impl DeltaTableState { log_store: Arc, version: Option, ) -> Result<(), DeltaTableError> { + println!("update table"); self.snapshot.update(log_store, version).await?; Ok(()) } From 776992499b3b23d049ffb9f96d5b4360cd1dfd4f Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 1 Apr 2024 18:11:55 -0400 Subject: [PATCH 3/5] implement conflict check & table read --- crates/core/src/kernel/snapshot/mod.rs | 34 ++--- crates/core/src/kernel/snapshot/replay.rs | 1 + .../transaction/conflict_checker.rs | 30 ++++- crates/core/src/operations/transaction/mod.rs | 4 +- crates/core/src/operations/write.rs | 123 +++++++++++++++++- crates/core/src/table/mod.rs | 7 - crates/core/src/table/state.rs | 33 ++++- 7 files changed, 190 insertions(+), 42 deletions(-) diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 121eecf9be..3c8b1a5a9b 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -33,7 +33,6 @@ use crate::kernel::StructType; use crate::logstore::LogStore; use crate::operations::transaction::CommitData; use crate::table::config::TableConfig; -use crate::table::state::AppTransactionVisitor; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; mod log_data; @@ -333,20 +332,12 @@ impl Snapshot { } } -/// TODO! +/// Allows hooking into the reading of commit files and checkpoints whenever a table is loaded or updated. pub trait ReplayVistor: Send { - /// TODO! + /// Process a batch fn visit_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()>; } -struct PrintVistor {} -impl ReplayVistor for PrintVistor { - fn visit_batch(&mut self, _batch: &RecordBatch) -> DeltaResult<()> { - println!("Hello world!"); - Ok(()) - } -} - /// A snapshot of a Delta table that has been eagerly loaded into memory. #[derive(Debug, Clone, PartialEq)] pub struct EagerSnapshot { @@ -364,9 +355,7 @@ impl EagerSnapshot { config: DeltaTableConfig, version: Option, ) -> DeltaResult { - let mut p = PrintVistor {}; - let mut i = AppTransactionVisitor::new(); - Self::try_new_with_visitor(table_root, store, config, version, vec![&mut p, &mut i]).await + Self::try_new_with_visitor(table_root, store, config, version, vec![]).await } /// Create a new [`EagerSnapshot`] instance @@ -397,18 +386,16 @@ impl EagerSnapshot { } /// Update the snapshot to the given version - pub async fn update( + pub async fn update<'a>( &mut self, log_store: Arc, target_version: Option, + visitors: Vec<&'a mut dyn ReplayVistor>, ) -> DeltaResult<()> { - dbg!("Update Call: {} {:?}", self.version(), target_version); if Some(self.version()) == target_version { return Ok(()); } - let mut p = PrintVistor {}; - let visitors: Vec<&mut dyn ReplayVistor> = vec![&mut p]; let new_slice = self .snapshot .update_inner(log_store.clone(), target_version) @@ -431,7 +418,6 @@ impl EagerSnapshot { ) .boxed() }; - dbg!("here2"); let mapper = LogMapper::try_new(&self.snapshot)?; let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot, visitors)? @@ -511,8 +497,8 @@ impl EagerSnapshot { pub fn advance<'a>( &mut self, commits: impl IntoIterator, + mut visitors: Vec<&'a mut dyn ReplayVistor>, ) -> DeltaResult { - println!("Advance!"); let mut metadata = None; let mut protocol = None; let mut send = Vec::new(); @@ -542,7 +528,11 @@ impl EagerSnapshot { let mut scanner = LogReplayScanner::new(); for batch in actions { - files.push(scanner.process_files_batch(&batch?, true)?); + let batch = batch?; + files.push(scanner.process_files_batch(&batch, true)?); + for visitor in &mut visitors { + visitor.visit_batch(&batch)?; + } } let mapper = LogMapper::try_new(&self.snapshot)?; @@ -817,7 +807,7 @@ mod tests { let actions = vec![CommitData::new(removes, operation, HashMap::new(), Vec::new()).unwrap()]; - let new_version = snapshot.advance(&actions)?; + let new_version = snapshot.advance(&actions, vec![])?; assert_eq!(new_version, version + 1); let new_files = snapshot.file_actions()?.map(|f| f.path).collect::>(); diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 42857efd84..0af320794f 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -160,6 +160,7 @@ where if matches!(res, Poll::Ready(None)) { this.checkpoint.poll_next(cx).map(|b| match b { Some(Ok(batch)) => { + dbg!("Checkpoint Batch"); for visitor in this.visitors.iter_mut() { visitor.visit_batch(&batch).unwrap(); } diff --git a/crates/core/src/operations/transaction/conflict_checker.rs b/crates/core/src/operations/transaction/conflict_checker.rs index 9463a154b7..1be1c2fc93 100644 --- a/crates/core/src/operations/transaction/conflict_checker.rs +++ b/crates/core/src/operations/transaction/conflict_checker.rs @@ -117,14 +117,24 @@ impl<'a> TransactionInfo<'a> { ) -> DeltaResult { use datafusion::prelude::SessionContext; + use crate::kernel::Txn; + let session = SessionContext::new(); let read_predicates = read_predicates .map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state())) .transpose()?; + + let mut read_app_ids = HashSet::::new(); + for action in actions.iter() { + if let Action::Txn(Txn { app_id, .. }) = action { + read_app_ids.insert(app_id.clone()); + } + } + Ok(Self { txn_id: "".into(), read_predicates, - read_app_ids: Default::default(), + read_app_ids: read_app_ids, actions, read_snapshot, read_whole_table, @@ -139,10 +149,18 @@ impl<'a> TransactionInfo<'a> { actions: &'a Vec, read_whole_table: bool, ) -> Self { + use crate::kernel::Txn; + + let mut read_app_ids = HashSet::::new(); + for action in actions.iter() { + if let Action::Txn(Txn { app_id, .. }) = action { + read_app_ids.insert(app_id.clone()); + } + } Self { txn_id: "".into(), read_predicates, - read_app_ids: Default::default(), + read_app_ids: read_app_ids, actions, read_snapshot, read_whole_table, @@ -156,10 +174,16 @@ impl<'a> TransactionInfo<'a> { actions: &'a Vec, read_whole_table: bool, ) -> DeltaResult { + let mut read_app_ids = HashSet::::new(); + for action in actions.iter() { + if let Action::Txn(Txn { app_id, .. }) = action { + read_app_ids.insert(app_id.clone()); + } + } Ok(Self { txn_id: "".into(), read_predicates, - read_app_ids: Default::default(), + read_app_ids: read_app_ids, actions, read_snapshot, read_whole_table, diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 700eb37c40..0ba90a0e6c 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -238,6 +238,7 @@ impl TableReference for DeltaTableState { } /// Data that was actually written to the log store. +#[derive(Debug)] pub struct CommitData { /// The actions pub actions: Vec, @@ -273,8 +274,6 @@ impl CommitData { actions.push(Action::Txn(txn.clone())) } - dbg!("{:?}", &actions); - Ok(CommitData { actions, operation, @@ -568,6 +567,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } /// A commit that successfully completed +#[derive(Debug)] pub struct FinalizedCommit { /// The winning version number of the commit pub version: i64, diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index a1559f5a69..34b9f70bb8 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -812,11 +812,10 @@ impl std::future::IntoFuture for WriteBuilder { // created actions, it may be safe to assume, that we want to include all actions. // then again, having only some tombstones may be misleading. if let Some(mut snapshot) = this.snapshot { - dbg!("merge1!"); snapshot.merge(commit.data.actions, &commit.data.operation, commit.version)?; + Ok(DeltaTable::new_with_state(this.log_store, snapshot)) } else { - dbg!("update!"); let mut table = DeltaTable::new(this.log_store, Default::default()); table.update().await?; Ok(table) @@ -902,7 +901,7 @@ mod tests { get_arrow_schema, get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch, get_record_batch_with_nested_struct, setup_table_with_configuration, }; - use crate::DeltaConfigKey; + use crate::{checkpoints, DeltaConfigKey, DeltaTableBuilder}; use arrow::datatypes::Field; use arrow::datatypes::Schema as ArrowSchema; use arrow_array::{Int32Array, StringArray, TimestampMicrosecondArray}; @@ -1640,9 +1639,21 @@ mod tests { } #[tokio::test] - async fn test_app_txn() { + async fn test_app_txn_workload() { + // Test that the transaction ids can be read from different scenarios + // 1. Write new table to storage + // 2. Read new table + // 3. Write to table a new txn id and then update a different table state that uses the same underlying table + // 4. Write a checkpoint and read that checkpoint. + + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); + dbg!("{:?}", &tmp_path); + let batch = get_record_batch(None, false); - let table = DeltaOps::new_in_memory() + let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap()) + .await + .unwrap() .write(vec![batch.clone()]) .with_save_mode(SaveMode::ErrorIfExists) .with_partition_columns(["modified"]) @@ -1655,7 +1666,107 @@ mod tests { assert_eq!(table.get_files_count(), 2); let app_txns = table.get_app_transaction_version(); - println!("{:?}", &app_txns); + dbg!("{:?}", &app_txns); + assert_eq!(app_txns.len(), 1); + assert_eq!(app_txns.get("my-app"), Some(&1)); + + // Test Txn Id can be read from existing table + + let mut table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) + .load() + .await + .unwrap(); + let app_txns2 = table2.get_app_transaction_version(); + + assert_eq!(app_txns2.len(), 1); + assert_eq!(app_txns2.get("my-app"), Some(&1)); + dbg!("{:?}", &app_txns2); + + // Write new data to the table and check that `update` functions work + + let table = DeltaOps::from(table) + .write(vec![get_record_batch(None, false)]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 3)), + ) + .await + .unwrap(); + + assert_eq!(table.version(), 1); + let app_txns = table.get_app_transaction_version(); + dbg!("{:?}", &app_txns); assert_eq!(app_txns.len(), 1); + assert_eq!(app_txns.get("my-app"), Some(&3)); + + dbg!("do update"); + table2.update_incremental(None).await.unwrap(); + assert_eq!(table2.version(), 1); + let app_txns2 = table2.get_app_transaction_version(); + assert_eq!(app_txns2.len(), 1); + assert_eq!(app_txns2.get("my-app"), Some(&3)); + dbg!("{:?}", &app_txns2); + dbg!("done update"); + + // Create a checkpoint and then load + dbg!("Load checkpoint"); + checkpoints::create_checkpoint(&table).await.unwrap(); + let table3 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) + .load() + .await + .unwrap(); + let app_txns3 = table2.get_app_transaction_version(); + assert_eq!(app_txns3.len(), 1); + assert_eq!(app_txns3.get("my-app"), Some(&3)); + assert_eq!(table3.version(), 1); + } + + #[tokio::test] + async fn test_app_txn_conflict() { + // A conflict must be raised whenever the same application id is used for two concurrent transactions + + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); + + let batch = get_record_batch(None, false); + let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap()) + .await + .unwrap() + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::ErrorIfExists) + .with_partition_columns(["modified"]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 1)), + ) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) + .load() + .await + .unwrap(); + assert_eq!(table2.version(), 0); + + let table = DeltaOps::from(table) + .write(vec![get_record_batch(None, false)]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 2)), + ) + .await + .unwrap(); + assert_eq!(table.version(), 1); + + let res = DeltaOps::from(table2) + .write(vec![get_record_batch(None, false)]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 3)), + ) + .await; + + let err = res.err().unwrap(); + assert_eq!( + err.to_string(), + "Transaction failed: Failed to commit transaction: Concurrent transaction failed." + ); } } diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 3078794d2e..dfd8d7c31e 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -11,7 +11,6 @@ use object_store::{path::Path, ObjectStore}; use serde::de::{Error, SeqAccess, Visitor}; use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use tracing::debug; use self::builder::DeltaTableConfig; use self::state::DeltaTableState; @@ -341,15 +340,9 @@ impl DeltaTable { &mut self, max_version: Option, ) -> Result<(), DeltaTableError> { - dbg!( - "incremental update with version({}) and max_version({max_version:?})", - self.version(), - max_version, - ); match self.state.as_mut() { Some(state) => state.update(self.log_store.clone(), max_version).await, _ => { - dbg!("New!"); let state = DeltaTableState::try_new( &Path::default(), self.log_store.object_store(), diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index a0250cd234..8b4077c0cf 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -36,8 +36,26 @@ impl AppTransactionVisitor { } } +impl AppTransactionVisitor { + pub fn merge(self, map: &HashMap) -> HashMap { + let mut clone = map.clone(); + for (key, value) in self.app_transaction_version { + clone.insert(key, value); + } + + return clone; + } +} + impl ReplayVistor for AppTransactionVisitor { fn visit_batch(&mut self, batch: &arrow_array::RecordBatch) -> DeltaResult<()> { + if batch.column_by_name("txn").is_none() { + return Ok(()); + } + + let s = pretty_format_batches(&[batch.to_owned()]) + .unwrap() + .to_string(); let txn_col = ex::extract_and_cast::(batch, "txn")?; let filter = is_not_null(txn_col)?; @@ -244,7 +262,14 @@ impl DeltaTableState { app_metadata: HashMap::new(), app_transactions: Vec::new(), }; - let new_version = self.snapshot.advance(&vec![commit_data])?; + + let mut app_txn_visitor = AppTransactionVisitor::new(); + let new_version = self + .snapshot + .advance(&vec![commit_data], vec![&mut app_txn_visitor])?; + + self.app_transaction_version = app_txn_visitor.merge(&self.app_transaction_version); + if new_version != version { return Err(DeltaTableError::Generic("Version mismatch".to_string())); } @@ -263,7 +288,11 @@ impl DeltaTableState { version: Option, ) -> Result<(), DeltaTableError> { println!("update table"); - self.snapshot.update(log_store, version).await?; + let mut app_txn_visitor = AppTransactionVisitor::new(); + self.snapshot + .update(log_store, version, vec![&mut app_txn_visitor]) + .await?; + self.app_transaction_version = app_txn_visitor.merge(&self.app_transaction_version); Ok(()) } From d91a87945bb4ed1e0e04871ed3f5ece6316b258e Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 1 Apr 2024 18:14:35 -0400 Subject: [PATCH 4/5] remove print --- crates/core/src/table/state.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 8b4077c0cf..088e9165a6 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -53,9 +53,6 @@ impl ReplayVistor for AppTransactionVisitor { return Ok(()); } - let s = pretty_format_batches(&[batch.to_owned()]) - .unwrap() - .to_string(); let txn_col = ex::extract_and_cast::(batch, "txn")?; let filter = is_not_null(txn_col)?; From 967581fbcf317b2c5fb7a652c6b35ab05fdf2ae2 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 1 Apr 2024 21:03:01 -0400 Subject: [PATCH 5/5] refactor for review --- crates/core/src/kernel/snapshot/mod.rs | 10 +- crates/core/src/kernel/snapshot/replay.rs | 15 +- .../src/operations/transaction/application.rs | 131 +++++++++++++++++ .../transaction/conflict_checker.rs | 11 +- crates/core/src/operations/transaction/mod.rs | 2 + crates/core/src/operations/write.rs | 132 ------------------ crates/core/src/table/state.rs | 8 +- 7 files changed, 154 insertions(+), 155 deletions(-) create mode 100644 crates/core/src/operations/transaction/application.rs diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 3c8b1a5a9b..492e0245af 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -200,7 +200,7 @@ impl Snapshot { pub fn files<'a>( &self, store: Arc, - visitors: Vec<&'a mut dyn ReplayVistor>, + visitors: Vec<&'a mut dyn ReplayVisitor>, ) -> DeltaResult>>> { let log_stream = self.log_segment.commit_stream( store.clone(), @@ -333,7 +333,7 @@ impl Snapshot { } /// Allows hooking into the reading of commit files and checkpoints whenever a table is loaded or updated. -pub trait ReplayVistor: Send { +pub trait ReplayVisitor: Send { /// Process a batch fn visit_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()>; } @@ -364,7 +364,7 @@ impl EagerSnapshot { store: Arc, config: DeltaTableConfig, version: Option, - visitors: Vec<&mut dyn ReplayVistor>, + visitors: Vec<&mut dyn ReplayVisitor>, ) -> DeltaResult { let snapshot = Snapshot::try_new(table_root, store.clone(), config, version).await?; let files = snapshot.files(store, visitors)?.try_collect().await?; @@ -390,7 +390,7 @@ impl EagerSnapshot { &mut self, log_store: Arc, target_version: Option, - visitors: Vec<&'a mut dyn ReplayVistor>, + visitors: Vec<&'a mut dyn ReplayVisitor>, ) -> DeltaResult<()> { if Some(self.version()) == target_version { return Ok(()); @@ -497,7 +497,7 @@ impl EagerSnapshot { pub fn advance<'a>( &mut self, commits: impl IntoIterator, - mut visitors: Vec<&'a mut dyn ReplayVistor>, + mut visitors: Vec<&'a mut dyn ReplayVisitor>, ) -> DeltaResult { let mut metadata = None; let mut protocol = None; diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 0af320794f..2b1710b9a4 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -23,7 +23,7 @@ use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName}; use crate::kernel::arrow::json; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; -use super::ReplayVistor; +use super::ReplayVisitor; use super::Snapshot; pin_project! { @@ -32,7 +32,7 @@ pin_project! { mapper: Arc, - visitors: Vec<&'a mut dyn ReplayVistor>, + visitors: Vec<&'a mut dyn ReplayVisitor>, #[pin] commits: S, @@ -47,7 +47,7 @@ impl<'a, S> ReplayStream<'a, S> { commits: S, checkpoint: S, snapshot: &Snapshot, - visitors: Vec<&'a mut dyn ReplayVistor>, + visitors: Vec<&'a mut dyn ReplayVisitor>, ) -> DeltaResult { let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?); let mapper = Arc::new(LogMapper { @@ -147,7 +147,9 @@ where let res = this.commits.poll_next(cx).map(|b| match b { Some(Ok(batch)) => { for visitor in this.visitors.iter_mut() { - visitor.visit_batch(&batch).unwrap(); + if let Err(e) = visitor.visit_batch(&batch) { + return Some(Err(e)); + } } match this.scanner.process_files_batch(&batch, true) { Ok(filtered) => Some(this.mapper.map_batch(filtered)), @@ -160,9 +162,10 @@ where if matches!(res, Poll::Ready(None)) { this.checkpoint.poll_next(cx).map(|b| match b { Some(Ok(batch)) => { - dbg!("Checkpoint Batch"); for visitor in this.visitors.iter_mut() { - visitor.visit_batch(&batch).unwrap(); + if let Err(e) = visitor.visit_batch(&batch) { + return Some(Err(e)); + } } match this.scanner.process_files_batch(&batch, false) { Ok(filtered) => Some(this.mapper.map_batch(filtered)), diff --git a/crates/core/src/operations/transaction/application.rs b/crates/core/src/operations/transaction/application.rs new file mode 100644 index 0000000000..81f6fb49dc --- /dev/null +++ b/crates/core/src/operations/transaction/application.rs @@ -0,0 +1,131 @@ +#[cfg(test)] +mod tests { + use crate::{ + checkpoints, kernel::Txn, operations::transaction::CommitProperties, protocol::SaveMode, + writer::test_utils::get_record_batch, DeltaOps, DeltaTableBuilder, + }; + + #[tokio::test] + async fn test_app_txn_workload() { + // Test that the transaction ids can be read from different scenarios + // 1. Write new table to storage + // 2. Read new table + // 3. Write to table a new txn id and then update a different table state that uses the same underlying table + // 4. Write a checkpoint and read that checkpoint. + + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); + + let batch = get_record_batch(None, false); + let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap()) + .await + .unwrap() + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::ErrorIfExists) + .with_partition_columns(["modified"]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 1)), + ) + .await + .unwrap(); + assert_eq!(table.version(), 0); + assert_eq!(table.get_files_count(), 2); + + let app_txns = table.get_app_transaction_version(); + assert_eq!(app_txns.len(), 1); + assert_eq!(app_txns.get("my-app"), Some(&1)); + + // Test Txn Id can be read from existing table + + let mut table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) + .load() + .await + .unwrap(); + let app_txns2 = table2.get_app_transaction_version(); + + assert_eq!(app_txns2.len(), 1); + assert_eq!(app_txns2.get("my-app"), Some(&1)); + + // Write new data to the table and check that `update` functions work + + let table = DeltaOps::from(table) + .write(vec![get_record_batch(None, false)]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 3)), + ) + .await + .unwrap(); + + assert_eq!(table.version(), 1); + let app_txns = table.get_app_transaction_version(); + assert_eq!(app_txns.len(), 1); + assert_eq!(app_txns.get("my-app"), Some(&3)); + + table2.update_incremental(None).await.unwrap(); + assert_eq!(table2.version(), 1); + let app_txns2 = table2.get_app_transaction_version(); + assert_eq!(app_txns2.len(), 1); + assert_eq!(app_txns2.get("my-app"), Some(&3)); + + // Create a checkpoint and then load + checkpoints::create_checkpoint(&table).await.unwrap(); + let table3 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) + .load() + .await + .unwrap(); + let app_txns3 = table2.get_app_transaction_version(); + assert_eq!(app_txns3.len(), 1); + assert_eq!(app_txns3.get("my-app"), Some(&3)); + assert_eq!(table3.version(), 1); + } + + #[tokio::test] + async fn test_app_txn_conflict() { + // A conflict must be raised whenever the same application id is used for two concurrent transactions + + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); + + let batch = get_record_batch(None, false); + let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap()) + .await + .unwrap() + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::ErrorIfExists) + .with_partition_columns(["modified"]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 1)), + ) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) + .load() + .await + .unwrap(); + assert_eq!(table2.version(), 0); + + let table = DeltaOps::from(table) + .write(vec![get_record_batch(None, false)]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 2)), + ) + .await + .unwrap(); + assert_eq!(table.version(), 1); + + let res = DeltaOps::from(table2) + .write(vec![get_record_batch(None, false)]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 3)), + ) + .await; + + let err = res.err().unwrap(); + assert_eq!( + err.to_string(), + "Transaction failed: Failed to commit transaction: Concurrent transaction failed." + ); + } +} diff --git a/crates/core/src/operations/transaction/conflict_checker.rs b/crates/core/src/operations/transaction/conflict_checker.rs index 1be1c2fc93..22bb873d20 100644 --- a/crates/core/src/operations/transaction/conflict_checker.rs +++ b/crates/core/src/operations/transaction/conflict_checker.rs @@ -6,6 +6,7 @@ use super::CommitInfo; use crate::delta_datafusion::DataFusionMixins; use crate::errors::DeltaResult; use crate::kernel::EagerSnapshot; +use crate::kernel::Txn; use crate::kernel::{Action, Add, Metadata, Protocol, Remove}; use crate::logstore::{get_actions, LogStore}; use crate::protocol::DeltaOperation; @@ -117,8 +118,6 @@ impl<'a> TransactionInfo<'a> { ) -> DeltaResult { use datafusion::prelude::SessionContext; - use crate::kernel::Txn; - let session = SessionContext::new(); let read_predicates = read_predicates .map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state())) @@ -134,7 +133,7 @@ impl<'a> TransactionInfo<'a> { Ok(Self { txn_id: "".into(), read_predicates, - read_app_ids: read_app_ids, + read_app_ids, actions, read_snapshot, read_whole_table, @@ -149,8 +148,6 @@ impl<'a> TransactionInfo<'a> { actions: &'a Vec, read_whole_table: bool, ) -> Self { - use crate::kernel::Txn; - let mut read_app_ids = HashSet::::new(); for action in actions.iter() { if let Action::Txn(Txn { app_id, .. }) = action { @@ -160,7 +157,7 @@ impl<'a> TransactionInfo<'a> { Self { txn_id: "".into(), read_predicates, - read_app_ids: read_app_ids, + read_app_ids, actions, read_snapshot, read_whole_table, @@ -183,7 +180,7 @@ impl<'a> TransactionInfo<'a> { Ok(Self { txn_id: "".into(), read_predicates, - read_app_ids: read_app_ids, + read_app_ids, actions, read_snapshot, read_whole_table, diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 0ba90a0e6c..e99c731e36 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -88,6 +88,8 @@ use crate::{crate_version, DeltaResult}; pub use self::protocol::INSTANCE as PROTOCOL; +#[cfg(test)] +pub(crate) mod application; mod conflict_checker; mod protocol; #[cfg(feature = "datafusion")] diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 34b9f70bb8..ad6d428cdb 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -1637,136 +1637,4 @@ mod tests { let actual = get_data_sorted(&table, "id,value,modified").await; assert_batches_sorted_eq!(&expected, &actual); } - - #[tokio::test] - async fn test_app_txn_workload() { - // Test that the transaction ids can be read from different scenarios - // 1. Write new table to storage - // 2. Read new table - // 3. Write to table a new txn id and then update a different table state that uses the same underlying table - // 4. Write a checkpoint and read that checkpoint. - - let tmp_dir = tempfile::tempdir().unwrap(); - let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); - dbg!("{:?}", &tmp_path); - - let batch = get_record_batch(None, false); - let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap()) - .await - .unwrap() - .write(vec![batch.clone()]) - .with_save_mode(SaveMode::ErrorIfExists) - .with_partition_columns(["modified"]) - .with_commit_properties( - CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 1)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 0); - assert_eq!(table.get_files_count(), 2); - - let app_txns = table.get_app_transaction_version(); - dbg!("{:?}", &app_txns); - assert_eq!(app_txns.len(), 1); - assert_eq!(app_txns.get("my-app"), Some(&1)); - - // Test Txn Id can be read from existing table - - let mut table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) - .load() - .await - .unwrap(); - let app_txns2 = table2.get_app_transaction_version(); - - assert_eq!(app_txns2.len(), 1); - assert_eq!(app_txns2.get("my-app"), Some(&1)); - dbg!("{:?}", &app_txns2); - - // Write new data to the table and check that `update` functions work - - let table = DeltaOps::from(table) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 3)), - ) - .await - .unwrap(); - - assert_eq!(table.version(), 1); - let app_txns = table.get_app_transaction_version(); - dbg!("{:?}", &app_txns); - assert_eq!(app_txns.len(), 1); - assert_eq!(app_txns.get("my-app"), Some(&3)); - - dbg!("do update"); - table2.update_incremental(None).await.unwrap(); - assert_eq!(table2.version(), 1); - let app_txns2 = table2.get_app_transaction_version(); - assert_eq!(app_txns2.len(), 1); - assert_eq!(app_txns2.get("my-app"), Some(&3)); - dbg!("{:?}", &app_txns2); - dbg!("done update"); - - // Create a checkpoint and then load - dbg!("Load checkpoint"); - checkpoints::create_checkpoint(&table).await.unwrap(); - let table3 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) - .load() - .await - .unwrap(); - let app_txns3 = table2.get_app_transaction_version(); - assert_eq!(app_txns3.len(), 1); - assert_eq!(app_txns3.get("my-app"), Some(&3)); - assert_eq!(table3.version(), 1); - } - - #[tokio::test] - async fn test_app_txn_conflict() { - // A conflict must be raised whenever the same application id is used for two concurrent transactions - - let tmp_dir = tempfile::tempdir().unwrap(); - let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); - - let batch = get_record_batch(None, false); - let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap()) - .await - .unwrap() - .write(vec![batch.clone()]) - .with_save_mode(SaveMode::ErrorIfExists) - .with_partition_columns(["modified"]) - .with_commit_properties( - CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 1)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 0); - - let table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap()) - .load() - .await - .unwrap(); - assert_eq!(table2.version(), 0); - - let table = DeltaOps::from(table) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 2)), - ) - .await - .unwrap(); - assert_eq!(table.version(), 1); - - let res = DeltaOps::from(table2) - .write(vec![get_record_batch(None, false)]) - .with_commit_properties( - CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 3)), - ) - .await; - - let err = res.err().unwrap(); - assert_eq!( - err.to_string(), - "Transaction failed: Failed to commit transaction: Concurrent transaction failed." - ); - } } diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 088e9165a6..7e89bb0f77 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use arrow::compute::{filter_record_batch, is_not_null}; use arrow_array::{Array, Int64Array, StringArray, StructArray}; -use arrow_cast::pretty::pretty_format_batches; use chrono::Utc; use futures::TryStreamExt; use object_store::{path::Path, ObjectStore}; @@ -16,7 +15,7 @@ use super::{get_partition_col_data_types, DeltaTableConfig}; use crate::kernel::arrow::extract as ex; use crate::kernel::{ Action, Add, DataType, EagerSnapshot, LogDataHandler, LogicalFile, Metadata, Protocol, Remove, - ReplayVistor, StructType, + ReplayVisitor, StructType, }; use crate::logstore::LogStore; use crate::operations::transaction::CommitData; @@ -47,7 +46,7 @@ impl AppTransactionVisitor { } } -impl ReplayVistor for AppTransactionVisitor { +impl ReplayVisitor for AppTransactionVisitor { fn visit_batch(&mut self, batch: &arrow_array::RecordBatch) -> DeltaResult<()> { if batch.column_by_name("txn").is_none() { return Ok(()); @@ -90,7 +89,7 @@ impl DeltaTableState { version: Option, ) -> DeltaResult { let mut app_visitor = AppTransactionVisitor::new(); - let visitors: Vec<&mut dyn ReplayVistor> = vec![&mut app_visitor]; + let visitors: Vec<&mut dyn ReplayVisitor> = vec![&mut app_visitor]; let snapshot = EagerSnapshot::try_new_with_visitor( table_root, store.clone(), @@ -284,7 +283,6 @@ impl DeltaTableState { log_store: Arc, version: Option, ) -> Result<(), DeltaTableError> { - println!("update table"); let mut app_txn_visitor = AppTransactionVisitor::new(); self.snapshot .update(log_store, version, vec![&mut app_txn_visitor])