From 5b85fce9a14e2c6de89cd534500b42e0e7568299 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 17 Sep 2023 16:12:52 -0700 Subject: [PATCH 1/4] refactor: move file readers to sync APIs --- acceptance/src/meta.rs | 10 +- kernel/Cargo.toml | 11 +- kernel/src/actions/mod.rs | 5 +- kernel/src/actions/types.rs | 5 +- kernel/src/client/filesystem.rs | 66 ++++++++---- kernel/src/client/json.rs | 59 +++++++---- kernel/src/client/mod.rs | 22 ++-- kernel/src/client/parquet.rs | 58 ++++++++--- kernel/src/lib.rs | 15 ++- kernel/src/scan/file_stream.rs | 179 ++++++++++++-------------------- kernel/src/scan/mod.rs | 109 ++++++++----------- kernel/src/snapshot.rs | 87 ++++++++-------- kernel/tests/dv.rs | 16 +-- kernel/tests/read.rs | 16 +-- 14 files changed, 336 insertions(+), 322 deletions(-) diff --git a/acceptance/src/meta.rs b/acceptance/src/meta.rs index c0deead0..405a2331 100644 --- a/acceptance/src/meta.rs +++ b/acceptance/src/meta.rs @@ -78,7 +78,7 @@ impl TestCaseInfo { Ok((latest, cases)) } - async fn assert_snapshot_meta( + fn assert_snapshot_meta( &self, case: &TableVersionMetaData, snapshot: &Snapshot, @@ -86,8 +86,8 @@ impl TestCaseInfo { assert_eq!(snapshot.version(), case.version); // assert correct metadata is read - let metadata = snapshot.metadata().await?; - let protocol = snapshot.protocol().await?; + let metadata = snapshot.metadata()?; + let protocol = snapshot.protocol()?; let tvm = TableVersionMetaData { version: snapshot.version(), properties: metadata @@ -111,11 +111,11 @@ impl TestCaseInfo { let (latest, versions) = self.versions().await?; let snapshot = table.snapshot(None)?; - self.assert_snapshot_meta(&latest, &snapshot).await?; + self.assert_snapshot_meta(&latest, &snapshot)?; for table_version in versions { let snapshot = table.snapshot(Some(table_version.version))?; - self.assert_snapshot_meta(&table_version, &snapshot).await?; + self.assert_snapshot_meta(&table_version, &snapshot)?; } Ok(()) diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 643b28b0..760321cf 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -15,18 +15,15 @@ arrow-json = { version = "^46.0" } arrow-ord = { version = "^46.0" } arrow-schema = { version = "^46.0" } arrow-select = { version = "^46.0" } -async-trait = "0.1" bytes = "1.4" chrono = { version = "0.4", optional = true } either = "1.8" fix-hidden-lifetime-bug = "0.2" -futures = "0.3" itertools = "0.11" lazy_static = "1.4" # used for providing a storage abstraction layer object_store = "^0.7.0" # need to generalize over arrow, arrow2 and diff parquet etc. (BYOP) -parquet = "^46.0" regex = "1.8" roaring = "0.10.1" serde = { version = "1", features = ["derive"] } @@ -38,12 +35,16 @@ url = "2" uuid = "1.3.0" z85 = "3.0.5" -# optionally used with default client +# Used in default client +futures = { version = "0.3", optional = true } +parquet = { version = "^46.0", optional = true, features=["async", "object_store"]} + +# optionally used with default client (though not required) tokio = { version = "1", optional = true, features=["rt-multi-thread"] } [features] default = ["default-client"] -default-client = ["chrono", "parquet/async", "parquet/object_store"] +default-client = ["chrono", "parquet", "futures"] [dev-dependencies] arrow = { version = "^46.0", features = ["json", "prettyprint"] } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 9efab307..5cd7fd0e 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -529,11 +529,12 @@ mod tests { use super::*; use crate::actions::Protocol; use crate::client::json::DefaultJsonHandler; + use crate::executor::tokio::TokioBackgroundExecutor; use crate::JsonHandler; fn action_batch() -> RecordBatch { let store = Arc::new(LocalFileSystem::new()); - let handler = DefaultJsonHandler::new(store); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let json_strings: StringArray = vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, @@ -596,7 +597,7 @@ mod tests { #[test] fn test_parse_add_partitioned() { let store = Arc::new(LocalFileSystem::new()); - let handler = DefaultJsonHandler::new(store); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let json_strings: StringArray = vec![ r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#, diff --git a/kernel/src/actions/types.rs b/kernel/src/actions/types.rs index 761f06e1..66d24b7b 100644 --- a/kernel/src/actions/types.rs +++ b/kernel/src/actions/types.rs @@ -226,9 +226,8 @@ impl DeletionVectorDescriptor { let dv_data = fs_client .read_files(vec![(path, None)])? - .first() - .ok_or(Error::MissingData("No deletion Vecor data".to_string()))? - .clone(); + .next() + .ok_or(Error::MissingData("No deletion Vector data".to_string()))??; let mut cursor = Cursor::new(dv_data); if let Some(offset) = offset { diff --git a/kernel/src/client/filesystem.rs b/kernel/src/client/filesystem.rs index 7c97ddaa..925681a2 100644 --- a/kernel/src/client/filesystem.rs +++ b/kernel/src/client/filesystem.rs @@ -1,18 +1,19 @@ use std::sync::Arc; use bytes::Bytes; -use futures::stream::{StreamExt, TryStreamExt}; +use futures::stream::StreamExt; use object_store::path::Path; use object_store::DynObjectStore; use url::Url; -use crate::{executor::TaskExecutor, DeltaResult, FileMeta, FileSlice, FileSystemClient}; +use crate::{executor::TaskExecutor, DeltaResult, Error, FileMeta, FileSlice, FileSystemClient}; #[derive(Debug)] pub struct ObjectStoreFileSystemClient { inner: Arc, table_root: Path, task_executor: Arc, + readahead: usize, } impl ObjectStoreFileSystemClient { @@ -21,8 +22,15 @@ impl ObjectStoreFileSystemClient { inner: store, table_root, task_executor, + readahead: 10, } } + + /// Set the maximum number of files to read in parallel. + pub fn with_readahead(mut self, readahead: usize) -> Self { + self.readahead = readahead; + self + } } impl FileSystemClient for ObjectStoreFileSystemClient { @@ -82,26 +90,42 @@ impl FileSystemClient for ObjectStoreFileSystemClient { } /// Read data specified by the start and end offset from the file. - fn read_files(&self, files: Vec) -> DeltaResult> { + /// + /// This will return the data in the same order as the provided file slices. + /// + /// Multiple reads may occur in parallel, depending on the configured readahead. + /// See [`Self::with_readahead`]. + fn read_files( + &self, + files: Vec, + ) -> DeltaResult>>> { let store = self.inner.clone(); - let fut = futures::stream::iter(files) - .map(move |(url, range)| { - let path = Path::from(url.path()); - let store = store.clone(); - async move { - if let Some(rng) = range { - store.get_range(&path, rng).await - } else { - let result = store.get(&path).await?; - result.bytes().await + + // This channel will become the iterator + let (sender, receiver) = std::sync::mpsc::sync_channel(20); + + self.task_executor.spawn( + futures::stream::iter(files) + .map(move |(url, range)| { + let path = Path::from(url.path()); + let store = store.clone(); + async move { + if let Some(rng) = range { + store.get_range(&path, rng).await + } else { + let result = store.get(&path).await?; + result.bytes().await + } } - } - }) - // Read up to 10 files concurrently. - .buffered(10) - .try_collect::>(); + }) + .buffered(self.readahead) + .for_each(move |res| { + sender.send(res.map_err(Error::from)).ok(); + futures::future::ready(()) + }), + ); - Ok(self.task_executor.block_on(fut)?) + Ok(Box::new(receiver.into_iter())) } } @@ -113,6 +137,8 @@ mod tests { use crate::executor::tokio::TokioBackgroundExecutor; + use itertools::Itertools; + use super::*; #[tokio::test] @@ -145,7 +171,7 @@ mod tests { url.set_path(&format!("{}/c", url.path())); slices.push((url, Some(Range { start: 4, end: 9 }))); - let data = client.read_files(slices).unwrap(); + let data: Vec = client.read_files(slices).unwrap().try_collect().unwrap(); assert_eq!(data.len(), 3); assert_eq!(data[0], Bytes::from("kernel")); diff --git a/kernel/src/client/json.rs b/kernel/src/client/json.rs index c54c0371..7f86f26e 100644 --- a/kernel/src/client/json.rs +++ b/kernel/src/client/json.rs @@ -10,15 +10,16 @@ use arrow_json::ReaderBuilder; use arrow_schema::SchemaRef as ArrowSchemaRef; use arrow_select::concat::concat_batches; use bytes::{Buf, Bytes}; -use futures::stream::{StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{DynObjectStore, GetResultPayload}; use super::file_handler::{FileOpenFuture, FileOpener}; +use crate::executor::TaskExecutor; use crate::file_handler::FileStream; use crate::schema::SchemaRef; use crate::{ - DeltaResult, Error, Expression, FileDataReadResultStream, FileHandler, FileMeta, JsonHandler, + DeltaResult, Error, Expression, FileDataReadResultIterator, FileHandler, FileMeta, JsonHandler, }; #[derive(Debug)] @@ -28,17 +29,28 @@ pub struct JsonReadContext { } #[derive(Debug)] -pub struct DefaultJsonHandler { +pub struct DefaultJsonHandler { store: Arc, + task_executor: Arc, + readahead: usize, } -impl DefaultJsonHandler { - pub fn new(store: Arc) -> Self { - Self { store } +impl DefaultJsonHandler { + pub fn new(store: Arc, task_executor: Arc) -> Self { + Self { + store, + task_executor, + readahead: 10, + } + } + + pub fn with_readahead(mut self, readahead: usize) -> Self { + self.readahead = readahead; + self } } -impl FileHandler for DefaultJsonHandler { +impl FileHandler for DefaultJsonHandler { type FileReadContext = JsonReadContext; fn contextualize_file_reads( @@ -56,8 +68,7 @@ impl FileHandler for DefaultJsonHandler { } } -#[async_trait::async_trait] -impl JsonHandler for DefaultJsonHandler { +impl JsonHandler for DefaultJsonHandler { fn parse_json( &self, json_strings: StringArray, @@ -88,9 +99,9 @@ impl JsonHandler for DefaultJsonHandler { &self, files: Vec<::FileReadContext>, physical_schema: SchemaRef, - ) -> DeltaResult { + ) -> DeltaResult { if files.is_empty() { - return Ok(futures::stream::empty().boxed()); + return Ok(Box::new(std::iter::empty())); } let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?); @@ -99,7 +110,16 @@ impl JsonHandler for DefaultJsonHandler { let files = files.into_iter().map(|f| f.meta).collect::>(); let stream = FileStream::new(files, schema, file_reader)?; - Ok(stream.boxed()) + + // This channel will become the iterator + let (sender, receiver) = std::sync::mpsc::sync_channel(self.readahead); + + self.task_executor.spawn(stream.for_each(move |res| { + sender.send(res).ok(); + futures::future::ready(()) + })); + + Ok(Box::new(receiver.into_iter())) } } @@ -186,15 +206,15 @@ impl FileOpener for JsonOpener { mod tests { use std::path::PathBuf; - use object_store::{local::LocalFileSystem, ObjectStore}; - use super::*; - use crate::actions::get_log_schema; + use crate::{actions::get_log_schema, executor::tokio::TokioBackgroundExecutor}; + use itertools::Itertools; + use object_store::{local::LocalFileSystem, ObjectStore}; #[test] fn test_parse_json() { let store = Arc::new(LocalFileSystem::new()); - let handler = DefaultJsonHandler::new(store); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let json_strings: StringArray = vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, @@ -227,14 +247,13 @@ mod tests { size: meta.size, }]; - let handler = DefaultJsonHandler::new(store); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let physical_schema = Arc::new(get_log_schema()); let context = handler.contextualize_file_reads(files, None).unwrap(); - let data = handler + let data: Vec = handler .read_json_files(context, Arc::new(physical_schema.try_into().unwrap())) .unwrap() - .try_collect::>() - .await + .try_collect() .unwrap(); assert_eq!(data.len(), 1); diff --git a/kernel/src/client/mod.rs b/kernel/src/client/mod.rs index 7b3237b1..a88d47f4 100644 --- a/kernel/src/client/mod.rs +++ b/kernel/src/client/mod.rs @@ -24,8 +24,8 @@ pub mod parquet; pub struct DefaultTableClient { store: Arc, file_system: Arc>, - json: Arc, - parquet: Arc, + json: Arc>, + parquet: Arc>, } impl DefaultTableClient { @@ -46,10 +46,13 @@ impl DefaultTableClient { file_system: Arc::new(ObjectStoreFileSystemClient::new( store.clone(), prefix, - task_executor, + task_executor.clone(), )), - json: Arc::new(DefaultJsonHandler::new(store.clone())), - parquet: Arc::new(DefaultParquetHandler::new(store.clone())), + json: Arc::new(DefaultJsonHandler::new( + store.clone(), + task_executor.clone(), + )), + parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)), store, }) } @@ -59,10 +62,13 @@ impl DefaultTableClient { file_system: Arc::new(ObjectStoreFileSystemClient::new( store.clone(), prefix, - task_executor, + task_executor.clone(), + )), + json: Arc::new(DefaultJsonHandler::new( + store.clone(), + task_executor.clone(), )), - json: Arc::new(DefaultJsonHandler::new(store.clone())), - parquet: Arc::new(DefaultParquetHandler::new(store.clone())), + parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)), store, } } diff --git a/kernel/src/client/parquet.rs b/kernel/src/client/parquet.rs index 58053a5f..24dd5d0b 100644 --- a/kernel/src/client/parquet.rs +++ b/kernel/src/client/parquet.rs @@ -4,17 +4,19 @@ use std::ops::Range; use std::sync::Arc; use arrow_schema::SchemaRef as ArrowSchemaRef; -use futures::stream::{StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::DynObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use super::file_handler::{FileOpenFuture, FileOpener}; +use crate::executor::TaskExecutor; use crate::file_handler::FileStream; use crate::schema::SchemaRef; use crate::{ - DeltaResult, Error, Expression, FileDataReadResultStream, FileHandler, FileMeta, ParquetHandler, + DeltaResult, Error, Expression, FileDataReadResultIterator, FileHandler, FileMeta, + ParquetHandler, }; #[derive(Debug)] @@ -24,17 +26,29 @@ pub struct ParquetReadContext { } #[derive(Debug)] -pub struct DefaultParquetHandler { +pub struct DefaultParquetHandler { store: Arc, + task_executor: Arc, + readahead: usize, } -impl DefaultParquetHandler { - pub fn new(store: Arc) -> Self { - Self { store } +impl DefaultParquetHandler { + pub fn new(store: Arc, task_executor: Arc) -> Self { + Self { + store, + task_executor, + readahead: 10, + } + } + + /// Max number of batches to read asynchronously. + pub fn with_readahead(mut self, readahead: usize) -> Self { + self.readahead = readahead; + self } } -impl FileHandler for DefaultParquetHandler { +impl FileHandler for DefaultParquetHandler { type FileReadContext = ParquetReadContext; fn contextualize_file_reads( @@ -49,16 +63,15 @@ impl FileHandler for DefaultParquetHandler { } } -#[async_trait::async_trait] -impl ParquetHandler for DefaultParquetHandler { +impl ParquetHandler for DefaultParquetHandler { fn read_parquet_files( &self, files: Vec<::FileReadContext>, physical_schema: SchemaRef, - ) -> DeltaResult { + ) -> DeltaResult { // TODO at the very least load only required columns ... if files.is_empty() { - return Ok(futures::stream::empty().boxed()); + return Ok(Box::new(std::iter::empty())); } let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?); @@ -66,7 +79,16 @@ impl ParquetHandler for DefaultParquetHandler { let files = files.into_iter().map(|f| f.meta).collect::>(); let stream = FileStream::new(files, schema, file_reader)?; - Ok(stream.boxed()) + + // This channel will become the iterator + let (sender, receiver) = std::sync::mpsc::sync_channel(self.readahead); + + self.task_executor.spawn(stream.for_each(move |res| { + sender.send(res).ok(); + futures::future::ready(()) + })); + + Ok(Box::new(receiver.into_iter())) } } @@ -135,8 +157,13 @@ impl FileOpener for ParquetOpener { mod tests { use std::path::PathBuf; + use arrow_array::RecordBatch; use object_store::{local::LocalFileSystem, ObjectStore}; + use crate::executor::tokio::TokioBackgroundExecutor; + + use itertools::Itertools; + use super::*; #[tokio::test] @@ -163,14 +190,13 @@ mod tests { size: meta.size, }]; - let handler = DefaultParquetHandler::new(store); + let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); let context = handler.contextualize_file_reads(files, None).unwrap(); - let data = handler + let data: Vec = handler .read_parquet_files(context, Arc::new(physical_schema.try_into().unwrap())) .unwrap() - .try_collect::>() - .await + .try_collect() .unwrap(); assert_eq!(data.len(), 1); diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 1b544c00..601827a8 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -38,13 +38,11 @@ )] use std::ops::Range; -use std::pin::Pin; use std::sync::Arc; use arrow_array::{RecordBatch, StringArray}; use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; -use futures::Stream; use url::Url; use self::schema::SchemaRef; @@ -75,7 +73,7 @@ pub type FileSlice = (Url, Option>); /// Data read from a Delta table file and the corresponding scan file information. pub type FileDataReadResult = (FileMeta, RecordBatch); -pub type FileDataReadResultStream = Pin> + Send>>; +pub type FileDataReadResultIterator = Box> + Send>; /// The metadata that describes an object. #[derive(Debug, Clone, PartialEq, Eq)] @@ -134,7 +132,10 @@ pub trait FileSystemClient: Send + Sync { -> DeltaResult>>>; /// Read data specified by the start and end offset from the file. - fn read_files(&self, files: Vec) -> DeltaResult>; + fn read_files( + &self, + files: Vec, + ) -> DeltaResult>>>; } /// Provides file handling functionality to Delta Kernel. @@ -173,7 +174,6 @@ pub trait FileHandler { /// Delta Kernel can use this client to parse JSON strings into Row or read content from JSON files. /// Connectors can leverage this interface to provide their best implementation of the JSON parsing /// capability to Delta Kernel. -#[async_trait::async_trait] pub trait JsonHandler: FileHandler { /// Parse the given json strings and return the fields requested by output schema as columns in a [`RecordBatch`]. fn parse_json( @@ -193,14 +193,13 @@ pub trait JsonHandler: FileHandler { &self, files: Vec<::FileReadContext>, physical_schema: SchemaRef, - ) -> DeltaResult; + ) -> DeltaResult; } /// Provides Parquet file related functionalities to Delta Kernel. /// /// Connectors can leverage this interface to provide their own custom /// implementation of Parquet data file functionalities to Delta Kernel. -#[async_trait::async_trait] pub trait ParquetHandler: FileHandler + Send + Sync { /// Read and parse the JSON format file at given locations and return /// the data as a RecordBatch with the columns requested by physical schema. @@ -213,7 +212,7 @@ pub trait ParquetHandler: FileHandler + Send + Sync { &self, files: Vec<::FileReadContext>, physical_schema: SchemaRef, - ) -> DeltaResult; + ) -> DeltaResult; } /// Interface encapsulating all clients needed by the Delta Kernel in order to read the Delta table. diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index 091e5913..a52e0a5e 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -1,132 +1,89 @@ use std::collections::HashSet; -use std::pin::Pin; -use std::sync::Arc; - -use arrow_arith::boolean::{is_not_null, or}; -use arrow_array::{BooleanArray, RecordBatch}; -use arrow_select::filter::filter_record_batch; -use futures::stream::{BoxStream, Stream}; -use futures::task::{Context, Poll}; -use roaring::RoaringTreemap; -use url::Url; use super::data_skipping::data_skipping_filter; use crate::actions::{parse_actions, Action, ActionType, Add}; use crate::expressions::Expression; -use crate::{DeltaResult, Error, FileSystemClient}; +use crate::DeltaResult; +use arrow_array::RecordBatch; -/// A stream of [`RecordBatch`]es that represent actions in the delta log. -pub struct LogReplayStream { - stream: BoxStream<'static, DeltaResult>, +struct LogReplayScanner { predicate: Option, + /// A set of (data file path, dv_unique_id) pairs that have been seen thus + /// far in the log. This is used to filter out files with Remove actions as + /// well as duplicate entries in the log. seen: HashSet<(String, Option)>, - // ages: HashMap> - fs_client: Arc, - table_root: Url, -} - -impl std::fmt::Debug for LogReplayStream { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - f.debug_struct("LogStream") - .field("predicate", &self.predicate) - .finish() - } } -impl LogReplayStream { +impl LogReplayScanner { /// Create a new [`LogReplayStream`] instance - pub(crate) fn new( - stream: BoxStream<'static, DeltaResult>, - predicate: Option, - fs_client: Arc, - table_root: Url, - ) -> DeltaResult { - Ok(Self { + fn new(predicate: Option) -> Self { + Self { predicate, - stream, - fs_client, - table_root, seen: Default::default(), - }) + } } -} - -#[allow(missing_debug_implementations)] -pub struct DataFile { - pub add: Add, - pub dv: Option, -} -impl Stream for LogReplayStream { - type Item = DeltaResult>; + /// Extract Add actions from a single batch. This will filter out rows that + /// don't match the predicate and Add actions that have corresponding Remove + /// actions in the log. + fn process_batch(&mut self, actions: RecordBatch) -> DeltaResult> { + let actions = if let Some(predicate) = &self.predicate { + data_skipping_filter(actions, predicate)? + } else { + actions + }; - fn poll_next( - mut self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll::Item>> { - let stream = Pin::new(&mut self.stream); - match stream.poll_next(ctx) { - futures::task::Poll::Ready(value) => match value { - Some(Ok(actions)) => { - let skipped = if let Some(predicate) = &self.predicate { - data_skipping_filter(actions, predicate)? - } else { - let predicate = filter_nulls(&actions)?; - filter_record_batch(&actions, &predicate)? - }; - let filtered_actions = - parse_actions(&skipped, &[ActionType::Remove, ActionType::Add])? - .filter_map(|action| match action { - Action::Add(add) - // TODO right now this may not work as expected if we have the same add - // file with different deletion vectors in the log - i.e. additional - // rows were deleted in separate op. Is this a case to consider? - if !self - .seen - .contains(&(add.path.clone(), add.dv_unique_id())) => - { - self.seen.insert((add.path.clone(), add.dv_unique_id())); - let dvv = add.deletion_vector.clone(); - let dv = if let Some(dv_def) = dvv { - let dv_def = dv_def.clone(); - let fut = dv_def.read(self.fs_client.clone(), self.table_root.clone()).unwrap(); - Some(fut) - } else { - None - }; - Some(DataFile { - add: add.clone(), - dv, - }) - } - Action::Add(add) => { - self.seen.insert((add.path.clone(), add.dv_unique_id())); - None - } - Action::Remove(remove) => { - self.seen - .insert((remove.path.clone(), remove.dv_unique_id())); - None - } - _ => None, - }) - .collect(); - futures::task::Poll::Ready(Some(Ok(filtered_actions))) + let adds: Vec = parse_actions(&actions, &[ActionType::Remove, ActionType::Add])? + .filter_map(|action| match action { + Action::Add(add) + // Note: each (add.path + add.dv_unique_id()) pair has a + // unique Add + Remove pair in the log. For example: + // https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json + if !self + .seen + .contains(&(add.path.clone(), add.dv_unique_id())) => + { + self.seen.insert((add.path.clone(), add.dv_unique_id())); + Some(add) } - Some(Err(err)) => futures::task::Poll::Ready(Some(Err(err))), - None => futures::task::Poll::Ready(None), - }, - _ => futures::task::Poll::Pending, - } + Action::Add(add) => { + self.seen.insert((add.path.clone(), add.dv_unique_id())); + None + } + Action::Remove(remove) => { + self.seen + .insert((remove.path.clone(), remove.dv_unique_id())); + None + } + _ => None, + }) + .collect(); + + Ok(adds) } } -fn filter_nulls(batch: &RecordBatch) -> DeltaResult { - let add_array = batch - .column_by_name("add") - .ok_or(Error::MissingData("expected add column".into()))?; - let remove_array = batch - .column_by_name("remove") - .ok_or(Error::MissingData("expected remove column".into()))?; - Ok(or(&is_not_null(add_array)?, &is_not_null(remove_array)?)?) +/// Given an iterator of actions and a predicate, returns a stream of [Add] +pub fn log_replay_iter( + action_iter: impl Iterator>, + predicate: Option, +) -> impl Iterator> { + let mut log_scanner = LogReplayScanner::new(predicate); + + action_iter.flat_map( + move |actions| -> Box>> { + match actions { + Ok(actions) => match log_scanner.process_batch(actions) { + Ok(adds) => Box::new(adds.into_iter().map(Ok)), + Err(err) => Box::new(std::iter::once(Err(err))), + }, + Err(err) => Box::new(std::iter::once(Err(err))), + } + }, + ) +} + +#[cfg(test)] +mod tests { + // TODO: add tests } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index ddf661b9..c29e3e6a 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -4,15 +4,15 @@ use arrow_array::{BooleanArray, RecordBatch}; use arrow_schema::{Fields, Schema as ArrowSchema}; use arrow_select::concat::concat_batches; use arrow_select::filter::filter_record_batch; -use futures::stream::{StreamExt, TryStreamExt}; +use itertools::Itertools; use url::Url; -use self::file_stream::LogReplayStream; +use self::file_stream::log_replay_iter; use crate::actions::ActionType; use crate::expressions::Expression; -use crate::schema::{Schema, SchemaRef}; +use crate::schema::SchemaRef; use crate::snapshot::LogSegment; -use crate::{DeltaResult, FileMeta, TableClient}; +use crate::{Add, DeltaResult, FileMeta, TableClient}; mod data_skipping; pub mod file_stream; @@ -126,76 +126,55 @@ impl Scan { /// which yields record batches of scan files and their associated metadata. Rows of the scan /// files batches correspond to data reads, and the DeltaReader is used to materialize the scan /// files into actual table data. - pub fn files(&self) -> DeltaResult { - // TODO use LogSegmentNEw replay ... - // TODO create function to generate native schema - let schema = ArrowSchema { + pub fn files(&self) -> DeltaResult>> { + let schema = Arc::new(ArrowSchema { fields: Fields::from_iter([ActionType::Add.field(), ActionType::Remove.field()]), metadata: Default::default(), - }; - let schema = Arc::new(Schema::try_from(&schema).unwrap()); + }); - let json_handler = self.table_client.get_json_handler(); - let commit_reads = json_handler.contextualize_file_reads( - // NOTE commit files are sorted in reverse on creations - self.log_segment.commit_files.clone(), - self.predicate.clone(), - )?; + let log_iter = + self.log_segment + .replay(self.table_client.as_ref(), schema, self.predicate.clone())?; - let parquet_handler = self.table_client.get_parquet_handler(); - let checkpoint_reads = parquet_handler.contextualize_file_reads( - self.log_segment.checkpoint_files.clone(), - self.predicate.clone(), - )?; - - let stream = json_handler - .read_json_files(commit_reads, schema.clone())? - .chain(parquet_handler.read_parquet_files(checkpoint_reads, schema.clone())?) - .boxed(); - - LogReplayStream::new( - stream, - self.predicate.clone(), - self.table_client.get_file_system_client(), - self.table_root.clone(), - ) + Ok(log_replay_iter(log_iter, self.predicate.clone())) } - pub async fn execute(&self) -> DeltaResult> { + pub fn execute(&self) -> DeltaResult> { let parquet_handler = self.table_client.get_parquet_handler(); - let mut stream = self.files()?.boxed(); - let mut results = Vec::new(); - while let Some(Ok(data)) = stream.next().await { - for file in data { + self.files()? + .map(|res| { + let add = res?; let meta = FileMeta { - last_modified: file.add.modification_time, - size: file.add.size as usize, - location: self.table_root.join(&file.add.path)?, + last_modified: add.modification_time, + size: add.size as usize, + location: self.table_root.join(&add.path)?, }; let context = parquet_handler.contextualize_file_reads(vec![meta], None)?; let batches = parquet_handler .read_parquet_files(context, self.schema.clone())? - .try_collect::>() - .await?; + .collect::>>()?; + if batches.is_empty() { - continue; + return Ok(None); } + let schema = batches[0].schema(); let batch = concat_batches(&schema, &batches)?; - if let Some(dv) = file.dv { - let vec: Vec<_> = (0..batch.num_rows()) + + if let Some(dv_descriptor) = add.deletion_vector { + let fs_client = self.table_client.get_file_system_client(); + let dv = dv_descriptor.read(fs_client, self.table_root.clone())?; + let mask: BooleanArray = (0..batch.num_rows()) .map(|i| Some(!dv.contains(i.try_into().expect("fit into u32")))) .collect(); - let dv = BooleanArray::from(vec); - results.push(filter_record_batch(&batch, &dv)?); + Ok(Some(filter_record_batch(&batch, &mask)?)) } else { - results.push(batch); + Ok(Some(batch)) } - } - } - - Ok(results) + }) + .filter_map_ok(|batch| batch) + .collect() } } @@ -203,15 +182,13 @@ impl Scan { mod tests { use std::path::PathBuf; - use futures::TryStreamExt; - use super::*; use crate::client::DefaultTableClient; use crate::executor::tokio::TokioBackgroundExecutor; use crate::Table; - #[tokio::test] - async fn test_scan_files() { + #[test] + fn test_scan_files() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -226,15 +203,19 @@ mod tests { let table = Table::new(url, table_client); let snapshot = table.snapshot(None).unwrap(); - let scan = snapshot.scan().await.unwrap().build(); - let files = scan.files().unwrap().try_collect::>().await.unwrap(); + let scan = snapshot.scan().unwrap().build(); + let files: Vec = scan.files().unwrap().try_collect().unwrap(); assert_eq!(files.len(), 1); - assert_eq!(files[0].len(), 1) + assert_eq!( + &files[0].path, + "part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet" + ); + assert!(&files[0].deletion_vector.is_none()); } - #[tokio::test] - async fn test_scan_data() { + #[test] + fn test_scan_data() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -249,8 +230,8 @@ mod tests { let table = Table::new(url, table_client); let snapshot = table.snapshot(None).unwrap(); - let scan = snapshot.scan().await.unwrap().build(); - let files = scan.execute().await.unwrap(); + let scan = snapshot.scan().unwrap().build(); + let files = scan.execute().unwrap(); assert_eq!(files.len(), 1); assert_eq!(files[0].num_rows(), 10) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index b923e457..ad175227 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -8,8 +8,6 @@ use std::sync::RwLock; use arrow_array::RecordBatch; use arrow_schema::{Fields, Schema as ArrowSchema}; -use futures::StreamExt; -use futures::TryStreamExt; use itertools::Itertools; use serde::{Deserialize, Serialize}; use url::Url; @@ -18,6 +16,7 @@ use crate::actions::{parse_action, Action, ActionType, Metadata, Protocol}; use crate::path::LogPath; use crate::scan::ScanBuilder; use crate::schema::Schema; +use crate::Expression; use crate::{DeltaResult, Error, FileMeta, FileSystemClient, TableClient, Version}; const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; @@ -36,46 +35,47 @@ impl LogSegment { self.commit_files.iter() } - // TODO just a stop gap implementation, eventually we likely want a stream of batches... - async fn replay( + pub fn replay( &self, table_client: &dyn TableClient, - ) -> DeltaResult> { - let read_schema = Arc::new(ArrowSchema { - fields: Fields::from_iter([ActionType::Metadata.field(), ActionType::Protocol.field()]), - metadata: Default::default(), - }); - + read_schema: Arc, + predicate: Option, + ) -> DeltaResult>> { let mut commit_files: Vec<_> = self.commit_files().cloned().collect(); + // NOTE this will already sort in reverse order commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location)); let json_client = table_client.get_json_handler(); - let read_contexts = json_client.contextualize_file_reads(commit_files, None)?; + let read_contexts = + json_client.contextualize_file_reads(commit_files, predicate.clone())?; let commit_stream = json_client - .read_json_files(read_contexts, Arc::new(read_schema.clone().try_into()?))?; + .read_json_files(read_contexts, Arc::new(read_schema.as_ref().try_into()?))?; let parquet_client = table_client.get_parquet_handler(); let read_contexts = - parquet_client.contextualize_file_reads(self.checkpoint_files.clone(), None)?; + parquet_client.contextualize_file_reads(self.checkpoint_files.clone(), predicate)?; let checkpoint_stream = parquet_client - .read_parquet_files(read_contexts, Arc::new(read_schema.clone().try_into()?))?; + .read_parquet_files(read_contexts, Arc::new(read_schema.as_ref().try_into()?))?; - let batches = commit_stream - .chain(checkpoint_stream) - .try_collect::>() - .await?; + let batches = commit_stream.chain(checkpoint_stream); Ok(batches) } - async fn read_metadata( + fn read_metadata( &self, table_client: &dyn TableClient, ) -> DeltaResult> { - let batches = self.replay(table_client).await?; + let read_schema = Arc::new(ArrowSchema { + fields: Fields::from_iter([ActionType::Metadata.field(), ActionType::Protocol.field()]), + metadata: Default::default(), + }); + + let batches = self.replay(table_client, read_schema, None)?; let mut metadata_opt = None; let mut protocol_opt = None; for batch in batches { + let batch = batch?; if let Ok(mut metas) = parse_action(&batch, &ActionType::Metadata) { if let Some(Action::Metadata(meta)) = metas.next() { metadata_opt = Some(meta.clone()); @@ -214,7 +214,7 @@ impl Snapshot { self.version } - async fn get_or_insert_metadata(&self) -> DeltaResult<(Metadata, Protocol)> { + fn get_or_insert_metadata(&self) -> DeltaResult<(Metadata, Protocol)> { { let read_lock = self .metadata @@ -227,8 +227,7 @@ impl Snapshot { let (metadata, protocol) = self .log_segment - .read_metadata(self.table_client.as_ref()) - .await? + .read_metadata(self.table_client.as_ref())? .ok_or(Error::MissingMetadata)?; let mut meta = self .metadata @@ -240,23 +239,23 @@ impl Snapshot { } /// Table [`Schema`] at this [`Snapshot`]s version. - pub async fn schema(&self) -> DeltaResult { - self.metadata().await?.schema() + pub fn schema(&self) -> DeltaResult { + self.metadata()?.schema() } /// Table [`Metadata`] at this [`Snapshot`]s version. - pub async fn metadata(&self) -> DeltaResult { - let (metadata, _) = self.get_or_insert_metadata().await?; + pub fn metadata(&self) -> DeltaResult { + let (metadata, _) = self.get_or_insert_metadata()?; Ok(metadata) } - pub async fn protocol(&self) -> DeltaResult { - let (_, protocol) = self.get_or_insert_metadata().await?; + pub fn protocol(&self) -> DeltaResult { + let (_, protocol) = self.get_or_insert_metadata()?; Ok(protocol) } - pub async fn scan(self) -> DeltaResult> { - let schema = Arc::new(self.schema().await?); + pub fn scan(self) -> DeltaResult> { + let schema = Arc::new(self.schema()?); Ok(ScanBuilder::new( self.table_root, schema, @@ -293,11 +292,11 @@ fn read_last_checkpoint( log_root: &Url, ) -> DeltaResult> { let file_path = LogPath(log_root).child(LAST_CHECKPOINT_FILE_NAME)?; - match fs_client.read_files(vec![(file_path, None)]) { - Ok(data) => match data.first() { - Some(data) => Ok(Some(serde_json::from_slice(data)?)), - None => Ok(None), - }, + match fs_client + .read_files(vec![(file_path, None)]) + .and_then(|mut data| data.next().expect("read_files should return one file")) + { + Ok(data) => Ok(Some(serde_json::from_slice(&data)?)), Err(Error::FileNotFound(_)) => Ok(None), Err(err) => Err(err), } @@ -419,8 +418,8 @@ mod tests { ) } - #[tokio::test] - async fn test_snapshot_read_metadata() { + #[test] + fn test_snapshot_read_metadata() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -428,7 +427,7 @@ mod tests { let client = default_table_client(&url); let snapshot = Snapshot::try_new(url, client, Some(1)).unwrap(); - let protocol = snapshot.protocol().await.unwrap(); + let protocol = snapshot.protocol().unwrap(); let expected = Protocol { min_reader_version: 3, min_writer_version: 7, @@ -439,12 +438,12 @@ mod tests { let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#; let expected: StructType = serde_json::from_str(schema_string).unwrap(); - let schema = snapshot.schema().await.unwrap(); + let schema = snapshot.schema().unwrap(); assert_eq!(schema, expected); } - #[tokio::test] - async fn test_new_snapshot() { + #[test] + fn test_new_snapshot() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -452,7 +451,7 @@ mod tests { let client = default_table_client(&url); let snapshot = Snapshot::try_new(url, client, None).unwrap(); - let protocol = snapshot.protocol().await.unwrap(); + let protocol = snapshot.protocol().unwrap(); let expected = Protocol { min_reader_version: 3, min_writer_version: 7, @@ -463,7 +462,7 @@ mod tests { let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#; let expected: StructType = serde_json::from_str(schema_string).unwrap(); - let schema = snapshot.schema().await.unwrap(); + let schema = snapshot.schema().unwrap(); assert_eq!(schema, expected); } diff --git a/kernel/tests/dv.rs b/kernel/tests/dv.rs index 344b5e12..848bbcce 100644 --- a/kernel/tests/dv.rs +++ b/kernel/tests/dv.rs @@ -7,8 +7,8 @@ use deltakernel::client::DefaultTableClient; use deltakernel::executor::tokio::TokioBackgroundExecutor; use deltakernel::Table; -#[tokio::test] -async fn dv_table() -> Result<(), Box> { +#[test] +fn dv_table() -> Result<(), Box> { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?; let url = url::Url::from_directory_path(path).unwrap(); let table_client = Arc::new(DefaultTableClient::try_new( @@ -19,9 +19,9 @@ async fn dv_table() -> Result<(), Box> { let table = Table::new(url, table_client); let snapshot = table.snapshot(None)?; - let scan = snapshot.scan().await?.build(); + let scan = snapshot.scan()?.build(); - let stream = scan.execute().await?.into_iter(); + let stream = scan.execute()?; for batch in stream { let rows = batch.num_rows(); arrow::util::pretty::print_batches(&[batch])?; @@ -30,8 +30,8 @@ async fn dv_table() -> Result<(), Box> { Ok(()) } -#[tokio::test] -async fn non_dv_table() -> Result<(), Box> { +#[test] +fn non_dv_table() -> Result<(), Box> { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/"))?; let url = url::Url::from_directory_path(path).unwrap(); let table_client = Arc::new(DefaultTableClient::try_new( @@ -42,9 +42,9 @@ async fn non_dv_table() -> Result<(), Box> { let table = Table::new(url, table_client); let snapshot = table.snapshot(None)?; - let scan = snapshot.scan().await?.build(); + let scan = snapshot.scan()?.build(); - let stream = scan.execute().await?.into_iter(); + let stream = scan.execute()?; for batch in stream { let rows = batch.num_rows(); arrow::util::pretty::print_batches(&[batch]).unwrap(); diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index a73b15b9..94aaa5c9 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -97,10 +97,10 @@ async fn single_commit_two_add_files() -> Result<(), Box> let expected_data = vec![batch.clone(), batch]; let snapshot = table.snapshot(None)?; - let scan = snapshot.scan().await?.build(); + let scan = snapshot.scan()?.build(); let mut files = 0; - let stream = scan.execute().await?.into_iter().zip(expected_data); + let stream = scan.execute()?.into_iter().zip(expected_data); for (data, expected) in stream { files += 1; @@ -147,10 +147,10 @@ async fn two_commits() -> Result<(), Box> { let expected_data = vec![batch.clone(), batch]; let snapshot = table.snapshot(None).unwrap(); - let scan = snapshot.scan().await?.build(); + let scan = snapshot.scan()?.build(); let mut files = 0; - let stream = scan.execute().await?.into_iter().zip(expected_data); + let stream = scan.execute()?.into_iter().zip(expected_data); for (data, expected) in stream { files += 1; @@ -201,9 +201,9 @@ async fn remove_action() -> Result<(), Box> { let expected_data = vec![batch]; let snapshot = table.snapshot(None)?; - let scan = snapshot.scan().await?.build(); + let scan = snapshot.scan()?.build(); - let stream = scan.execute().await?.into_iter().zip(expected_data); + let stream = scan.execute()?.into_iter().zip(expected_data); let mut files = 0; for (data, expected) in stream { @@ -266,10 +266,10 @@ async fn stats() -> Result<(), Box> { Box::new(Expression::Column(String::from("ids"))), Box::new(Expression::Literal(2)), ); - let scan = snapshot.scan().await?.with_predicate(predicate).build(); + let scan = snapshot.scan()?.with_predicate(predicate).build(); let mut files = 0; - let stream = scan.execute().await?.into_iter().zip(expected_data); + let stream = scan.execute()?.into_iter().zip(expected_data); for (data, expected) in stream { files += 1; From 61ae63d031f40fb5d96a5b3a5e5979181748a77d Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 17 Sep 2023 16:24:33 -0700 Subject: [PATCH 2/4] refactor: make object_store optional --- .github/workflows/build.yml | 2 + kernel/Cargo.toml | 5 +- kernel/src/actions/mod.rs | 1 + kernel/src/client/arrow.rs | 275 ------------------------------------ kernel/src/client/mod.rs | 1 - kernel/src/error.rs | 3 + 6 files changed, 8 insertions(+), 279 deletions(-) delete mode 100644 kernel/src/client/arrow.rs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 773598b4..392d713b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -32,6 +32,8 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: build and lint with clippy run: cargo clippy --tests -- -D warnings + - name: lint without default features + run: cargo clippy --no-default--features -- -D warnings test: runs-on: ubuntu-latest steps: diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 760321cf..1ba893a5 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -21,8 +21,6 @@ either = "1.8" fix-hidden-lifetime-bug = "0.2" itertools = "0.11" lazy_static = "1.4" -# used for providing a storage abstraction layer -object_store = "^0.7.0" # need to generalize over arrow, arrow2 and diff parquet etc. (BYOP) regex = "1.8" roaring = "0.10.1" @@ -37,6 +35,7 @@ z85 = "3.0.5" # Used in default client futures = { version = "0.3", optional = true } +object_store = { version = "^0.7.0", optional = true } parquet = { version = "^46.0", optional = true, features=["async", "object_store"]} # optionally used with default client (though not required) @@ -44,7 +43,7 @@ tokio = { version = "1", optional = true, features=["rt-multi-thread"] } [features] default = ["default-client"] -default-client = ["chrono", "parquet", "futures"] +default-client = ["chrono", "futures", "object_store", "parquet"] [dev-dependencies] arrow = { version = "^46.0", features = ["json", "prettyprint"] } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 5cd7fd0e..f22da04a 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -10,6 +10,7 @@ use itertools::izip; use crate::{DeltaResult, Error}; +pub(crate) mod arrow; pub(crate) mod schemas; pub(crate) mod types; diff --git a/kernel/src/client/arrow.rs b/kernel/src/client/arrow.rs deleted file mode 100644 index 5408eb8c..00000000 --- a/kernel/src/client/arrow.rs +++ /dev/null @@ -1,275 +0,0 @@ -use std::sync::Arc; - -use arrow_schema::{ - ArrowError, DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, - SchemaRef as ArrowSchemaRef, TimeUnit, -}; - -use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField, StructType}; - -impl TryFrom<&StructType> for ArrowSchema { - type Error = ArrowError; - - fn try_from(s: &StructType) -> Result { - let fields = s - .fields() - .iter() - .map(|f| >::try_from(*f)) - .collect::, ArrowError>>()?; - - Ok(ArrowSchema::new(fields)) - } -} - -impl TryFrom<&StructField> for ArrowField { - type Error = ArrowError; - - fn try_from(f: &StructField) -> Result { - let metadata = f - .metadata() - .iter() - .map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?))) - .collect::>() - .map_err(|err| ArrowError::JsonError(err.to_string()))?; - - let field = ArrowField::new( - f.name(), - ArrowDataType::try_from(f.data_type())?, - f.is_nullable(), - ) - .with_metadata(metadata); - - Ok(field) - } -} - -impl TryFrom<&ArrayType> for ArrowField { - type Error = ArrowError; - - fn try_from(a: &ArrayType) -> Result { - Ok(ArrowField::new( - "item", - ArrowDataType::try_from(a.element_type())?, - a.contains_null(), - )) - } -} - -impl TryFrom<&MapType> for ArrowField { - type Error = ArrowError; - - fn try_from(a: &MapType) -> Result { - Ok(ArrowField::new( - "entries", - ArrowDataType::Struct( - vec![ - ArrowField::new("key", ArrowDataType::try_from(a.key_type())?, false), - ArrowField::new( - "value", - ArrowDataType::try_from(a.value_type())?, - a.value_contains_null(), - ), - ] - .into(), - ), - false, // always non-null - )) - } -} - -impl TryFrom<&DataType> for ArrowDataType { - type Error = ArrowError; - - fn try_from(t: &DataType) -> Result { - match t { - DataType::Primitive(p) => { - match p { - PrimitiveType::String => Ok(ArrowDataType::Utf8), - PrimitiveType::Long => Ok(ArrowDataType::Int64), // undocumented type - PrimitiveType::Integer => Ok(ArrowDataType::Int32), - PrimitiveType::Short => Ok(ArrowDataType::Int16), - PrimitiveType::Byte => Ok(ArrowDataType::Int8), - PrimitiveType::Float => Ok(ArrowDataType::Float32), - PrimitiveType::Double => Ok(ArrowDataType::Float64), - PrimitiveType::Boolean => Ok(ArrowDataType::Boolean), - PrimitiveType::Binary => Ok(ArrowDataType::Binary), - PrimitiveType::Decimal(precision, scale) => { - let precision = u8::try_from(*precision).map_err(|_| { - ArrowError::SchemaError(format!( - "Invalid precision for decimal: {}", - precision - )) - })?; - let scale = i8::try_from(*scale).map_err(|_| { - ArrowError::SchemaError(format!("Invalid scale for decimal: {}", scale)) - })?; - - if precision <= 38 { - Ok(ArrowDataType::Decimal128(precision, scale)) - } else if precision <= 76 { - Ok(ArrowDataType::Decimal256(precision, scale)) - } else { - Err(ArrowError::SchemaError(format!( - "Precision too large to be represented in Arrow: {}", - precision - ))) - } - } - PrimitiveType::Date => { - // A calendar date, represented as a year-month-day triple without a - // timezone. Stored as 4 bytes integer representing days since 1970-01-01 - Ok(ArrowDataType::Date32) - } - PrimitiveType::Timestamp => { - // Issue: https://github.com/delta-io/delta/issues/643 - Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)) - } - } - } - DataType::Struct(s) => Ok(ArrowDataType::Struct( - s.fields() - .iter() - .map(|f| >::try_from(*f)) - .collect::, ArrowError>>()? - .into(), - )), - DataType::Array(a) => Ok(ArrowDataType::List(Arc::new(>::try_from(a)?))), - DataType::Map(m) => Ok(ArrowDataType::Map( - Arc::new(ArrowField::new( - "entries", - ArrowDataType::Struct( - vec![ - ArrowField::new( - "key", - >::try_from(m.key_type())?, - false, - ), - ArrowField::new( - "value", - >::try_from(m.value_type())?, - m.value_contains_null(), - ), - ] - .into(), - ), - true, - )), - false, - )), - } - } -} - -impl TryFrom<&ArrowSchema> for StructType { - type Error = ArrowError; - - fn try_from(arrow_schema: &ArrowSchema) -> Result { - let new_fields: Result, _> = arrow_schema - .fields() - .iter() - .map(|field| field.as_ref().try_into()) - .collect(); - Ok(StructType::new(new_fields?)) - } -} - -impl TryFrom for StructType { - type Error = ArrowError; - - fn try_from(arrow_schema: ArrowSchemaRef) -> Result { - arrow_schema.as_ref().try_into() - } -} - -impl TryFrom<&ArrowField> for StructField { - type Error = ArrowError; - - fn try_from(arrow_field: &ArrowField) -> Result { - Ok(StructField::new( - arrow_field.name().clone(), - arrow_field.data_type().try_into()?, - arrow_field.is_nullable(), - ) - .with_metadata(arrow_field.metadata().iter().map(|(k, v)| (k.clone(), v)))) - } -} - -impl TryFrom<&ArrowDataType> for DataType { - type Error = ArrowError; - - fn try_from(arrow_datatype: &ArrowDataType) -> Result { - match arrow_datatype { - ArrowDataType::Utf8 => Ok(DataType::Primitive(PrimitiveType::String)), - ArrowDataType::LargeUtf8 => Ok(DataType::Primitive(PrimitiveType::String)), - ArrowDataType::Int64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type - ArrowDataType::Int32 => Ok(DataType::Primitive(PrimitiveType::Integer)), - ArrowDataType::Int16 => Ok(DataType::Primitive(PrimitiveType::Short)), - ArrowDataType::Int8 => Ok(DataType::Primitive(PrimitiveType::Byte)), - ArrowDataType::UInt64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type - ArrowDataType::UInt32 => Ok(DataType::Primitive(PrimitiveType::Integer)), - ArrowDataType::UInt16 => Ok(DataType::Primitive(PrimitiveType::Short)), - ArrowDataType::UInt8 => Ok(DataType::Primitive(PrimitiveType::Boolean)), - ArrowDataType::Float32 => Ok(DataType::Primitive(PrimitiveType::Float)), - ArrowDataType::Float64 => Ok(DataType::Primitive(PrimitiveType::Double)), - ArrowDataType::Boolean => Ok(DataType::Primitive(PrimitiveType::Boolean)), - ArrowDataType::Binary => Ok(DataType::Primitive(PrimitiveType::Binary)), - ArrowDataType::FixedSizeBinary(_) => Ok(DataType::Primitive(PrimitiveType::Binary)), - ArrowDataType::LargeBinary => Ok(DataType::Primitive(PrimitiveType::Binary)), - ArrowDataType::Decimal128(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal( - *p as i32, *s as i32, - ))), - ArrowDataType::Decimal256(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal( - *p as i32, *s as i32, - ))), - ArrowDataType::Date32 => Ok(DataType::Primitive(PrimitiveType::Date)), - ArrowDataType::Date64 => Ok(DataType::Primitive(PrimitiveType::Date)), - ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => { - Ok(DataType::Primitive(PrimitiveType::Timestamp)) - } - ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz)) - if tz.eq_ignore_ascii_case("utc") => - { - Ok(DataType::Primitive(PrimitiveType::Timestamp)) - } - ArrowDataType::Struct(fields) => { - let converted_fields: Result, _> = fields - .iter() - .map(|field| field.as_ref().try_into()) - .collect(); - Ok(DataType::Struct(Box::new(StructType::new( - converted_fields?, - )))) - } - ArrowDataType::List(field) => Ok(DataType::Array(Box::new(ArrayType::new( - (*field).data_type().try_into()?, - (*field).is_nullable(), - )))), - ArrowDataType::LargeList(field) => Ok(DataType::Array(Box::new(ArrayType::new( - (*field).data_type().try_into()?, - (*field).is_nullable(), - )))), - ArrowDataType::FixedSizeList(field, _) => Ok(DataType::Array(Box::new( - ArrayType::new((*field).data_type().try_into()?, (*field).is_nullable()), - ))), - ArrowDataType::Map(field, _) => { - if let ArrowDataType::Struct(struct_fields) = field.data_type() { - let key_type = struct_fields[0].data_type().try_into()?; - let value_type = struct_fields[1].data_type().try_into()?; - let value_type_nullable = struct_fields[1].is_nullable(); - Ok(DataType::Map(Box::new(MapType::new( - key_type, - value_type, - value_type_nullable, - )))) - } else { - panic!("DataType::Map should contain a struct field child"); - } - } - s => Err(ArrowError::SchemaError(format!( - "Invalid data type for Delta Lake: {s}" - ))), - } - } -} diff --git a/kernel/src/client/mod.rs b/kernel/src/client/mod.rs index a88d47f4..a4a5fc70 100644 --- a/kernel/src/client/mod.rs +++ b/kernel/src/client/mod.rs @@ -13,7 +13,6 @@ use crate::{ DeltaResult, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler, TableClient, }; -pub mod arrow; pub mod executor; pub mod file_handler; pub mod filesystem; diff --git a/kernel/src/error.rs b/kernel/src/error.rs index 4c172103..89442127 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -14,9 +14,11 @@ pub enum Error { source: Box, }, + #[cfg(feature = "parquet")] #[error("Arrow error: {0}")] Parquet(#[from] parquet::errors::ParquetError), + #[cfg(feature = "object_store")] #[error("Error interacting with object store: {0}")] ObjectStore(object_store::Error), @@ -48,6 +50,7 @@ pub enum Error { MissingMetadata, } +#[cfg(feature = "object_store")] impl From for Error { fn from(value: object_store::Error) -> Self { match value { From 63fe9af7e318ffc11b05d513e3a3b3cbe19574c0 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 17 Sep 2023 16:29:22 -0700 Subject: [PATCH 3/4] cleanup --- .github/workflows/build.yml | 2 +- kernel/src/actions/arrow.rs | 275 +++++++++++++++++++++++++++++++++ kernel/src/scan/file_stream.rs | 22 +-- 3 files changed, 283 insertions(+), 16 deletions(-) create mode 100644 kernel/src/actions/arrow.rs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 392d713b..d408147c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -33,7 +33,7 @@ jobs: - name: build and lint with clippy run: cargo clippy --tests -- -D warnings - name: lint without default features - run: cargo clippy --no-default--features -- -D warnings + run: cargo clippy --no-default-features -- -D warnings test: runs-on: ubuntu-latest steps: diff --git a/kernel/src/actions/arrow.rs b/kernel/src/actions/arrow.rs new file mode 100644 index 00000000..5408eb8c --- /dev/null +++ b/kernel/src/actions/arrow.rs @@ -0,0 +1,275 @@ +use std::sync::Arc; + +use arrow_schema::{ + ArrowError, DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, + SchemaRef as ArrowSchemaRef, TimeUnit, +}; + +use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField, StructType}; + +impl TryFrom<&StructType> for ArrowSchema { + type Error = ArrowError; + + fn try_from(s: &StructType) -> Result { + let fields = s + .fields() + .iter() + .map(|f| >::try_from(*f)) + .collect::, ArrowError>>()?; + + Ok(ArrowSchema::new(fields)) + } +} + +impl TryFrom<&StructField> for ArrowField { + type Error = ArrowError; + + fn try_from(f: &StructField) -> Result { + let metadata = f + .metadata() + .iter() + .map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?))) + .collect::>() + .map_err(|err| ArrowError::JsonError(err.to_string()))?; + + let field = ArrowField::new( + f.name(), + ArrowDataType::try_from(f.data_type())?, + f.is_nullable(), + ) + .with_metadata(metadata); + + Ok(field) + } +} + +impl TryFrom<&ArrayType> for ArrowField { + type Error = ArrowError; + + fn try_from(a: &ArrayType) -> Result { + Ok(ArrowField::new( + "item", + ArrowDataType::try_from(a.element_type())?, + a.contains_null(), + )) + } +} + +impl TryFrom<&MapType> for ArrowField { + type Error = ArrowError; + + fn try_from(a: &MapType) -> Result { + Ok(ArrowField::new( + "entries", + ArrowDataType::Struct( + vec![ + ArrowField::new("key", ArrowDataType::try_from(a.key_type())?, false), + ArrowField::new( + "value", + ArrowDataType::try_from(a.value_type())?, + a.value_contains_null(), + ), + ] + .into(), + ), + false, // always non-null + )) + } +} + +impl TryFrom<&DataType> for ArrowDataType { + type Error = ArrowError; + + fn try_from(t: &DataType) -> Result { + match t { + DataType::Primitive(p) => { + match p { + PrimitiveType::String => Ok(ArrowDataType::Utf8), + PrimitiveType::Long => Ok(ArrowDataType::Int64), // undocumented type + PrimitiveType::Integer => Ok(ArrowDataType::Int32), + PrimitiveType::Short => Ok(ArrowDataType::Int16), + PrimitiveType::Byte => Ok(ArrowDataType::Int8), + PrimitiveType::Float => Ok(ArrowDataType::Float32), + PrimitiveType::Double => Ok(ArrowDataType::Float64), + PrimitiveType::Boolean => Ok(ArrowDataType::Boolean), + PrimitiveType::Binary => Ok(ArrowDataType::Binary), + PrimitiveType::Decimal(precision, scale) => { + let precision = u8::try_from(*precision).map_err(|_| { + ArrowError::SchemaError(format!( + "Invalid precision for decimal: {}", + precision + )) + })?; + let scale = i8::try_from(*scale).map_err(|_| { + ArrowError::SchemaError(format!("Invalid scale for decimal: {}", scale)) + })?; + + if precision <= 38 { + Ok(ArrowDataType::Decimal128(precision, scale)) + } else if precision <= 76 { + Ok(ArrowDataType::Decimal256(precision, scale)) + } else { + Err(ArrowError::SchemaError(format!( + "Precision too large to be represented in Arrow: {}", + precision + ))) + } + } + PrimitiveType::Date => { + // A calendar date, represented as a year-month-day triple without a + // timezone. Stored as 4 bytes integer representing days since 1970-01-01 + Ok(ArrowDataType::Date32) + } + PrimitiveType::Timestamp => { + // Issue: https://github.com/delta-io/delta/issues/643 + Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)) + } + } + } + DataType::Struct(s) => Ok(ArrowDataType::Struct( + s.fields() + .iter() + .map(|f| >::try_from(*f)) + .collect::, ArrowError>>()? + .into(), + )), + DataType::Array(a) => Ok(ArrowDataType::List(Arc::new(>::try_from(a)?))), + DataType::Map(m) => Ok(ArrowDataType::Map( + Arc::new(ArrowField::new( + "entries", + ArrowDataType::Struct( + vec![ + ArrowField::new( + "key", + >::try_from(m.key_type())?, + false, + ), + ArrowField::new( + "value", + >::try_from(m.value_type())?, + m.value_contains_null(), + ), + ] + .into(), + ), + true, + )), + false, + )), + } + } +} + +impl TryFrom<&ArrowSchema> for StructType { + type Error = ArrowError; + + fn try_from(arrow_schema: &ArrowSchema) -> Result { + let new_fields: Result, _> = arrow_schema + .fields() + .iter() + .map(|field| field.as_ref().try_into()) + .collect(); + Ok(StructType::new(new_fields?)) + } +} + +impl TryFrom for StructType { + type Error = ArrowError; + + fn try_from(arrow_schema: ArrowSchemaRef) -> Result { + arrow_schema.as_ref().try_into() + } +} + +impl TryFrom<&ArrowField> for StructField { + type Error = ArrowError; + + fn try_from(arrow_field: &ArrowField) -> Result { + Ok(StructField::new( + arrow_field.name().clone(), + arrow_field.data_type().try_into()?, + arrow_field.is_nullable(), + ) + .with_metadata(arrow_field.metadata().iter().map(|(k, v)| (k.clone(), v)))) + } +} + +impl TryFrom<&ArrowDataType> for DataType { + type Error = ArrowError; + + fn try_from(arrow_datatype: &ArrowDataType) -> Result { + match arrow_datatype { + ArrowDataType::Utf8 => Ok(DataType::Primitive(PrimitiveType::String)), + ArrowDataType::LargeUtf8 => Ok(DataType::Primitive(PrimitiveType::String)), + ArrowDataType::Int64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type + ArrowDataType::Int32 => Ok(DataType::Primitive(PrimitiveType::Integer)), + ArrowDataType::Int16 => Ok(DataType::Primitive(PrimitiveType::Short)), + ArrowDataType::Int8 => Ok(DataType::Primitive(PrimitiveType::Byte)), + ArrowDataType::UInt64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type + ArrowDataType::UInt32 => Ok(DataType::Primitive(PrimitiveType::Integer)), + ArrowDataType::UInt16 => Ok(DataType::Primitive(PrimitiveType::Short)), + ArrowDataType::UInt8 => Ok(DataType::Primitive(PrimitiveType::Boolean)), + ArrowDataType::Float32 => Ok(DataType::Primitive(PrimitiveType::Float)), + ArrowDataType::Float64 => Ok(DataType::Primitive(PrimitiveType::Double)), + ArrowDataType::Boolean => Ok(DataType::Primitive(PrimitiveType::Boolean)), + ArrowDataType::Binary => Ok(DataType::Primitive(PrimitiveType::Binary)), + ArrowDataType::FixedSizeBinary(_) => Ok(DataType::Primitive(PrimitiveType::Binary)), + ArrowDataType::LargeBinary => Ok(DataType::Primitive(PrimitiveType::Binary)), + ArrowDataType::Decimal128(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal( + *p as i32, *s as i32, + ))), + ArrowDataType::Decimal256(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal( + *p as i32, *s as i32, + ))), + ArrowDataType::Date32 => Ok(DataType::Primitive(PrimitiveType::Date)), + ArrowDataType::Date64 => Ok(DataType::Primitive(PrimitiveType::Date)), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => { + Ok(DataType::Primitive(PrimitiveType::Timestamp)) + } + ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz)) + if tz.eq_ignore_ascii_case("utc") => + { + Ok(DataType::Primitive(PrimitiveType::Timestamp)) + } + ArrowDataType::Struct(fields) => { + let converted_fields: Result, _> = fields + .iter() + .map(|field| field.as_ref().try_into()) + .collect(); + Ok(DataType::Struct(Box::new(StructType::new( + converted_fields?, + )))) + } + ArrowDataType::List(field) => Ok(DataType::Array(Box::new(ArrayType::new( + (*field).data_type().try_into()?, + (*field).is_nullable(), + )))), + ArrowDataType::LargeList(field) => Ok(DataType::Array(Box::new(ArrayType::new( + (*field).data_type().try_into()?, + (*field).is_nullable(), + )))), + ArrowDataType::FixedSizeList(field, _) => Ok(DataType::Array(Box::new( + ArrayType::new((*field).data_type().try_into()?, (*field).is_nullable()), + ))), + ArrowDataType::Map(field, _) => { + if let ArrowDataType::Struct(struct_fields) = field.data_type() { + let key_type = struct_fields[0].data_type().try_into()?; + let value_type = struct_fields[1].data_type().try_into()?; + let value_type_nullable = struct_fields[1].is_nullable(); + Ok(DataType::Map(Box::new(MapType::new( + key_type, + value_type, + value_type_nullable, + )))) + } else { + panic!("DataType::Map should contain a struct field child"); + } + } + s => Err(ArrowError::SchemaError(format!( + "Invalid data type for Delta Lake: {s}" + ))), + } + } +} diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index a52e0a5e..a47c8782 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -5,6 +5,7 @@ use crate::actions::{parse_actions, Action, ActionType, Add}; use crate::expressions::Expression; use crate::DeltaResult; use arrow_array::RecordBatch; +use either::Either; struct LogReplayScanner { predicate: Option, @@ -70,20 +71,11 @@ pub fn log_replay_iter( ) -> impl Iterator> { let mut log_scanner = LogReplayScanner::new(predicate); - action_iter.flat_map( - move |actions| -> Box>> { - match actions { - Ok(actions) => match log_scanner.process_batch(actions) { - Ok(adds) => Box::new(adds.into_iter().map(Ok)), - Err(err) => Box::new(std::iter::once(Err(err))), - }, - Err(err) => Box::new(std::iter::once(Err(err))), - } + action_iter.flat_map(move |actions| match actions { + Ok(actions) => match log_scanner.process_batch(actions) { + Ok(adds) => Either::Left(adds.into_iter().map(Ok)), + Err(err) => Either::Right(std::iter::once(Err(err))), }, - ) -} - -#[cfg(test)] -mod tests { - // TODO: add tests + Err(err) => Either::Right(std::iter::once(Err(err))), + }) } From ca2f1e71f0a21696af9b071cf54af35c1cadc6c9 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 17 Sep 2023 16:49:28 -0700 Subject: [PATCH 4/4] docs: better docs and comments --- kernel/src/client/filesystem.rs | 9 +++++++-- kernel/src/client/json.rs | 7 ++++++- kernel/src/client/mod.rs | 8 ++++++++ kernel/src/client/parquet.rs | 9 +++++++-- kernel/src/snapshot.rs | 8 ++++++++ 5 files changed, 36 insertions(+), 5 deletions(-) diff --git a/kernel/src/client/filesystem.rs b/kernel/src/client/filesystem.rs index 925681a2..9ec64e6a 100644 --- a/kernel/src/client/filesystem.rs +++ b/kernel/src/client/filesystem.rs @@ -101,8 +101,10 @@ impl FileSystemClient for ObjectStoreFileSystemClient { ) -> DeltaResult>>> { let store = self.inner.clone(); - // This channel will become the iterator - let (sender, receiver) = std::sync::mpsc::sync_channel(20); + // This channel will become the output iterator. + // Because there will already be buffering in the stream, we set the + // buffer size to 0. + let (sender, receiver) = std::sync::mpsc::sync_channel(0); self.task_executor.spawn( futures::stream::iter(files) @@ -118,6 +120,9 @@ impl FileSystemClient for ObjectStoreFileSystemClient { } } }) + // We allow executing up to `readahead` futures concurrently and + // buffer the results. This allows us to achieve async concurrency + // within a synchronous method. .buffered(self.readahead) .for_each(move |res| { sender.send(res.map_err(Error::from)).ok(); diff --git a/kernel/src/client/json.rs b/kernel/src/client/json.rs index 7f86f26e..7bd406fb 100644 --- a/kernel/src/client/json.rs +++ b/kernel/src/client/json.rs @@ -44,6 +44,9 @@ impl DefaultJsonHandler { } } + /// Set the maximum number of batches to read ahead during [Self::read_json_files()]. + /// + /// Defaults to 10. pub fn with_readahead(mut self, readahead: usize) -> Self { self.readahead = readahead; self @@ -111,7 +114,9 @@ impl JsonHandler for DefaultJsonHandler { let files = files.into_iter().map(|f| f.meta).collect::>(); let stream = FileStream::new(files, schema, file_reader)?; - // This channel will become the iterator + // This channel will become the output iterator + // The stream will execute in the background, and we allow up to `readahead` + // batches to be buffered in the channel. let (sender, receiver) = std::sync::mpsc::sync_channel(self.readahead); self.task_executor.spawn(stream.for_each(move |res| { diff --git a/kernel/src/client/mod.rs b/kernel/src/client/mod.rs index a4a5fc70..0a84e1c9 100644 --- a/kernel/src/client/mod.rs +++ b/kernel/src/client/mod.rs @@ -1,4 +1,12 @@ //! # Default TableClient +//! +//! The default implementation of [`TableClient`] is [`DefaultTableClient`]. +//! This uses the [object_store], [parquet][::parquet], and [arrow_json] crates +//! to read and write data. +//! +//! The underlying implementations use asynchronous IO. Async tasks are run on +//! a separate thread pool, provided by the [`TaskExecutor`] trait. Read more in +//! the [executor] module. use std::sync::Arc; diff --git a/kernel/src/client/parquet.rs b/kernel/src/client/parquet.rs index 24dd5d0b..dcdc4a7b 100644 --- a/kernel/src/client/parquet.rs +++ b/kernel/src/client/parquet.rs @@ -41,7 +41,9 @@ impl DefaultParquetHandler { } } - /// Max number of batches to read asynchronously. + /// Max number of batches to read ahead while executing [Self::read_parquet_files()]. + /// + /// Defaults to 10. pub fn with_readahead(mut self, readahead: usize) -> Self { self.readahead = readahead; self @@ -80,7 +82,10 @@ impl ParquetHandler for DefaultParquetHandler { let files = files.into_iter().map(|f| f.meta).collect::>(); let stream = FileStream::new(files, schema, file_reader)?; - // This channel will become the iterator + // This channel will become the output iterator. + // The stream will execute in the background and send results to this channel. + // The channel will buffer up to `readahead` results, allowing the background + // stream to get ahead of the consumer. let (sender, receiver) = std::sync::mpsc::sync_channel(self.readahead); self.task_executor.spawn(stream.for_each(move |res| { diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index ad175227..9b945356 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -35,6 +35,14 @@ impl LogSegment { self.commit_files.iter() } + /// Read a stream of log data from this log segment. + /// + /// The log files will be read from most recent to oldest. + /// + /// `read_schema` is the schema to read the log files with. This can be used + /// to project the log files to a subset of the columns. + /// + /// `predicate` is an optional expression to filter the log files with. pub fn replay( &self, table_client: &dyn TableClient,