Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] delta sharing hackathon 2024Q1 #144

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ arrow-ord = { version = "^49.0", optional = true }
arrow-schema = { version = "^49.0", optional = true }
futures = { version = "0.3", optional = true }
object_store = { version = "^0.8.0", optional = true }
reqwest = { version = "^0.11.0", optional = true }
# Used in default and simple client
parquet = { version = "^49.0", optional = true }

Expand All @@ -60,7 +61,8 @@ default-client = [
"object_store",
"parquet/async",
"parquet/object_store",
"tokio"
"reqwest",
"tokio",
]

developer-visibility = []
Expand Down
12 changes: 8 additions & 4 deletions kernel/src/client/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,15 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
let path = Path::from(url.path());
let store = store.clone();
async move {
if let Some(rng) = range {
store.get_range(&path, rng).await
if url.scheme() == "https" {
Ok(reqwest::get(url).await.unwrap().bytes().await.unwrap())
} else {
let result = store.get(&path).await?;
result.bytes().await
if let Some(rng) = range {
store.get_range(&path, rng).await
} else {
let result = store.get(&path).await?;
result.bytes().await
}
}
}
})
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! the [executor] module.

use std::sync::Arc;
// use std::collections::HashMap;

use object_store::{parse_url_opts, path::Path, DynObjectStore};
use url::Url;
Expand All @@ -29,6 +30,7 @@ pub mod parquet;

#[derive(Debug)]
pub struct DefaultTableClient<E: TaskExecutor> {
// stores: HashMap<String, Arc<DynObjectStore>>,
store: Arc<DynObjectStore>,
file_system: Arc<ObjectStoreFileSystemClient<E>>,
json: Arc<DefaultJsonHandler<E>>,
Expand All @@ -48,6 +50,10 @@ impl<E: TaskExecutor> DefaultTableClient<E> {
K: AsRef<str>,
V: Into<String>,
{
// path to table (with a _delta_log) used to determine the kind of ObjectStore to
// initialize to read the log.

// let store_scheme = path.scheme();
let (store, prefix) = parse_url_opts(path, options)?;
let store = Arc::new(store);
Ok(Self {
Expand All @@ -61,6 +67,7 @@ impl<E: TaskExecutor> DefaultTableClient<E> {
task_executor.clone(),
)),
parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)),
// stores: HashMap::from([(store_scheme.to_string(), store)]),
store,
expression: Arc::new(DefaultExpressionHandler {}),
})
Expand Down
78 changes: 74 additions & 4 deletions kernel/src/client/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow_schema::SchemaRef as ArrowSchemaRef;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::DynObjectStore;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};

use super::file_handler::{FileOpenFuture, FileOpener};
Expand Down Expand Up @@ -55,8 +55,28 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
}

let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?);
let file_reader = ParquetOpener::new(1024, schema.clone(), self.store.clone());
let mut stream = FileStream::new(files.to_vec(), schema, file_reader)?;
// get the first FileMeta to decide how to fetch the file
// s3:// -> aws (ParquetOpener)
// nothing -> local (ParquetOpener)
// https:// -> assume presigned URL (and fetch without object_store)
// -> reqwest to get data
// -> parse to parquet
// SAFETY: we did is_empty check above, this is ok.

let mut stream: std::pin::Pin<
Box<dyn futures::Stream<Item = DeltaResult<arrow_array::RecordBatch>> + Send>,
> = match files[0].location.scheme() {
"https" => Box::pin(FileStream::new(
files.to_vec(),
schema.clone(),
PresignedUrlOpener::new(1024, schema.clone()),
)?),
_ => Box::pin(FileStream::new(
files.to_vec(),
schema.clone(),
ParquetOpener::new(1024, schema.clone(), self.store.clone()),
)?),
};

// This channel will become the output iterator.
// The stream will execute in the background and send results to this channel.
Expand Down Expand Up @@ -89,7 +109,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
}
}

/// Implements [`FileOpener`] for a parquet file
/// Implements [`FileOpener`] for a opening a parquet file from object storage
struct ParquetOpener {
// projection: Arc<[usize]>,
batch_size: usize,
Expand Down Expand Up @@ -147,6 +167,56 @@ impl FileOpener for ParquetOpener {
}
}

/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
struct PresignedUrlOpener {
batch_size: usize,
limit: Option<usize>,
table_schema: ArrowSchemaRef,
}

impl PresignedUrlOpener {
pub(crate) fn new(batch_size: usize, schema: ArrowSchemaRef) -> Self {
Self {
batch_size,
table_schema: schema,
limit: None,
}
}
}

impl FileOpener for PresignedUrlOpener {
fn open(&self, file_meta: FileMeta, _range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture> {
let batch_size = self.batch_size;
let _table_schema = self.table_schema.clone();
let limit = self.limit;

Ok(Box::pin(async move {
// fetch the file from the interweb FIXME use client
let reader = reqwest::get(file_meta.location)
.await
.unwrap()
.bytes()
.await
.unwrap();

let options = ArrowReaderOptions::new();
let mut builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(reader, options)?;

if let Some(limit) = limit {
builder = builder.with_limit(limit)
}

let reader = builder.with_batch_size(batch_size).build()?;

let stream = futures::stream::iter(reader);

let adapted = stream.map_err(Error::generic_err);
Ok(adapted.boxed())
}))
}
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;
Expand Down
49 changes: 45 additions & 4 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod file_stream;

// TODO projection: something like fn select(self, columns: &[&str])
/// Builder to scan a snapshot of a table.
#[derive(Clone)]
pub struct ScanBuilder {
snapshot: Arc<Snapshot>,
schema: Option<SchemaRef>,
Expand Down Expand Up @@ -219,15 +220,17 @@ impl Scan {
read_result
} else {
let mut fields = Vec::with_capacity(partition_fields.len() + len);
fields.extend(select_fields.clone());
for field in &partition_fields {
let value_expression = parse_partition_value(
add.partition_values.get(field.name()),
field.data_type(),
)?;
fields.push(Expression::Literal(value_expression));
let field_position = self.schema().field_index(&field.name).unwrap();
// need to ensure the field goes in the expected location in the output
// schema
fields.insert(field_position, Expression::Literal(value_expression));
}
fields.extend(select_fields.clone());

let evaluator = engine_interface.get_expression_handler().get_evaluator(
read_schema.clone(),
Expression::Struct(fields),
Expand All @@ -239,7 +242,21 @@ impl Scan {

// need to split the dv_mask. what's left in dv_mask covers this result, and rest
// will cover the following results
let rest = dv_mask.as_mut().map(|mask| mask.split_off(len));
let rest = dv_mask.as_mut().and_then(|mask| {
if mask.len() >= len {
Some(mask.split_off(len))
} else {
None
}
});

if let Some(ref mut mask) = dv_mask {
if mask.len() < len {
// we need to extend the mask with `true` to fulfill the contract
let to_add = len - mask.len();
mask.extend(itertools::repeat_n(true, to_add));
}
}

let scan_result = ScanResult {
raw_data: read_result,
Expand Down Expand Up @@ -395,4 +412,28 @@ mod tests {
);
Ok(())
}

#[test_log::test]
fn test_scan_with_presigned_data_urls() -> DeltaResult<()> {
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/local_log_with_presigned_data_urls",
))?;

let url = url::Url::from_directory_path(path).unwrap();
let engine_interface = SimpleClient::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine_interface, None)?;
let scan = ScanBuilder::new(snapshot).build();
let files: Vec<DeltaResult<Add>> = scan.files(&engine_interface)?.collect();

assert_eq!(
files
.into_iter()
.map(|file| file.unwrap().path)
.collect::<Vec<_>>(),
vec!["https://fixme"]
);
Ok(())
}
}
4 changes: 4 additions & 0 deletions kernel/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ impl StructType {
pub fn fields(&self) -> impl Iterator<Item = &StructField> {
self.fields.values()
}

pub fn field_index(&self, name: impl AsRef<str>) -> Option<usize> {
self.fields.get_index_of(name.as_ref())
}
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"84b09beb-329c-4b5e-b493-f58c6c78b8fd","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpointInterval":"2"},"createdTime":1674611455081}}
{"commitInfo":{"timestamp":1674611455099,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.checkpointInterval\":\"2\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"d87e63fb-7388-4b1c-9afc-750a561012b7"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00000-ad1a4bb7-07e8-4f40-b50b-49910d209e0c-c000.snappy.parquet","partitionValues":{},"size":965,"modificationTime":1674611456921,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"b\",\"int\":288,\"date\":\"1978-02-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":988,\"date\":\"2020-05-01\"},\"nullCount\":{\"letter\":3,\"int\":0,\"date\":0}}"}}
{"commitInfo":{"timestamp":1674611457269,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"965"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"71d9bcd1-7f2b-46f8-bd1f-e0a8e872f3c3"}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"add":{"path":"part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet","partitionValues":{},"size":976,"modificationTime":1674611458901,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"a\",\"int\":120,\"date\":\"1971-07-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":667,\"date\":\"2018-02-01\"},\"nullCount\":{\"letter\":2,\"int\":0,\"date\":0}}"}}
{"remove":{"path":"part-00000-ad1a4bb7-07e8-4f40-b50b-49910d209e0c-c000.snappy.parquet","deletionTimestamp":1674611459307,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":965}}
{"commitInfo":{"timestamp":1674611459307,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"976"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"b08f5758-a8e9-4dd1-af7e-7b6e53928d7a"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"add":{"path":"https://fixme","partitionValues":{},"size":1010,"modificationTime":1674611461541,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"a\",\"int\":93,\"date\":\"1975-06-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":753,\"date\":\"2013-03-01\"},\"nullCount\":{\"letter\":1,\"int\":0,\"date\":0}}"}}
{"remove":{"path":"part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet","deletionTimestamp":1674611461982,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":976}}
{"commitInfo":{"timestamp":1674611461982,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"1010"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"0403bbaf-a6f2-4543-9e6c-bd068e76670f"}}
16 changes: 16 additions & 0 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,19 @@ fn data() -> Result<(), Box<dyn std::error::Error>> {

Ok(())
}

#[test]
fn presigned_urls() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec!["+----+", "| id |", "+----+", "| 2 |", "+----+"];
read_table_data("./tests/data/local_log_with_presigned_data_urls", expected)?;

Ok(())
}

#[test]
fn presigned_dvs() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec!["+----+", "| id |", "+----+", "| 2 |", "+----+"];
read_table_data("./tests/data/table-with-presigned-dv", expected)?;

Ok(())
}
Loading