Skip to content

Commit

Permalink
https://github.com/delta-io/delta-rs/pull/2327
Browse files Browse the repository at this point in the history
  • Loading branch information
Nekit2217 committed Apr 16, 2024
1 parent 03488bb commit 78a2a78
Show file tree
Hide file tree
Showing 10 changed files with 1,125 additions and 143 deletions.
655 changes: 655 additions & 0 deletions benchmarks/src/bin/merge.rs

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,26 @@ pub struct Txn {
pub last_updated: Option<i64>,
}

impl Txn {
/// Create a new application transactions. See [`Txn`] for details.
pub fn new(app_id: &dyn ToString, version: i64) -> Self {
Self::new_with_last_update(app_id, version, None)
}

/// Create a new application transactions. See [`Txn`] for details.
pub fn new_with_last_update(
app_id: &dyn ToString,
version: i64,
last_updated: Option<i64>,
) -> Self {
Txn {
app_id: app_id.to_string(),
version,
last_updated,
}
}
}

/// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored.
/// However the reference implementation as well as delta-rs store useful information that may for instance
/// allow us to be more permissive in commit conflict resolution.
Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ lazy_static! {
pub(super) static ref COMMIT_SCHEMA: StructType = StructType::new(vec![
ActionType::Add.schema_field().clone(),
ActionType::Remove.schema_field().clone(),
ActionType::Txn.schema_field().clone(),
]);
pub(super) static ref CHECKPOINT_SCHEMA: StructType = StructType::new(vec![
ActionType::Add.schema_field().clone(),
ActionType::Txn.schema_field().clone(),
]);
pub(super) static ref CHECKPOINT_SCHEMA: StructType =
StructType::new(vec![ActionType::Add.schema_field().clone(),]);
pub(super) static ref TOMBSTONE_SCHEMA: StructType =
StructType::new(vec![ActionType::Remove.schema_field().clone(),]);
}
Expand Down
53 changes: 40 additions & 13 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ impl Snapshot {
}

/// Get the files in the snapshot
pub fn files(
pub fn files<'a>(
&self,
store: Arc<dyn ObjectStore>,
) -> DeltaResult<ReplayStream<BoxStream<'_, DeltaResult<RecordBatch>>>> {
visitors: Vec<&'a mut dyn ReplayVisitor>,
) -> DeltaResult<ReplayStream<'a, BoxStream<'_, DeltaResult<RecordBatch>>>> {
let log_stream = self.log_segment.commit_stream(
store.clone(),
&log_segment::COMMIT_SCHEMA,
Expand All @@ -211,7 +212,7 @@ impl Snapshot {
&log_segment::CHECKPOINT_SCHEMA,
&self.config,
);
ReplayStream::try_new(log_stream, checkpoint_stream, self)
ReplayStream::try_new(log_stream, checkpoint_stream, self, visitors)
}

/// Get the commit infos in the snapshot
Expand Down Expand Up @@ -331,6 +332,12 @@ impl Snapshot {
}
}

/// Allows hooking into the reading of commit files and checkpoints whenever a table is loaded or updated.
pub trait ReplayVisitor: Send {
/// Process a batch
fn visit_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()>;
}

/// A snapshot of a Delta table that has been eagerly loaded into memory.
#[derive(Debug, Clone, PartialEq)]
pub struct EagerSnapshot {
Expand All @@ -347,9 +354,20 @@ impl EagerSnapshot {
store: Arc<dyn ObjectStore>,
config: DeltaTableConfig,
version: Option<i64>,
) -> DeltaResult<Self> {
Self::try_new_with_visitor(table_root, store, config, version, vec![]).await
}

/// Create a new [`EagerSnapshot`] instance
pub async fn try_new_with_visitor(
table_root: &Path,
store: Arc<dyn ObjectStore>,
config: DeltaTableConfig,
version: Option<i64>,
visitors: Vec<&mut dyn ReplayVisitor>,
) -> DeltaResult<Self> {
let snapshot = Snapshot::try_new(table_root, store.clone(), config, version).await?;
let files = snapshot.files(store)?.try_collect().await?;
let files = snapshot.files(store, visitors)?.try_collect().await?;
Ok(Self { snapshot, files })
}

Expand All @@ -368,14 +386,16 @@ impl EagerSnapshot {
}

/// Update the snapshot to the given version
pub async fn update(
pub async fn update<'a>(
&mut self,
log_store: Arc<dyn LogStore>,
target_version: Option<i64>,
visitors: Vec<&'a mut dyn ReplayVisitor>,
) -> DeltaResult<()> {
if Some(self.version()) == target_version {
return Ok(());
}

let new_slice = self
.snapshot
.update_inner(log_store.clone(), target_version)
Expand All @@ -399,10 +419,11 @@ impl EagerSnapshot {
.boxed()
};
let mapper = LogMapper::try_new(&self.snapshot)?;
let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot)?
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
.try_collect()
.await?;
let files =
ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot, visitors)?
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
.try_collect()
.await?;

self.files = files;
}
Expand Down Expand Up @@ -476,6 +497,7 @@ impl EagerSnapshot {
pub fn advance<'a>(
&mut self,
commits: impl IntoIterator<Item = &'a CommitData>,
mut visitors: Vec<&'a mut dyn ReplayVisitor>,
) -> DeltaResult<i64> {
let mut metadata = None;
let mut protocol = None;
Expand Down Expand Up @@ -506,7 +528,11 @@ impl EagerSnapshot {
let mut scanner = LogReplayScanner::new();

for batch in actions {
files.push(scanner.process_files_batch(&batch?, true)?);
let batch = batch?;
files.push(scanner.process_files_batch(&batch, true)?);
for visitor in &mut visitors {
visitor.visit_batch(&batch)?;
}
}

let mapper = LogMapper::try_new(&self.snapshot)?;
Expand Down Expand Up @@ -652,7 +678,7 @@ mod tests {
assert_eq!(tombstones.len(), 31);

let batches = snapshot
.files(store.clone())?
.files(store.clone(), vec![])?
.try_collect::<Vec<_>>()
.await?;
let expected = [
Expand Down Expand Up @@ -778,9 +804,10 @@ mod tests {
predicate: None,
};

let actions = vec![CommitData::new(removes, operation, HashMap::new()).unwrap()];
let actions =
vec![CommitData::new(removes, operation, HashMap::new(), Vec::new()).unwrap()];

let new_version = snapshot.advance(&actions)?;
let new_version = snapshot.advance(&actions, vec![])?;
assert_eq!(new_version, version + 1);

let new_files = snapshot.file_actions()?.map(|f| f.path).collect::<Vec<_>>();
Expand Down
47 changes: 35 additions & 12 deletions crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName};
use crate::kernel::arrow::json;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

use super::ReplayVisitor;
use super::Snapshot;

pin_project! {
pub struct ReplayStream<S> {
pub struct ReplayStream<'a, S> {
scanner: LogReplayScanner,

mapper: Arc<LogMapper>,

visitors: Vec<&'a mut dyn ReplayVisitor>,

#[pin]
commits: S,

Expand All @@ -39,8 +42,13 @@ pin_project! {
}
}

impl<S> ReplayStream<S> {
pub(super) fn try_new(commits: S, checkpoint: S, snapshot: &Snapshot) -> DeltaResult<Self> {
impl<'a, S> ReplayStream<'a, S> {
pub(super) fn try_new(
commits: S,
checkpoint: S,
snapshot: &Snapshot,
visitors: Vec<&'a mut dyn ReplayVisitor>,
) -> DeltaResult<Self> {
let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?);
let mapper = Arc::new(LogMapper {
stats_schema,
Expand All @@ -50,6 +58,7 @@ impl<S> ReplayStream<S> {
commits,
checkpoint,
mapper,
visitors,
scanner: LogReplayScanner::new(),
})
}
Expand Down Expand Up @@ -127,7 +136,7 @@ fn map_batch(
Ok(batch)
}

impl<S> Stream for ReplayStream<S>
impl<'a, S> Stream for ReplayStream<'a, S>
where
S: Stream<Item = DeltaResult<RecordBatch>>,
{
Expand All @@ -136,19 +145,33 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let res = this.commits.poll_next(cx).map(|b| match b {
Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, true) {
Ok(filtered) => Some(this.mapper.map_batch(filtered)),
Err(e) => Some(Err(e)),
},
Some(Ok(batch)) => {
for visitor in this.visitors.iter_mut() {
if let Err(e) = visitor.visit_batch(&batch) {
return Some(Err(e));
}
}
match this.scanner.process_files_batch(&batch, true) {
Ok(filtered) => Some(this.mapper.map_batch(filtered)),
Err(e) => Some(Err(e)),
}
}
Some(Err(e)) => Some(Err(e)),
None => None,
});
if matches!(res, Poll::Ready(None)) {
this.checkpoint.poll_next(cx).map(|b| match b {
Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, false) {
Ok(filtered) => Some(this.mapper.map_batch(filtered)),
Err(e) => Some(Err(e)),
},
Some(Ok(batch)) => {
for visitor in this.visitors.iter_mut() {
if let Err(e) = visitor.visit_batch(&batch) {
return Some(Err(e));
}
}
match this.scanner.process_files_batch(&batch, false) {
Ok(filtered) => Some(this.mapper.map_batch(filtered)),
Err(e) => Some(Err(e)),
}
}
Some(Err(e)) => Some(Err(e)),
None => None,
})
Expand Down
Loading

0 comments on commit 78a2a78

Please sign in to comment.