From f01c062d212277fb0146895ebea210e452c40107 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 29 Feb 2024 22:48:10 +0000 Subject: [PATCH] refactor: extract record batch reader --- .../iceberg/src/file_record_batch_reader.rs | 94 ++++++++++++++++++ crates/iceberg/src/lib.rs | 1 + crates/iceberg/src/scan.rs | 99 ++++--------------- 3 files changed, 115 insertions(+), 79 deletions(-) create mode 100644 crates/iceberg/src/file_record_batch_reader.rs diff --git a/crates/iceberg/src/file_record_batch_reader.rs b/crates/iceberg/src/file_record_batch_reader.rs new file mode 100644 index 000000000..8547e826f --- /dev/null +++ b/crates/iceberg/src/file_record_batch_reader.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Parquet file data reader + +use crate::{Error, ErrorKind}; +use async_stream::try_stream; +use futures::stream::StreamExt; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; + +use crate::io::FileIO; +use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; +use crate::spec::SchemaRef; + +/// Default arrow record batch size +const DEFAULT_BATCH_SIZE: usize = 1024; + +/// Reads data from Parquet files +pub struct FileRecordBatchReader { + batch_size: Option, + #[allow(dead_code)] + schema: SchemaRef, + file_io: FileIO, +} + +impl FileRecordBatchReader { + /// Constructs a new FileRecordBatchReader + pub fn new(file_io: FileIO, schema: SchemaRef, batch_size: Option) -> Self { + FileRecordBatchReader { + batch_size, + file_io, + schema, + } + } + + /// Take a stream of FileScanTasks and reads all the files. + /// Returns a stream of Arrow RecordBatches containing the data from the files + pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result { + let file_io = self.file_io.clone(); + let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + + Ok( + try_stream! { + while let Some(Ok(task)) = tasks.next().await { + + let projection_mask = self.get_arrow_projection_mask(&task); + + let parquet_reader = file_io + .new_input(task.data_file().file_path())? + .reader() + .await?; + + let mut batch_stream = ParquetRecordBatchStreamBuilder::new(parquet_reader) + .await + .map_err(|err| map_parquet_error(err, "Failed create record batch stream builder", task.data_file().file_path()))? + .with_batch_size(batch_size) + .with_projection(projection_mask) + .build() + .map_err(|err| map_parquet_error(err, "Fail to build record batch stream builder", task.data_file().file_path()))?; + + while let Some(batch) = batch_stream.next().await { + yield batch + .map_err(|err| map_parquet_error(err, "Fail to read record batch", task.data_file().file_path()))?; + } + } + }.boxed() + ) + } + + fn get_arrow_projection_mask(&self, _task: &FileScanTask) -> ProjectionMask { + // TODO: full implementation + ProjectionMask::all() + } +} + +fn map_parquet_error(err: parquet::errors::ParquetError, message: &str, file_path: &str) -> Error { + Error::new(ErrorKind::Unexpected, message) + .with_source(err) + .with_context("filename", file_path) +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 9ceadcac8..563498f38 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -52,4 +52,5 @@ pub mod expr; pub mod transaction; pub mod transform; +pub mod file_record_batch_reader; pub mod writer; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index fed0f9797..00f36137a 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -17,19 +17,14 @@ //! Table scan api. +use crate::file_record_batch_reader::FileRecordBatchReader; use crate::io::FileIO; use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef}; use crate::table::Table; use crate::{Error, ErrorKind}; use arrow_array::RecordBatch; -use async_stream::try_stream; use futures::stream::{iter, BoxStream}; -use futures::{StreamExt, TryStreamExt}; -use parquet::arrow::arrow_reader::RowSelection; -use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; - -/// Default arrow record batch size -const DEFAULT_BATCH_SIZE: usize = 1024; +use futures::StreamExt; /// Builder to create table scan. pub struct TableScanBuilder<'a> { @@ -37,7 +32,6 @@ pub struct TableScanBuilder<'a> { // Empty column names means to select all columns column_names: Vec, snapshot_id: Option, - batch_size: Option, } impl<'a> TableScanBuilder<'a> { @@ -46,7 +40,6 @@ impl<'a> TableScanBuilder<'a> { table, column_names: vec![], snapshot_id: None, - batch_size: None, } } @@ -71,11 +64,6 @@ impl<'a> TableScanBuilder<'a> { self } - pub fn with_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = Some(batch_size); - self - } - /// Build the table scan. pub fn build(self) -> crate::Result { let snapshot = match self.snapshot_id { @@ -123,7 +111,6 @@ impl<'a> TableScanBuilder<'a> { table_metadata: self.table.metadata_ref(), column_names: self.column_names, schema, - batch_size: self.batch_size, }) } } @@ -137,7 +124,6 @@ pub struct TableScan { file_io: FileIO, column_names: Vec, schema: SchemaRef, - batch_size: Option, } /// A stream of [`FileScanTask`]. @@ -179,61 +165,22 @@ impl TableScan { Ok(iter(file_scan_tasks).boxed()) } - /// Transforms a stream of FileScanTasks from plan_files into a stream of - /// Arrow RecordBatches. - pub fn open(&self, mut tasks: FileScanTaskStream) -> crate::Result { - let file_io = self.file_io.clone(); - let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE); - let projection_mask = self.get_arrow_projection_mask(); - let row_selection = self.get_arrow_row_selection(); - - Ok( - try_stream! { - while let Some(Ok(task)) = tasks.next().await { - let parquet_reader = file_io - .new_input(task.data_file().file_path())? - .reader() - .await?; - - let mut batch_stream = ParquetRecordBatchStreamBuilder::new(parquet_reader) - .await - .map_err(|err| { - Error::new(ErrorKind::Unexpected, "failed to load parquet file").with_source(err) - })? - .with_batch_size(batch_size) - .with_offset(task.start() as usize) - .with_limit(task.length() as usize) - .with_projection(projection_mask.clone()) - .with_row_selection(row_selection.clone()) - .build() - .unwrap() - .map_err(|err| Error::new(ErrorKind::Unexpected, "Fail to read data").with_source(err)); - - while let Some(batch) = batch_stream.next().await { - yield batch?; - } - } - }.boxed() - ) - } - - fn get_arrow_projection_mask(&self) -> ProjectionMask { - // TODO, dummy implementation - todo!() - } - - fn get_arrow_row_selection(&self) -> RowSelection { - // TODO, dummy implementation - todo!() + pub async fn execute( + &self, + batch_size: Option, + ) -> crate::Result { + FileRecordBatchReader::new(self.file_io.clone(), self.schema.clone(), batch_size) + .read(self.plan_files().await?) } } /// A task to scan part of file. #[derive(Debug)] -#[allow(dead_code)] pub struct FileScanTask { data_file: ManifestEntryRef, + #[allow(dead_code)] start: u64, + #[allow(dead_code)] length: u64, } @@ -241,17 +188,9 @@ pub struct FileScanTask { pub type ArrowRecordBatchStream = BoxStream<'static, crate::Result>; impl FileScanTask { - pub fn data_file(&self) -> ManifestEntryRef { + pub(crate) fn data_file(&self) -> ManifestEntryRef { self.data_file.clone() } - - pub fn start(&self) -> u64 { - self.start - } - - pub fn length(&self) -> u64 { - self.length - } } #[cfg(test)] @@ -445,13 +384,16 @@ mod tests { .set_compression(Compression::SNAPPY) .build(); - let file = File::create(format!("{}/1.parquet", &self.table_location)).unwrap(); - let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + for n in 1..=3 { + let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); + let mut writer = + ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); - writer.write(&to_write).expect("Writing batch"); + writer.write(&to_write).expect("Writing batch"); - // writer must be closed to write footer - writer.close().unwrap(); + // writer must be closed to write footer + writer.close().unwrap(); + } } } @@ -554,7 +496,6 @@ mod tests { } #[tokio::test] - #[ignore = "won't work yet as there are still some unimplemented methods"] async fn test_open_parquet_no_deletions() { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; @@ -563,7 +504,7 @@ mod tests { let table_scan = fixture.table.scan().build().unwrap(); let tasks = table_scan.plan_files().await.unwrap(); - let batch_stream = table_scan.open(tasks).unwrap(); + let batch_stream = table_scan.execute(None).await.unwrap(); let batches: Vec<_> = batch_stream.try_collect().await.unwrap();