From 5b85fce9a14e2c6de89cd534500b42e0e7568299 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 17 Sep 2023 16:12:52 -0700 Subject: [PATCH] refactor: move file readers to sync APIs --- acceptance/src/meta.rs | 10 +- kernel/Cargo.toml | 11 +- kernel/src/actions/mod.rs | 5 +- kernel/src/actions/types.rs | 5 +- kernel/src/client/filesystem.rs | 66 ++++++++---- kernel/src/client/json.rs | 59 +++++++---- kernel/src/client/mod.rs | 22 ++-- kernel/src/client/parquet.rs | 58 ++++++++--- kernel/src/lib.rs | 15 ++- kernel/src/scan/file_stream.rs | 179 ++++++++++++-------------------- kernel/src/scan/mod.rs | 109 ++++++++----------- kernel/src/snapshot.rs | 87 ++++++++-------- kernel/tests/dv.rs | 16 +-- kernel/tests/read.rs | 16 +-- 14 files changed, 336 insertions(+), 322 deletions(-) diff --git a/acceptance/src/meta.rs b/acceptance/src/meta.rs index c0deead0..405a2331 100644 --- a/acceptance/src/meta.rs +++ b/acceptance/src/meta.rs @@ -78,7 +78,7 @@ impl TestCaseInfo { Ok((latest, cases)) } - async fn assert_snapshot_meta( + fn assert_snapshot_meta( &self, case: &TableVersionMetaData, snapshot: &Snapshot, @@ -86,8 +86,8 @@ impl TestCaseInfo { assert_eq!(snapshot.version(), case.version); // assert correct metadata is read - let metadata = snapshot.metadata().await?; - let protocol = snapshot.protocol().await?; + let metadata = snapshot.metadata()?; + let protocol = snapshot.protocol()?; let tvm = TableVersionMetaData { version: snapshot.version(), properties: metadata @@ -111,11 +111,11 @@ impl TestCaseInfo { let (latest, versions) = self.versions().await?; let snapshot = table.snapshot(None)?; - self.assert_snapshot_meta(&latest, &snapshot).await?; + self.assert_snapshot_meta(&latest, &snapshot)?; for table_version in versions { let snapshot = table.snapshot(Some(table_version.version))?; - self.assert_snapshot_meta(&table_version, &snapshot).await?; + self.assert_snapshot_meta(&table_version, &snapshot)?; } Ok(()) diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 643b28b0..760321cf 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -15,18 +15,15 @@ arrow-json = { version = "^46.0" } arrow-ord = { version = "^46.0" } arrow-schema = { version = "^46.0" } arrow-select = { version = "^46.0" } -async-trait = "0.1" bytes = "1.4" chrono = { version = "0.4", optional = true } either = "1.8" fix-hidden-lifetime-bug = "0.2" -futures = "0.3" itertools = "0.11" lazy_static = "1.4" # used for providing a storage abstraction layer object_store = "^0.7.0" # need to generalize over arrow, arrow2 and diff parquet etc. (BYOP) -parquet = "^46.0" regex = "1.8" roaring = "0.10.1" serde = { version = "1", features = ["derive"] } @@ -38,12 +35,16 @@ url = "2" uuid = "1.3.0" z85 = "3.0.5" -# optionally used with default client +# Used in default client +futures = { version = "0.3", optional = true } +parquet = { version = "^46.0", optional = true, features=["async", "object_store"]} + +# optionally used with default client (though not required) tokio = { version = "1", optional = true, features=["rt-multi-thread"] } [features] default = ["default-client"] -default-client = ["chrono", "parquet/async", "parquet/object_store"] +default-client = ["chrono", "parquet", "futures"] [dev-dependencies] arrow = { version = "^46.0", features = ["json", "prettyprint"] } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 9efab307..5cd7fd0e 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -529,11 +529,12 @@ mod tests { use super::*; use crate::actions::Protocol; use crate::client::json::DefaultJsonHandler; + use crate::executor::tokio::TokioBackgroundExecutor; use crate::JsonHandler; fn action_batch() -> RecordBatch { let store = Arc::new(LocalFileSystem::new()); - let handler = DefaultJsonHandler::new(store); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let json_strings: StringArray = vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, @@ -596,7 +597,7 @@ mod tests { #[test] fn test_parse_add_partitioned() { let store = Arc::new(LocalFileSystem::new()); - let handler = DefaultJsonHandler::new(store); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let json_strings: StringArray = vec![ r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#, diff --git a/kernel/src/actions/types.rs b/kernel/src/actions/types.rs index 761f06e1..66d24b7b 100644 --- a/kernel/src/actions/types.rs +++ b/kernel/src/actions/types.rs @@ -226,9 +226,8 @@ impl DeletionVectorDescriptor { let dv_data = fs_client .read_files(vec![(path, None)])? - .first() - .ok_or(Error::MissingData("No deletion Vecor data".to_string()))? - .clone(); + .next() + .ok_or(Error::MissingData("No deletion Vector data".to_string()))??; let mut cursor = Cursor::new(dv_data); if let Some(offset) = offset { diff --git a/kernel/src/client/filesystem.rs b/kernel/src/client/filesystem.rs index 7c97ddaa..925681a2 100644 --- a/kernel/src/client/filesystem.rs +++ b/kernel/src/client/filesystem.rs @@ -1,18 +1,19 @@ use std::sync::Arc; use bytes::Bytes; -use futures::stream::{StreamExt, TryStreamExt}; +use futures::stream::StreamExt; use object_store::path::Path; use object_store::DynObjectStore; use url::Url; -use crate::{executor::TaskExecutor, DeltaResult, FileMeta, FileSlice, FileSystemClient}; +use crate::{executor::TaskExecutor, DeltaResult, Error, FileMeta, FileSlice, FileSystemClient}; #[derive(Debug)] pub struct ObjectStoreFileSystemClient { inner: Arc, table_root: Path, task_executor: Arc, + readahead: usize, } impl ObjectStoreFileSystemClient { @@ -21,8 +22,15 @@ impl ObjectStoreFileSystemClient { inner: store, table_root, task_executor, + readahead: 10, } } + + /// Set the maximum number of files to read in parallel. + pub fn with_readahead(mut self, readahead: usize) -> Self { + self.readahead = readahead; + self + } } impl FileSystemClient for ObjectStoreFileSystemClient { @@ -82,26 +90,42 @@ impl FileSystemClient for ObjectStoreFileSystemClient { } /// Read data specified by the start and end offset from the file. - fn read_files(&self, files: Vec) -> DeltaResult> { + /// + /// This will return the data in the same order as the provided file slices. + /// + /// Multiple reads may occur in parallel, depending on the configured readahead. + /// See [`Self::with_readahead`]. + fn read_files( + &self, + files: Vec, + ) -> DeltaResult>>> { let store = self.inner.clone(); - let fut = futures::stream::iter(files) - .map(move |(url, range)| { - let path = Path::from(url.path()); - let store = store.clone(); - async move { - if let Some(rng) = range { - store.get_range(&path, rng).await - } else { - let result = store.get(&path).await?; - result.bytes().await + + // This channel will become the iterator + let (sender, receiver) = std::sync::mpsc::sync_channel(20); + + self.task_executor.spawn( + futures::stream::iter(files) + .map(move |(url, range)| { + let path = Path::from(url.path()); + let store = store.clone(); + async move { + if let Some(rng) = range { + store.get_range(&path, rng).await + } else { + let result = store.get(&path).await?; + result.bytes().await + } } - } - }) - // Read up to 10 files concurrently. - .buffered(10) - .try_collect::>(); + }) + .buffered(self.readahead) + .for_each(move |res| { + sender.send(res.map_err(Error::from)).ok(); + futures::future::ready(()) + }), + ); - Ok(self.task_executor.block_on(fut)?) + Ok(Box::new(receiver.into_iter())) } } @@ -113,6 +137,8 @@ mod tests { use crate::executor::tokio::TokioBackgroundExecutor; + use itertools::Itertools; + use super::*; #[tokio::test] @@ -145,7 +171,7 @@ mod tests { url.set_path(&format!("{}/c", url.path())); slices.push((url, Some(Range { start: 4, end: 9 }))); - let data = client.read_files(slices).unwrap(); + let data: Vec = client.read_files(slices).unwrap().try_collect().unwrap(); assert_eq!(data.len(), 3); assert_eq!(data[0], Bytes::from("kernel")); diff --git a/kernel/src/client/json.rs b/kernel/src/client/json.rs index c54c0371..7f86f26e 100644 --- a/kernel/src/client/json.rs +++ b/kernel/src/client/json.rs @@ -10,15 +10,16 @@ use arrow_json::ReaderBuilder; use arrow_schema::SchemaRef as ArrowSchemaRef; use arrow_select::concat::concat_batches; use bytes::{Buf, Bytes}; -use futures::stream::{StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{DynObjectStore, GetResultPayload}; use super::file_handler::{FileOpenFuture, FileOpener}; +use crate::executor::TaskExecutor; use crate::file_handler::FileStream; use crate::schema::SchemaRef; use crate::{ - DeltaResult, Error, Expression, FileDataReadResultStream, FileHandler, FileMeta, JsonHandler, + DeltaResult, Error, Expression, FileDataReadResultIterator, FileHandler, FileMeta, JsonHandler, }; #[derive(Debug)] @@ -28,17 +29,28 @@ pub struct JsonReadContext { } #[derive(Debug)] -pub struct DefaultJsonHandler { +pub struct DefaultJsonHandler { store: Arc, + task_executor: Arc, + readahead: usize, } -impl DefaultJsonHandler { - pub fn new(store: Arc) -> Self { - Self { store } +impl DefaultJsonHandler { + pub fn new(store: Arc, task_executor: Arc) -> Self { + Self { + store, + task_executor, + readahead: 10, + } + } + + pub fn with_readahead(mut self, readahead: usize) -> Self { + self.readahead = readahead; + self } } -impl FileHandler for DefaultJsonHandler { +impl FileHandler for DefaultJsonHandler { type FileReadContext = JsonReadContext; fn contextualize_file_reads( @@ -56,8 +68,7 @@ impl FileHandler for DefaultJsonHandler { } } -#[async_trait::async_trait] -impl JsonHandler for DefaultJsonHandler { +impl JsonHandler for DefaultJsonHandler { fn parse_json( &self, json_strings: StringArray, @@ -88,9 +99,9 @@ impl JsonHandler for DefaultJsonHandler { &self, files: Vec<::FileReadContext>, physical_schema: SchemaRef, - ) -> DeltaResult { + ) -> DeltaResult { if files.is_empty() { - return Ok(futures::stream::empty().boxed()); + return Ok(Box::new(std::iter::empty())); } let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?); @@ -99,7 +110,16 @@ impl JsonHandler for DefaultJsonHandler { let files = files.into_iter().map(|f| f.meta).collect::>(); let stream = FileStream::new(files, schema, file_reader)?; - Ok(stream.boxed()) + + // This channel will become the iterator + let (sender, receiver) = std::sync::mpsc::sync_channel(self.readahead); + + self.task_executor.spawn(stream.for_each(move |res| { + sender.send(res).ok(); + futures::future::ready(()) + })); + + Ok(Box::new(receiver.into_iter())) } } @@ -186,15 +206,15 @@ impl FileOpener for JsonOpener { mod tests { use std::path::PathBuf; - use object_store::{local::LocalFileSystem, ObjectStore}; - use super::*; - use crate::actions::get_log_schema; + use crate::{actions::get_log_schema, executor::tokio::TokioBackgroundExecutor}; + use itertools::Itertools; + use object_store::{local::LocalFileSystem, ObjectStore}; #[test] fn test_parse_json() { let store = Arc::new(LocalFileSystem::new()); - let handler = DefaultJsonHandler::new(store); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let json_strings: StringArray = vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, @@ -227,14 +247,13 @@ mod tests { size: meta.size, }]; - let handler = DefaultJsonHandler::new(store); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let physical_schema = Arc::new(get_log_schema()); let context = handler.contextualize_file_reads(files, None).unwrap(); - let data = handler + let data: Vec = handler .read_json_files(context, Arc::new(physical_schema.try_into().unwrap())) .unwrap() - .try_collect::>() - .await + .try_collect() .unwrap(); assert_eq!(data.len(), 1); diff --git a/kernel/src/client/mod.rs b/kernel/src/client/mod.rs index 7b3237b1..a88d47f4 100644 --- a/kernel/src/client/mod.rs +++ b/kernel/src/client/mod.rs @@ -24,8 +24,8 @@ pub mod parquet; pub struct DefaultTableClient { store: Arc, file_system: Arc>, - json: Arc, - parquet: Arc, + json: Arc>, + parquet: Arc>, } impl DefaultTableClient { @@ -46,10 +46,13 @@ impl DefaultTableClient { file_system: Arc::new(ObjectStoreFileSystemClient::new( store.clone(), prefix, - task_executor, + task_executor.clone(), )), - json: Arc::new(DefaultJsonHandler::new(store.clone())), - parquet: Arc::new(DefaultParquetHandler::new(store.clone())), + json: Arc::new(DefaultJsonHandler::new( + store.clone(), + task_executor.clone(), + )), + parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)), store, }) } @@ -59,10 +62,13 @@ impl DefaultTableClient { file_system: Arc::new(ObjectStoreFileSystemClient::new( store.clone(), prefix, - task_executor, + task_executor.clone(), + )), + json: Arc::new(DefaultJsonHandler::new( + store.clone(), + task_executor.clone(), )), - json: Arc::new(DefaultJsonHandler::new(store.clone())), - parquet: Arc::new(DefaultParquetHandler::new(store.clone())), + parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)), store, } } diff --git a/kernel/src/client/parquet.rs b/kernel/src/client/parquet.rs index 58053a5f..24dd5d0b 100644 --- a/kernel/src/client/parquet.rs +++ b/kernel/src/client/parquet.rs @@ -4,17 +4,19 @@ use std::ops::Range; use std::sync::Arc; use arrow_schema::SchemaRef as ArrowSchemaRef; -use futures::stream::{StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::DynObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use super::file_handler::{FileOpenFuture, FileOpener}; +use crate::executor::TaskExecutor; use crate::file_handler::FileStream; use crate::schema::SchemaRef; use crate::{ - DeltaResult, Error, Expression, FileDataReadResultStream, FileHandler, FileMeta, ParquetHandler, + DeltaResult, Error, Expression, FileDataReadResultIterator, FileHandler, FileMeta, + ParquetHandler, }; #[derive(Debug)] @@ -24,17 +26,29 @@ pub struct ParquetReadContext { } #[derive(Debug)] -pub struct DefaultParquetHandler { +pub struct DefaultParquetHandler { store: Arc, + task_executor: Arc, + readahead: usize, } -impl DefaultParquetHandler { - pub fn new(store: Arc) -> Self { - Self { store } +impl DefaultParquetHandler { + pub fn new(store: Arc, task_executor: Arc) -> Self { + Self { + store, + task_executor, + readahead: 10, + } + } + + /// Max number of batches to read asynchronously. + pub fn with_readahead(mut self, readahead: usize) -> Self { + self.readahead = readahead; + self } } -impl FileHandler for DefaultParquetHandler { +impl FileHandler for DefaultParquetHandler { type FileReadContext = ParquetReadContext; fn contextualize_file_reads( @@ -49,16 +63,15 @@ impl FileHandler for DefaultParquetHandler { } } -#[async_trait::async_trait] -impl ParquetHandler for DefaultParquetHandler { +impl ParquetHandler for DefaultParquetHandler { fn read_parquet_files( &self, files: Vec<::FileReadContext>, physical_schema: SchemaRef, - ) -> DeltaResult { + ) -> DeltaResult { // TODO at the very least load only required columns ... if files.is_empty() { - return Ok(futures::stream::empty().boxed()); + return Ok(Box::new(std::iter::empty())); } let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?); @@ -66,7 +79,16 @@ impl ParquetHandler for DefaultParquetHandler { let files = files.into_iter().map(|f| f.meta).collect::>(); let stream = FileStream::new(files, schema, file_reader)?; - Ok(stream.boxed()) + + // This channel will become the iterator + let (sender, receiver) = std::sync::mpsc::sync_channel(self.readahead); + + self.task_executor.spawn(stream.for_each(move |res| { + sender.send(res).ok(); + futures::future::ready(()) + })); + + Ok(Box::new(receiver.into_iter())) } } @@ -135,8 +157,13 @@ impl FileOpener for ParquetOpener { mod tests { use std::path::PathBuf; + use arrow_array::RecordBatch; use object_store::{local::LocalFileSystem, ObjectStore}; + use crate::executor::tokio::TokioBackgroundExecutor; + + use itertools::Itertools; + use super::*; #[tokio::test] @@ -163,14 +190,13 @@ mod tests { size: meta.size, }]; - let handler = DefaultParquetHandler::new(store); + let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let context = handler.contextualize_file_reads(files, None).unwrap(); - let data = handler + let data: Vec = handler .read_parquet_files(context, Arc::new(physical_schema.try_into().unwrap())) .unwrap() - .try_collect::>() - .await + .try_collect() .unwrap(); assert_eq!(data.len(), 1); diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 1b544c00..601827a8 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -38,13 +38,11 @@ )] use std::ops::Range; -use std::pin::Pin; use std::sync::Arc; use arrow_array::{RecordBatch, StringArray}; use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; -use futures::Stream; use url::Url; use self::schema::SchemaRef; @@ -75,7 +73,7 @@ pub type FileSlice = (Url, Option>); /// Data read from a Delta table file and the corresponding scan file information. pub type FileDataReadResult = (FileMeta, RecordBatch); -pub type FileDataReadResultStream = Pin> + Send>>; +pub type FileDataReadResultIterator = Box> + Send>; /// The metadata that describes an object. #[derive(Debug, Clone, PartialEq, Eq)] @@ -134,7 +132,10 @@ pub trait FileSystemClient: Send + Sync { -> DeltaResult>>>; /// Read data specified by the start and end offset from the file. - fn read_files(&self, files: Vec) -> DeltaResult>; + fn read_files( + &self, + files: Vec, + ) -> DeltaResult>>>; } /// Provides file handling functionality to Delta Kernel. @@ -173,7 +174,6 @@ pub trait FileHandler { /// Delta Kernel can use this client to parse JSON strings into Row or read content from JSON files. /// Connectors can leverage this interface to provide their best implementation of the JSON parsing /// capability to Delta Kernel. -#[async_trait::async_trait] pub trait JsonHandler: FileHandler { /// Parse the given json strings and return the fields requested by output schema as columns in a [`RecordBatch`]. fn parse_json( @@ -193,14 +193,13 @@ pub trait JsonHandler: FileHandler { &self, files: Vec<::FileReadContext>, physical_schema: SchemaRef, - ) -> DeltaResult; + ) -> DeltaResult; } /// Provides Parquet file related functionalities to Delta Kernel. /// /// Connectors can leverage this interface to provide their own custom /// implementation of Parquet data file functionalities to Delta Kernel. -#[async_trait::async_trait] pub trait ParquetHandler: FileHandler + Send + Sync { /// Read and parse the JSON format file at given locations and return /// the data as a RecordBatch with the columns requested by physical schema. @@ -213,7 +212,7 @@ pub trait ParquetHandler: FileHandler + Send + Sync { &self, files: Vec<::FileReadContext>, physical_schema: SchemaRef, - ) -> DeltaResult; + ) -> DeltaResult; } /// Interface encapsulating all clients needed by the Delta Kernel in order to read the Delta table. diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index 091e5913..a52e0a5e 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -1,132 +1,89 @@ use std::collections::HashSet; -use std::pin::Pin; -use std::sync::Arc; - -use arrow_arith::boolean::{is_not_null, or}; -use arrow_array::{BooleanArray, RecordBatch}; -use arrow_select::filter::filter_record_batch; -use futures::stream::{BoxStream, Stream}; -use futures::task::{Context, Poll}; -use roaring::RoaringTreemap; -use url::Url; use super::data_skipping::data_skipping_filter; use crate::actions::{parse_actions, Action, ActionType, Add}; use crate::expressions::Expression; -use crate::{DeltaResult, Error, FileSystemClient}; +use crate::DeltaResult; +use arrow_array::RecordBatch; -/// A stream of [`RecordBatch`]es that represent actions in the delta log. -pub struct LogReplayStream { - stream: BoxStream<'static, DeltaResult>, +struct LogReplayScanner { predicate: Option, + /// A set of (data file path, dv_unique_id) pairs that have been seen thus + /// far in the log. This is used to filter out files with Remove actions as + /// well as duplicate entries in the log. seen: HashSet<(String, Option)>, - // ages: HashMap> - fs_client: Arc, - table_root: Url, -} - -impl std::fmt::Debug for LogReplayStream { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - f.debug_struct("LogStream") - .field("predicate", &self.predicate) - .finish() - } } -impl LogReplayStream { +impl LogReplayScanner { /// Create a new [`LogReplayStream`] instance - pub(crate) fn new( - stream: BoxStream<'static, DeltaResult>, - predicate: Option, - fs_client: Arc, - table_root: Url, - ) -> DeltaResult { - Ok(Self { + fn new(predicate: Option) -> Self { + Self { predicate, - stream, - fs_client, - table_root, seen: Default::default(), - }) + } } -} - -#[allow(missing_debug_implementations)] -pub struct DataFile { - pub add: Add, - pub dv: Option, -} -impl Stream for LogReplayStream { - type Item = DeltaResult>; + /// Extract Add actions from a single batch. This will filter out rows that + /// don't match the predicate and Add actions that have corresponding Remove + /// actions in the log. + fn process_batch(&mut self, actions: RecordBatch) -> DeltaResult> { + let actions = if let Some(predicate) = &self.predicate { + data_skipping_filter(actions, predicate)? + } else { + actions + }; - fn poll_next( - mut self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll::Item>> { - let stream = Pin::new(&mut self.stream); - match stream.poll_next(ctx) { - futures::task::Poll::Ready(value) => match value { - Some(Ok(actions)) => { - let skipped = if let Some(predicate) = &self.predicate { - data_skipping_filter(actions, predicate)? - } else { - let predicate = filter_nulls(&actions)?; - filter_record_batch(&actions, &predicate)? - }; - let filtered_actions = - parse_actions(&skipped, &[ActionType::Remove, ActionType::Add])? - .filter_map(|action| match action { - Action::Add(add) - // TODO right now this may not work as expected if we have the same add - // file with different deletion vectors in the log - i.e. additional - // rows were deleted in separate op. Is this a case to consider? - if !self - .seen - .contains(&(add.path.clone(), add.dv_unique_id())) => - { - self.seen.insert((add.path.clone(), add.dv_unique_id())); - let dvv = add.deletion_vector.clone(); - let dv = if let Some(dv_def) = dvv { - let dv_def = dv_def.clone(); - let fut = dv_def.read(self.fs_client.clone(), self.table_root.clone()).unwrap(); - Some(fut) - } else { - None - }; - Some(DataFile { - add: add.clone(), - dv, - }) - } - Action::Add(add) => { - self.seen.insert((add.path.clone(), add.dv_unique_id())); - None - } - Action::Remove(remove) => { - self.seen - .insert((remove.path.clone(), remove.dv_unique_id())); - None - } - _ => None, - }) - .collect(); - futures::task::Poll::Ready(Some(Ok(filtered_actions))) + let adds: Vec = parse_actions(&actions, &[ActionType::Remove, ActionType::Add])? + .filter_map(|action| match action { + Action::Add(add) + // Note: each (add.path + add.dv_unique_id()) pair has a + // unique Add + Remove pair in the log. For example: + // https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json + if !self + .seen + .contains(&(add.path.clone(), add.dv_unique_id())) => + { + self.seen.insert((add.path.clone(), add.dv_unique_id())); + Some(add) } - Some(Err(err)) => futures::task::Poll::Ready(Some(Err(err))), - None => futures::task::Poll::Ready(None), - }, - _ => futures::task::Poll::Pending, - } + Action::Add(add) => { + self.seen.insert((add.path.clone(), add.dv_unique_id())); + None + } + Action::Remove(remove) => { + self.seen + .insert((remove.path.clone(), remove.dv_unique_id())); + None + } + _ => None, + }) + .collect(); + + Ok(adds) } } -fn filter_nulls(batch: &RecordBatch) -> DeltaResult { - let add_array = batch - .column_by_name("add") - .ok_or(Error::MissingData("expected add column".into()))?; - let remove_array = batch - .column_by_name("remove") - .ok_or(Error::MissingData("expected remove column".into()))?; - Ok(or(&is_not_null(add_array)?, &is_not_null(remove_array)?)?) +/// Given an iterator of actions and a predicate, returns a stream of [Add] +pub fn log_replay_iter( + action_iter: impl Iterator>, + predicate: Option, +) -> impl Iterator> { + let mut log_scanner = LogReplayScanner::new(predicate); + + action_iter.flat_map( + move |actions| -> Box>> { + match actions { + Ok(actions) => match log_scanner.process_batch(actions) { + Ok(adds) => Box::new(adds.into_iter().map(Ok)), + Err(err) => Box::new(std::iter::once(Err(err))), + }, + Err(err) => Box::new(std::iter::once(Err(err))), + } + }, + ) +} + +#[cfg(test)] +mod tests { + // TODO: add tests } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index ddf661b9..c29e3e6a 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -4,15 +4,15 @@ use arrow_array::{BooleanArray, RecordBatch}; use arrow_schema::{Fields, Schema as ArrowSchema}; use arrow_select::concat::concat_batches; use arrow_select::filter::filter_record_batch; -use futures::stream::{StreamExt, TryStreamExt}; +use itertools::Itertools; use url::Url; -use self::file_stream::LogReplayStream; +use self::file_stream::log_replay_iter; use crate::actions::ActionType; use crate::expressions::Expression; -use crate::schema::{Schema, SchemaRef}; +use crate::schema::SchemaRef; use crate::snapshot::LogSegment; -use crate::{DeltaResult, FileMeta, TableClient}; +use crate::{Add, DeltaResult, FileMeta, TableClient}; mod data_skipping; pub mod file_stream; @@ -126,76 +126,55 @@ impl Scan { /// which yields record batches of scan files and their associated metadata. Rows of the scan /// files batches correspond to data reads, and the DeltaReader is used to materialize the scan /// files into actual table data. - pub fn files(&self) -> DeltaResult { - // TODO use LogSegmentNEw replay ... - // TODO create function to generate native schema - let schema = ArrowSchema { + pub fn files(&self) -> DeltaResult>> { + let schema = Arc::new(ArrowSchema { fields: Fields::from_iter([ActionType::Add.field(), ActionType::Remove.field()]), metadata: Default::default(), - }; - let schema = Arc::new(Schema::try_from(&schema).unwrap()); + }); - let json_handler = self.table_client.get_json_handler(); - let commit_reads = json_handler.contextualize_file_reads( - // NOTE commit files are sorted in reverse on creations - self.log_segment.commit_files.clone(), - self.predicate.clone(), - )?; + let log_iter = + self.log_segment + .replay(self.table_client.as_ref(), schema, self.predicate.clone())?; - let parquet_handler = self.table_client.get_parquet_handler(); - let checkpoint_reads = parquet_handler.contextualize_file_reads( - self.log_segment.checkpoint_files.clone(), - self.predicate.clone(), - )?; - - let stream = json_handler - .read_json_files(commit_reads, schema.clone())? - .chain(parquet_handler.read_parquet_files(checkpoint_reads, schema.clone())?) - .boxed(); - - LogReplayStream::new( - stream, - self.predicate.clone(), - self.table_client.get_file_system_client(), - self.table_root.clone(), - ) + Ok(log_replay_iter(log_iter, self.predicate.clone())) } - pub async fn execute(&self) -> DeltaResult> { + pub fn execute(&self) -> DeltaResult> { let parquet_handler = self.table_client.get_parquet_handler(); - let mut stream = self.files()?.boxed(); - let mut results = Vec::new(); - while let Some(Ok(data)) = stream.next().await { - for file in data { + self.files()? + .map(|res| { + let add = res?; let meta = FileMeta { - last_modified: file.add.modification_time, - size: file.add.size as usize, - location: self.table_root.join(&file.add.path)?, + last_modified: add.modification_time, + size: add.size as usize, + location: self.table_root.join(&add.path)?, }; let context = parquet_handler.contextualize_file_reads(vec![meta], None)?; let batches = parquet_handler .read_parquet_files(context, self.schema.clone())? - .try_collect::>() - .await?; + .collect::>>()?; + if batches.is_empty() { - continue; + return Ok(None); } + let schema = batches[0].schema(); let batch = concat_batches(&schema, &batches)?; - if let Some(dv) = file.dv { - let vec: Vec<_> = (0..batch.num_rows()) + + if let Some(dv_descriptor) = add.deletion_vector { + let fs_client = self.table_client.get_file_system_client(); + let dv = dv_descriptor.read(fs_client, self.table_root.clone())?; + let mask: BooleanArray = (0..batch.num_rows()) .map(|i| Some(!dv.contains(i.try_into().expect("fit into u32")))) .collect(); - let dv = BooleanArray::from(vec); - results.push(filter_record_batch(&batch, &dv)?); + Ok(Some(filter_record_batch(&batch, &mask)?)) } else { - results.push(batch); + Ok(Some(batch)) } - } - } - - Ok(results) + }) + .filter_map_ok(|batch| batch) + .collect() } } @@ -203,15 +182,13 @@ impl Scan { mod tests { use std::path::PathBuf; - use futures::TryStreamExt; - use super::*; use crate::client::DefaultTableClient; use crate::executor::tokio::TokioBackgroundExecutor; use crate::Table; - #[tokio::test] - async fn test_scan_files() { + #[test] + fn test_scan_files() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -226,15 +203,19 @@ mod tests { let table = Table::new(url, table_client); let snapshot = table.snapshot(None).unwrap(); - let scan = snapshot.scan().await.unwrap().build(); - let files = scan.files().unwrap().try_collect::>().await.unwrap(); + let scan = snapshot.scan().unwrap().build(); + let files: Vec = scan.files().unwrap().try_collect().unwrap(); assert_eq!(files.len(), 1); - assert_eq!(files[0].len(), 1) + assert_eq!( + &files[0].path, + "part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet" + ); + assert!(&files[0].deletion_vector.is_none()); } - #[tokio::test] - async fn test_scan_data() { + #[test] + fn test_scan_data() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -249,8 +230,8 @@ mod tests { let table = Table::new(url, table_client); let snapshot = table.snapshot(None).unwrap(); - let scan = snapshot.scan().await.unwrap().build(); - let files = scan.execute().await.unwrap(); + let scan = snapshot.scan().unwrap().build(); + let files = scan.execute().unwrap(); assert_eq!(files.len(), 1); assert_eq!(files[0].num_rows(), 10) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index b923e457..ad175227 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -8,8 +8,6 @@ use std::sync::RwLock; use arrow_array::RecordBatch; use arrow_schema::{Fields, Schema as ArrowSchema}; -use futures::StreamExt; -use futures::TryStreamExt; use itertools::Itertools; use serde::{Deserialize, Serialize}; use url::Url; @@ -18,6 +16,7 @@ use crate::actions::{parse_action, Action, ActionType, Metadata, Protocol}; use crate::path::LogPath; use crate::scan::ScanBuilder; use crate::schema::Schema; +use crate::Expression; use crate::{DeltaResult, Error, FileMeta, FileSystemClient, TableClient, Version}; const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; @@ -36,46 +35,47 @@ impl LogSegment { self.commit_files.iter() } - // TODO just a stop gap implementation, eventually we likely want a stream of batches... - async fn replay( + pub fn replay( &self, table_client: &dyn TableClient, - ) -> DeltaResult> { - let read_schema = Arc::new(ArrowSchema { - fields: Fields::from_iter([ActionType::Metadata.field(), ActionType::Protocol.field()]), - metadata: Default::default(), - }); - + read_schema: Arc, + predicate: Option, + ) -> DeltaResult>> { let mut commit_files: Vec<_> = self.commit_files().cloned().collect(); + // NOTE this will already sort in reverse order commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location)); let json_client = table_client.get_json_handler(); - let read_contexts = json_client.contextualize_file_reads(commit_files, None)?; + let read_contexts = + json_client.contextualize_file_reads(commit_files, predicate.clone())?; let commit_stream = json_client - .read_json_files(read_contexts, Arc::new(read_schema.clone().try_into()?))?; + .read_json_files(read_contexts, Arc::new(read_schema.as_ref().try_into()?))?; let parquet_client = table_client.get_parquet_handler(); let read_contexts = - parquet_client.contextualize_file_reads(self.checkpoint_files.clone(), None)?; + parquet_client.contextualize_file_reads(self.checkpoint_files.clone(), predicate)?; let checkpoint_stream = parquet_client - .read_parquet_files(read_contexts, Arc::new(read_schema.clone().try_into()?))?; + .read_parquet_files(read_contexts, Arc::new(read_schema.as_ref().try_into()?))?; - let batches = commit_stream - .chain(checkpoint_stream) - .try_collect::>() - .await?; + let batches = commit_stream.chain(checkpoint_stream); Ok(batches) } - async fn read_metadata( + fn read_metadata( &self, table_client: &dyn TableClient, ) -> DeltaResult> { - let batches = self.replay(table_client).await?; + let read_schema = Arc::new(ArrowSchema { + fields: Fields::from_iter([ActionType::Metadata.field(), ActionType::Protocol.field()]), + metadata: Default::default(), + }); + + let batches = self.replay(table_client, read_schema, None)?; let mut metadata_opt = None; let mut protocol_opt = None; for batch in batches { + let batch = batch?; if let Ok(mut metas) = parse_action(&batch, &ActionType::Metadata) { if let Some(Action::Metadata(meta)) = metas.next() { metadata_opt = Some(meta.clone()); @@ -214,7 +214,7 @@ impl Snapshot { self.version } - async fn get_or_insert_metadata(&self) -> DeltaResult<(Metadata, Protocol)> { + fn get_or_insert_metadata(&self) -> DeltaResult<(Metadata, Protocol)> { { let read_lock = self .metadata @@ -227,8 +227,7 @@ impl Snapshot { let (metadata, protocol) = self .log_segment - .read_metadata(self.table_client.as_ref()) - .await? + .read_metadata(self.table_client.as_ref())? .ok_or(Error::MissingMetadata)?; let mut meta = self .metadata @@ -240,23 +239,23 @@ impl Snapshot { } /// Table [`Schema`] at this [`Snapshot`]s version. - pub async fn schema(&self) -> DeltaResult { - self.metadata().await?.schema() + pub fn schema(&self) -> DeltaResult { + self.metadata()?.schema() } /// Table [`Metadata`] at this [`Snapshot`]s version. - pub async fn metadata(&self) -> DeltaResult { - let (metadata, _) = self.get_or_insert_metadata().await?; + pub fn metadata(&self) -> DeltaResult { + let (metadata, _) = self.get_or_insert_metadata()?; Ok(metadata) } - pub async fn protocol(&self) -> DeltaResult { - let (_, protocol) = self.get_or_insert_metadata().await?; + pub fn protocol(&self) -> DeltaResult { + let (_, protocol) = self.get_or_insert_metadata()?; Ok(protocol) } - pub async fn scan(self) -> DeltaResult> { - let schema = Arc::new(self.schema().await?); + pub fn scan(self) -> DeltaResult> { + let schema = Arc::new(self.schema()?); Ok(ScanBuilder::new( self.table_root, schema, @@ -293,11 +292,11 @@ fn read_last_checkpoint( log_root: &Url, ) -> DeltaResult> { let file_path = LogPath(log_root).child(LAST_CHECKPOINT_FILE_NAME)?; - match fs_client.read_files(vec![(file_path, None)]) { - Ok(data) => match data.first() { - Some(data) => Ok(Some(serde_json::from_slice(data)?)), - None => Ok(None), - }, + match fs_client + .read_files(vec![(file_path, None)]) + .and_then(|mut data| data.next().expect("read_files should return one file")) + { + Ok(data) => Ok(Some(serde_json::from_slice(&data)?)), Err(Error::FileNotFound(_)) => Ok(None), Err(err) => Err(err), } @@ -419,8 +418,8 @@ mod tests { ) } - #[tokio::test] - async fn test_snapshot_read_metadata() { + #[test] + fn test_snapshot_read_metadata() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -428,7 +427,7 @@ mod tests { let client = default_table_client(&url); let snapshot = Snapshot::try_new(url, client, Some(1)).unwrap(); - let protocol = snapshot.protocol().await.unwrap(); + let protocol = snapshot.protocol().unwrap(); let expected = Protocol { min_reader_version: 3, min_writer_version: 7, @@ -439,12 +438,12 @@ mod tests { let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#; let expected: StructType = serde_json::from_str(schema_string).unwrap(); - let schema = snapshot.schema().await.unwrap(); + let schema = snapshot.schema().unwrap(); assert_eq!(schema, expected); } - #[tokio::test] - async fn test_new_snapshot() { + #[test] + fn test_new_snapshot() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -452,7 +451,7 @@ mod tests { let client = default_table_client(&url); let snapshot = Snapshot::try_new(url, client, None).unwrap(); - let protocol = snapshot.protocol().await.unwrap(); + let protocol = snapshot.protocol().unwrap(); let expected = Protocol { min_reader_version: 3, min_writer_version: 7, @@ -463,7 +462,7 @@ mod tests { let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#; let expected: StructType = serde_json::from_str(schema_string).unwrap(); - let schema = snapshot.schema().await.unwrap(); + let schema = snapshot.schema().unwrap(); assert_eq!(schema, expected); } diff --git a/kernel/tests/dv.rs b/kernel/tests/dv.rs index 344b5e12..848bbcce 100644 --- a/kernel/tests/dv.rs +++ b/kernel/tests/dv.rs @@ -7,8 +7,8 @@ use deltakernel::client::DefaultTableClient; use deltakernel::executor::tokio::TokioBackgroundExecutor; use deltakernel::Table; -#[tokio::test] -async fn dv_table() -> Result<(), Box> { +#[test] +fn dv_table() -> Result<(), Box> { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?; let url = url::Url::from_directory_path(path).unwrap(); let table_client = Arc::new(DefaultTableClient::try_new( @@ -19,9 +19,9 @@ async fn dv_table() -> Result<(), Box> { let table = Table::new(url, table_client); let snapshot = table.snapshot(None)?; - let scan = snapshot.scan().await?.build(); + let scan = snapshot.scan()?.build(); - let stream = scan.execute().await?.into_iter(); + let stream = scan.execute()?; for batch in stream { let rows = batch.num_rows(); arrow::util::pretty::print_batches(&[batch])?; @@ -30,8 +30,8 @@ async fn dv_table() -> Result<(), Box> { Ok(()) } -#[tokio::test] -async fn non_dv_table() -> Result<(), Box> { +#[test] +fn non_dv_table() -> Result<(), Box> { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/"))?; let url = url::Url::from_directory_path(path).unwrap(); let table_client = Arc::new(DefaultTableClient::try_new( @@ -42,9 +42,9 @@ async fn non_dv_table() -> Result<(), Box> { let table = Table::new(url, table_client); let snapshot = table.snapshot(None)?; - let scan = snapshot.scan().await?.build(); + let scan = snapshot.scan()?.build(); - let stream = scan.execute().await?.into_iter(); + let stream = scan.execute()?; for batch in stream { let rows = batch.num_rows(); arrow::util::pretty::print_batches(&[batch]).unwrap(); diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index a73b15b9..94aaa5c9 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -97,10 +97,10 @@ async fn single_commit_two_add_files() -> Result<(), Box> let expected_data = vec![batch.clone(), batch]; let snapshot = table.snapshot(None)?; - let scan = snapshot.scan().await?.build(); + let scan = snapshot.scan()?.build(); let mut files = 0; - let stream = scan.execute().await?.into_iter().zip(expected_data); + let stream = scan.execute()?.into_iter().zip(expected_data); for (data, expected) in stream { files += 1; @@ -147,10 +147,10 @@ async fn two_commits() -> Result<(), Box> { let expected_data = vec![batch.clone(), batch]; let snapshot = table.snapshot(None).unwrap(); - let scan = snapshot.scan().await?.build(); + let scan = snapshot.scan()?.build(); let mut files = 0; - let stream = scan.execute().await?.into_iter().zip(expected_data); + let stream = scan.execute()?.into_iter().zip(expected_data); for (data, expected) in stream { files += 1; @@ -201,9 +201,9 @@ async fn remove_action() -> Result<(), Box> { let expected_data = vec![batch]; let snapshot = table.snapshot(None)?; - let scan = snapshot.scan().await?.build(); + let scan = snapshot.scan()?.build(); - let stream = scan.execute().await?.into_iter().zip(expected_data); + let stream = scan.execute()?.into_iter().zip(expected_data); let mut files = 0; for (data, expected) in stream { @@ -266,10 +266,10 @@ async fn stats() -> Result<(), Box> { Box::new(Expression::Column(String::from("ids"))), Box::new(Expression::Literal(2)), ); - let scan = snapshot.scan().await?.with_predicate(predicate).build(); + let scan = snapshot.scan()?.with_predicate(predicate).build(); let mut files = 0; - let stream = scan.execute().await?.into_iter().zip(expected_data); + let stream = scan.execute()?.into_iter().zip(expected_data); for (data, expected) in stream { files += 1;