Skip to content

Commit

Permalink
feat: add commit infos apis to new snapshots
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
  • Loading branch information
roeap committed Jan 19, 2025
1 parent 27accd7 commit e7c7766
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ debug = "line-tables-only"
# "default-engine",
# "developer-visibility",
# ] }
delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "caeb70ab78e4d5f3b56b5105fd3587c1046d1e1b", features = [
delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "023abf1ee604b77bbaa5efec97e043fc4bdf220b", features = [
"default-engine",
"developer-visibility",
] }
Expand Down
20 changes: 16 additions & 4 deletions crates/core/src/kernel/snapshot_next/eager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use arrow::compute::{concat_batches, filter_record_batch};
use arrow_array::{BooleanArray, RecordBatch};
use chrono::format::Item;
use delta_kernel::actions::set_transaction::SetTransactionMap;
use delta_kernel::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME};
use delta_kernel::actions::{Add, Metadata, Protocol, SetTransaction};
Expand All @@ -11,14 +10,15 @@ use delta_kernel::log_segment::LogSegment;
use delta_kernel::scan::log_replay::scan_action_iter;
use delta_kernel::schema::Schema;
use delta_kernel::table_properties::TableProperties;
use delta_kernel::{EngineData, Expression, Table, Version};
use delta_kernel::{Engine, EngineData, Expression, Table, Version};
use itertools::Itertools;
use object_store::ObjectStore;
use url::Url;

use super::iterators::{AddIterator, AddView, AddViewItem};
use super::lazy::LazySnapshot;
use super::{Snapshot, SnapshotError};
use crate::kernel::CommitInfo;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

/// An eager snapshot of a Delta Table at a specific version.
Expand Down Expand Up @@ -77,6 +77,14 @@ impl Snapshot for EagerSnapshot {
) -> DeltaResult<Option<SetTransaction>> {
self.snapshot.application_transaction(app_id)
}

fn commit_infos(
&self,
start_version: impl Into<Option<Version>>,
limit: impl Into<Option<usize>>,
) -> DeltaResult<impl Iterator<Item = (Version, CommitInfo)>> {
self.snapshot.commit_infos(start_version, limit)
}
}

impl EagerSnapshot {
Expand All @@ -92,7 +100,7 @@ impl EagerSnapshot {
LazySnapshot::try_new(Table::try_from_uri(table_root)?, store, version).await?;
let files = config
.require_files
.then(|| -> DeltaResult<_> { Ok(replay_file_actions(&snapshot)?) })
.then(|| -> DeltaResult<_> { replay_file_actions(&snapshot) })
.transpose()?;
Ok(Self {
snapshot,
Expand All @@ -101,6 +109,10 @@ impl EagerSnapshot {
})
}

pub(crate) fn engine_ref(&self) -> &Arc<dyn Engine> {
self.snapshot.engine_ref()
}

pub fn file_data(&self) -> DeltaResult<&RecordBatch> {
Ok(self
.files
Expand All @@ -122,7 +134,7 @@ impl EagerSnapshot {
.files
.as_ref()
.map(|f| f.num_rows())
.ok_or_else(|| SnapshotError::FilesNotInitialized)?)
.ok_or(SnapshotError::FilesNotInitialized)?)
}

pub(crate) fn update(&mut self) -> DeltaResult<()> {
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/kernel/snapshot_next/iterators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct AddIterator<'a> {
}

impl AddIterator<'_> {
pub fn try_new<'a>(actions: &'a RecordBatch) -> DeltaResult<AddIterator<'a>> {
pub fn try_new(actions: &RecordBatch) -> DeltaResult<AddIterator<'_>> {
validate_column::<StringArray>(actions, &[ADD_NAME, "path"])?;
validate_column::<Int64Array>(actions, &[ADD_NAME, "size"])?;
validate_column::<Int64Array>(actions, &[ADD_NAME, "modificationTime"])?;
Expand Down Expand Up @@ -108,7 +108,7 @@ pub struct AddViewItem {
}

impl AddViewItem {
pub fn path<T: Array>(&self) -> &str {
pub fn path(&self) -> &str {
extract_column(&self.actions, &[ADD_NAME, "path"])
.unwrap()
.as_string::<i32>()
Expand Down Expand Up @@ -273,7 +273,7 @@ fn validate_column<'a, T: Array + 'static>(
}
} else {
return Err(DeltaTableError::from(
crate::protocol::ProtocolError::InvalidField(format!("Column not found",)),
crate::protocol::ProtocolError::InvalidField("Column not found".to_string()),
));
}
Ok(())
Expand Down
61 changes: 58 additions & 3 deletions crates/core/src/kernel/snapshot_next/lazy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Snapshot of a Delta Table at a specific version.
//!
use std::io::{BufRead, BufReader, Cursor};
use std::sync::{Arc, LazyLock};

use arrow::compute::filter_record_batch;
Expand All @@ -12,6 +13,7 @@ use delta_kernel::engine::default::executor::tokio::{
TokioBackgroundExecutor, TokioMultiThreadExecutor,
};
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::log_segment::LogSegment;
use delta_kernel::schema::Schema;
use delta_kernel::snapshot::Snapshot as SnapshotInner;
use delta_kernel::table_properties::TableProperties;
Expand All @@ -23,6 +25,7 @@ use url::Url;

use super::cache::CommitCacheObjectStore;
use super::Snapshot;
use crate::kernel::{Action, CommitInfo};
use crate::{DeltaResult, DeltaTableError};

// TODO: avoid repetitive parsing of json stats
Expand All @@ -35,7 +38,7 @@ pub struct LazySnapshot {

impl Snapshot for LazySnapshot {
fn table_root(&self) -> &Url {
&self.inner.table_root()
self.inner.table_root()
}

fn version(&self) -> Version {
Expand All @@ -55,7 +58,7 @@ impl Snapshot for LazySnapshot {
}

fn table_properties(&self) -> &TableProperties {
&self.inner.table_properties()
self.inner.table_properties()
}

fn files(&self) -> DeltaResult<impl Iterator<Item = DeltaResult<RecordBatch>>> {
Expand Down Expand Up @@ -96,6 +99,58 @@ impl Snapshot for LazySnapshot {
let scanner = SetTransactionScanner::new(self.inner.clone());
Ok(scanner.application_transaction(self.engine.as_ref(), app_id.as_ref())?)
}

fn commit_infos(
&self,
start_version: impl Into<Option<Version>>,
limit: impl Into<Option<usize>>,
) -> DeltaResult<impl Iterator<Item = (Version, CommitInfo)>> {
// let start_version = start_version.into();
let fs_client = self.engine.get_file_system_client();
let end_version = start_version.into().unwrap_or_else(|| self.version());
let start_version = limit
.into()
.and_then(|limit| {
if limit == 0 {
Some(end_version)
} else {
Some(end_version.saturating_sub(limit as u64 - 1))
}
})
.unwrap_or(0);
let log_root = self.inner.table_root().join("_delta_log").unwrap();
let mut log_segment = LogSegment::for_table_changes(
fs_client.as_ref(),
log_root,
start_version,
end_version,
)?;
log_segment.ascending_commit_files.reverse();
let files = log_segment
.ascending_commit_files
.iter()
.map(|commit_file| (commit_file.location.location.clone(), None))
.collect_vec();

Ok(fs_client
.read_files(files)?
.zip(log_segment.ascending_commit_files.into_iter())
.filter_map(|(data, path)| {
data.ok().and_then(|d| {
let reader = BufReader::new(Cursor::new(d));
for line in reader.lines() {
match line.and_then(|l| Ok(serde_json::from_str::<Action>(&l)?)) {
Ok(Action::CommitInfo(commit_info)) => {
return Some((path.version, commit_info))
}
Err(e) => return None,
_ => continue,
};
}
None
})
}))
}
}

impl LazySnapshot {
Expand Down Expand Up @@ -138,7 +193,7 @@ impl LazySnapshot {
}

/// A shared reference to the engine used for interacting with the Delta Table.
pub(super) fn engine_ref(&self) -> &Arc<dyn Engine> {
pub(crate) fn engine_ref(&self) -> &Arc<dyn Engine> {
&self.engine
}

Expand Down
98 changes: 95 additions & 3 deletions crates/core/src/kernel/snapshot_next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use std::sync::Arc;

use arrow_array::RecordBatch;
use delta_kernel::actions::visitors::SetTransactionMap;
use delta_kernel::actions::{Add, Metadata, Protocol, SetTransaction};
use delta_kernel::actions::{Metadata, Protocol, SetTransaction};
use delta_kernel::expressions::{Scalar, StructData};
use delta_kernel::schema::Schema;
use delta_kernel::table_properties::TableProperties;
use delta_kernel::Version;
use iterators::{AddIterator, AddView, AddViewItem};
use iterators::{AddView, AddViewItem};
use url::Url;

use crate::kernel::actions::CommitInfo;
use crate::{DeltaResult, DeltaTableError};

pub use eager::EagerSnapshot;
Expand Down Expand Up @@ -77,7 +78,7 @@ pub trait Snapshot {
fn files_view(
&self,
) -> DeltaResult<impl Iterator<Item = DeltaResult<impl Iterator<Item = AddViewItem>>>> {
Ok(self.files()?.map(|r| r.and_then(|b| AddView::try_new(b))))
Ok(self.files()?.map(|r| r.and_then(AddView::try_new)))
}

fn tombstones(&self) -> DeltaResult<impl Iterator<Item = DeltaResult<RecordBatch>>>;
Expand All @@ -93,10 +94,40 @@ pub trait Snapshot {
///
/// Initiates a log scan, but terminates as soon as the transaction
/// for the given application is found.
///
/// # Parameters
/// - `app_id`: The application id for which to fetch the transaction.
///
/// # Returns
/// The latest transaction for the given application id, if it exists.
fn application_transaction(
&self,
app_id: impl AsRef<str>,
) -> DeltaResult<Option<SetTransaction>>;

/// Get commit info for the table.
///
/// The [`CommitInfo`]s are returned in descending order of version
/// with the most recent commit first starting from the `start_version`.
///
/// [`CommitInfo`]s are read on a best-effort basis. If the action
/// for a version is not available or cannot be parsed, it is skipped.
///
/// # Parameters
/// - `start_version`: The version from which to start fetching commit info.
/// Defaults to the latest version.
/// - `limit`: The maximum number of commit infos to fetch.
///
/// # Returns
/// An iterator of commit info tuples. The first element of the tuple is the version
/// of the commit, the second element is the corresponding commit info.
// TODO(roeap): this is currently using our commit info, we should be using
// the definition form kernel, once handling over there matured.
fn commit_infos(
&self,
start_version: impl Into<Option<Version>>,
limit: impl Into<Option<usize>>,
) -> DeltaResult<impl Iterator<Item = (Version, CommitInfo)>>;
}

impl<T: Snapshot> Snapshot for Arc<T> {
Expand Down Expand Up @@ -142,6 +173,67 @@ impl<T: Snapshot> Snapshot for Arc<T> {
) -> DeltaResult<Option<SetTransaction>> {
self.as_ref().application_transaction(app_id)
}

fn commit_infos(
&self,
start_version: impl Into<Option<Version>>,
limit: impl Into<Option<usize>>,
) -> DeltaResult<impl Iterator<Item = (Version, CommitInfo)>> {
self.as_ref().commit_infos(start_version, limit)
}
}

impl<T: Snapshot> Snapshot for Box<T> {
fn table_root(&self) -> &Url {
self.as_ref().table_root()
}

fn version(&self) -> Version {
self.as_ref().version()
}

fn schema(&self) -> &Schema {
self.as_ref().schema()
}

fn metadata(&self) -> &Metadata {
self.as_ref().metadata()
}

fn protocol(&self) -> &Protocol {
self.as_ref().protocol()
}

fn table_properties(&self) -> &TableProperties {
self.as_ref().table_properties()
}

fn files(&self) -> DeltaResult<impl Iterator<Item = DeltaResult<RecordBatch>>> {
self.as_ref().files()
}

fn tombstones(&self) -> DeltaResult<impl Iterator<Item = DeltaResult<RecordBatch>>> {
self.as_ref().tombstones()
}

fn application_transactions(&self) -> DeltaResult<SetTransactionMap> {
self.as_ref().application_transactions()
}

fn application_transaction(
&self,
app_id: impl AsRef<str>,
) -> DeltaResult<Option<SetTransaction>> {
self.as_ref().application_transaction(app_id)
}

fn commit_infos(
&self,
start_version: impl Into<Option<Version>>,
limit: impl Into<Option<usize>>,
) -> DeltaResult<impl Iterator<Item = (Version, CommitInfo)>> {
self.as_ref().commit_infos(start_version, limit)
}
}

#[cfg(test)]
Expand Down

0 comments on commit e7c7766

Please sign in to comment.