Skip to content

Commit

Permalink
docs: better docs and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Sep 17, 2023
1 parent 63fe9af commit ca2f1e7
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 5 deletions.
9 changes: 7 additions & 2 deletions kernel/src/client/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>> {
let store = self.inner.clone();

// This channel will become the iterator
let (sender, receiver) = std::sync::mpsc::sync_channel(20);
// This channel will become the output iterator.
// Because there will already be buffering in the stream, we set the
// buffer size to 0.
let (sender, receiver) = std::sync::mpsc::sync_channel(0);

self.task_executor.spawn(
futures::stream::iter(files)
Expand All @@ -118,6 +120,9 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
}
}
})
// We allow executing up to `readahead` futures concurrently and
// buffer the results. This allows us to achieve async concurrency
// within a synchronous method.
.buffered(self.readahead)
.for_each(move |res| {
sender.send(res.map_err(Error::from)).ok();
Expand Down
7 changes: 6 additions & 1 deletion kernel/src/client/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ impl<E: TaskExecutor> DefaultJsonHandler<E> {
}
}

/// Set the maximum number of batches to read ahead during [Self::read_json_files()].
///
/// Defaults to 10.
pub fn with_readahead(mut self, readahead: usize) -> Self {
self.readahead = readahead;
self
Expand Down Expand Up @@ -111,7 +114,9 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
let files = files.into_iter().map(|f| f.meta).collect::<Vec<_>>();
let stream = FileStream::new(files, schema, file_reader)?;

// This channel will become the iterator
// This channel will become the output iterator
// The stream will execute in the background, and we allow up to `readahead`
// batches to be buffered in the channel.
let (sender, receiver) = std::sync::mpsc::sync_channel(self.readahead);

self.task_executor.spawn(stream.for_each(move |res| {
Expand Down
8 changes: 8 additions & 0 deletions kernel/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
//! # Default TableClient
//!
//! The default implementation of [`TableClient`] is [`DefaultTableClient`].
//! This uses the [object_store], [parquet][::parquet], and [arrow_json] crates
//! to read and write data.
//!
//! The underlying implementations use asynchronous IO. Async tasks are run on
//! a separate thread pool, provided by the [`TaskExecutor`] trait. Read more in
//! the [executor] module.

use std::sync::Arc;

Expand Down
9 changes: 7 additions & 2 deletions kernel/src/client/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
}
}

/// Max number of batches to read asynchronously.
/// Max number of batches to read ahead while executing [Self::read_parquet_files()].
///
/// Defaults to 10.
pub fn with_readahead(mut self, readahead: usize) -> Self {
self.readahead = readahead;
self
Expand Down Expand Up @@ -80,7 +82,10 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
let files = files.into_iter().map(|f| f.meta).collect::<Vec<_>>();
let stream = FileStream::new(files, schema, file_reader)?;

// This channel will become the iterator
// This channel will become the output iterator.
// The stream will execute in the background and send results to this channel.
// The channel will buffer up to `readahead` results, allowing the background
// stream to get ahead of the consumer.
let (sender, receiver) = std::sync::mpsc::sync_channel(self.readahead);

self.task_executor.spawn(stream.for_each(move |res| {
Expand Down
8 changes: 8 additions & 0 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ impl LogSegment {
self.commit_files.iter()
}

/// Read a stream of log data from this log segment.
///
/// The log files will be read from most recent to oldest.
///
/// `read_schema` is the schema to read the log files with. This can be used
/// to project the log files to a subset of the columns.
///
/// `predicate` is an optional expression to filter the log files with.
pub fn replay<JRC: Send, PRC: Send>(
&self,
table_client: &dyn TableClient<JsonReadContext = JRC, ParquetReadContext = PRC>,
Expand Down

0 comments on commit ca2f1e7

Please sign in to comment.