Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move transaction module into actions/ and rename to set_transaction #386

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use delta_kernel::actions::get_log_schema;
use delta_kernel::actions::visitors::{
AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, TransactionVisitor,
AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor,
};
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
Expand Down Expand Up @@ -62,7 +62,7 @@ enum Action {
Protocol(delta_kernel::actions::Protocol, usize),
Remove(delta_kernel::actions::Remove, usize),
Add(delta_kernel::actions::Add, usize),
SetTransaction(delta_kernel::actions::Transaction, usize),
SetTransaction(delta_kernel::actions::SetTransaction, usize),
}

impl Action {
Expand Down Expand Up @@ -161,7 +161,7 @@ impl DataVisitor for LogVisitor {
}
if let Some(app_id) = getters[self.set_transaction_offset].get_opt(i, "txn.appId")? {
self.actions.push(Action::SetTransaction(
TransactionVisitor::visit_txn(
SetTransactionVisitor::visit_txn(
i,
app_id,
&getters[self.set_transaction_offset..],
Expand Down
34 changes: 21 additions & 13 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
//! Provides parsing and manipulation of the various actions defined in the [Delta
//! specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md)

pub mod deletion_vector;
pub(crate) mod schemas;
#[cfg(feature = "developer-visibility")]
pub mod visitors;
#[cfg(not(feature = "developer-visibility"))]
pub(crate) mod visitors;

use delta_kernel_derive::Schema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand All @@ -19,23 +12,33 @@ use crate::actions::schemas::GetStructField;
use crate::features::{ReaderFeatures, WriterFeatures};
use crate::{schema::StructType, DeltaResult, EngineData};

pub mod deletion_vector;
pub mod set_transaction;

pub(crate) mod schemas;
#[cfg(feature = "developer-visibility")]
pub mod visitors;
#[cfg(not(feature = "developer-visibility"))]
pub(crate) mod visitors;

pub(crate) const ADD_NAME: &str = "add";
pub(crate) const REMOVE_NAME: &str = "remove";
pub(crate) const METADATA_NAME: &str = "metaData";
pub(crate) const PROTOCOL_NAME: &str = "protocol";
pub(crate) const TRANSACTION_NAME: &str = "txn";
pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";

static LOG_SCHEMA: LazyLock<StructType> = LazyLock::new(|| {
StructType::new([
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
Option::<Metadata>::get_struct_field(METADATA_NAME),
Option::<Protocol>::get_struct_field(PROTOCOL_NAME),
Option::<Transaction>::get_struct_field(TRANSACTION_NAME),
Option::<SetTransaction>::get_struct_field(SET_TRANSACTION_NAME),
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME),
// We don't support the following actions yet
//Option<Cdc>::get_field(CDC_NAME),
//Option<CommitInfo>::get_field(COMMIT_INFO_NAME),
//Option<DomainMetadata>::get_field(DOMAIN_METADATA_NAME),
//Option::<Cdc>::get_struct_field(CDC_NAME),
//Option::<DomainMetadata>::get_struct_field(DOMAIN_METADATA_NAME),
])
});

Expand Down Expand Up @@ -133,6 +136,11 @@ impl Protocol {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct CommitInfo {
pub kernel_version: Option<String>,
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct Add {
/// A relative path to a data file from the root of the table or an absolute path to a file
Expand Down Expand Up @@ -244,7 +252,7 @@ impl Remove {
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct Transaction {
pub struct SetTransaction {
/// A unique identifier for the application performing the transaction.
pub app_id: String,

Expand Down
41 changes: 22 additions & 19 deletions kernel/src/transaction.rs → kernel/src/actions/set_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
use std::sync::Arc;

use crate::actions::visitors::TransactionVisitor;
use crate::actions::{get_log_schema, Transaction, TRANSACTION_NAME};
use crate::actions::visitors::SetTransactionVisitor;
use crate::actions::{get_log_schema, SetTransaction, SET_TRANSACTION_NAME};
use crate::snapshot::Snapshot;
use crate::{DeltaResult, Engine, EngineData, Expression as Expr, SchemaRef};

pub use crate::actions::visitors::TransactionMap;
pub struct TransactionScanner {
pub use crate::actions::visitors::SetTransactionMap;
pub struct SetTransactionScanner {
snapshot: Arc<Snapshot>,
}

impl TransactionScanner {
impl SetTransactionScanner {
pub fn new(snapshot: Arc<Snapshot>) -> Self {
TransactionScanner { snapshot }
SetTransactionScanner { snapshot }
}

/// Scan the entire log for all application ids but terminate early if a specific application id is provided
fn scan_application_transactions(
&self,
engine: &dyn Engine,
application_id: Option<&str>,
) -> DeltaResult<TransactionMap> {
) -> DeltaResult<SetTransactionMap> {
let schema = Self::get_txn_schema()?;
let mut visitor = TransactionVisitor::new(application_id.map(|s| s.to_owned()));
let mut visitor = SetTransactionVisitor::new(application_id.map(|s| s.to_owned()));
// If a specific id is requested then we can terminate log replay early as soon as it was
// found. If all ids are requested then we are forced to replay the entire log.
for maybe_data in self.replay_for_app_ids(engine, schema.clone())? {
let (txns, _) = maybe_data?;
txns.extract(schema.clone(), &mut visitor)?;
// if a specific id is requested and a transaction was found, then return
if application_id.is_some() && !visitor.transactions.is_empty() {
if application_id.is_some() && !visitor.set_transactions.is_empty() {
break;
}
}

Ok(visitor.transactions)
Ok(visitor.set_transactions)
}

// Factored out to facilitate testing
fn get_txn_schema() -> DeltaResult<SchemaRef> {
get_log_schema().project(&[TRANSACTION_NAME])
get_log_schema().project(&[SET_TRANSACTION_NAME])
}

// Factored out to facilitate testing
Expand All @@ -63,13 +63,13 @@ impl TransactionScanner {
&self,
engine: &dyn Engine,
application_id: &str,
) -> DeltaResult<Option<Transaction>> {
) -> DeltaResult<Option<SetTransaction>> {
let mut transactions = self.scan_application_transactions(engine, Some(application_id))?;
Ok(transactions.remove(application_id))
}

/// Scan the Delta Log to obtain the latest transaction for all applications
pub fn application_transactions(&self, engine: &dyn Engine) -> DeltaResult<TransactionMap> {
pub fn application_transactions(&self, engine: &dyn Engine) -> DeltaResult<SetTransactionMap> {
self.scan_application_transactions(engine, None)
}
}
Expand All @@ -83,14 +83,17 @@ mod tests {
use crate::Table;
use itertools::Itertools;

fn get_latest_transactions(path: &str, app_id: &str) -> (TransactionMap, Option<Transaction>) {
fn get_latest_transactions(
path: &str,
app_id: &str,
) -> (SetTransactionMap, Option<SetTransaction>) {
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let txn_scan = TransactionScanner::new(snapshot.into());
let txn_scan = SetTransactionScanner::new(snapshot.into());

(
txn_scan.application_transactions(&engine).unwrap(),
Expand All @@ -110,7 +113,7 @@ mod tests {
assert_eq!(txns.get("my-app"), txn.as_ref());
assert_eq!(
txns.get("my-app2"),
Some(Transaction {
Some(SetTransaction {
app_id: "my-app2".to_owned(),
version: 2,
last_updated: None
Expand All @@ -124,7 +127,7 @@ mod tests {
assert_eq!(txns.get("my-app"), txn.as_ref());
assert_eq!(
txns.get("my-app2"),
Some(Transaction {
Some(SetTransaction {
app_id: "my-app2".to_owned(),
version: 2,
last_updated: None
Expand All @@ -141,8 +144,8 @@ mod tests {

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let txn = TransactionScanner::new(snapshot.into());
let txn_schema = TransactionScanner::get_txn_schema().unwrap();
let txn = SetTransactionScanner::new(snapshot.into());
let txn_schema = SetTransactionScanner::get_txn_schema().unwrap();

// The checkpoint has five parts, each containing one action. There are two app ids.
let data: Vec<_> = txn
Expand Down
39 changes: 20 additions & 19 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::{
};

use super::{
deletion_vector::DeletionVectorDescriptor, Add, Format, Metadata, Protocol, Remove, Transaction,
deletion_vector::DeletionVectorDescriptor, Add, Format, Metadata, Protocol, Remove,
SetTransaction,
};

#[derive(Default)]
Expand Down Expand Up @@ -244,7 +245,7 @@ impl DataVisitor for RemoveVisitor {
}
}

pub type TransactionMap = HashMap<String, Transaction>;
pub type SetTransactionMap = HashMap<String, SetTransaction>;

/// Extact application transaction actions from the log into a map
///
Expand All @@ -257,16 +258,16 @@ pub type TransactionMap = HashMap<String, Transaction>;
#[derive(Default, Debug)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct TransactionVisitor {
pub(crate) transactions: TransactionMap,
pub(crate) struct SetTransactionVisitor {
pub(crate) set_transactions: SetTransactionMap,
pub(crate) application_id: Option<String>,
}

impl TransactionVisitor {
impl SetTransactionVisitor {
/// Create a new visitor. When application_id is set then bookkeeping is only for that id only
pub(crate) fn new(application_id: Option<String>) -> Self {
TransactionVisitor {
transactions: HashMap::default(),
SetTransactionVisitor {
set_transactions: HashMap::default(),
application_id,
}
}
Expand All @@ -277,18 +278,18 @@ impl TransactionVisitor {
row_index: usize,
app_id: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Transaction> {
) -> DeltaResult<SetTransaction> {
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
let version: i64 = getters[1].get(row_index, "txn.version")?;
let last_updated: Option<i64> = getters[2].get_long(row_index, "txn.lastUpdated")?;
Ok(Transaction {
Ok(SetTransaction {
app_id,
version,
last_updated,
})
}
}

impl DataVisitor for TransactionVisitor {
impl DataVisitor for SetTransactionVisitor {
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
// Assumes batches are visited in reverse order relative to the log
for i in 0..row_count {
Expand All @@ -299,9 +300,9 @@ impl DataVisitor for TransactionVisitor {
.as_ref()
.is_some_and(|requested| !requested.eq(&app_id))
{
let txn = TransactionVisitor::visit_txn(i, app_id, getters)?;
if !self.transactions.contains_key(&txn.app_id) {
self.transactions.insert(txn.app_id.clone(), txn);
let txn = SetTransactionVisitor::visit_txn(i, app_id, getters)?;
if !self.set_transactions.contains_key(&txn.app_id) {
self.set_transactions.insert(txn.app_id.clone(), txn);
}
}
}
Expand Down Expand Up @@ -345,7 +346,7 @@ mod tests {

use super::*;
use crate::{
actions::{get_log_schema, ADD_NAME, TRANSACTION_NAME},
actions::{get_log_schema, ADD_NAME, SET_TRANSACTION_NAME},
engine::arrow_data::ArrowEngineData,
engine::sync::{json::SyncJsonHandler, SyncEngine},
Engine, EngineData, JsonHandler,
Expand Down Expand Up @@ -501,22 +502,22 @@ mod tests {
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let add_schema = get_log_schema()
.project(&[TRANSACTION_NAME])
.project(&[SET_TRANSACTION_NAME])
.expect("Can't get txn schema");
let mut txn_visitor = TransactionVisitor::default();
let mut txn_visitor = SetTransactionVisitor::default();
batch.extract(add_schema, &mut txn_visitor).unwrap();
let mut actual = txn_visitor.transactions;
let mut actual = txn_visitor.set_transactions;
assert_eq!(
actual.remove("myApp2"),
Some(Transaction {
Some(SetTransaction {
app_id: "myApp2".to_string(),
version: 4,
last_updated: Some(1670892998177),
},)
);
assert_eq!(
actual.remove("myApp"),
Some(Transaction {
Some(SetTransaction {
app_id: "myApp".to_string(),
version: 3,
last_updated: None,
Expand Down
1 change: 0 additions & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ pub mod scan;
pub mod schema;
pub mod snapshot;
pub mod table;
pub mod transaction;
pub(crate) mod utils;

pub use engine_data::{DataVisitor, EngineData};
Expand Down
Loading