Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement transaction identifiers #2327

Closed
wants to merge 11 commits into from
20 changes: 20 additions & 0 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,26 @@ pub struct Txn {
pub last_updated: Option<i64>,
}

impl Txn {
/// Create a new application transactions. See [`Txn`] for details.
pub fn new(app_id: &dyn ToString, version: i64) -> Self {
Self::new_with_last_update(app_id, version, None)
}

/// Create a new application transactions. See [`Txn`] for details.
pub fn new_with_last_update(
app_id: &dyn ToString,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
app_id: &dyn ToString,
app_id: impl Into<String>,

personally I usually use Into<String>, since ToString usually gests implemented by the Debug trait which would often not really make much of difference. Also just preference, but I somehow prefer impl .... then again its just preference :)

version: i64,
last_updated: Option<i64>,
) -> Self {
Txn {
app_id: app_id.to_string(),
version,
last_updated,
}
}
}

/// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored.
/// However the reference implementation as well as delta-rs store useful information that may for instance
/// allow us to be more permissive in commit conflict resolution.
Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ lazy_static! {
pub(super) static ref COMMIT_SCHEMA: StructType = StructType::new(vec![
ActionType::Add.schema_field().clone(),
ActionType::Remove.schema_field().clone(),
ActionType::Txn.schema_field().clone(),
]);
pub(super) static ref CHECKPOINT_SCHEMA: StructType = StructType::new(vec![
ActionType::Add.schema_field().clone(),
ActionType::Txn.schema_field().clone(),
Comment on lines +33 to +37
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it maybe make sense to make this configurable? The goal is to be as minimal as possible when reading the log and eventuially update the parquet reader to leverage push-down as best we can.

We will always require add / remove but can maybe read txns and others based on config. Another alternative would maybe be to just read the transactions separately and in another PR add a simple caching layer for reading especially the json files?

What do you think?

]);
pub(super) static ref CHECKPOINT_SCHEMA: StructType =
StructType::new(vec![ActionType::Add.schema_field().clone(),]);
pub(super) static ref TOMBSTONE_SCHEMA: StructType =
StructType::new(vec![ActionType::Remove.schema_field().clone(),]);
}
Expand Down
55 changes: 41 additions & 14 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ impl Snapshot {
}

/// Get the files in the snapshot
pub fn files(
pub fn files<'a>(
&self,
store: Arc<dyn ObjectStore>,
) -> DeltaResult<ReplayStream<BoxStream<'_, DeltaResult<RecordBatch>>>> {
visitors: Vec<&'a mut dyn ReplayVisitor>,
) -> DeltaResult<ReplayStream<'a, BoxStream<'_, DeltaResult<RecordBatch>>>> {
let log_stream = self.log_segment.commit_stream(
store.clone(),
&log_segment::COMMIT_SCHEMA,
Expand All @@ -211,7 +212,7 @@ impl Snapshot {
&log_segment::CHECKPOINT_SCHEMA,
&self.config,
);
ReplayStream::try_new(log_stream, checkpoint_stream, self)
ReplayStream::try_new(log_stream, checkpoint_stream, self, visitors)
}

/// Get the commit infos in the snapshot
Expand Down Expand Up @@ -331,6 +332,12 @@ impl Snapshot {
}
}

/// Allows hooking into the reading of commit files and checkpoints whenever a table is loaded or updated.
pub trait ReplayVisitor: Send {
/// Process a batch
fn visit_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()>;
}

/// A snapshot of a Delta table that has been eagerly loaded into memory.
#[derive(Debug, Clone, PartialEq)]
pub struct EagerSnapshot {
Expand All @@ -347,9 +354,20 @@ impl EagerSnapshot {
store: Arc<dyn ObjectStore>,
config: DeltaTableConfig,
version: Option<i64>,
) -> DeltaResult<Self> {
Self::try_new_with_visitor(table_root, store, config, version, vec![]).await
}

/// Create a new [`EagerSnapshot`] instance
pub async fn try_new_with_visitor(
table_root: &Path,
store: Arc<dyn ObjectStore>,
config: DeltaTableConfig,
version: Option<i64>,
visitors: Vec<&mut dyn ReplayVisitor>,
) -> DeltaResult<Self> {
let snapshot = Snapshot::try_new(table_root, store.clone(), config, version).await?;
let files = snapshot.files(store)?.try_collect().await?;
let files = snapshot.files(store, visitors)?.try_collect().await?;
Ok(Self { snapshot, files })
}

Expand All @@ -368,14 +386,16 @@ impl EagerSnapshot {
}

/// Update the snapshot to the given version
pub async fn update(
pub async fn update<'a>(
&mut self,
log_store: Arc<dyn LogStore>,
target_version: Option<i64>,
visitors: Vec<&'a mut dyn ReplayVisitor>,
) -> DeltaResult<()> {
if Some(self.version()) == target_version {
return Ok(());
}

let new_slice = self
.snapshot
.update_inner(log_store.clone(), target_version)
Expand All @@ -399,10 +419,11 @@ impl EagerSnapshot {
.boxed()
};
let mapper = LogMapper::try_new(&self.snapshot)?;
let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot)?
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
.try_collect()
.await?;
let files =
ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot, visitors)?
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
.try_collect()
.await?;

self.files = files;
}
Expand Down Expand Up @@ -476,6 +497,7 @@ impl EagerSnapshot {
pub fn advance<'a>(
&mut self,
commits: impl IntoIterator<Item = &'a CommitData>,
mut visitors: Vec<&'a mut dyn ReplayVisitor>,
) -> DeltaResult<i64> {
let mut metadata = None;
let mut protocol = None;
Expand Down Expand Up @@ -506,7 +528,11 @@ impl EagerSnapshot {
let mut scanner = LogReplayScanner::new();

for batch in actions {
files.push(scanner.process_files_batch(&batch?, true)?);
let batch = batch?;
files.push(scanner.process_files_batch(&batch, true)?);
for visitor in &mut visitors {
visitor.visit_batch(&batch)?;
}
}

let mapper = LogMapper::try_new(&self.snapshot)?;
Expand Down Expand Up @@ -652,7 +678,7 @@ mod tests {
assert_eq!(tombstones.len(), 31);

let batches = snapshot
.files(store.clone())?
.files(store.clone(), vec![])?
.try_collect::<Vec<_>>()
.await?;
let expected = [
Expand Down Expand Up @@ -682,7 +708,7 @@ mod tests {
)
.await?;
let batches = snapshot
.files(store.clone())?
.files(store.clone(), vec![])?
.try_collect::<Vec<_>>()
.await?;
let num_files = batches.iter().map(|b| b.num_rows() as i64).sum::<i64>();
Expand Down Expand Up @@ -778,9 +804,10 @@ mod tests {
predicate: None,
};

let actions = vec![CommitData::new(removes, operation, HashMap::new()).unwrap()];
let actions =
vec![CommitData::new(removes, operation, HashMap::new(), Vec::new()).unwrap()];

let new_version = snapshot.advance(&actions)?;
let new_version = snapshot.advance(&actions, vec![])?;
assert_eq!(new_version, version + 1);

let new_files = snapshot.file_actions()?.map(|f| f.path).collect::<Vec<_>>();
Expand Down
47 changes: 35 additions & 12 deletions crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName};
use crate::kernel::arrow::json;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

use super::ReplayVisitor;
use super::Snapshot;

pin_project! {
pub struct ReplayStream<S> {
pub struct ReplayStream<'a, S> {
scanner: LogReplayScanner,

mapper: Arc<LogMapper>,

visitors: Vec<&'a mut dyn ReplayVisitor>,

#[pin]
commits: S,

Expand All @@ -39,8 +42,13 @@ pin_project! {
}
}

impl<S> ReplayStream<S> {
pub(super) fn try_new(commits: S, checkpoint: S, snapshot: &Snapshot) -> DeltaResult<Self> {
impl<'a, S> ReplayStream<'a, S> {
pub(super) fn try_new(
commits: S,
checkpoint: S,
snapshot: &Snapshot,
visitors: Vec<&'a mut dyn ReplayVisitor>,
) -> DeltaResult<Self> {
let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?);
let mapper = Arc::new(LogMapper {
stats_schema,
Expand All @@ -50,6 +58,7 @@ impl<S> ReplayStream<S> {
commits,
checkpoint,
mapper,
visitors,
scanner: LogReplayScanner::new(),
})
}
Expand Down Expand Up @@ -127,7 +136,7 @@ fn map_batch(
Ok(batch)
}

impl<S> Stream for ReplayStream<S>
impl<'a, S> Stream for ReplayStream<'a, S>
where
S: Stream<Item = DeltaResult<RecordBatch>>,
{
Expand All @@ -136,19 +145,33 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let res = this.commits.poll_next(cx).map(|b| match b {
Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, true) {
Ok(filtered) => Some(this.mapper.map_batch(filtered)),
Err(e) => Some(Err(e)),
},
Some(Ok(batch)) => {
for visitor in this.visitors.iter_mut() {
if let Err(e) = visitor.visit_batch(&batch) {
return Some(Err(e));
}
}
match this.scanner.process_files_batch(&batch, true) {
Ok(filtered) => Some(this.mapper.map_batch(filtered)),
Err(e) => Some(Err(e)),
}
}
Some(Err(e)) => Some(Err(e)),
None => None,
});
if matches!(res, Poll::Ready(None)) {
this.checkpoint.poll_next(cx).map(|b| match b {
Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, false) {
Ok(filtered) => Some(this.mapper.map_batch(filtered)),
Err(e) => Some(Err(e)),
},
Some(Ok(batch)) => {
for visitor in this.visitors.iter_mut() {
if let Err(e) = visitor.visit_batch(&batch) {
return Some(Err(e));
}
}
match this.scanner.process_files_batch(&batch, false) {
Ok(filtered) => Some(this.mapper.map_batch(filtered)),
Err(e) => Some(Err(e)),
}
}
Some(Err(e)) => Some(Err(e)),
None => None,
})
Expand Down
Loading
Loading