Skip to content

Commit

Permalink
feat: add owned file view
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
  • Loading branch information
roeap committed Jan 16, 2025
1 parent 09c04e3 commit 3c7db81
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 44 deletions.
3 changes: 1 addition & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ tokio = { workspace = true, features = [
] }

# cahce
quick_cache = { version = "0.6.9", optional = true }
quick_cache = { version = "0.6.9" }

# other deps (these should be organized and pulled into workspace.dependencies as necessary)
cfg-if = "1"
Expand Down Expand Up @@ -132,4 +132,3 @@ datafusion = [
datafusion-ext = ["datafusion"]
json = ["parquet/json"]
python = ["arrow/pyarrow"]
log-cache = ["quick_cache"]
221 changes: 185 additions & 36 deletions crates/core/src/kernel/snapshot/next.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,46 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock};
use std::collections::HashSet;
use std::sync::Arc;

use ::serde::{Deserialize, Serialize};
use arrow::compute::{concat_batches, filter_record_batch};
use arrow_arith::boolean::{and, is_null, not};
use arrow_array::{BooleanArray, RecordBatch};
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::{Array, BooleanArray, RecordBatch};
use chrono::{DateTime, Utc};
use delta_kernel::actions::set_transaction::{SetTransactionMap, SetTransactionScanner};
use delta_kernel::actions::{
get_log_add_schema, get_log_schema, Add, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME,
get_log_add_schema, get_log_schema, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME,
REMOVE_NAME, SET_TRANSACTION_NAME,
};
use delta_kernel::actions::{Metadata, Protocol};
use delta_kernel::actions::{Metadata, Protocol, SetTransaction};
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::{
TokioBackgroundExecutor, TokioMultiThreadExecutor,
};
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine_data::{GetData, RowVisitor, TypedGetData as _};
use delta_kernel::expressions::ColumnName;
use delta_kernel::expressions::{Scalar, StructData};
use delta_kernel::scan::log_replay::scan_action_iter;
use delta_kernel::scan::state::{DvInfo, Stats};
use delta_kernel::scan::{scan_row_schema, PhysicalPredicate, ScanBuilder, ScanData};
use delta_kernel::schema::{ColumnNamesAndTypes, DataType, Schema};
use delta_kernel::scan::{scan_row_schema, PhysicalPredicate};
use delta_kernel::schema::Schema;
use delta_kernel::snapshot::Snapshot as SnapshotInner;
use delta_kernel::table_properties::TableProperties;
use delta_kernel::{
DeltaResult as KernelResult, Engine, EngineData, Error, Expression, Table, Version,
};
use futures::{StreamExt, TryStreamExt};
use delta_kernel::{DeltaResult as KernelResult, Engine, EngineData, Expression, Table, Version};
use itertools::Itertools;
use object_store::path::Path;
use object_store::ObjectStore;
use tracing::warn;
use url::Url;

use crate::kernel::scalars::ScalarExt;
use crate::kernel::ActionType;
use crate::storage::cache::CommitCacheObjectStore;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

type ReplayIter = Box<dyn Iterator<Item = KernelResult<(Box<dyn EngineData>, Vec<bool>)>>>;

type LocalFileSystem = CommitCacheObjectStore;

impl ActionType {
pub(self) fn field_name_unckecked(&self) -> &'static str {
match self {
Expand Down Expand Up @@ -76,28 +79,42 @@ pub struct Snapshot {
}

impl Snapshot {
/// Create a new [`Snapshot`] instance.
pub fn new(inner: Arc<SnapshotInner>, engine: Arc<dyn Engine>) -> Self {
Self { inner, engine }
}

/// Create a new [`Snapshot`] instance for a table.
pub async fn try_new(
table: Table,
store: Arc<dyn ObjectStore>,
version: Option<i64>,
) -> DeltaResult<Self> {
// let executor = Arc::new(TokioMultiThreadExecutor::new(
// config
// .io_runtime
// .map(|rt| rt.get_handle())
// .unwrap_or(tokio::runtime::Handle::current()),
// ));
let executor = Arc::new(TokioMultiThreadExecutor::new(
tokio::runtime::Handle::current(),
));
// TODO: how to deal with the dedicated IO runtime? Would this already be covered by the
// object store implementation pass to this?
let table_root = Path::from_url_path(table.location().path())?;
let engine = DefaultEngine::new(store, table_root, executor);
let snapshot = table.snapshot(&engine, version.map(|v| v as u64))?;
Ok(Self::new(Arc::new(snapshot), Arc::new(engine)))
let store_str = format!("{}", store);
let is_local = store_str.starts_with("LocalFileSystem");
let store = Arc::new(CommitCacheObjectStore::new(store));
let handle = tokio::runtime::Handle::current();
let engine: Arc<dyn Engine> = match handle.runtime_flavor() {
tokio::runtime::RuntimeFlavor::MultiThread => Arc::new(DefaultEngine::new_with_opts(
store,
table_root,
Arc::new(TokioMultiThreadExecutor::new(handle)),
!is_local,
)),
tokio::runtime::RuntimeFlavor::CurrentThread => Arc::new(DefaultEngine::new_with_opts(
store,
table_root,
Arc::new(TokioBackgroundExecutor::new()),
!is_local,
)),
_ => return Err(DeltaTableError::generic("unsupported runtime flavor")),
};

let snapshot = table.snapshot(engine.as_ref(), version.map(|v| v as u64))?;
Ok(Self::new(Arc::new(snapshot), engine))
}

pub(crate) fn engine_ref(&self) -> &Arc<dyn Engine> {
Expand All @@ -120,7 +137,7 @@ impl Snapshot {
self.inner.protocol()
}

pub fn metadata(&self) -> &delta_kernel::actions::Metadata {
pub fn metadata(&self) -> &Metadata {
self.inner.metadata()
}

Expand All @@ -142,6 +159,28 @@ impl Snapshot {
.map(|f| f.location.last_modified)
}

/// Scan the Delta Log to obtain the latest transaction for all applications
///
/// This method requires a full scan of the log to find all transactions.
/// When a specific application id is requested, it is much more efficient to use
/// [`application_transaction`](Self::application_transaction) instead.
pub fn application_transactions(&self) -> DeltaResult<SetTransactionMap> {
let scanner = SetTransactionScanner::new(self.inner.clone());
Ok(scanner.application_transactions(self.engine.as_ref())?)
}

/// Scan the Delta Log for the latest transaction entry for a specific application.
///
/// Initiates a log scan, but terminates as soon as the transaction
/// for the given application is found.
pub fn application_transaction(
&self,
app_id: impl AsRef<str>,
) -> DeltaResult<Option<SetTransaction>> {
let scanner = SetTransactionScanner::new(self.inner.clone());
Ok(scanner.application_transaction(self.engine.as_ref(), app_id.as_ref())?)
}

fn log_data(
&self,
types: &[ActionType],
Expand Down Expand Up @@ -170,12 +209,6 @@ impl Snapshot {
))
})
.flatten())
// let it = scan_action_iter(
// engine,
// self.replay_for_scan_data(engine)?,
// physical_predicate,
// );
// Ok(Some(it).into_iter().flatten())
}
}

Expand Down Expand Up @@ -309,6 +342,109 @@ impl EagerSnapshot {
pub fn table_properties(&self) -> &TableProperties {
&self.snapshot.table_properties()
}

pub fn files(&self) -> impl Iterator<Item = LogicalFileView> {
LogicalFileView {
files: self.files.clone(),
index: 0,
}
}

/// Get the number of files in the current snapshot
pub fn files_count(&self) -> usize {
self.files.num_rows()
}
}

/// Helper trait to extract individual values from a `StructData`.
pub trait StructDataExt {
fn get(&self, key: &str) -> Option<&Scalar>;
}

impl StructDataExt for StructData {
fn get(&self, key: &str) -> Option<&Scalar> {
self.fields()
.iter()
.zip(self.values().iter())
.find(|(k, _)| k.name() == key)
.map(|(_, v)| v)
}
}

#[derive(Clone)]
pub struct LogicalFileView {
files: RecordBatch,
index: usize,
}

impl LogicalFileView {
/// Path of the file.
pub fn path(&self) -> &str {
self.files.column(0).as_string::<i32>().value(self.index)
}

/// Size of the file in bytes.
pub fn size(&self) -> i64 {
self.files
.column(1)
.as_primitive::<Int64Type>()
.value(self.index)
}

/// Modification time of the file in milliseconds since epoch.
pub fn modification_time(&self) -> i64 {
self.files
.column(2)
.as_primitive::<Int64Type>()
.value(self.index)
}

/// Datetime of the last modification time of the file.
pub fn modification_datetime(&self) -> DeltaResult<chrono::DateTime<Utc>> {
DateTime::from_timestamp_millis(self.modification_time()).ok_or(DeltaTableError::from(
crate::protocol::ProtocolError::InvalidField(format!(
"invalid modification_time: {:?}",
self.modification_time()
)),
))
}

pub fn stats(&self) -> Option<&str> {
let col = self.files.column(3).as_string::<i32>();
col.is_valid(self.index).then(|| col.value(self.index))
}

pub fn partition_values(&self) -> Option<StructData> {
self.files
.column_by_name("fileConstantValues")
.and_then(|col| col.as_struct_opt())
.and_then(|s| s.column_by_name("partitionValues"))
.and_then(|arr| {
arr.is_valid(self.index)
.then(|| match Scalar::from_array(arr, self.index) {
Some(Scalar::Struct(s)) => Some(s),
_ => None,
})
.flatten()
})
}
}

impl Iterator for LogicalFileView {
type Item = LogicalFileView;

fn next(&mut self) -> Option<Self::Item> {
if self.index < self.files.num_rows() {
let file = LogicalFileView {
files: self.files.clone(),
index: self.index,
};
self.index += 1;
Some(file)
} else {
None
}
}
}

#[cfg(test)]
Expand All @@ -331,11 +467,11 @@ mod tests {
rep_root
}

#[tokio::test(flavor = "multi_thread")]
async fn load_snapshot() -> TestResult<()> {
// some comment
let mut dat_dir = get_dat_dir();
dat_dir.push("multi_partitioned");

let dat_info: TestCaseInfo = read_dat_case(dat_dir)?;
let table_info = dat_info.table_summary()?;

Expand All @@ -350,14 +486,27 @@ mod tests {

assert_eq!(snapshot.version(), table_info.version);
assert_eq!(
snapshot.protocol().min_reader_version(),
table_info.min_reader_version
(
snapshot.protocol().min_reader_version(),
snapshot.protocol().min_writer_version()
),
(table_info.min_reader_version, table_info.min_writer_version)
);

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn load_snapshot_multi() -> TestResult<()> {
load_snapshot().await
}

#[tokio::test(flavor = "current_thread")]
async fn load_snapshot_current() -> TestResult<()> {
load_snapshot().await
}

#[tokio::test]
async fn load_eager_snapshot() -> TestResult<()> {
// some comment
let mut dat_dir = get_dat_dir();
Expand Down
16 changes: 10 additions & 6 deletions crates/core/src/storage/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{
Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
Expand Down Expand Up @@ -46,21 +46,22 @@ impl Entry {
/// the object store are immutable and no attempt is made to invalidate the cache
/// when files are updated in the remote object store.
#[derive(Clone)]
pub(crate) struct ConditionallyCachedObjectStore {
pub(crate) struct CommitCacheObjectStore {
inner: Arc<dyn ObjectStore>,
check: Arc<dyn Fn(&Path) -> bool + Send + Sync>,
cache: Arc<Cache<Path, Entry>>,
has_ordered_listing: bool,
}

impl std::fmt::Debug for ConditionallyCachedObjectStore {
impl std::fmt::Debug for CommitCacheObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConditionallyCachedObjectStore")
.field("object_store", &self.inner)
.finish()
}
}

impl std::fmt::Display for ConditionallyCachedObjectStore {
impl std::fmt::Display for CommitCacheObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ConditionallyCachedObjectStore({})", self.inner)
}
Expand All @@ -71,13 +72,16 @@ fn cache_json(path: &Path) -> bool {
.map_or(false, |ext| ext.eq_ignore_ascii_case("json"))
}

impl ConditionallyCachedObjectStore {
impl CommitCacheObjectStore {
/// Create a new conditionally cached object store.
pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
let store_str = format!("{}", inner);
let is_local = store_str.starts_with("LocalFileSystem");
Self {
inner,
check: Arc::new(cache_json),
cache: Arc::new(Cache::new(100)),
has_ordered_listing: !is_local,
}
}

Expand Down Expand Up @@ -121,7 +125,7 @@ impl ConditionallyCachedObjectStore {
}

#[async_trait::async_trait]
impl ObjectStore for ConditionallyCachedObjectStore {
impl ObjectStore for CommitCacheObjectStore {
async fn put_opts(
&self,
location: &Path,
Expand Down

0 comments on commit 3c7db81

Please sign in to comment.