-
Notifications
You must be signed in to change notification settings - Fork 400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement transaction identifiers #2327
Conversation
@roeap Please provide some feedback on how I'm interacting with the log replay. I implemented the hook with a visitor trait called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like exposing Txn 😄 kafka-delta-ingest is the only application I've seen in real life that really makes heavy use of them 😄
@@ -337,6 +339,20 @@ impl Snapshot { | |||
} | |||
} | |||
|
|||
/// TODO! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓
struct PrintVistor {} | ||
impl ReplayVistor for PrintVistor { | ||
fn visit_batch(&mut self, _batch: &RecordBatch) -> DeltaResult<()> { | ||
println!("Hello world!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still WIP / draft. Opened this get some feedback on the new mechanism of implementing a visitor pattern for hooking into the log replay. Currently implementation is only tested when a brand new table is created. I will need to be tested with reading from existing tables.
@Blajda - very sorry for the long delay! Will do a review now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, and I do like the visitor pattern to extract more actions during replay.
There are some thoughts I had during review that I am keen to ghet your opinion on.
|
||
/// Create a new application transactions. See [`Txn`] for details. | ||
pub fn new_with_last_update( | ||
app_id: &dyn ToString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
app_id: &dyn ToString, | |
app_id: impl Into<String>, |
personally I usually use Into<String>
, since ToString
usually gests implemented by the Debug
trait which would often not really make much of difference. Also just preference, but I somehow prefer impl ...
. then again its just preference :)
ActionType::Txn.schema_field().clone(), | ||
]); | ||
pub(super) static ref CHECKPOINT_SCHEMA: StructType = StructType::new(vec![ | ||
ActionType::Add.schema_field().clone(), | ||
ActionType::Txn.schema_field().clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it maybe make sense to make this configurable? The goal is to be as minimal as possible when reading the log and eventuially update the parquet reader to leverage push-down as best we can.
We will always require add / remove but can maybe read txns and others based on config. Another alternative would maybe be to just read the transactions separately and in another PR add a simple caching layer for reading especially the json files?
What do you think?
fn visit_batch(&mut self, batch: &arrow_array::RecordBatch) -> DeltaResult<()> { | ||
if batch.column_by_name("txn").is_none() { | ||
return Ok(()); | ||
} | ||
|
||
let txn_col = ex::extract_and_cast::<StructArray>(batch, "txn")?; | ||
let filter = is_not_null(txn_col)?; | ||
|
||
let filtered = filter_record_batch(batch, &filter)?; | ||
let arr = ex::extract_and_cast::<StructArray>(&filtered, "txn")?; | ||
|
||
let id = ex::extract_and_cast::<StringArray>(arr, "appId")?; | ||
let version = ex::extract_and_cast::<Int64Array>(arr, "version")?; | ||
|
||
for idx in 0..id.len() { | ||
let app = ex::read_str(id, idx)?; | ||
let version = ex::read_primitive(version, idx)?; | ||
|
||
self.app_transaction_version.insert(app.to_owned(), version); | ||
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never actually worked with the transaction IDs, but I think we may have to insert transactrion Ids only when its not yet inserted. Reason being that we replay the log from highes commit to lowest.
Description
Enable users to read transactions identifiers on table and allows writing of transactions identifiers for all operations.
Related Issue(s)