Skip to content

Commit

Permalink
Prepare to support parquet row group skipping (#381)
Browse files Browse the repository at this point in the history
In preparation for
#362 that
actually implements parquet row group skipping, here we make various
preparatory changes that can stand on their own:
* Plumb the predicates through to the parquet readers, so that they can
easily start using them
* Add and use a new `Expression::is_not_null` helper that does what it
says
* Factor out `replay_for_XXX` methods, so that log replay involving
push-down predicates can be tested independently.
* Don't involve <n>.json in log replay if <n>.checkpoint.parquet is
available

This should make both changes easier to review.
  • Loading branch information
scovich authored Oct 9, 2024
1 parent 340c5e4 commit 4b602ae
Show file tree
Hide file tree
Showing 16 changed files with 157 additions and 41 deletions.
1 change: 1 addition & 0 deletions ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ fn read_parquet_file_impl(
last_modified: file.last_modified,
size: file.size,
};
// TODO: Plumb the predicate through the FFI?
let data = parquet_handler.read_parquet_files(&[delta_fm], physical_schema, None)?;
let res = Box::new(FileReadResultIterator {
data,
Expand Down
18 changes: 14 additions & 4 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
_predicate: Option<Expression>,
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
Expand All @@ -62,10 +62,15 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
// -> parse to parquet
// SAFETY: we did is_empty check above, this is ok.
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() {
"http" | "https" => Box::new(PresignedUrlOpener::new(1024, physical_schema.clone())),
"http" | "https" => Box::new(PresignedUrlOpener::new(
1024,
physical_schema.clone(),
predicate,
)),
_ => Box::new(ParquetOpener::new(
1024,
physical_schema.clone(),
predicate,
self.store.clone(),
)),
};
Expand All @@ -83,20 +88,23 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
struct ParquetOpener {
// projection: Arc<[usize]>,
batch_size: usize,
limit: Option<usize>,
table_schema: SchemaRef,
_predicate: Option<Expression>,
limit: Option<usize>,
store: Arc<DynObjectStore>,
}

impl ParquetOpener {
pub(crate) fn new(
batch_size: usize,
table_schema: SchemaRef,
predicate: Option<Expression>,
store: Arc<DynObjectStore>,
) -> Self {
Self {
batch_size,
table_schema,
_predicate: predicate,
limit: None,
store,
}
Expand Down Expand Up @@ -153,16 +161,18 @@ impl FileOpener for ParquetOpener {
/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
struct PresignedUrlOpener {
batch_size: usize,
_predicate: Option<Expression>,
limit: Option<usize>,
table_schema: SchemaRef,
client: reqwest::Client,
}

impl PresignedUrlOpener {
pub(crate) fn new(batch_size: usize, schema: SchemaRef) -> Self {
pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option<Expression>) -> Self {
Self {
batch_size,
table_schema: schema,
_predicate: predicate,
limit: None,
client: reqwest::Client::new(),
}
Expand Down
5 changes: 5 additions & 0 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ impl Expression {
Self::unary(UnaryOperator::IsNull, self)
}

/// Create a new expression `self IS NOT NULL`
pub fn is_not_null(self) -> Self {
!Self::is_null(self)
}

/// Create a new expression `self == other`
pub fn eq(self, other: Self) -> Self {
Self::binary(BinaryOperator::Equal, self, other)
Expand Down
46 changes: 35 additions & 11 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,22 +217,27 @@ impl Scan {
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanData>>> {
Ok(scan_action_iter(
engine,
self.replay_for_scan_data(engine)?,
&self.logical_schema,
&self.predicate,
))
}

// Factored out to facilitate testing
fn replay_for_scan_data(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;

let log_iter = self.snapshot.log_segment.replay(
self.snapshot.log_segment.replay(
engine,
commit_read_schema,
checkpoint_read_schema,
self.predicate.clone(),
)?;

Ok(scan_action_iter(
engine,
log_iter,
&self.logical_schema,
&self.predicate,
))
)
}

/// Get global state that is valid for the entire scan. This is somewhat expensive so should
Expand Down Expand Up @@ -312,7 +317,7 @@ impl Scan {
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[meta],
global_state.read_schema.clone(),
None,
self.predicate().clone(),
)?;
let gs = global_state.clone(); // Arc clone
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
Expand Down Expand Up @@ -714,6 +719,25 @@ mod tests {
}
}

#[test]
fn test_replay_for_scan_data() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let data: Vec<_> = scan
.replay_for_scan_data(&engine)
.unwrap()
.try_collect()
.unwrap();
// No predicate pushdown attempted, because at most one part of a multi-part checkpoint
// could be skipped when looking for adds/removes.
assert_eq!(data.len(), 5);
}

#[test_log::test]
fn test_scan_with_checkpoint() -> DeltaResult<()> {
let path = std::fs::canonicalize(PathBuf::from(
Expand Down
53 changes: 43 additions & 10 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//!

use std::cmp::Ordering;
use std::ops::Not;
use std::sync::Arc;

use itertools::Itertools;
Expand Down Expand Up @@ -71,15 +70,7 @@ impl LogSegment {
}

fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<Option<(Metadata, Protocol)>> {
let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
// filter out log files that do not contain metadata or protocol information
use Expression as Expr;
let filter = Some(Expr::or(
Expr::not(Expr::is_null(Expr::column("metaData.id"))),
Expr::not(Expr::is_null(Expr::column("protocol.minReaderVersion"))),
));
// read the same protocol and metadata schema for both commits and checkpoints
let data_batches = self.replay(engine, schema.clone(), schema, filter)?;
let data_batches = self.replay_for_metadata(engine)?;
let mut metadata_opt: Option<Metadata> = None;
let mut protocol_opt: Option<Protocol> = None;
for batch in data_batches {
Expand All @@ -102,6 +93,22 @@ impl LogSegment {
_ => Err(Error::MissingMetadataAndProtocol),
}
}

// Factored out to facilitate testing
fn replay_for_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
// filter out log files that do not contain metadata or protocol information
use Expression as Expr;
let meta_predicate = Expr::or(
Expr::column("metaData.id").is_not_null(),
Expr::column("protocol.minReaderVersion").is_not_null(),
);
// read the same protocol and metadata schema for both commits and checkpoints
self.replay(engine, schema.clone(), schema, Some(meta_predicate))
}
}

// TODO expose methods for accessing the files of a table (with file pruning).
Expand Down Expand Up @@ -168,6 +175,10 @@ impl Snapshot {
if let Some(version) = version {
commit_files.retain(|log_path| log_path.version <= version);
}
// only keep commit files above the checkpoint we found
if let Some(checkpoint_file) = checkpoint_files.first() {
commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}

// get the effective version from chosen files
let version_eff = commit_files
Expand Down Expand Up @@ -445,6 +456,7 @@ mod tests {
use crate::engine::default::filesystem::ObjectStoreFileSystemClient;
use crate::engine::sync::SyncEngine;
use crate::schema::StructType;
use crate::Table;

#[test]
fn test_snapshot_read_metadata() {
Expand Down Expand Up @@ -616,6 +628,27 @@ mod tests {
assert!(invalid.is_none())
}

#[test]
fn test_replay_for_metadata() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let data: Vec<_> = snapshot
.log_segment
.replay_for_metadata(&engine)
.unwrap()
.try_collect()
.unwrap();
// The checkpoint has five parts, each containing one action. The P&M come from first and
// third parts, respectively. The actual `read_metadata` will also skip the last two parts
// because it terminates the iteration immediately after finding both P&M.
// TODO: Implement parquet row group skipping so we filter out all but two files.
assert_eq!(data.len(), 5);
}

#[test_log::test]
fn test_read_table_with_checkpoint() {
let path = std::fs::canonicalize(PathBuf::from(
Expand Down
57 changes: 44 additions & 13 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::Arc;

use crate::actions::visitors::TransactionVisitor;
use crate::actions::{get_log_schema, TRANSACTION_NAME};
use crate::actions::{get_log_schema, Transaction, TRANSACTION_NAME};
use crate::snapshot::Snapshot;
use crate::Engine;
use crate::{actions::Transaction, DeltaResult};
use crate::{DeltaResult, Engine, EngineData, SchemaRef};

pub use crate::actions::visitors::TransactionMap;
pub struct TransactionScanner {
Expand All @@ -22,17 +21,11 @@ impl TransactionScanner {
engine: &dyn Engine,
application_id: Option<&str>,
) -> DeltaResult<TransactionMap> {
let schema = get_log_schema().project(&[TRANSACTION_NAME])?;

let schema = Self::get_txn_schema()?;
let mut visitor = TransactionVisitor::new(application_id.map(|s| s.to_owned()));

// when all ids are requested then a full scan of the log to the latest checkpoint is required
let iter =
self.snapshot
.log_segment
.replay(engine, schema.clone(), schema.clone(), None)?;

for maybe_data in iter {
// If a specific id is requested then we can terminate log replay early as soon as it was
// found. If all ids are requested then we are forced to replay the entire log.
for maybe_data in self.replay_for_app_ids(engine, schema.clone())? {
let (txns, _) = maybe_data?;
txns.extract(schema.clone(), &mut visitor)?;
// if a specific id is requested and a transaction was found, then return
Expand All @@ -44,6 +37,22 @@ impl TransactionScanner {
Ok(visitor.transactions)
}

// Factored out to facilitate testing
fn get_txn_schema() -> DeltaResult<SchemaRef> {
get_log_schema().project(&[TRANSACTION_NAME])
}

// Factored out to facilitate testing
fn replay_for_app_ids(
&self,
engine: &dyn Engine,
schema: SchemaRef,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
self.snapshot
.log_segment
.replay(engine, schema.clone(), schema, None)
}

/// Scan the Delta Log for the latest transaction entry of an application
pub fn application_transaction(
&self,
Expand All @@ -67,6 +76,7 @@ mod tests {
use super::*;
use crate::engine::sync::SyncEngine;
use crate::Table;
use itertools::Itertools;

fn get_latest_transactions(path: &str, app_id: &str) -> (TransactionMap, Option<Transaction>) {
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
Expand Down Expand Up @@ -117,4 +127,25 @@ mod tests {
.as_ref()
);
}

#[test]
fn test_replay_for_app_ids() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let txn = TransactionScanner::new(snapshot.into());
let txn_schema = TransactionScanner::get_txn_schema().unwrap();

// The checkpoint has five parts, each containing one action. There are two app ids.
// TODO: Implement parquet row group skipping so we only read two files.
let data: Vec<_> = txn
.replay_for_app_ids(&engine, txn_schema.clone())
.unwrap()
.try_collect()
.unwrap();
assert_eq!(data.len(), 5);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1728065840472,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.checkpoint.writeStatsAsStruct\":\"false\",\"delta.dataSkippingNumIndexedCols\":\"0\",\"delta.checkpoint.writeStatsAsJson\":\"false\",\"delta.checkpointInterval\":\"1\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.2.1","txnId":"aef9df5a-e8a9-4d36-af75-2ffd4dc6b6cf"}}
{"metaData":{"id":"fd39678a-d482-4fe2-99d3-52732e7fbb09","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"bool\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"chrono\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"date32\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"timestamp_ntz\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"numeric\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"decimals\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"decimal128\",\"type\":\"decimal(32,3)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal32\",\"type\":\"decimal(8,3)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal64\",\"type\":\"decimal(16,3)\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"floats\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"float32\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"float64\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"ints\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"int16\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int32\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int64\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int8\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"varlen\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"binary\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{}},{\"name\":\"utf8\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsStruct":"false","delta.dataSkippingNumIndexedCols":"0","delta.checkpoint.writeStatsAsJson":"false","delta.checkpointInterval":"1"},"createdTime":1728065840373}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1728065844007,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"4959"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.2.1","txnId":"d46d4bca-ab50-4075-977f-80a5b3844afa"}}
{"add":{"path":"part-00000-b92e017a-50ba-4676-8322-48fc371c2b59-c000.snappy.parquet","partitionValues":{},"size":4959,"modificationTime":1728065843972,"dataChange":true,"stats":"{\"numRecords\":5}"}}
{"txn":{"appId":"3ae45b72-24e1-865a-a211-34987ae02f2a","version":4390}}
{"txn":{"appId":"b42b951f-f5d1-4f6e-be2a-0d11d1543029","version":1235}}
Loading

0 comments on commit 4b602ae

Please sign in to comment.