From 8ba7f4d105ccd3e3e43e70709011f8b25dfe3ec3 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 9 Oct 2024 15:31:04 -0700 Subject: [PATCH 1/9] move transaction to actions --- kernel/src/actions/mod.rs | 22 +++++++++++++++------- kernel/src/{ => actions}/transaction.rs | 0 kernel/src/lib.rs | 1 - 3 files changed, 15 insertions(+), 8 deletions(-) rename kernel/src/{ => actions}/transaction.rs (100%) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 0a218fb7..f1969793 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -1,10 +1,6 @@ //! Provides parsing and manipulation of the various actions defined in the [Delta //! specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md) -pub mod deletion_vector; -pub(crate) mod schemas; -pub(crate) mod visitors; - use delta_kernel_derive::Schema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -16,11 +12,18 @@ use crate::actions::schemas::GetStructField; use crate::features::{ReaderFeatures, WriterFeatures}; use crate::{schema::StructType, DeltaResult, EngineData}; +pub mod deletion_vector; +pub mod transaction; + +pub(crate) mod schemas; +pub(crate) mod visitors; + pub(crate) const ADD_NAME: &str = "add"; pub(crate) const REMOVE_NAME: &str = "remove"; pub(crate) const METADATA_NAME: &str = "metaData"; pub(crate) const PROTOCOL_NAME: &str = "protocol"; pub(crate) const TRANSACTION_NAME: &str = "txn"; +pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo"; static LOG_SCHEMA: LazyLock = LazyLock::new(|| { StructType::new(vec![ @@ -29,10 +32,10 @@ static LOG_SCHEMA: LazyLock = LazyLock::new(|| { Option::::get_struct_field(METADATA_NAME), Option::::get_struct_field(PROTOCOL_NAME), Option::::get_struct_field(TRANSACTION_NAME), + Option::::get_struct_field(COMMIT_INFO_NAME), // We don't support the following actions yet - //Option::get_field(CDC_NAME), - //Option::get_field(COMMIT_INFO_NAME), - //Option::get_field(DOMAIN_METADATA_NAME), + //Option::::get_struct_field(CDC_NAME), + //Option::::get_struct_field(DOMAIN_METADATA_NAME), ]) }); @@ -128,6 +131,11 @@ impl Protocol { } } +#[derive(Debug, Clone, PartialEq, Eq, Schema)] +pub struct CommitInfo { + pub kernel_version: Option, +} + #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Add { /// A relative path to a data file from the root of the table or an absolute path to a file diff --git a/kernel/src/transaction.rs b/kernel/src/actions/transaction.rs similarity index 100% rename from kernel/src/transaction.rs rename to kernel/src/actions/transaction.rs diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index d9eabcbc..72725e82 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -73,7 +73,6 @@ pub mod scan; pub mod schema; pub mod snapshot; pub mod table; -pub mod transaction; pub(crate) mod utils; pub use engine_data::{DataVisitor, EngineData}; From 2935d439ce443bc05071b80697a11c7ff4039c69 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 9 Oct 2024 15:35:53 -0700 Subject: [PATCH 2/9] rename to set_transaction --- kernel/src/actions/mod.rs | 6 +++--- .../actions/{transaction.rs => set_transaction.rs} | 13 ++++++++----- kernel/src/actions/visitors.rs | 13 +++++++------ 3 files changed, 18 insertions(+), 14 deletions(-) rename kernel/src/actions/{transaction.rs => set_transaction.rs} (94%) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index f1969793..96dfb09a 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -13,7 +13,7 @@ use crate::features::{ReaderFeatures, WriterFeatures}; use crate::{schema::StructType, DeltaResult, EngineData}; pub mod deletion_vector; -pub mod transaction; +pub mod set_transaction; pub(crate) mod schemas; pub(crate) mod visitors; @@ -31,7 +31,7 @@ static LOG_SCHEMA: LazyLock = LazyLock::new(|| { Option::::get_struct_field(REMOVE_NAME), Option::::get_struct_field(METADATA_NAME), Option::::get_struct_field(PROTOCOL_NAME), - Option::::get_struct_field(TRANSACTION_NAME), + Option::::get_struct_field(TRANSACTION_NAME), Option::::get_struct_field(COMMIT_INFO_NAME), // We don't support the following actions yet //Option::::get_struct_field(CDC_NAME), @@ -245,7 +245,7 @@ impl Remove { } #[derive(Debug, Clone, PartialEq, Eq, Schema)] -pub struct Transaction { +pub struct SetTransaction { /// A unique identifier for the application performing the transaction. pub app_id: String, diff --git a/kernel/src/actions/transaction.rs b/kernel/src/actions/set_transaction.rs similarity index 94% rename from kernel/src/actions/transaction.rs rename to kernel/src/actions/set_transaction.rs index 00658154..875fc59f 100644 --- a/kernel/src/actions/transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::actions::visitors::TransactionVisitor; -use crate::actions::{get_log_schema, Transaction, TRANSACTION_NAME}; +use crate::actions::{get_log_schema, SetTransaction, TRANSACTION_NAME}; use crate::snapshot::Snapshot; use crate::{DeltaResult, Engine, EngineData, SchemaRef}; @@ -58,7 +58,7 @@ impl TransactionScanner { &self, engine: &dyn Engine, application_id: &str, - ) -> DeltaResult> { + ) -> DeltaResult> { let mut transactions = self.scan_application_transactions(engine, Some(application_id))?; Ok(transactions.remove(application_id)) } @@ -78,7 +78,10 @@ mod tests { use crate::Table; use itertools::Itertools; - fn get_latest_transactions(path: &str, app_id: &str) -> (TransactionMap, Option) { + fn get_latest_transactions( + path: &str, + app_id: &str, + ) -> (TransactionMap, Option) { let path = std::fs::canonicalize(PathBuf::from(path)).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); @@ -105,7 +108,7 @@ mod tests { assert_eq!(txns.get("my-app"), txn.as_ref()); assert_eq!( txns.get("my-app2"), - Some(Transaction { + Some(SetTransaction { app_id: "my-app2".to_owned(), version: 2, last_updated: None @@ -119,7 +122,7 @@ mod tests { assert_eq!(txns.get("my-app"), txn.as_ref()); assert_eq!( txns.get("my-app2"), - Some(Transaction { + Some(SetTransaction { app_id: "my-app2".to_owned(), version: 2, last_updated: None diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 25858301..c81583c4 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -9,7 +9,8 @@ use crate::{ }; use super::{ - deletion_vector::DeletionVectorDescriptor, Add, Format, Metadata, Protocol, Remove, Transaction, + deletion_vector::DeletionVectorDescriptor, Add, Format, Metadata, Protocol, Remove, + SetTransaction, }; #[derive(Default)] @@ -230,7 +231,7 @@ impl DataVisitor for RemoveVisitor { } } -pub type TransactionMap = HashMap; +pub type TransactionMap = HashMap; /// Extact application transaction actions from the log into a map /// @@ -259,10 +260,10 @@ impl TransactionVisitor { row_index: usize, app_id: String, getters: &[&'a dyn GetData<'a>], - ) -> DeltaResult { + ) -> DeltaResult { let version: i64 = getters[1].get(row_index, "txn.version")?; let last_updated: Option = getters[2].get_long(row_index, "txn.lastUpdated")?; - Ok(Transaction { + Ok(SetTransaction { app_id, version, last_updated, @@ -490,7 +491,7 @@ mod tests { let mut actual = txn_visitor.transactions; assert_eq!( actual.remove("myApp2"), - Some(Transaction { + Some(SetTransaction { app_id: "myApp2".to_string(), version: 4, last_updated: Some(1670892998177), @@ -498,7 +499,7 @@ mod tests { ); assert_eq!( actual.remove("myApp"), - Some(Transaction { + Some(SetTransaction { app_id: "myApp".to_string(), version: 3, last_updated: None, From c1555f2b71e8c2de53e65ae447616cec0465243d Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 9 Oct 2024 15:53:06 -0700 Subject: [PATCH 3/9] Update kernel/src/actions/mod.rs Co-authored-by: Nick Lanham --- kernel/src/actions/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 96dfb09a..4e0af6e5 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -22,7 +22,7 @@ pub(crate) const ADD_NAME: &str = "add"; pub(crate) const REMOVE_NAME: &str = "remove"; pub(crate) const METADATA_NAME: &str = "metaData"; pub(crate) const PROTOCOL_NAME: &str = "protocol"; -pub(crate) const TRANSACTION_NAME: &str = "txn"; +pub(crate) const SET_TRANSACTION_NAME: &str = "txn"; pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo"; static LOG_SCHEMA: LazyLock = LazyLock::new(|| { From fe60f592d1fa3dabb5bf196d9ac3aac2de7a743c Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 9 Oct 2024 15:53:11 -0700 Subject: [PATCH 4/9] Update kernel/src/actions/mod.rs Co-authored-by: Nick Lanham --- kernel/src/actions/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 4e0af6e5..65d68815 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -31,7 +31,7 @@ static LOG_SCHEMA: LazyLock = LazyLock::new(|| { Option::::get_struct_field(REMOVE_NAME), Option::::get_struct_field(METADATA_NAME), Option::::get_struct_field(PROTOCOL_NAME), - Option::::get_struct_field(TRANSACTION_NAME), + Option::::get_struct_field(SET_TRANSACTION_NAME), Option::::get_struct_field(COMMIT_INFO_NAME), // We don't support the following actions yet //Option::::get_struct_field(CDC_NAME), From 489ebb06a532c7440ccdcf371ebb9168b83f6b63 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 9 Oct 2024 15:54:35 -0700 Subject: [PATCH 5/9] fix --- kernel/src/actions/set_transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/actions/set_transaction.rs b/kernel/src/actions/set_transaction.rs index 875fc59f..29a9eda4 100644 --- a/kernel/src/actions/set_transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::actions::visitors::TransactionVisitor; -use crate::actions::{get_log_schema, SetTransaction, TRANSACTION_NAME}; +use crate::actions::{get_log_schema, SetTransaction, SET_TRANSACTION_NAME}; use crate::snapshot::Snapshot; use crate::{DeltaResult, Engine, EngineData, SchemaRef}; From ef09a4542521e7e10bcf78b481da2710b4216e5f Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 9 Oct 2024 15:56:10 -0700 Subject: [PATCH 6/9] fix --- kernel/src/actions/set_transaction.rs | 2 +- kernel/src/actions/visitors.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/src/actions/set_transaction.rs b/kernel/src/actions/set_transaction.rs index 29a9eda4..9a3c41d1 100644 --- a/kernel/src/actions/set_transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -39,7 +39,7 @@ impl TransactionScanner { // Factored out to facilitate testing fn get_txn_schema() -> DeltaResult { - get_log_schema().project(&[TRANSACTION_NAME]) + get_log_schema().project(&[SET_TRANSACTION_NAME]) } // Factored out to facilitate testing diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index c81583c4..e554d8d0 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -328,7 +328,7 @@ mod tests { use super::*; use crate::{ - actions::{get_log_schema, ADD_NAME, TRANSACTION_NAME}, + actions::{get_log_schema, ADD_NAME, SET_TRANSACTION_NAME}, engine::arrow_data::ArrowEngineData, engine::sync::{json::SyncJsonHandler, SyncEngine}, Engine, EngineData, JsonHandler, @@ -484,7 +484,7 @@ mod tests { .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); let add_schema = get_log_schema() - .project(&[TRANSACTION_NAME]) + .project(&[SET_TRANSACTION_NAME]) .expect("Can't get txn schema"); let mut txn_visitor = TransactionVisitor::default(); batch.extract(add_schema, &mut txn_visitor).unwrap(); From ee33fa94d99e32b6435f091e4b70c3922b4f0f01 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 9 Oct 2024 16:37:01 -0700 Subject: [PATCH 7/9] trigger ci From 2480e8ec8eac7a4d503fbcebccd33ea0393e2696 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 9 Oct 2024 16:52:30 -0700 Subject: [PATCH 8/9] more rename --- kernel/src/actions/set_transaction.rs | 28 +++++++++++++-------------- kernel/src/actions/visitors.rs | 24 +++++++++++------------ 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/kernel/src/actions/set_transaction.rs b/kernel/src/actions/set_transaction.rs index c37bf4a6..d6701c19 100644 --- a/kernel/src/actions/set_transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -1,18 +1,18 @@ use std::sync::Arc; -use crate::actions::visitors::TransactionVisitor; +use crate::actions::visitors::SetTransactionVisitor; use crate::actions::{get_log_schema, SetTransaction, SET_TRANSACTION_NAME}; use crate::snapshot::Snapshot; use crate::{DeltaResult, Engine, EngineData, Expression as Expr, SchemaRef}; -pub use crate::actions::visitors::TransactionMap; -pub struct TransactionScanner { +pub use crate::actions::visitors::SetTransactionMap; +pub struct SetTransactionScanner { snapshot: Arc, } -impl TransactionScanner { +impl SetTransactionScanner { pub fn new(snapshot: Arc) -> Self { - TransactionScanner { snapshot } + SetTransactionScanner { snapshot } } /// Scan the entire log for all application ids but terminate early if a specific application id is provided @@ -20,21 +20,21 @@ impl TransactionScanner { &self, engine: &dyn Engine, application_id: Option<&str>, - ) -> DeltaResult { + ) -> DeltaResult { let schema = Self::get_txn_schema()?; - let mut visitor = TransactionVisitor::new(application_id.map(|s| s.to_owned())); + let mut visitor = SetTransactionVisitor::new(application_id.map(|s| s.to_owned())); // If a specific id is requested then we can terminate log replay early as soon as it was // found. If all ids are requested then we are forced to replay the entire log. for maybe_data in self.replay_for_app_ids(engine, schema.clone())? { let (txns, _) = maybe_data?; txns.extract(schema.clone(), &mut visitor)?; // if a specific id is requested and a transaction was found, then return - if application_id.is_some() && !visitor.transactions.is_empty() { + if application_id.is_some() && !visitor.set_transactions.is_empty() { break; } } - Ok(visitor.transactions) + Ok(visitor.set_transactions) } // Factored out to facilitate testing @@ -69,7 +69,7 @@ impl TransactionScanner { } /// Scan the Delta Log to obtain the latest transaction for all applications - pub fn application_transactions(&self, engine: &dyn Engine) -> DeltaResult { + pub fn application_transactions(&self, engine: &dyn Engine) -> DeltaResult { self.scan_application_transactions(engine, None) } } @@ -86,14 +86,14 @@ mod tests { fn get_latest_transactions( path: &str, app_id: &str, - ) -> (TransactionMap, Option) { + ) -> (SetTransactionMap, Option) { let path = std::fs::canonicalize(PathBuf::from(path)).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); let table = Table::new(url); let snapshot = table.snapshot(&engine, None).unwrap(); - let txn_scan = TransactionScanner::new(snapshot.into()); + let txn_scan = SetTransactionScanner::new(snapshot.into()); ( txn_scan.application_transactions(&engine).unwrap(), @@ -144,8 +144,8 @@ mod tests { let table = Table::new(url); let snapshot = table.snapshot(&engine, None).unwrap(); - let txn = TransactionScanner::new(snapshot.into()); - let txn_schema = TransactionScanner::get_txn_schema().unwrap(); + let txn = SetTransactionScanner::new(snapshot.into()); + let txn_schema = SetTransactionScanner::get_txn_schema().unwrap(); // The checkpoint has five parts, each containing one action. There are two app ids. let data: Vec<_> = txn diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index e554d8d0..25118cc8 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -231,7 +231,7 @@ impl DataVisitor for RemoveVisitor { } } -pub type TransactionMap = HashMap; +pub type SetTransactionMap = HashMap; /// Extact application transaction actions from the log into a map /// @@ -242,16 +242,16 @@ pub type TransactionMap = HashMap; /// required. /// #[derive(Default, Debug)] -pub(crate) struct TransactionVisitor { - pub(crate) transactions: TransactionMap, +pub(crate) struct SetTransactionVisitor { + pub(crate) set_transactions: SetTransactionMap, pub(crate) application_id: Option, } -impl TransactionVisitor { +impl SetTransactionVisitor { /// Create a new visitor. When application_id is set then bookkeeping is only for that id only pub(crate) fn new(application_id: Option) -> Self { - TransactionVisitor { - transactions: HashMap::default(), + SetTransactionVisitor { + set_transactions: HashMap::default(), application_id, } } @@ -271,7 +271,7 @@ impl TransactionVisitor { } } -impl DataVisitor for TransactionVisitor { +impl DataVisitor for SetTransactionVisitor { fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { // Assumes batches are visited in reverse order relative to the log for i in 0..row_count { @@ -282,9 +282,9 @@ impl DataVisitor for TransactionVisitor { .as_ref() .is_some_and(|requested| !requested.eq(&app_id)) { - let txn = TransactionVisitor::visit_txn(i, app_id, getters)?; - if !self.transactions.contains_key(&txn.app_id) { - self.transactions.insert(txn.app_id.clone(), txn); + let txn = SetTransactionVisitor::visit_txn(i, app_id, getters)?; + if !self.set_transactions.contains_key(&txn.app_id) { + self.set_transactions.insert(txn.app_id.clone(), txn); } } } @@ -486,9 +486,9 @@ mod tests { let add_schema = get_log_schema() .project(&[SET_TRANSACTION_NAME]) .expect("Can't get txn schema"); - let mut txn_visitor = TransactionVisitor::default(); + let mut txn_visitor = SetTransactionVisitor::default(); batch.extract(add_schema, &mut txn_visitor).unwrap(); - let mut actual = txn_visitor.transactions; + let mut actual = txn_visitor.set_transactions; assert_eq!( actual.remove("myApp2"), Some(SetTransaction { From 09fc9f5cfe3304d9c9ae93fbe0c8fa76376904df Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 9 Oct 2024 16:55:47 -0700 Subject: [PATCH 9/9] fix inspect-table --- kernel/examples/inspect-table/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 33515bb6..ebb0978b 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -1,6 +1,6 @@ use delta_kernel::actions::get_log_schema; use delta_kernel::actions::visitors::{ - AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, TransactionVisitor, + AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor, }; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; @@ -62,7 +62,7 @@ enum Action { Protocol(delta_kernel::actions::Protocol, usize), Remove(delta_kernel::actions::Remove, usize), Add(delta_kernel::actions::Add, usize), - SetTransaction(delta_kernel::actions::Transaction, usize), + SetTransaction(delta_kernel::actions::SetTransaction, usize), } impl Action { @@ -161,7 +161,7 @@ impl DataVisitor for LogVisitor { } if let Some(app_id) = getters[self.set_transaction_offset].get_opt(i, "txn.appId")? { self.actions.push(Action::SetTransaction( - TransactionVisitor::visit_txn( + SetTransactionVisitor::visit_txn( i, app_id, &getters[self.set_transaction_offset..],