diff --git a/common/streams/Cargo.toml b/common/streams/Cargo.toml index ae37fe212a3e..2fae12f581af 100644 --- a/common/streams/Cargo.toml +++ b/common/streams/Cargo.toml @@ -14,7 +14,6 @@ test = false # Workspace dependencies common-arrow = { path = "../arrow" } common-base = { path = "../base" } -common-dal = { path = "../dal" } common-datablocks = { path = "../datablocks" } common-datavalues = { path = "../datavalues" } common-exception = { path = "../exception" } @@ -33,3 +32,6 @@ futures = "0.3.18" pin-project-lite = "0.2.7" tempfile = "3.2.0" tokio-stream = { version = "0.1.8", features = ["net"] } + +[dev-dependencies] +common-dal = {path = "../dal"} diff --git a/common/streams/src/sources/source_factory.rs b/common/streams/src/sources/source_factory.rs index 47f716e57b1b..6c1629f4ee01 100644 --- a/common/streams/src/sources/source_factory.rs +++ b/common/streams/src/sources/source_factory.rs @@ -13,12 +13,12 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; -use common_dal::DataAccessor; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; +use futures::AsyncRead; +use futures::AsyncSeek; use crate::CsvSource; use crate::ParquetSource; @@ -26,8 +26,10 @@ use crate::Source; pub struct SourceFactory {} -pub struct SourceParams<'a> { - pub acc: Arc, +pub struct SourceParams<'a, R> +where R: AsyncRead + Unpin + Send +{ + pub reader: R, pub path: &'a str, pub format: &'a str, pub schema: DataSchemaRef, @@ -37,7 +39,8 @@ pub struct SourceParams<'a> { } impl SourceFactory { - pub fn try_get(params: SourceParams) -> Result> { + pub fn try_get(params: SourceParams) -> Result> + where R: AsyncRead + AsyncSeek + Unpin + Send + 'static { let format = params.format.to_lowercase(); match format.as_str() { "csv" => { @@ -65,9 +68,8 @@ impl SourceFactory { }) .unwrap_or(b'\n'); - let reader = params.acc.get_input_stream(params.path, None)?; Ok(Box::new(CsvSource::try_create( - reader, + params.reader, params.schema, has_header.eq_ignore_ascii_case("1"), field_delimitor, @@ -76,8 +78,7 @@ impl SourceFactory { )?)) } "parquet" => Ok(Box::new(ParquetSource::new( - params.acc, - params.path.to_owned(), + params.reader, params.schema, params.projection, ))), diff --git a/common/streams/src/sources/source_parquet.rs b/common/streams/src/sources/source_parquet.rs index 4c56faabd41e..61ccbb652f12 100644 --- a/common/streams/src/sources/source_parquet.rs +++ b/common/streams/src/sources/source_parquet.rs @@ -21,7 +21,6 @@ use common_arrow::arrow::io::parquet::read::page_stream_to_array; use common_arrow::arrow::io::parquet::read::read_metadata_async; use common_arrow::arrow::io::parquet::read::schema::FileMetaData; use common_arrow::parquet::read::get_page_stream; -use common_dal::DataAccessor; use common_datablocks::DataBlock; use common_datavalues::prelude::DataColumn; use common_datavalues::series::IntoSeries; @@ -31,141 +30,94 @@ use common_exception::Result; use common_tracing::tracing; use common_tracing::tracing::debug_span; use common_tracing::tracing::Instrument; -use futures::io::BufReader; +use futures::AsyncRead; +use futures::AsyncSeek; use futures::StreamExt; -use futures::TryStreamExt; - -/// default buffer size of BufferedReader, 1MB -const DEFAULT_READ_BUFFER_SIZE: u64 = 1024 * 1024; use crate::Source; -pub struct ParquetSource { - data_accessor: Arc, - path: String, - +pub struct ParquetSource { + reader: R, block_schema: DataSchemaRef, arrow_table_schema: ArrowSchema, projection: Vec, - row_group: usize, - row_groups: usize, metadata: Option, - file_len: Option, - read_buffer_size: Option, + current_row_group: usize, } -impl ParquetSource { - pub fn new( - data_accessor: Arc, - path: String, - table_schema: DataSchemaRef, - projection: Vec, - ) -> Self { - Self::with_hints( - data_accessor, - path, - table_schema, - projection, - None, - None, - None, - ) +impl ParquetSource +where R: AsyncRead + AsyncSeek + Unpin + Send +{ + pub fn new(reader: R, table_schema: DataSchemaRef, projection: Vec) -> Self { + Self::with_meta(reader, table_schema, projection, None) } - pub fn with_hints( - data_accessor: Arc, - path: String, + pub fn with_meta( + reader: R, table_schema: DataSchemaRef, projection: Vec, metadata: Option, - file_len: Option, - read_buffer_size: Option, ) -> Self { let block_schema = Arc::new(table_schema.project(projection.clone())); - Self { - data_accessor, - path, + ParquetSource { + reader, block_schema, arrow_table_schema: table_schema.to_arrow(), projection, - row_group: 0, - row_groups: 0, metadata, - file_len, - read_buffer_size, + current_row_group: 0, } } } #[async_trait] -impl Source for ParquetSource { +impl Source for ParquetSource +where R: AsyncRead + AsyncSeek + Unpin + Send +{ #[tracing::instrument(level = "debug", skip_all)] async fn read(&mut self) -> Result> { let fetched_metadata; let metadata = match &self.metadata { Some(m) => m, None => { - let mut reader = self - .data_accessor - .get_input_stream(self.path.as_str(), None)?; - fetched_metadata = read_metadata_async(&mut reader) + fetched_metadata = read_metadata_async(&mut self.reader) .instrument(debug_span!("parquet_source_read_meta")) .await .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - &fetched_metadata + self.metadata = Some(fetched_metadata); + match self.metadata.as_ref() { + Some(m) => m, + _ => unreachable!(), + } } }; - self.row_groups = metadata.row_groups.len(); - self.row_group = 0; - - if self.row_group >= self.row_groups { + if self.current_row_group >= metadata.row_groups.len() { return Ok(None); } - let col_num = self.projection.len(); - let row_group = self.row_group; + + let fields = self.arrow_table_schema.fields(); + let row_grp = &metadata.row_groups[self.current_row_group]; let cols = self .projection .clone() .into_iter() - .map(|idx| (metadata.row_groups[row_group].column(idx).clone(), idx)); - - let fields = self.arrow_table_schema.fields(); - let stream_len = self.file_len; - let read_buffer_size = self.read_buffer_size.unwrap_or(DEFAULT_READ_BUFFER_SIZE); - - let stream = futures::stream::iter(cols).map(|(col_meta, idx)| { - let data_accessor = self.data_accessor.clone(); - let path = self.path.clone(); - - async move { - let reader = data_accessor.get_input_stream(path.as_str(), stream_len)?; - let mut reader = BufReader::with_capacity(read_buffer_size as usize, reader); - // TODO cache block column - let col_pages = - get_page_stream(&col_meta, &mut reader, vec![], Arc::new(|_, _| true)) - .instrument(debug_span!("parquet_source_get_column_page")) - .await - .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - let pages = col_pages.map(|compressed_page| { - debug_span!("parquet_source_decompress_page") - .in_scope(|| decompress(compressed_page?, &mut vec![])) - }); - let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone()) - .instrument(debug_span!("parquet_source_page_stream_to_array")) - .await?; - let array: Arc = array.into(); - Ok::<_, ErrorCode>(DataColumn::Array(array.into_series())) - } - .instrument(debug_span!("parquet_source_read_column").or_current()) - }); - - // TODO configuration of the buffer size - let buffer_size = 10; - let n = std::cmp::min(buffer_size, col_num); - let data_cols = stream.buffered(n).try_collect().await?; - - self.row_group += 1; + .map(|idx| (row_grp.column(idx).clone(), idx)); + let mut data_cols = Vec::with_capacity(cols.len()); + for (col_meta, idx) in cols { + let col_pages = + get_page_stream(&col_meta, &mut self.reader, vec![], Arc::new(|_, _| true)) + .instrument(debug_span!("parquet_source_get_column_page")) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let pages = col_pages.map(|compressed_page| decompress(compressed_page?, &mut vec![])); + let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone()) + .instrument(debug_span!("parquet_source_page_stream_to_array")) + .await?; + let array: Arc = array.into(); + data_cols.push(DataColumn::Array(array.into_series())) + } + self.current_row_group += 1; let block = DataBlock::create(self.block_schema.clone(), data_cols); Ok(Some(block)) } diff --git a/common/streams/tests/it/source.rs b/common/streams/tests/it/source.rs index af7e7fe216dc..71f454904444 100644 --- a/common/streams/tests/it/source.rs +++ b/common/streams/tests/it/source.rs @@ -19,10 +19,16 @@ use common_base::tokio; use common_dal::DataAccessor; use common_dal::Local; use common_datablocks::assert_blocks_eq; +use common_datablocks::DataBlock; +use common_datavalues::prelude::DataColumn; +use common_datavalues::series::Series; use common_datavalues::DataField; use common_datavalues::DataSchemaRefExt; use common_datavalues::DataType; +use common_exception::ErrorCode; +use common_exception::Result; use common_streams::CsvSource; +use common_streams::ParquetSource; use common_streams::Source; use common_streams::ValueSource; @@ -113,3 +119,91 @@ async fn test_parse_csvs() { } } } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_source_parquet() -> Result<()> { + use common_datavalues::DataType; + let schema = DataSchemaRefExt::create(vec![ + DataField::new("a", DataType::Int8, false), + DataField::new("b", DataType::String, false), + ]); + + let arrow_schema = schema.to_arrow(); + + use common_arrow::arrow::io::parquet::write::*; + let options = WriteOptions { + write_statistics: true, + compression: Compression::Lz4, // let's begin with lz4 + version: Version::V2, + }; + + use common_datavalues::prelude::SeriesFrom; + let col_a = Series::new(vec![1i8, 1, 2, 1, 2, 3]); + let col_b = Series::new(vec!["1", "1", "2", "1", "2", "3"]); + let sample_block = DataBlock::create(schema.clone(), vec![ + DataColumn::Array(col_a), + DataColumn::Array(col_b), + ]); + + use common_arrow::arrow::record_batch::RecordBatch; + let batch = RecordBatch::try_from(sample_block)?; + use common_arrow::parquet::encoding::Encoding; + let encodings = std::iter::repeat(Encoding::Plain) + .take(arrow_schema.fields.len()) + .collect::>(); + + let page_nums_expects = 3; + let name = "test-parquet"; + let dir = tempfile::tempdir().unwrap(); + + // write test parquet + let len = { + let rg_iter = std::iter::repeat(batch).map(Ok).take(page_nums_expects); + let row_groups = RowGroupIterator::try_new(rg_iter, &arrow_schema, options, encodings)?; + let parquet_schema = row_groups.parquet_schema().clone(); + let path = dir.path().join(name); + let mut writer = File::create(path).unwrap(); + common_arrow::parquet::write::write_file( + &mut writer, + row_groups, + parquet_schema, + options, + None, + None, + ) + .map_err(|e| ErrorCode::ParquetError(e.to_string()))? + }; + + let local = Local::with_path(dir.path().to_path_buf()); + let stream = local.get_input_stream(name, Some(len)).unwrap(); + + let default_proj = (0..schema.fields().len()) + .into_iter() + .collect::>(); + + let mut page_nums = 0; + let mut parquet_source = ParquetSource::new(stream, schema, default_proj); + // expects `page_nums_expects` blocks, and + while let Some(block) = parquet_source.read().await? { + page_nums += 1; + // for each block, the content is the same of `sample_block` + assert_blocks_eq( + vec![ + "+---+---+", + "| a | b |", + "+---+---+", + "| 1 | 1 |", + "| 1 | 1 |", + "| 2 | 2 |", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | 3 |", + "+---+---+", + ], + &[block], + ); + } + + assert_eq!(page_nums_expects, page_nums); + Ok(()) +} diff --git a/query/src/interpreters/interpreter_copy.rs b/query/src/interpreters/interpreter_copy.rs index 6ed66d571486..a88facde3cc0 100644 --- a/query/src/interpreters/interpreter_copy.rs +++ b/query/src/interpreters/interpreter_copy.rs @@ -25,6 +25,7 @@ use common_streams::SendableDataBlockStream; use common_streams::SourceFactory; use common_streams::SourceParams; use common_streams::SourceStream; +use futures::io::BufReader; use futures::TryStreamExt; use nom::bytes::complete::tag; use nom::bytes::complete::take_until; @@ -71,8 +72,11 @@ impl Interpreter for CopyInterpreter { let acc = get_dal_by_stage(self.ctx.clone(), stage)?; let max_block_size = self.ctx.get_settings().get_max_block_size()? as usize; + let input_stream = acc.get_input_stream(path, None)?; + let read_buffer_size = self.ctx.get_settings().get_storage_read_buffer_size()?; + let reader = BufReader::with_capacity(read_buffer_size as usize, input_stream); let source_params = SourceParams { - acc, + reader, path, format: self.plan.format.as_str(), schema: self.plan.schema.clone(), diff --git a/query/src/storages/fuse/io/block_reader.rs b/query/src/storages/fuse/io/block_reader.rs index 7438905c631c..d8601817c1de 100644 --- a/query/src/storages/fuse/io/block_reader.rs +++ b/query/src/storages/fuse/io/block_reader.rs @@ -14,14 +14,17 @@ use std::sync::Arc; +use common_arrow::arrow::datatypes::DataType; use common_arrow::arrow::datatypes::Schema as ArrowSchema; use common_arrow::arrow::io::parquet::read::decompress; use common_arrow::arrow::io::parquet::read::page_stream_to_array; use common_arrow::arrow::io::parquet::read::read_metadata; use common_arrow::arrow::io::parquet::read::read_metadata_async; use common_arrow::arrow::io::parquet::read::schema::FileMetaData; +use common_arrow::parquet::metadata::ColumnChunkMetaData; use common_arrow::parquet::read::get_page_stream; use common_dal::DataAccessor; +use common_dal::InputStream; use common_datablocks::DataBlock; use common_datavalues::prelude::DataColumn; use common_datavalues::series::IntoSeries; @@ -37,24 +40,15 @@ use futures::TryStreamExt; use crate::storages::cache::StorageCache; -/// default buffer size of BufferedReader, 1MB -const DEFAULT_READ_BUFFER_SIZE: u64 = 1024 * 1024; - pub struct BlockReader { data_accessor: Arc, path: String, - block_schema: DataSchemaRef, arrow_table_schema: ArrowSchema, projection: Vec, - row_group: usize, - row_groups: usize, + file_len: u64, + read_buffer_size: u64, metadata: FileMetaData, - file_len: Option, - read_buffer_size: Option, - - #[allow(dead_code)] - cache: Arc>>, } impl BlockReader { @@ -63,27 +57,8 @@ impl BlockReader { path: String, table_schema: DataSchemaRef, projection: Vec, - cache: Arc>>, - ) -> Result { - Self::with_hints( - data_accessor, - path, - table_schema, - projection, - None, - None, - cache, - ) - .await - } - - pub async fn with_hints( - data_accessor: Arc, - path: String, - table_schema: DataSchemaRef, - projection: Vec, - file_len: Option, - read_buffer_size: Option, + file_len: u64, + read_buffer_size: u64, cache: Arc>>, ) -> Result { let block_schema = Arc::new(table_schema.project(projection.clone())); @@ -106,61 +81,73 @@ impl BlockReader { block_schema, arrow_table_schema: table_schema.to_arrow(), projection, - row_group: 0, - row_groups: 0, - metadata, file_len, read_buffer_size, - cache, + metadata, }) } + async fn read_column( + mut reader: BufReader, + column_chunk_meta: &ColumnChunkMetaData, + data_type: DataType, + ) -> Result { + let col_pages = get_page_stream( + column_chunk_meta, + &mut reader, + vec![], + Arc::new(|_, _| true), + ) + .instrument(debug_span!("block_reader_get_column_page")) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let pages = col_pages.map(|compressed_page| { + debug_span!("block_reader_decompress_page") + .in_scope(|| decompress(compressed_page?, &mut vec![])) + }); + let array = page_stream_to_array(pages, column_chunk_meta, data_type) + .instrument(debug_span!("block_reader_page_stream_to_array")) + .await?; + let array: Arc = array.into(); + Ok::<_, ErrorCode>(DataColumn::Array(array.into_series())) + } + #[tracing::instrument(level = "debug", skip_all)] - pub async fn read(&mut self) -> Result> { + pub async fn read(&mut self) -> Result { let metadata = &self.metadata; - self.row_groups = metadata.row_groups.len(); - self.row_group = 0; + // FUSE uses exact one "row group" + let num_row_groups = metadata.row_groups.len(); + let row_group = if num_row_groups != 1 { + return Err(ErrorCode::LogicalError(format!( + "invalid parquet file, expect exact one row group insides, but got {}", + num_row_groups + ))); + } else { + &metadata.row_groups[0] + }; - if self.row_group >= self.row_groups { - return Ok(None); - } let col_num = self.projection.len(); - let row_group = self.row_group; let cols = self .projection .clone() .into_iter() - .map(|idx| (metadata.row_groups[row_group].column(idx).clone(), idx)); + .map(|idx| (row_group.column(idx).clone(), idx)); let fields = self.arrow_table_schema.fields(); let stream_len = self.file_len; - let read_buffer_size = self.read_buffer_size.unwrap_or(DEFAULT_READ_BUFFER_SIZE); + let read_buffer_size = self.read_buffer_size; let stream = futures::stream::iter(cols).map(|(col_meta, idx)| { let data_accessor = self.data_accessor.clone(); let path = self.path.clone(); - async move { - let reader = data_accessor.get_input_stream(path.as_str(), stream_len)?; - let mut reader = BufReader::with_capacity(read_buffer_size as usize, reader); - // TODO cache block column - let col_pages = - get_page_stream(&col_meta, &mut reader, vec![], Arc::new(|_, _| true)) - .instrument(debug_span!("parquet_source_get_column_page")) - .await - .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - let pages = col_pages.map(|compressed_page| { - debug_span!("parquet_source_decompress_page") - .in_scope(|| decompress(compressed_page?, &mut vec![])) - }); - let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone()) - .instrument(debug_span!("parquet_source_page_stream_to_array")) - .await?; - let array: Arc = array.into(); - Ok::<_, ErrorCode>(DataColumn::Array(array.into_series())) + let reader = data_accessor.get_input_stream(path.as_str(), Some(stream_len))?; + let reader = BufReader::with_capacity(read_buffer_size as usize, reader); + let data_type = fields[idx].data_type.clone(); + Self::read_column(reader, &col_meta, data_type).await } - .instrument(debug_span!("parquet_source_read_column").or_current()) + .instrument(debug_span!("block_reader_read_column").or_current()) }); // TODO configuration of the buffer size @@ -168,8 +155,7 @@ impl BlockReader { let n = std::cmp::min(buffer_size, col_num); let data_cols = stream.buffered(n).try_collect().await?; - self.row_group += 1; let block = DataBlock::create(self.block_schema.clone(), data_cols); - Ok(Some(block)) + Ok(block) } } diff --git a/query/src/storages/fuse/operations/read.rs b/query/src/storages/fuse/operations/read.rs index 045235d89734..6122b8823fda 100644 --- a/query/src/storages/fuse/operations/read.rs +++ b/query/src/storages/fuse/operations/read.rs @@ -76,31 +76,22 @@ impl FuseTable { let part_location = part_info.location(); let part_len = part_info.length(); - let mut reader = BlockReader::with_hints( + let mut block_reader = BlockReader::new( da, part_info.location().to_owned(), table_schema, projection, - Some(part_len), - Some(read_buffer_size), + part_len, + read_buffer_size, cache, ) .await?; - reader - .read() - .await - .map_err(|e| { - ErrorCode::ParquetError(format!( - "fail to read block {}, {}", - part_location, e - )) - })? - .ok_or_else(|| { - ErrorCode::ParquetError(format!( - "reader returns None for block {}", - part_location, - )) - }) + block_reader.read().await.map_err(|e| { + ErrorCode::ParquetError(format!( + "fail to read block {}, {}", + part_location, e + )) + }) } }) .buffer_unordered(bite_size as usize)