diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 33515bb6..ebb0978b 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -1,6 +1,6 @@ use delta_kernel::actions::get_log_schema; use delta_kernel::actions::visitors::{ - AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, TransactionVisitor, + AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor, }; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; @@ -62,7 +62,7 @@ enum Action { Protocol(delta_kernel::actions::Protocol, usize), Remove(delta_kernel::actions::Remove, usize), Add(delta_kernel::actions::Add, usize), - SetTransaction(delta_kernel::actions::Transaction, usize), + SetTransaction(delta_kernel::actions::SetTransaction, usize), } impl Action { @@ -161,7 +161,7 @@ impl DataVisitor for LogVisitor { } if let Some(app_id) = getters[self.set_transaction_offset].get_opt(i, "txn.appId")? { self.actions.push(Action::SetTransaction( - TransactionVisitor::visit_txn( + SetTransactionVisitor::visit_txn( i, app_id, &getters[self.set_transaction_offset..], diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index b17e49db..3d14eaa0 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -1,13 +1,6 @@ //! Provides parsing and manipulation of the various actions defined in the [Delta //! specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md) -pub mod deletion_vector; -pub(crate) mod schemas; -#[cfg(feature = "developer-visibility")] -pub mod visitors; -#[cfg(not(feature = "developer-visibility"))] -pub(crate) mod visitors; - use delta_kernel_derive::Schema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -19,11 +12,21 @@ use crate::actions::schemas::GetStructField; use crate::features::{ReaderFeatures, WriterFeatures}; use crate::{schema::StructType, DeltaResult, EngineData}; +pub mod deletion_vector; +pub mod set_transaction; + +pub(crate) mod schemas; +#[cfg(feature = "developer-visibility")] +pub mod visitors; +#[cfg(not(feature = "developer-visibility"))] +pub(crate) mod visitors; + pub(crate) const ADD_NAME: &str = "add"; pub(crate) const REMOVE_NAME: &str = "remove"; pub(crate) const METADATA_NAME: &str = "metaData"; pub(crate) const PROTOCOL_NAME: &str = "protocol"; -pub(crate) const TRANSACTION_NAME: &str = "txn"; +pub(crate) const SET_TRANSACTION_NAME: &str = "txn"; +pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo"; static LOG_SCHEMA: LazyLock = LazyLock::new(|| { StructType::new([ @@ -31,11 +34,11 @@ static LOG_SCHEMA: LazyLock = LazyLock::new(|| { Option::::get_struct_field(REMOVE_NAME), Option::::get_struct_field(METADATA_NAME), Option::::get_struct_field(PROTOCOL_NAME), - Option::::get_struct_field(TRANSACTION_NAME), + Option::::get_struct_field(SET_TRANSACTION_NAME), + Option::::get_struct_field(COMMIT_INFO_NAME), // We don't support the following actions yet - //Option::get_field(CDC_NAME), - //Option::get_field(COMMIT_INFO_NAME), - //Option::get_field(DOMAIN_METADATA_NAME), + //Option::::get_struct_field(CDC_NAME), + //Option::::get_struct_field(DOMAIN_METADATA_NAME), ]) }); @@ -133,6 +136,11 @@ impl Protocol { } } +#[derive(Debug, Clone, PartialEq, Eq, Schema)] +pub struct CommitInfo { + pub kernel_version: Option, +} + #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Add { /// A relative path to a data file from the root of the table or an absolute path to a file @@ -244,7 +252,7 @@ impl Remove { } #[derive(Debug, Clone, PartialEq, Eq, Schema)] -pub struct Transaction { +pub struct SetTransaction { /// A unique identifier for the application performing the transaction. pub app_id: String, diff --git a/kernel/src/transaction.rs b/kernel/src/actions/set_transaction.rs similarity index 81% rename from kernel/src/transaction.rs rename to kernel/src/actions/set_transaction.rs index c2c78b35..d6701c19 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -1,18 +1,18 @@ use std::sync::Arc; -use crate::actions::visitors::TransactionVisitor; -use crate::actions::{get_log_schema, Transaction, TRANSACTION_NAME}; +use crate::actions::visitors::SetTransactionVisitor; +use crate::actions::{get_log_schema, SetTransaction, SET_TRANSACTION_NAME}; use crate::snapshot::Snapshot; use crate::{DeltaResult, Engine, EngineData, Expression as Expr, SchemaRef}; -pub use crate::actions::visitors::TransactionMap; -pub struct TransactionScanner { +pub use crate::actions::visitors::SetTransactionMap; +pub struct SetTransactionScanner { snapshot: Arc, } -impl TransactionScanner { +impl SetTransactionScanner { pub fn new(snapshot: Arc) -> Self { - TransactionScanner { snapshot } + SetTransactionScanner { snapshot } } /// Scan the entire log for all application ids but terminate early if a specific application id is provided @@ -20,26 +20,26 @@ impl TransactionScanner { &self, engine: &dyn Engine, application_id: Option<&str>, - ) -> DeltaResult { + ) -> DeltaResult { let schema = Self::get_txn_schema()?; - let mut visitor = TransactionVisitor::new(application_id.map(|s| s.to_owned())); + let mut visitor = SetTransactionVisitor::new(application_id.map(|s| s.to_owned())); // If a specific id is requested then we can terminate log replay early as soon as it was // found. If all ids are requested then we are forced to replay the entire log. for maybe_data in self.replay_for_app_ids(engine, schema.clone())? { let (txns, _) = maybe_data?; txns.extract(schema.clone(), &mut visitor)?; // if a specific id is requested and a transaction was found, then return - if application_id.is_some() && !visitor.transactions.is_empty() { + if application_id.is_some() && !visitor.set_transactions.is_empty() { break; } } - Ok(visitor.transactions) + Ok(visitor.set_transactions) } // Factored out to facilitate testing fn get_txn_schema() -> DeltaResult { - get_log_schema().project(&[TRANSACTION_NAME]) + get_log_schema().project(&[SET_TRANSACTION_NAME]) } // Factored out to facilitate testing @@ -63,13 +63,13 @@ impl TransactionScanner { &self, engine: &dyn Engine, application_id: &str, - ) -> DeltaResult> { + ) -> DeltaResult> { let mut transactions = self.scan_application_transactions(engine, Some(application_id))?; Ok(transactions.remove(application_id)) } /// Scan the Delta Log to obtain the latest transaction for all applications - pub fn application_transactions(&self, engine: &dyn Engine) -> DeltaResult { + pub fn application_transactions(&self, engine: &dyn Engine) -> DeltaResult { self.scan_application_transactions(engine, None) } } @@ -83,14 +83,17 @@ mod tests { use crate::Table; use itertools::Itertools; - fn get_latest_transactions(path: &str, app_id: &str) -> (TransactionMap, Option) { + fn get_latest_transactions( + path: &str, + app_id: &str, + ) -> (SetTransactionMap, Option) { let path = std::fs::canonicalize(PathBuf::from(path)).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); let table = Table::new(url); let snapshot = table.snapshot(&engine, None).unwrap(); - let txn_scan = TransactionScanner::new(snapshot.into()); + let txn_scan = SetTransactionScanner::new(snapshot.into()); ( txn_scan.application_transactions(&engine).unwrap(), @@ -110,7 +113,7 @@ mod tests { assert_eq!(txns.get("my-app"), txn.as_ref()); assert_eq!( txns.get("my-app2"), - Some(Transaction { + Some(SetTransaction { app_id: "my-app2".to_owned(), version: 2, last_updated: None @@ -124,7 +127,7 @@ mod tests { assert_eq!(txns.get("my-app"), txn.as_ref()); assert_eq!( txns.get("my-app2"), - Some(Transaction { + Some(SetTransaction { app_id: "my-app2".to_owned(), version: 2, last_updated: None @@ -141,8 +144,8 @@ mod tests { let table = Table::new(url); let snapshot = table.snapshot(&engine, None).unwrap(); - let txn = TransactionScanner::new(snapshot.into()); - let txn_schema = TransactionScanner::get_txn_schema().unwrap(); + let txn = SetTransactionScanner::new(snapshot.into()); + let txn_schema = SetTransactionScanner::get_txn_schema().unwrap(); // The checkpoint has five parts, each containing one action. There are two app ids. let data: Vec<_> = txn diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 748d913a..b01723c0 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -9,7 +9,8 @@ use crate::{ }; use super::{ - deletion_vector::DeletionVectorDescriptor, Add, Format, Metadata, Protocol, Remove, Transaction, + deletion_vector::DeletionVectorDescriptor, Add, Format, Metadata, Protocol, Remove, + SetTransaction, }; #[derive(Default)] @@ -244,7 +245,7 @@ impl DataVisitor for RemoveVisitor { } } -pub type TransactionMap = HashMap; +pub type SetTransactionMap = HashMap; /// Extact application transaction actions from the log into a map /// @@ -257,16 +258,16 @@ pub type TransactionMap = HashMap; #[derive(Default, Debug)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] -struct TransactionVisitor { - pub(crate) transactions: TransactionMap, +pub(crate) struct SetTransactionVisitor { + pub(crate) set_transactions: SetTransactionMap, pub(crate) application_id: Option, } -impl TransactionVisitor { +impl SetTransactionVisitor { /// Create a new visitor. When application_id is set then bookkeeping is only for that id only pub(crate) fn new(application_id: Option) -> Self { - TransactionVisitor { - transactions: HashMap::default(), + SetTransactionVisitor { + set_transactions: HashMap::default(), application_id, } } @@ -277,10 +278,10 @@ impl TransactionVisitor { row_index: usize, app_id: String, getters: &[&'a dyn GetData<'a>], - ) -> DeltaResult { + ) -> DeltaResult { let version: i64 = getters[1].get(row_index, "txn.version")?; let last_updated: Option = getters[2].get_long(row_index, "txn.lastUpdated")?; - Ok(Transaction { + Ok(SetTransaction { app_id, version, last_updated, @@ -288,7 +289,7 @@ impl TransactionVisitor { } } -impl DataVisitor for TransactionVisitor { +impl DataVisitor for SetTransactionVisitor { fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { // Assumes batches are visited in reverse order relative to the log for i in 0..row_count { @@ -299,9 +300,9 @@ impl DataVisitor for TransactionVisitor { .as_ref() .is_some_and(|requested| !requested.eq(&app_id)) { - let txn = TransactionVisitor::visit_txn(i, app_id, getters)?; - if !self.transactions.contains_key(&txn.app_id) { - self.transactions.insert(txn.app_id.clone(), txn); + let txn = SetTransactionVisitor::visit_txn(i, app_id, getters)?; + if !self.set_transactions.contains_key(&txn.app_id) { + self.set_transactions.insert(txn.app_id.clone(), txn); } } } @@ -345,7 +346,7 @@ mod tests { use super::*; use crate::{ - actions::{get_log_schema, ADD_NAME, TRANSACTION_NAME}, + actions::{get_log_schema, ADD_NAME, SET_TRANSACTION_NAME}, engine::arrow_data::ArrowEngineData, engine::sync::{json::SyncJsonHandler, SyncEngine}, Engine, EngineData, JsonHandler, @@ -501,14 +502,14 @@ mod tests { .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); let add_schema = get_log_schema() - .project(&[TRANSACTION_NAME]) + .project(&[SET_TRANSACTION_NAME]) .expect("Can't get txn schema"); - let mut txn_visitor = TransactionVisitor::default(); + let mut txn_visitor = SetTransactionVisitor::default(); batch.extract(add_schema, &mut txn_visitor).unwrap(); - let mut actual = txn_visitor.transactions; + let mut actual = txn_visitor.set_transactions; assert_eq!( actual.remove("myApp2"), - Some(Transaction { + Some(SetTransaction { app_id: "myApp2".to_string(), version: 4, last_updated: Some(1670892998177), @@ -516,7 +517,7 @@ mod tests { ); assert_eq!( actual.remove("myApp"), - Some(Transaction { + Some(SetTransaction { app_id: "myApp".to_string(), version: 3, last_updated: None, diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index d9eabcbc..72725e82 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -73,7 +73,6 @@ pub mod scan; pub mod schema; pub mod snapshot; pub mod table; -pub mod transaction; pub(crate) mod utils; pub use engine_data::{DataVisitor, EngineData};