Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move file readers to sync APIs #38

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ jobs:
- uses: Swatinem/rust-cache@v2
- name: build and lint with clippy
run: cargo clippy --tests -- -D warnings
- name: lint without default features
run: cargo clippy --no-default-features -- -D warnings
test:
runs-on: ubuntu-latest
steps:
Expand Down
10 changes: 5 additions & 5 deletions acceptance/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,16 @@ impl TestCaseInfo {
Ok((latest, cases))
}

async fn assert_snapshot_meta<JRC: Send, PRC: Send + Sync>(
fn assert_snapshot_meta<JRC: Send, PRC: Send + Sync>(
&self,
case: &TableVersionMetaData,
snapshot: &Snapshot<JRC, PRC>,
) -> TestResult<()> {
assert_eq!(snapshot.version(), case.version);

// assert correct metadata is read
let metadata = snapshot.metadata().await?;
let protocol = snapshot.protocol().await?;
let metadata = snapshot.metadata()?;
let protocol = snapshot.protocol()?;
let tvm = TableVersionMetaData {
version: snapshot.version(),
properties: metadata
Expand All @@ -111,11 +111,11 @@ impl TestCaseInfo {
let (latest, versions) = self.versions().await?;

let snapshot = table.snapshot(None)?;
self.assert_snapshot_meta(&latest, &snapshot).await?;
self.assert_snapshot_meta(&latest, &snapshot)?;

for table_version in versions {
let snapshot = table.snapshot(Some(table_version.version))?;
self.assert_snapshot_meta(&table_version, &snapshot).await?;
self.assert_snapshot_meta(&table_version, &snapshot)?;
}

Ok(())
Expand Down
14 changes: 7 additions & 7 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@ arrow-json = { version = "^46.0" }
arrow-ord = { version = "^46.0" }
arrow-schema = { version = "^46.0" }
arrow-select = { version = "^46.0" }
async-trait = "0.1"
bytes = "1.4"
chrono = { version = "0.4", optional = true }
either = "1.8"
fix-hidden-lifetime-bug = "0.2"
futures = "0.3"
itertools = "0.11"
lazy_static = "1.4"
# used for providing a storage abstraction layer
object_store = "^0.7.0"
# need to generalize over arrow, arrow2 and diff parquet etc. (BYOP)
parquet = "^46.0"
regex = "1.8"
roaring = "0.10.1"
serde = { version = "1", features = ["derive"] }
Expand All @@ -38,12 +33,17 @@ url = "2"
uuid = "1.3.0"
z85 = "3.0.5"

# optionally used with default client
# Used in default client
futures = { version = "0.3", optional = true }
object_store = { version = "^0.7.0", optional = true }
parquet = { version = "^46.0", optional = true, features=["async", "object_store"]}

# optionally used with default client (though not required)
tokio = { version = "1", optional = true, features=["rt-multi-thread"] }

[features]
default = ["default-client"]
default-client = ["chrono", "parquet/async", "parquet/object_store"]
default-client = ["chrono", "futures", "object_store", "parquet"]

[dev-dependencies]
arrow = { version = "^46.0", features = ["json", "prettyprint"] }
Expand Down
File renamed without changes.
6 changes: 4 additions & 2 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use itertools::izip;

use crate::{DeltaResult, Error};

pub(crate) mod arrow;
pub(crate) mod schemas;
pub(crate) mod types;

Expand Down Expand Up @@ -529,11 +530,12 @@ mod tests {
use super::*;
use crate::actions::Protocol;
use crate::client::json::DefaultJsonHandler;
use crate::executor::tokio::TokioBackgroundExecutor;
use crate::JsonHandler;

fn action_batch() -> RecordBatch {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store);
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
Expand Down Expand Up @@ -596,7 +598,7 @@ mod tests {
#[test]
fn test_parse_add_partitioned() {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store);
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#,
Expand Down
5 changes: 2 additions & 3 deletions kernel/src/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,8 @@ impl DeletionVectorDescriptor {

let dv_data = fs_client
.read_files(vec![(path, None)])?
.first()
.ok_or(Error::MissingData("No deletion Vecor data".to_string()))?
.clone();
.next()
.ok_or(Error::MissingData("No deletion Vector data".to_string()))??;

let mut cursor = Cursor::new(dv_data);
if let Some(offset) = offset {
Expand Down
71 changes: 51 additions & 20 deletions kernel/src/client/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::sync::Arc;

use bytes::Bytes;
use futures::stream::{StreamExt, TryStreamExt};
use futures::stream::StreamExt;
use object_store::path::Path;
use object_store::DynObjectStore;
use url::Url;

use crate::{executor::TaskExecutor, DeltaResult, FileMeta, FileSlice, FileSystemClient};
use crate::{executor::TaskExecutor, DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};

#[derive(Debug)]
pub struct ObjectStoreFileSystemClient<E: TaskExecutor> {
inner: Arc<DynObjectStore>,
table_root: Path,
task_executor: Arc<E>,
readahead: usize,
}

impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
Expand All @@ -21,8 +22,15 @@ impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
inner: store,
table_root,
task_executor,
readahead: 10,
}
}

/// Set the maximum number of files to read in parallel.
pub fn with_readahead(mut self, readahead: usize) -> Self {
self.readahead = readahead;
self
}
}

impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
Expand Down Expand Up @@ -82,26 +90,47 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
}

/// Read data specified by the start and end offset from the file.
fn read_files(&self, files: Vec<FileSlice>) -> DeltaResult<Vec<Bytes>> {
///
/// This will return the data in the same order as the provided file slices.
///
/// Multiple reads may occur in parallel, depending on the configured readahead.
/// See [`Self::with_readahead`].
fn read_files(
&self,
files: Vec<FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>> {
let store = self.inner.clone();
let fut = futures::stream::iter(files)
.map(move |(url, range)| {
let path = Path::from(url.path());
let store = store.clone();
async move {
if let Some(rng) = range {
store.get_range(&path, rng).await
} else {
let result = store.get(&path).await?;
result.bytes().await

// This channel will become the output iterator.
// Because there will already be buffering in the stream, we set the
// buffer size to 0.
let (sender, receiver) = std::sync::mpsc::sync_channel(0);

self.task_executor.spawn(
futures::stream::iter(files)
.map(move |(url, range)| {
let path = Path::from(url.path());
let store = store.clone();
async move {
if let Some(rng) = range {
store.get_range(&path, rng).await
} else {
let result = store.get(&path).await?;
result.bytes().await
}
}
}
})
// Read up to 10 files concurrently.
.buffered(10)
.try_collect::<Vec<Bytes>>();
})
// We allow executing up to `readahead` futures concurrently and
// buffer the results. This allows us to achieve async concurrency
// within a synchronous method.
.buffered(self.readahead)
.for_each(move |res| {
sender.send(res.map_err(Error::from)).ok();
futures::future::ready(())
}),
);

Ok(self.task_executor.block_on(fut)?)
Ok(Box::new(receiver.into_iter()))
}
}

Expand All @@ -113,6 +142,8 @@ mod tests {

use crate::executor::tokio::TokioBackgroundExecutor;

use itertools::Itertools;

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -145,7 +176,7 @@ mod tests {
url.set_path(&format!("{}/c", url.path()));
slices.push((url, Some(Range { start: 4, end: 9 })));

let data = client.read_files(slices).unwrap();
let data: Vec<Bytes> = client.read_files(slices).unwrap().try_collect().unwrap();

assert_eq!(data.len(), 3);
assert_eq!(data[0], Bytes::from("kernel"));
Expand Down
64 changes: 44 additions & 20 deletions kernel/src/client/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ use arrow_json::ReaderBuilder;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use arrow_select::concat::concat_batches;
use bytes::{Buf, Bytes};
use futures::stream::{StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{DynObjectStore, GetResultPayload};

use super::file_handler::{FileOpenFuture, FileOpener};
use crate::executor::TaskExecutor;
use crate::file_handler::FileStream;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, Error, Expression, FileDataReadResultStream, FileHandler, FileMeta, JsonHandler,
DeltaResult, Error, Expression, FileDataReadResultIterator, FileHandler, FileMeta, JsonHandler,
};

#[derive(Debug)]
Expand All @@ -28,17 +29,31 @@ pub struct JsonReadContext {
}

#[derive(Debug)]
pub struct DefaultJsonHandler {
pub struct DefaultJsonHandler<E: TaskExecutor> {
store: Arc<DynObjectStore>,
task_executor: Arc<E>,
readahead: usize,
}

impl DefaultJsonHandler {
pub fn new(store: Arc<DynObjectStore>) -> Self {
Self { store }
impl<E: TaskExecutor> DefaultJsonHandler<E> {
pub fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
Self {
store,
task_executor,
readahead: 10,
}
}

/// Set the maximum number of batches to read ahead during [Self::read_json_files()].
///
/// Defaults to 10.
pub fn with_readahead(mut self, readahead: usize) -> Self {
self.readahead = readahead;
self
}
}

impl FileHandler for DefaultJsonHandler {
impl<E: TaskExecutor> FileHandler for DefaultJsonHandler<E> {
type FileReadContext = JsonReadContext;

fn contextualize_file_reads(
Expand All @@ -56,8 +71,7 @@ impl FileHandler for DefaultJsonHandler {
}
}

#[async_trait::async_trait]
impl JsonHandler for DefaultJsonHandler {
impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn parse_json(
&self,
json_strings: StringArray,
Expand Down Expand Up @@ -88,9 +102,9 @@ impl JsonHandler for DefaultJsonHandler {
&self,
files: Vec<<Self as FileHandler>::FileReadContext>,
physical_schema: SchemaRef,
) -> DeltaResult<FileDataReadResultStream> {
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(futures::stream::empty().boxed());
return Ok(Box::new(std::iter::empty()));
}

let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?);
Expand All @@ -99,7 +113,18 @@ impl JsonHandler for DefaultJsonHandler {

let files = files.into_iter().map(|f| f.meta).collect::<Vec<_>>();
let stream = FileStream::new(files, schema, file_reader)?;
Ok(stream.boxed())

// This channel will become the output iterator
// The stream will execute in the background, and we allow up to `readahead`
// batches to be buffered in the channel.
let (sender, receiver) = std::sync::mpsc::sync_channel(self.readahead);

self.task_executor.spawn(stream.for_each(move |res| {
sender.send(res).ok();
futures::future::ready(())
}));

Ok(Box::new(receiver.into_iter()))
}
}

Expand Down Expand Up @@ -186,15 +211,15 @@ impl FileOpener for JsonOpener {
mod tests {
use std::path::PathBuf;

use object_store::{local::LocalFileSystem, ObjectStore};

use super::*;
use crate::actions::get_log_schema;
use crate::{actions::get_log_schema, executor::tokio::TokioBackgroundExecutor};
use itertools::Itertools;
use object_store::{local::LocalFileSystem, ObjectStore};

#[test]
fn test_parse_json() {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store);
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
Expand Down Expand Up @@ -227,14 +252,13 @@ mod tests {
size: meta.size,
}];

let handler = DefaultJsonHandler::new(store);
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let physical_schema = Arc::new(get_log_schema());
let context = handler.contextualize_file_reads(files, None).unwrap();
let data = handler
let data: Vec<RecordBatch> = handler
.read_json_files(context, Arc::new(physical_schema.try_into().unwrap()))
.unwrap()
.try_collect::<Vec<_>>()
.await
.try_collect()
.unwrap();

assert_eq!(data.len(), 1);
Expand Down
Loading
Loading