Skip to content

Commit

Permalink
feat(FileScanTask): partial execute impl for parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Feb 20, 2024
1 parent 07c9e0f commit 3733c0a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ arrow-schema = { version = ">=46" }
async-trait = "0.1"
bimap = "0.6"
bitvec = "1.0.1"
bytes = "1.5"
chrono = "0.4"
derive_builder = "0.13.0"
either = "1"
Expand All @@ -52,6 +53,7 @@ murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.45"
ordered-float = "4.0.0"
parquet = "50"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
reqwest = { version = "^0.11", features = ["json"] }
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
bitvec = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
derive_builder = { workspace = true }
either = { workspace = true }
Expand All @@ -48,6 +49,7 @@ murmur3 = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
reqwest = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
Expand All @@ -56,6 +58,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
tokio = { workspace = true }
typed-builder = { workspace = true }
url = { workspace = true }
urlencoding = { workspace = true }
Expand Down
8 changes: 6 additions & 2 deletions crates/iceberg/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use once_cell::sync::Lazy;
use opendal::{Operator, Scheme};
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
use url::Url;

/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
Expand Down Expand Up @@ -215,9 +216,12 @@ pub struct InputFile {
}

/// Trait for reading file.
pub trait FileRead: AsyncRead + AsyncSeek {}
pub trait FileRead: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek {}

impl<T> FileRead for T where T: AsyncRead + AsyncSeek {}
impl<T> FileRead for T where
T: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek
{
}

impl InputFile {
/// Absolute path to root uri.
Expand Down
46 changes: 43 additions & 3 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
//! Table scan api.

use crate::io::FileIO;
use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef};
use crate::spec::{
DataContentType, DataFileFormat, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::{Error, ErrorKind};
use arrow_array::RecordBatch;
use futures::stream::{iter, BoxStream};
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use parquet::arrow::ParquetRecordBatchStreamBuilder;

/// Builder to create table scan.
pub struct TableScanBuilder<'a> {
Expand Down Expand Up @@ -152,6 +155,7 @@ impl TableScan {
}
DataContentType::Data => {
file_scan_tasks.push(Ok(FileScanTask {
file_io: self.file_io.clone(),
data_file: manifest_entry.clone(),
start: 0,
length: manifest_entry.file_size_in_bytes(),
Expand All @@ -169,6 +173,7 @@ impl TableScan {
#[derive(Debug)]
#[allow(dead_code)]
pub struct FileScanTask {
file_io: FileIO,
data_file: ManifestEntryRef,
start: u64,
length: u64,
Expand All @@ -180,7 +185,42 @@ pub type ArrowRecordBatchStream = BoxStream<'static, crate::Result<RecordBatch>>
impl FileScanTask {
/// Returns a stream of arrow record batches.
pub async fn execute(&self) -> crate::Result<ArrowRecordBatchStream> {
todo!()
match self.data_file.content_type() {
DataContentType::Data => match self.data_file.file_format() {
DataFileFormat::Parquet => self.execute_parquet_data().await,
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
"Only Parquet data files are currently supported.",
)),
},
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
"Delete files are not supported yet.",
)),
}
}

async fn execute_parquet_data(&self) -> crate::Result<ArrowRecordBatchStream> {
debug_assert_eq!(self.data_file.content_type(), DataContentType::Data);
debug_assert_eq!(self.data_file.file_format(), DataFileFormat::Parquet);

let parquet_reader = self
.file_io
.new_input(self.data_file.file_path())?
.reader()
.await?;

let stream = ParquetRecordBatchStreamBuilder::new(parquet_reader)
.await
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "failed to load parquet file").with_source(err)
})?
.build()
.unwrap()
.map_err(|err| Error::new(ErrorKind::Unexpected, "Fail to read data").with_source(err))
.boxed();

Ok(stream)
}
}

Expand Down
5 changes: 5 additions & 0 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,11 @@ impl ManifestEntry {
self.data_file.content
}

/// Content type of this manifest entry.
pub fn file_format(&self) -> DataFileFormat {
self.data_file.file_format
}

/// Data file path of this manifest entry.
pub fn file_path(&self) -> &str {
&self.data_file.file_path
Expand Down

0 comments on commit 3733c0a

Please sign in to comment.