Skip to content

Commit

Permalink
add test, read presigned urls - and remove actual presigned urls
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Mar 15, 2024
1 parent 518bfe0 commit 1166576
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 5 deletions.
4 changes: 3 additions & 1 deletion kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ arrow-ord = { version = "^49.0", optional = true }
arrow-schema = { version = "^49.0", optional = true }
futures = { version = "0.3", optional = true }
object_store = { version = "^0.8.0", optional = true }
reqwest = { version = "^0.11.0", optional = true }
# Used in default and simple client
parquet = { version = "^49.0", optional = true }

Expand All @@ -60,7 +61,8 @@ default-client = [
"object_store",
"parquet/async",
"parquet/object_store",
"tokio"
"reqwest",
"tokio",
]

developer-visibility = []
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! the [executor] module.

use std::sync::Arc;
// use std::collections::HashMap;

use object_store::{parse_url_opts, path::Path, DynObjectStore};
use url::Url;
Expand All @@ -29,6 +30,7 @@ pub mod parquet;

#[derive(Debug)]
pub struct DefaultTableClient<E: TaskExecutor> {
// stores: HashMap<String, Arc<DynObjectStore>>,
store: Arc<DynObjectStore>,
file_system: Arc<ObjectStoreFileSystemClient<E>>,
json: Arc<DefaultJsonHandler<E>>,
Expand All @@ -48,6 +50,10 @@ impl<E: TaskExecutor> DefaultTableClient<E> {
K: AsRef<str>,
V: Into<String>,
{
// path to table (with a _delta_log) used to determine the kind of ObjectStore to
// initialize to read the log.

// let store_scheme = path.scheme();
let (store, prefix) = parse_url_opts(path, options)?;
let store = Arc::new(store);
Ok(Self {
Expand All @@ -61,6 +67,7 @@ impl<E: TaskExecutor> DefaultTableClient<E> {
task_executor.clone(),
)),
parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)),
// stores: HashMap::from([(store_scheme.to_string(), store)]),
store,
expression: Arc::new(DefaultExpressionHandler {}),
})
Expand Down
78 changes: 74 additions & 4 deletions kernel/src/client/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow_schema::SchemaRef as ArrowSchemaRef;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::DynObjectStore;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};

use super::file_handler::{FileOpenFuture, FileOpener};
Expand Down Expand Up @@ -55,8 +55,28 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
}

let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?);
let file_reader = ParquetOpener::new(1024, schema.clone(), self.store.clone());
let mut stream = FileStream::new(files.to_vec(), schema, file_reader)?;
// get the first FileMeta to decide how to fetch the file
// s3:// -> aws (ParquetOpener)
// nothing -> local (ParquetOpener)
// https:// -> assume presigned URL (and fetch without object_store)
// -> reqwest to get data
// -> parse to parquet
// SAFETY: we did is_empty check above, this is ok.

let mut stream: std::pin::Pin<
Box<dyn futures::Stream<Item = DeltaResult<arrow_array::RecordBatch>> + Send>,
> = match files[0].location.scheme() {
"https" => Box::pin(FileStream::new(
files.to_vec(),
schema.clone(),
PresignedUrlOpener::new(1024, schema.clone()),
)?),
_ => Box::pin(FileStream::new(
files.to_vec(),
schema.clone(),
ParquetOpener::new(1024, schema.clone(), self.store.clone()),
)?),
};

// This channel will become the output iterator.
// The stream will execute in the background and send results to this channel.
Expand Down Expand Up @@ -89,7 +109,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
}
}

/// Implements [`FileOpener`] for a parquet file
/// Implements [`FileOpener`] for a opening a parquet file from object storage
struct ParquetOpener {
// projection: Arc<[usize]>,
batch_size: usize,
Expand Down Expand Up @@ -147,6 +167,56 @@ impl FileOpener for ParquetOpener {
}
}

/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
struct PresignedUrlOpener {
batch_size: usize,
limit: Option<usize>,
table_schema: ArrowSchemaRef,
}

impl PresignedUrlOpener {
pub(crate) fn new(batch_size: usize, schema: ArrowSchemaRef) -> Self {
Self {
batch_size,
table_schema: schema,
limit: None,
}
}
}

impl FileOpener for PresignedUrlOpener {
fn open(&self, file_meta: FileMeta, _range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture> {
let batch_size = self.batch_size;
let _table_schema = self.table_schema.clone();
let limit = self.limit;

Ok(Box::pin(async move {
// fetch the file from the interweb FIXME use client
let reader = reqwest::get(file_meta.location)
.await
.unwrap()
.bytes()
.await
.unwrap();

let options = ArrowReaderOptions::new();
let mut builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(reader, options)?;

if let Some(limit) = limit {
builder = builder.with_limit(limit)
}

let reader = builder.with_batch_size(batch_size).build()?;

let stream = futures::stream::iter(reader);

let adapted = stream.map_err(Error::generic_err);
Ok(adapted.boxed())
}))
}
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;
Expand Down
24 changes: 24 additions & 0 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,28 @@ mod tests {
);
Ok(())
}

#[test_log::test]
fn test_scan_with_presigned_data_urls() -> DeltaResult<()> {
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/local_log_with_presigned_data_urls",
))?;

let url = url::Url::from_directory_path(path).unwrap();
let engine_interface = SimpleClient::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine_interface, None)?;
let scan = ScanBuilder::new(snapshot).build();
let files: Vec<DeltaResult<Add>> = scan.files(&engine_interface)?.collect();

assert_eq!(
files
.into_iter()
.map(|file| file.unwrap().path)
.collect::<Vec<_>>(),
vec!["https://blahblah"]
);
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"84b09beb-329c-4b5e-b493-f58c6c78b8fd","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpointInterval":"2"},"createdTime":1674611455081}}
{"commitInfo":{"timestamp":1674611455099,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.checkpointInterval\":\"2\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"d87e63fb-7388-4b1c-9afc-750a561012b7"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00000-ad1a4bb7-07e8-4f40-b50b-49910d209e0c-c000.snappy.parquet","partitionValues":{},"size":965,"modificationTime":1674611456921,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"b\",\"int\":288,\"date\":\"1978-02-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":988,\"date\":\"2020-05-01\"},\"nullCount\":{\"letter\":3,\"int\":0,\"date\":0}}"}}
{"commitInfo":{"timestamp":1674611457269,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"965"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"71d9bcd1-7f2b-46f8-bd1f-e0a8e872f3c3"}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"add":{"path":"part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet","partitionValues":{},"size":976,"modificationTime":1674611458901,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"a\",\"int\":120,\"date\":\"1971-07-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":667,\"date\":\"2018-02-01\"},\"nullCount\":{\"letter\":2,\"int\":0,\"date\":0}}"}}
{"remove":{"path":"part-00000-ad1a4bb7-07e8-4f40-b50b-49910d209e0c-c000.snappy.parquet","deletionTimestamp":1674611459307,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":965}}
{"commitInfo":{"timestamp":1674611459307,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"976"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"b08f5758-a8e9-4dd1-af7e-7b6e53928d7a"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"add":{"path":"https://fixme","partitionValues":{},"size":1010,"modificationTime":1674611461541,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"a\",\"int\":93,\"date\":\"1975-06-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":753,\"date\":\"2013-03-01\"},\"nullCount\":{\"letter\":1,\"int\":0,\"date\":0}}"}}
{"remove":{"path":"part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet","deletionTimestamp":1674611461982,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":976}}
{"commitInfo":{"timestamp":1674611461982,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"1010"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"0403bbaf-a6f2-4543-9e6c-bd068e76670f"}}
8 changes: 8 additions & 0 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,11 @@ fn data() -> Result<(), Box<dyn std::error::Error>> {

Ok(())
}

#[test]
fn presigned_urls() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec!["+----+", "| id |", "+----+", "| 2 |", "+----+"];
read_table_data("./tests/data/local_log_with_presigned_data_urls", expected)?;

Ok(())
}

0 comments on commit 1166576

Please sign in to comment.