Skip to content

Commit

Permalink
Support txn actions
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklan committed Oct 9, 2024
1 parent f2beff4 commit 447224a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
19 changes: 18 additions & 1 deletion kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use delta_kernel::actions::get_log_schema;
use delta_kernel::actions::visitors::{
AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor,
AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, TransactionVisitor,
};
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
Expand Down Expand Up @@ -62,6 +62,7 @@ enum Action {
Protocol(delta_kernel::actions::Protocol, usize),
Remove(delta_kernel::actions::Remove, usize),
Add(delta_kernel::actions::Add, usize),
SetTransaction(delta_kernel::actions::Transaction, usize),
}

impl Action {
Expand All @@ -71,6 +72,7 @@ impl Action {
Action::Protocol(_, row) => *row,
Action::Remove(_, row) => *row,
Action::Add(_, row) => *row,
Action::SetTransaction(_, row) => *row,
}
}
}
Expand All @@ -89,6 +91,7 @@ struct LogVisitor {
remove_offset: usize,
protocol_offset: usize,
metadata_offset: usize,
set_transaction_offset: usize,
previous_rows_seen: usize,
}

Expand All @@ -99,12 +102,14 @@ impl LogVisitor {
let mut remove_offset = 0;
let mut protocol_offset = 0;
let mut metadata_offset = 0;
let mut set_transaction_offset = 0;
for field in log_schema.fields() {
match field.name().as_str() {
"add" => add_offset = offset,
"remove" => remove_offset = offset,
"protocol" => protocol_offset = offset,
"metaData" => metadata_offset = offset,
"txn" => set_transaction_offset = offset,
_ => {}
}
offset += fields_in(field);
Expand All @@ -115,6 +120,7 @@ impl LogVisitor {
remove_offset,
protocol_offset,
metadata_offset,
set_transaction_offset,
previous_rows_seen: 0,
}
}
Expand Down Expand Up @@ -153,6 +159,16 @@ impl DataVisitor for LogVisitor {
self.previous_rows_seen + i,
));
}
if let Some(app_id) = getters[self.set_transaction_offset].get_opt(i, "txn.appId")? {
self.actions.push(Action::SetTransaction(
TransactionVisitor::visit_txn(
i,
app_id,
&getters[self.set_transaction_offset..],
)?,
self.previous_rows_seen + i,
));
}
}
self.previous_rows_seen += row_count;
Ok(())
Expand Down Expand Up @@ -250,6 +266,7 @@ fn try_main() -> DeltaResult<()> {
Action::Protocol(p, row) => println!("\nAction {row}:\n{:#?}", p),
Action::Remove(r, row) => println!("\nAction {row}:\n{:#?}", r),
Action::Add(a, row) => println!("\nAction {row}:\n{:#?}", a),
Action::SetTransaction(t, row) => println!("\nAction {row}:\n{:#?}", t),
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ pub type TransactionMap = HashMap<String, Transaction>;
/// required.
///
#[derive(Default, Debug)]
pub(crate) struct TransactionVisitor {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct TransactionVisitor {
pub(crate) transactions: TransactionMap,
pub(crate) application_id: Option<String>,
}
Expand All @@ -269,7 +271,9 @@ impl TransactionVisitor {
}
}

pub(crate) fn visit_txn<'a>(
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn visit_txn<'a>(
row_index: usize,
app_id: String,
getters: &[&'a dyn GetData<'a>],
Expand Down

0 comments on commit 447224a

Please sign in to comment.