Skip to content

Commit

Permalink
refactor: move file readers to sync APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Sep 17, 2023
1 parent c65c6cc commit 5b85fce
Show file tree
Hide file tree
Showing 14 changed files with 336 additions and 322 deletions.
10 changes: 5 additions & 5 deletions acceptance/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,16 @@ impl TestCaseInfo {
Ok((latest, cases))
}

async fn assert_snapshot_meta<JRC: Send, PRC: Send + Sync>(
fn assert_snapshot_meta<JRC: Send, PRC: Send + Sync>(
&self,
case: &TableVersionMetaData,
snapshot: &Snapshot<JRC, PRC>,
) -> TestResult<()> {
assert_eq!(snapshot.version(), case.version);

// assert correct metadata is read
let metadata = snapshot.metadata().await?;
let protocol = snapshot.protocol().await?;
let metadata = snapshot.metadata()?;
let protocol = snapshot.protocol()?;
let tvm = TableVersionMetaData {
version: snapshot.version(),
properties: metadata
Expand All @@ -111,11 +111,11 @@ impl TestCaseInfo {
let (latest, versions) = self.versions().await?;

let snapshot = table.snapshot(None)?;
self.assert_snapshot_meta(&latest, &snapshot).await?;
self.assert_snapshot_meta(&latest, &snapshot)?;

for table_version in versions {
let snapshot = table.snapshot(Some(table_version.version))?;
self.assert_snapshot_meta(&table_version, &snapshot).await?;
self.assert_snapshot_meta(&table_version, &snapshot)?;
}

Ok(())
Expand Down
11 changes: 6 additions & 5 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,15 @@ arrow-json = { version = "^46.0" }
arrow-ord = { version = "^46.0" }
arrow-schema = { version = "^46.0" }
arrow-select = { version = "^46.0" }
async-trait = "0.1"
bytes = "1.4"
chrono = { version = "0.4", optional = true }
either = "1.8"
fix-hidden-lifetime-bug = "0.2"
futures = "0.3"
itertools = "0.11"
lazy_static = "1.4"
# used for providing a storage abstraction layer
object_store = "^0.7.0"
# need to generalize over arrow, arrow2 and diff parquet etc. (BYOP)
parquet = "^46.0"
regex = "1.8"
roaring = "0.10.1"
serde = { version = "1", features = ["derive"] }
Expand All @@ -38,12 +35,16 @@ url = "2"
uuid = "1.3.0"
z85 = "3.0.5"

# optionally used with default client
# Used in default client
futures = { version = "0.3", optional = true }
parquet = { version = "^46.0", optional = true, features=["async", "object_store"]}

# optionally used with default client (though not required)
tokio = { version = "1", optional = true, features=["rt-multi-thread"] }

[features]
default = ["default-client"]
default-client = ["chrono", "parquet/async", "parquet/object_store"]
default-client = ["chrono", "parquet", "futures"]

[dev-dependencies]
arrow = { version = "^46.0", features = ["json", "prettyprint"] }
Expand Down
5 changes: 3 additions & 2 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,11 +529,12 @@ mod tests {
use super::*;
use crate::actions::Protocol;
use crate::client::json::DefaultJsonHandler;
use crate::executor::tokio::TokioBackgroundExecutor;
use crate::JsonHandler;

fn action_batch() -> RecordBatch {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store);
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
Expand Down Expand Up @@ -596,7 +597,7 @@ mod tests {
#[test]
fn test_parse_add_partitioned() {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store);
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#,
Expand Down
5 changes: 2 additions & 3 deletions kernel/src/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,8 @@ impl DeletionVectorDescriptor {

let dv_data = fs_client
.read_files(vec![(path, None)])?
.first()
.ok_or(Error::MissingData("No deletion Vecor data".to_string()))?
.clone();
.next()
.ok_or(Error::MissingData("No deletion Vector data".to_string()))??;

let mut cursor = Cursor::new(dv_data);
if let Some(offset) = offset {
Expand Down
66 changes: 46 additions & 20 deletions kernel/src/client/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::sync::Arc;

use bytes::Bytes;
use futures::stream::{StreamExt, TryStreamExt};
use futures::stream::StreamExt;
use object_store::path::Path;
use object_store::DynObjectStore;
use url::Url;

use crate::{executor::TaskExecutor, DeltaResult, FileMeta, FileSlice, FileSystemClient};
use crate::{executor::TaskExecutor, DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};

#[derive(Debug)]
pub struct ObjectStoreFileSystemClient<E: TaskExecutor> {
inner: Arc<DynObjectStore>,
table_root: Path,
task_executor: Arc<E>,
readahead: usize,
}

impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
Expand All @@ -21,8 +22,15 @@ impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
inner: store,
table_root,
task_executor,
readahead: 10,
}
}

/// Set the maximum number of files to read in parallel.
pub fn with_readahead(mut self, readahead: usize) -> Self {
self.readahead = readahead;
self
}
}

impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
Expand Down Expand Up @@ -82,26 +90,42 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
}

/// Read data specified by the start and end offset from the file.
fn read_files(&self, files: Vec<FileSlice>) -> DeltaResult<Vec<Bytes>> {
///
/// This will return the data in the same order as the provided file slices.
///
/// Multiple reads may occur in parallel, depending on the configured readahead.
/// See [`Self::with_readahead`].
fn read_files(
&self,
files: Vec<FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>> {
let store = self.inner.clone();
let fut = futures::stream::iter(files)
.map(move |(url, range)| {
let path = Path::from(url.path());
let store = store.clone();
async move {
if let Some(rng) = range {
store.get_range(&path, rng).await
} else {
let result = store.get(&path).await?;
result.bytes().await

// This channel will become the iterator
let (sender, receiver) = std::sync::mpsc::sync_channel(20);

self.task_executor.spawn(
futures::stream::iter(files)
.map(move |(url, range)| {
let path = Path::from(url.path());
let store = store.clone();
async move {
if let Some(rng) = range {
store.get_range(&path, rng).await
} else {
let result = store.get(&path).await?;
result.bytes().await
}
}
}
})
// Read up to 10 files concurrently.
.buffered(10)
.try_collect::<Vec<Bytes>>();
})
.buffered(self.readahead)
.for_each(move |res| {
sender.send(res.map_err(Error::from)).ok();
futures::future::ready(())
}),
);

Ok(self.task_executor.block_on(fut)?)
Ok(Box::new(receiver.into_iter()))
}
}

Expand All @@ -113,6 +137,8 @@ mod tests {

use crate::executor::tokio::TokioBackgroundExecutor;

use itertools::Itertools;

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -145,7 +171,7 @@ mod tests {
url.set_path(&format!("{}/c", url.path()));
slices.push((url, Some(Range { start: 4, end: 9 })));

let data = client.read_files(slices).unwrap();
let data: Vec<Bytes> = client.read_files(slices).unwrap().try_collect().unwrap();

assert_eq!(data.len(), 3);
assert_eq!(data[0], Bytes::from("kernel"));
Expand Down
59 changes: 39 additions & 20 deletions kernel/src/client/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ use arrow_json::ReaderBuilder;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use arrow_select::concat::concat_batches;
use bytes::{Buf, Bytes};
use futures::stream::{StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{DynObjectStore, GetResultPayload};

use super::file_handler::{FileOpenFuture, FileOpener};
use crate::executor::TaskExecutor;
use crate::file_handler::FileStream;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, Error, Expression, FileDataReadResultStream, FileHandler, FileMeta, JsonHandler,
DeltaResult, Error, Expression, FileDataReadResultIterator, FileHandler, FileMeta, JsonHandler,
};

#[derive(Debug)]
Expand All @@ -28,17 +29,28 @@ pub struct JsonReadContext {
}

#[derive(Debug)]
pub struct DefaultJsonHandler {
pub struct DefaultJsonHandler<E: TaskExecutor> {
store: Arc<DynObjectStore>,
task_executor: Arc<E>,
readahead: usize,
}

impl DefaultJsonHandler {
pub fn new(store: Arc<DynObjectStore>) -> Self {
Self { store }
impl<E: TaskExecutor> DefaultJsonHandler<E> {
pub fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
Self {
store,
task_executor,
readahead: 10,
}
}

pub fn with_readahead(mut self, readahead: usize) -> Self {
self.readahead = readahead;
self
}
}

impl FileHandler for DefaultJsonHandler {
impl<E: TaskExecutor> FileHandler for DefaultJsonHandler<E> {
type FileReadContext = JsonReadContext;

fn contextualize_file_reads(
Expand All @@ -56,8 +68,7 @@ impl FileHandler for DefaultJsonHandler {
}
}

#[async_trait::async_trait]
impl JsonHandler for DefaultJsonHandler {
impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn parse_json(
&self,
json_strings: StringArray,
Expand Down Expand Up @@ -88,9 +99,9 @@ impl JsonHandler for DefaultJsonHandler {
&self,
files: Vec<<Self as FileHandler>::FileReadContext>,
physical_schema: SchemaRef,
) -> DeltaResult<FileDataReadResultStream> {
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(futures::stream::empty().boxed());
return Ok(Box::new(std::iter::empty()));
}

let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?);
Expand All @@ -99,7 +110,16 @@ impl JsonHandler for DefaultJsonHandler {

let files = files.into_iter().map(|f| f.meta).collect::<Vec<_>>();
let stream = FileStream::new(files, schema, file_reader)?;
Ok(stream.boxed())

// This channel will become the iterator
let (sender, receiver) = std::sync::mpsc::sync_channel(self.readahead);

self.task_executor.spawn(stream.for_each(move |res| {
sender.send(res).ok();
futures::future::ready(())
}));

Ok(Box::new(receiver.into_iter()))
}
}

Expand Down Expand Up @@ -186,15 +206,15 @@ impl FileOpener for JsonOpener {
mod tests {
use std::path::PathBuf;

use object_store::{local::LocalFileSystem, ObjectStore};

use super::*;
use crate::actions::get_log_schema;
use crate::{actions::get_log_schema, executor::tokio::TokioBackgroundExecutor};
use itertools::Itertools;
use object_store::{local::LocalFileSystem, ObjectStore};

#[test]
fn test_parse_json() {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store);
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
Expand Down Expand Up @@ -227,14 +247,13 @@ mod tests {
size: meta.size,
}];

let handler = DefaultJsonHandler::new(store);
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let physical_schema = Arc::new(get_log_schema());
let context = handler.contextualize_file_reads(files, None).unwrap();
let data = handler
let data: Vec<RecordBatch> = handler
.read_json_files(context, Arc::new(physical_schema.try_into().unwrap()))
.unwrap()
.try_collect::<Vec<_>>()
.await
.try_collect()
.unwrap();

assert_eq!(data.len(), 1);
Expand Down
22 changes: 14 additions & 8 deletions kernel/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub mod parquet;
pub struct DefaultTableClient<E: TaskExecutor> {
store: Arc<DynObjectStore>,
file_system: Arc<ObjectStoreFileSystemClient<E>>,
json: Arc<DefaultJsonHandler>,
parquet: Arc<DefaultParquetHandler>,
json: Arc<DefaultJsonHandler<E>>,
parquet: Arc<DefaultParquetHandler<E>>,
}

impl<E: TaskExecutor> DefaultTableClient<E> {
Expand All @@ -46,10 +46,13 @@ impl<E: TaskExecutor> DefaultTableClient<E> {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
store.clone(),
prefix,
task_executor,
task_executor.clone(),
)),
json: Arc::new(DefaultJsonHandler::new(store.clone())),
parquet: Arc::new(DefaultParquetHandler::new(store.clone())),
json: Arc::new(DefaultJsonHandler::new(
store.clone(),
task_executor.clone(),
)),
parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)),
store,
})
}
Expand All @@ -59,10 +62,13 @@ impl<E: TaskExecutor> DefaultTableClient<E> {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
store.clone(),
prefix,
task_executor,
task_executor.clone(),
)),
json: Arc::new(DefaultJsonHandler::new(
store.clone(),
task_executor.clone(),
)),
json: Arc::new(DefaultJsonHandler::new(store.clone())),
parquet: Arc::new(DefaultParquetHandler::new(store.clone())),
parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)),
store,
}
}
Expand Down
Loading

0 comments on commit 5b85fce

Please sign in to comment.