From 3387aebd767ae8079e3db5cc2376c28323c726e0 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 4 Jan 2022 12:38:30 +0800 Subject: [PATCH 1/8] add block reader --- query/src/storages/fuse/io/block_reader.rs | 155 +++++++++++++++++++++ query/src/storages/fuse/io/meta_reader.rs | 3 + query/src/storages/fuse/io/mod.rs | 1 + 3 files changed, 159 insertions(+) create mode 100644 query/src/storages/fuse/io/block_reader.rs diff --git a/query/src/storages/fuse/io/block_reader.rs b/query/src/storages/fuse/io/block_reader.rs new file mode 100644 index 000000000000..9a6a3fee43f5 --- /dev/null +++ b/query/src/storages/fuse/io/block_reader.rs @@ -0,0 +1,155 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use common_arrow::arrow::datatypes::Schema as ArrowSchema; +use common_arrow::arrow::io::parquet::read::decompress; +use common_arrow::arrow::io::parquet::read::page_stream_to_array; +use common_arrow::arrow::io::parquet::read::read_metadata_async; +use common_arrow::arrow::io::parquet::read::schema::FileMetaData; +use common_arrow::parquet::read::get_page_stream; +use common_dal::DataAccessor; +use common_datablocks::DataBlock; +use common_datavalues::prelude::DataColumn; +use common_datavalues::series::IntoSeries; +use common_datavalues::DataSchemaRef; +use common_exception::ErrorCode; +use common_exception::Result; +use common_tracing::tracing; +use common_tracing::tracing::debug_span; +use common_tracing::tracing::Instrument; +use futures::io::BufReader; +use futures::StreamExt; +use futures::TryStreamExt; + +/// default buffer size of BufferedReader, 1MB +const DEFAULT_READ_BUFFER_SIZE: u64 = 1024 * 1024; + +use crate::Source; + +pub struct BlockReader { + data_accessor: Arc, + path: String, + + block_schema: DataSchemaRef, + arrow_table_schema: ArrowSchema, + projection: Vec, + row_group: usize, + row_groups: usize, + metadata: Option, + file_len: Option, + read_buffer_size: Option, +} + +impl BlockReader { + pub fn with_hints( + data_accessor: Arc, + path: String, + table_schema: DataSchemaRef, + projection: Vec, + metadata: Option, + file_len: Option, + read_buffer_size: Option, + ) -> Self { + let block_schema = Arc::new(table_schema.project(projection.clone())); + Self { + data_accessor, + path, + block_schema, + arrow_table_schema: table_schema.to_arrow(), + projection, + row_group: 0, + row_groups: 0, + metadata, + file_len, + read_buffer_size, + } + } +} + +#[async_trait] +impl Source for BlockReader { + #[tracing::instrument(level = "debug", skip_all)] + async fn read(&mut self) -> Result { + let fetched_metadata; + let metadata = match &self.metadata { + Some(m) => m, + None => { + let mut reader = self + .data_accessor + .get_input_stream(self.path.as_str(), None)?; + fetched_metadata = read_metadata_async(&mut reader) + .instrument(debug_span!("parquet_source_read_meta")) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + &fetched_metadata + } + }; + + self.row_groups = metadata.row_groups.len(); + self.row_group = 0; + + if self.row_group >= self.row_groups { + return Ok(None); + } + let col_num = self.projection.len(); + let row_group = self.row_group; + let cols = self + .projection + .clone() + .into_iter() + .map(|idx| (metadata.row_groups[row_group].column(idx).clone(), idx)); + + let fields = self.arrow_table_schema.fields(); + let stream_len = self.file_len; + let read_buffer_size = self.read_buffer_size.unwrap_or(DEFAULT_READ_BUFFER_SIZE); + + let stream = futures::stream::iter(cols).map(|(col_meta, idx)| { + let data_accessor = self.data_accessor.clone(); + let path = self.path.clone(); + + async move { + let reader = data_accessor.get_input_stream(path.as_str(), stream_len)?; + let mut reader = BufReader::with_capacity(read_buffer_size as usize, reader); + // TODO cache block column + let col_pages = + get_page_stream(&col_meta, &mut reader, vec![], Arc::new(|_, _| true)) + .instrument(debug_span!("parquet_source_get_column_page")) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let pages = col_pages.map(|compressed_page| { + debug_span!("parquet_source_decompress_page") + .in_scope(|| decompress(compressed_page?, &mut vec![])) + }); + let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone()) + .instrument(debug_span!("parquet_source_page_stream_to_array")) + .await?; + let array: Arc = array.into(); + Ok::<_, ErrorCode>(DataColumn::Array(array.into_series())) + } + .instrument(debug_span!("parquet_source_read_column").or_current()) + }); + + // TODO configuration of the buffer size + let buffer_size = 10; + let n = std::cmp::min(buffer_size, col_num); + let data_cols = stream.buffered(n).try_collect().await?; + + self.row_group += 1; + let block = DataBlock::create(self.block_schema.clone(), data_cols); + Ok(block) + } +} diff --git a/query/src/storages/fuse/io/meta_reader.rs b/query/src/storages/fuse/io/meta_reader.rs index 53e9aaeda0e8..523bfa0bc151 100644 --- a/query/src/storages/fuse/io/meta_reader.rs +++ b/query/src/storages/fuse/io/meta_reader.rs @@ -89,3 +89,6 @@ impl SegmentReader { Ok(segment_info) } } + + +? diff --git a/query/src/storages/fuse/io/mod.rs b/query/src/storages/fuse/io/mod.rs index 062b1cac56d8..68bea875cc2f 100644 --- a/query/src/storages/fuse/io/mod.rs +++ b/query/src/storages/fuse/io/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod block_reader; mod block_stream_writer; mod block_writer; mod locations; From 580d980de6d2417909e2cd5d1dc26e5e744bca5b Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 4 Jan 2022 13:14:15 +0800 Subject: [PATCH 2/8] Table::read switch to BlockReader --- query/src/storages/fuse/io/block_reader.rs | 45 ++++++++-------------- query/src/storages/fuse/io/meta_reader.rs | 3 -- query/src/storages/fuse/io/mod.rs | 1 + query/src/storages/fuse/operations/read.rs | 30 +++++---------- 4 files changed, 26 insertions(+), 53 deletions(-) diff --git a/query/src/storages/fuse/io/block_reader.rs b/query/src/storages/fuse/io/block_reader.rs index 9a6a3fee43f5..73257315b22c 100644 --- a/query/src/storages/fuse/io/block_reader.rs +++ b/query/src/storages/fuse/io/block_reader.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use async_trait::async_trait; use common_arrow::arrow::datatypes::Schema as ArrowSchema; use common_arrow::arrow::io::parquet::read::decompress; use common_arrow::arrow::io::parquet::read::page_stream_to_array; @@ -35,34 +34,26 @@ use futures::io::BufReader; use futures::StreamExt; use futures::TryStreamExt; -/// default buffer size of BufferedReader, 1MB -const DEFAULT_READ_BUFFER_SIZE: u64 = 1024 * 1024; - -use crate::Source; - pub struct BlockReader { data_accessor: Arc, path: String, - block_schema: DataSchemaRef, arrow_table_schema: ArrowSchema, projection: Vec, - row_group: usize, - row_groups: usize, + file_len: u64, + read_buffer_size: u64, metadata: Option, - file_len: Option, - read_buffer_size: Option, } impl BlockReader { - pub fn with_hints( + pub fn new( data_accessor: Arc, path: String, table_schema: DataSchemaRef, projection: Vec, + file_len: u64, + read_buffer_size: u64, metadata: Option, - file_len: Option, - read_buffer_size: Option, ) -> Self { let block_schema = Arc::new(table_schema.project(projection.clone())); Self { @@ -71,19 +62,14 @@ impl BlockReader { block_schema, arrow_table_schema: table_schema.to_arrow(), projection, - row_group: 0, - row_groups: 0, metadata, file_len, read_buffer_size, } } -} -#[async_trait] -impl Source for BlockReader { #[tracing::instrument(level = "debug", skip_all)] - async fn read(&mut self) -> Result { + pub async fn read(&mut self) -> Result { let fetched_metadata; let metadata = match &self.metadata { Some(m) => m, @@ -99,30 +85,30 @@ impl Source for BlockReader { } }; - self.row_groups = metadata.row_groups.len(); - self.row_group = 0; + let num_row_groups = metadata.row_groups.len(); + let row_group = if num_row_groups != 1 { + return Err(ErrorCode::LogicalError("invalid parquet file")); + } else { + &metadata.row_groups[0] + }; - if self.row_group >= self.row_groups { - return Ok(None); - } let col_num = self.projection.len(); - let row_group = self.row_group; let cols = self .projection .clone() .into_iter() - .map(|idx| (metadata.row_groups[row_group].column(idx).clone(), idx)); + .map(|idx| (row_group.column(idx).clone(), idx)); let fields = self.arrow_table_schema.fields(); let stream_len = self.file_len; - let read_buffer_size = self.read_buffer_size.unwrap_or(DEFAULT_READ_BUFFER_SIZE); + let read_buffer_size = self.read_buffer_size; let stream = futures::stream::iter(cols).map(|(col_meta, idx)| { let data_accessor = self.data_accessor.clone(); let path = self.path.clone(); async move { - let reader = data_accessor.get_input_stream(path.as_str(), stream_len)?; + let reader = data_accessor.get_input_stream(path.as_str(), Some(stream_len))?; let mut reader = BufReader::with_capacity(read_buffer_size as usize, reader); // TODO cache block column let col_pages = @@ -148,7 +134,6 @@ impl Source for BlockReader { let n = std::cmp::min(buffer_size, col_num); let data_cols = stream.buffered(n).try_collect().await?; - self.row_group += 1; let block = DataBlock::create(self.block_schema.clone(), data_cols); Ok(block) } diff --git a/query/src/storages/fuse/io/meta_reader.rs b/query/src/storages/fuse/io/meta_reader.rs index 523bfa0bc151..53e9aaeda0e8 100644 --- a/query/src/storages/fuse/io/meta_reader.rs +++ b/query/src/storages/fuse/io/meta_reader.rs @@ -89,6 +89,3 @@ impl SegmentReader { Ok(segment_info) } } - - -? diff --git a/query/src/storages/fuse/io/mod.rs b/query/src/storages/fuse/io/mod.rs index 68bea875cc2f..f3b9443b9377 100644 --- a/query/src/storages/fuse/io/mod.rs +++ b/query/src/storages/fuse/io/mod.rs @@ -18,6 +18,7 @@ mod block_writer; mod locations; mod meta_reader; +pub use block_reader::BlockReader; pub use block_stream_writer::BlockStreamWriter; pub use block_stream_writer::SegmentInfoStream; pub use locations::gen_segment_info_location; diff --git a/query/src/storages/fuse/operations/read.rs b/query/src/storages/fuse/operations/read.rs index 7c2994fafd89..94ef71b06fa4 100644 --- a/query/src/storages/fuse/operations/read.rs +++ b/query/src/storages/fuse/operations/read.rs @@ -19,14 +19,13 @@ use common_datavalues::DataSchema; use common_exception::ErrorCode; use common_exception::Result; use common_planners::Extras; -use common_streams::ParquetSource; use common_streams::SendableDataBlockStream; -use common_streams::Source; use common_tracing::tracing_futures::Instrument; use futures::StreamExt; use super::part_info::PartInfo; use crate::sessions::QueryContext; +use crate::storages::fuse::io::BlockReader; use crate::storages::fuse::FuseTable; impl FuseTable { @@ -76,30 +75,21 @@ impl FuseTable { let part_location = part_info.location(); let part_len = part_info.length(); - let mut source = ParquetSource::with_hints( + let mut source = BlockReader::new( da, part_info.location().to_owned(), table_schema, projection, + part_len, + read_buffer_size, None, // TODO cache parquet meta - Some(part_len), - Some(read_buffer_size), ); - source - .read() - .await - .map_err(|e| { - ErrorCode::ParquetError(format!( - "fail to read block {}, {}", - part_location, e - )) - })? - .ok_or_else(|| { - ErrorCode::ParquetError(format!( - "reader returns None for block {}", - part_location, - )) - }) + source.read().await.map_err(|e| { + ErrorCode::ParquetError(format!( + "fail to read block {}, {}", + part_location, e + )) + }) } }) .buffer_unordered(bite_size as usize) From 2c7afdf3e1cb157d5b66975ff7577551a5dc73de Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 4 Jan 2022 14:26:58 +0800 Subject: [PATCH 3/8] decouple common-stream from dal --- common/streams/Cargo.toml | 4 +- common/streams/src/sources/source_factory.rs | 20 ++-- common/streams/src/sources/source_parquet.rs | 115 ++++++------------- query/src/interpreters/interpreter_copy.rs | 6 +- 4 files changed, 54 insertions(+), 91 deletions(-) diff --git a/common/streams/Cargo.toml b/common/streams/Cargo.toml index ae37fe212a3e..6db193527c5f 100644 --- a/common/streams/Cargo.toml +++ b/common/streams/Cargo.toml @@ -14,7 +14,6 @@ test = false # Workspace dependencies common-arrow = { path = "../arrow" } common-base = { path = "../base" } -common-dal = { path = "../dal" } common-datablocks = { path = "../datablocks" } common-datavalues = { path = "../datavalues" } common-exception = { path = "../exception" } @@ -33,3 +32,6 @@ futures = "0.3.18" pin-project-lite = "0.2.7" tempfile = "3.2.0" tokio-stream = { version = "0.1.8", features = ["net"] } + +[dev-dependencies] +common-dal = {path = "../dal"} \ No newline at end of file diff --git a/common/streams/src/sources/source_factory.rs b/common/streams/src/sources/source_factory.rs index 47f716e57b1b..0f4384cbdfde 100644 --- a/common/streams/src/sources/source_factory.rs +++ b/common/streams/src/sources/source_factory.rs @@ -13,12 +13,12 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; -use common_dal::DataAccessor; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; +use futures::AsyncRead; +use futures::AsyncSeek; use crate::CsvSource; use crate::ParquetSource; @@ -26,8 +26,10 @@ use crate::Source; pub struct SourceFactory {} -pub struct SourceParams<'a> { - pub acc: Arc, +pub struct SourceParams<'a, R> +where R: AsyncRead + Unpin + Send +{ + pub reader: R, pub path: &'a str, pub format: &'a str, pub schema: DataSchemaRef, @@ -37,8 +39,10 @@ pub struct SourceParams<'a> { } impl SourceFactory { - pub fn try_get(params: SourceParams) -> Result> { + pub fn try_get(params: SourceParams) -> Result> + where R: AsyncRead + AsyncSeek + Unpin + Send + 'static { let format = params.format.to_lowercase(); + // let reader = params.acc.get_input_stream(params.path, None)?; match format.as_str() { "csv" => { let has_header = params @@ -65,9 +69,8 @@ impl SourceFactory { }) .unwrap_or(b'\n'); - let reader = params.acc.get_input_stream(params.path, None)?; Ok(Box::new(CsvSource::try_create( - reader, + params.reader, params.schema, has_header.eq_ignore_ascii_case("1"), field_delimitor, @@ -76,8 +79,7 @@ impl SourceFactory { )?)) } "parquet" => Ok(Box::new(ParquetSource::new( - params.acc, - params.path.to_owned(), + params.reader, params.schema, params.projection, ))), diff --git a/common/streams/src/sources/source_parquet.rs b/common/streams/src/sources/source_parquet.rs index 4c56faabd41e..4785eded0965 100644 --- a/common/streams/src/sources/source_parquet.rs +++ b/common/streams/src/sources/source_parquet.rs @@ -21,7 +21,6 @@ use common_arrow::arrow::io::parquet::read::page_stream_to_array; use common_arrow::arrow::io::parquet::read::read_metadata_async; use common_arrow::arrow::io::parquet::read::schema::FileMetaData; use common_arrow::parquet::read::get_page_stream; -use common_dal::DataAccessor; use common_datablocks::DataBlock; use common_datavalues::prelude::DataColumn; use common_datavalues::series::IntoSeries; @@ -31,84 +30,58 @@ use common_exception::Result; use common_tracing::tracing; use common_tracing::tracing::debug_span; use common_tracing::tracing::Instrument; -use futures::io::BufReader; +use futures::AsyncRead; +use futures::AsyncSeek; use futures::StreamExt; -use futures::TryStreamExt; - -/// default buffer size of BufferedReader, 1MB -const DEFAULT_READ_BUFFER_SIZE: u64 = 1024 * 1024; use crate::Source; -pub struct ParquetSource { - data_accessor: Arc, - path: String, +pub struct ParquetSource { + reader: R, block_schema: DataSchemaRef, arrow_table_schema: ArrowSchema, projection: Vec, row_group: usize, - row_groups: usize, metadata: Option, - file_len: Option, - read_buffer_size: Option, } -impl ParquetSource { - pub fn new( - data_accessor: Arc, - path: String, - table_schema: DataSchemaRef, - projection: Vec, - ) -> Self { - Self::with_hints( - data_accessor, - path, - table_schema, - projection, - None, - None, - None, - ) +impl ParquetSource +where R: AsyncRead + AsyncSeek + Unpin + Send +{ + pub fn new(reader: R, table_schema: DataSchemaRef, projection: Vec) -> Self { + Self::with_hints(reader, table_schema, projection, None) } pub fn with_hints( - data_accessor: Arc, - path: String, + reader: R, table_schema: DataSchemaRef, projection: Vec, metadata: Option, - file_len: Option, - read_buffer_size: Option, ) -> Self { let block_schema = Arc::new(table_schema.project(projection.clone())); - Self { - data_accessor, - path, + ParquetSource { + reader, block_schema, arrow_table_schema: table_schema.to_arrow(), projection, row_group: 0, - row_groups: 0, metadata, - file_len, - read_buffer_size, } } } #[async_trait] -impl Source for ParquetSource { +impl Source for ParquetSource +where R: AsyncRead + AsyncSeek + Unpin + Send +{ #[tracing::instrument(level = "debug", skip_all)] async fn read(&mut self) -> Result> { let fetched_metadata; let metadata = match &self.metadata { Some(m) => m, None => { - let mut reader = self - .data_accessor - .get_input_stream(self.path.as_str(), None)?; - fetched_metadata = read_metadata_async(&mut reader) + fetched_metadata = read_metadata_async(&mut self.reader) .instrument(debug_span!("parquet_source_read_meta")) .await .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; @@ -116,13 +89,11 @@ impl Source for ParquetSource { } }; - self.row_groups = metadata.row_groups.len(); + let row_groups = metadata.row_groups.len(); self.row_group = 0; - - if self.row_group >= self.row_groups { + if self.row_group >= row_groups { return Ok(None); } - let col_num = self.projection.len(); let row_group = self.row_group; let cols = self .projection @@ -131,39 +102,23 @@ impl Source for ParquetSource { .map(|idx| (metadata.row_groups[row_group].column(idx).clone(), idx)); let fields = self.arrow_table_schema.fields(); - let stream_len = self.file_len; - let read_buffer_size = self.read_buffer_size.unwrap_or(DEFAULT_READ_BUFFER_SIZE); - - let stream = futures::stream::iter(cols).map(|(col_meta, idx)| { - let data_accessor = self.data_accessor.clone(); - let path = self.path.clone(); - - async move { - let reader = data_accessor.get_input_stream(path.as_str(), stream_len)?; - let mut reader = BufReader::with_capacity(read_buffer_size as usize, reader); - // TODO cache block column - let col_pages = - get_page_stream(&col_meta, &mut reader, vec![], Arc::new(|_, _| true)) - .instrument(debug_span!("parquet_source_get_column_page")) - .await - .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - let pages = col_pages.map(|compressed_page| { - debug_span!("parquet_source_decompress_page") - .in_scope(|| decompress(compressed_page?, &mut vec![])) - }); - let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone()) - .instrument(debug_span!("parquet_source_page_stream_to_array")) - .await?; - let array: Arc = array.into(); - Ok::<_, ErrorCode>(DataColumn::Array(array.into_series())) - } - .instrument(debug_span!("parquet_source_read_column").or_current()) - }); - - // TODO configuration of the buffer size - let buffer_size = 10; - let n = std::cmp::min(buffer_size, col_num); - let data_cols = stream.buffered(n).try_collect().await?; + let mut data_cols = Vec::with_capacity(cols.len()); + for (col_meta, idx) in cols { + let col_pages = + get_page_stream(&col_meta, &mut self.reader, vec![], Arc::new(|_, _| true)) + .instrument(debug_span!("parquet_source_get_column_page")) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let pages = col_pages.map(|compressed_page| { + debug_span!("parquet_source_decompress_page") + .in_scope(|| decompress(compressed_page?, &mut vec![])) + }); + let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone()) + .instrument(debug_span!("parquet_source_page_stream_to_array")) + .await?; + let array: Arc = array.into(); + data_cols.push(DataColumn::Array(array.into_series())) + } self.row_group += 1; let block = DataBlock::create(self.block_schema.clone(), data_cols); diff --git a/query/src/interpreters/interpreter_copy.rs b/query/src/interpreters/interpreter_copy.rs index 6ed66d571486..a88facde3cc0 100644 --- a/query/src/interpreters/interpreter_copy.rs +++ b/query/src/interpreters/interpreter_copy.rs @@ -25,6 +25,7 @@ use common_streams::SendableDataBlockStream; use common_streams::SourceFactory; use common_streams::SourceParams; use common_streams::SourceStream; +use futures::io::BufReader; use futures::TryStreamExt; use nom::bytes::complete::tag; use nom::bytes::complete::take_until; @@ -71,8 +72,11 @@ impl Interpreter for CopyInterpreter { let acc = get_dal_by_stage(self.ctx.clone(), stage)?; let max_block_size = self.ctx.get_settings().get_max_block_size()? as usize; + let input_stream = acc.get_input_stream(path, None)?; + let read_buffer_size = self.ctx.get_settings().get_storage_read_buffer_size()?; + let reader = BufReader::with_capacity(read_buffer_size as usize, input_stream); let source_params = SourceParams { - acc, + reader, path, format: self.plan.format.as_str(), schema: self.plan.schema.clone(), From 1d0fb22e5878b0a3389d42de16cfc796ea11e8a5 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 4 Jan 2022 14:33:32 +0800 Subject: [PATCH 4/8] add newline at the end of file --- common/streams/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/streams/Cargo.toml b/common/streams/Cargo.toml index 6db193527c5f..2fae12f581af 100644 --- a/common/streams/Cargo.toml +++ b/common/streams/Cargo.toml @@ -34,4 +34,4 @@ tempfile = "3.2.0" tokio-stream = { version = "0.1.8", features = ["net"] } [dev-dependencies] -common-dal = {path = "../dal"} \ No newline at end of file +common-dal = {path = "../dal"} From 2609f2511296a74d085cdcb8608c8db3d240ec8c Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 4 Jan 2022 16:11:37 +0800 Subject: [PATCH 5/8] refactoring ParquetSource and BlockReader --- common/streams/src/sources/source_parquet.rs | 35 ++++++------- query/src/storages/fuse/io/block_reader.rs | 54 ++++++++++++-------- 2 files changed, 50 insertions(+), 39 deletions(-) diff --git a/common/streams/src/sources/source_parquet.rs b/common/streams/src/sources/source_parquet.rs index 4785eded0965..61ccbb652f12 100644 --- a/common/streams/src/sources/source_parquet.rs +++ b/common/streams/src/sources/source_parquet.rs @@ -38,22 +38,21 @@ use crate::Source; pub struct ParquetSource { reader: R, - block_schema: DataSchemaRef, arrow_table_schema: ArrowSchema, projection: Vec, - row_group: usize, metadata: Option, + current_row_group: usize, } impl ParquetSource where R: AsyncRead + AsyncSeek + Unpin + Send { pub fn new(reader: R, table_schema: DataSchemaRef, projection: Vec) -> Self { - Self::with_hints(reader, table_schema, projection, None) + Self::with_meta(reader, table_schema, projection, None) } - pub fn with_hints( + pub fn with_meta( reader: R, table_schema: DataSchemaRef, projection: Vec, @@ -65,8 +64,8 @@ where R: AsyncRead + AsyncSeek + Unpin + Send block_schema, arrow_table_schema: table_schema.to_arrow(), projection, - row_group: 0, metadata, + current_row_group: 0, } } } @@ -85,23 +84,25 @@ where R: AsyncRead + AsyncSeek + Unpin + Send .instrument(debug_span!("parquet_source_read_meta")) .await .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - &fetched_metadata + self.metadata = Some(fetched_metadata); + match self.metadata.as_ref() { + Some(m) => m, + _ => unreachable!(), + } } }; - let row_groups = metadata.row_groups.len(); - self.row_group = 0; - if self.row_group >= row_groups { + if self.current_row_group >= metadata.row_groups.len() { return Ok(None); } - let row_group = self.row_group; + + let fields = self.arrow_table_schema.fields(); + let row_grp = &metadata.row_groups[self.current_row_group]; let cols = self .projection .clone() .into_iter() - .map(|idx| (metadata.row_groups[row_group].column(idx).clone(), idx)); - - let fields = self.arrow_table_schema.fields(); + .map(|idx| (row_grp.column(idx).clone(), idx)); let mut data_cols = Vec::with_capacity(cols.len()); for (col_meta, idx) in cols { let col_pages = @@ -109,18 +110,14 @@ where R: AsyncRead + AsyncSeek + Unpin + Send .instrument(debug_span!("parquet_source_get_column_page")) .await .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - let pages = col_pages.map(|compressed_page| { - debug_span!("parquet_source_decompress_page") - .in_scope(|| decompress(compressed_page?, &mut vec![])) - }); + let pages = col_pages.map(|compressed_page| decompress(compressed_page?, &mut vec![])); let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone()) .instrument(debug_span!("parquet_source_page_stream_to_array")) .await?; let array: Arc = array.into(); data_cols.push(DataColumn::Array(array.into_series())) } - - self.row_group += 1; + self.current_row_group += 1; let block = DataBlock::create(self.block_schema.clone(), data_cols); Ok(Some(block)) } diff --git a/query/src/storages/fuse/io/block_reader.rs b/query/src/storages/fuse/io/block_reader.rs index 73257315b22c..7b023314765d 100644 --- a/query/src/storages/fuse/io/block_reader.rs +++ b/query/src/storages/fuse/io/block_reader.rs @@ -14,13 +14,16 @@ use std::sync::Arc; +use common_arrow::arrow::datatypes::DataType; use common_arrow::arrow::datatypes::Schema as ArrowSchema; use common_arrow::arrow::io::parquet::read::decompress; use common_arrow::arrow::io::parquet::read::page_stream_to_array; use common_arrow::arrow::io::parquet::read::read_metadata_async; use common_arrow::arrow::io::parquet::read::schema::FileMetaData; +use common_arrow::parquet::metadata::ColumnChunkMetaData; use common_arrow::parquet::read::get_page_stream; use common_dal::DataAccessor; +use common_dal::InputStream; use common_datablocks::DataBlock; use common_datavalues::prelude::DataColumn; use common_datavalues::series::IntoSeries; @@ -68,8 +71,29 @@ impl BlockReader { } } + async fn read_col( + mut reader: BufReader, + col_meta: &ColumnChunkMetaData, + data_type: DataType, + ) -> Result { + let col_pages = get_page_stream(col_meta, &mut reader, vec![], Arc::new(|_, _| true)) + .instrument(debug_span!("block_reader_get_column_page")) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let pages = col_pages.map(|compressed_page| { + debug_span!("block_reader_decompress_page") + .in_scope(|| decompress(compressed_page?, &mut vec![])) + }); + let array = page_stream_to_array(pages, col_meta, data_type) + .instrument(debug_span!("block_reader_page_stream_to_array")) + .await?; + let array: Arc = array.into(); + Ok::<_, ErrorCode>(DataColumn::Array(array.into_series())) + } + #[tracing::instrument(level = "debug", skip_all)] pub async fn read(&mut self) -> Result { + // TODO cache parquet meta, and remove the following `let match clause` let fetched_metadata; let metadata = match &self.metadata { Some(m) => m, @@ -78,16 +102,20 @@ impl BlockReader { .data_accessor .get_input_stream(self.path.as_str(), None)?; fetched_metadata = read_metadata_async(&mut reader) - .instrument(debug_span!("parquet_source_read_meta")) + .instrument(debug_span!("block_reader_read_meta")) .await .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; &fetched_metadata } }; + // FUSE uses exact one "row group" only let num_row_groups = metadata.row_groups.len(); let row_group = if num_row_groups != 1 { - return Err(ErrorCode::LogicalError("invalid parquet file")); + return Err(ErrorCode::LogicalError(format!( + "invalid parquet file, expect exact one row group insides, but got {}", + num_row_groups + ))); } else { &metadata.row_groups[0] }; @@ -106,27 +134,13 @@ impl BlockReader { let stream = futures::stream::iter(cols).map(|(col_meta, idx)| { let data_accessor = self.data_accessor.clone(); let path = self.path.clone(); - async move { let reader = data_accessor.get_input_stream(path.as_str(), Some(stream_len))?; - let mut reader = BufReader::with_capacity(read_buffer_size as usize, reader); - // TODO cache block column - let col_pages = - get_page_stream(&col_meta, &mut reader, vec![], Arc::new(|_, _| true)) - .instrument(debug_span!("parquet_source_get_column_page")) - .await - .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - let pages = col_pages.map(|compressed_page| { - debug_span!("parquet_source_decompress_page") - .in_scope(|| decompress(compressed_page?, &mut vec![])) - }); - let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone()) - .instrument(debug_span!("parquet_source_page_stream_to_array")) - .await?; - let array: Arc = array.into(); - Ok::<_, ErrorCode>(DataColumn::Array(array.into_series())) + let reader = BufReader::with_capacity(read_buffer_size as usize, reader); + let data_type = fields[idx].data_type.clone(); + Self::read_col(reader, &col_meta, data_type).await } - .instrument(debug_span!("parquet_source_read_column").or_current()) + .instrument(debug_span!("block_reader_read_column").or_current()) }); // TODO configuration of the buffer size From d1cae1c505b485b3ef65e5127a09a0643a922bd9 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 4 Jan 2022 17:44:40 +0800 Subject: [PATCH 6/8] add unit test --- common/streams/src/sources/source_parquet.rs | 1 + common/streams/tests/it/source.rs | 94 ++++++++++++++++++++ query/src/storages/fuse/io/block_reader.rs | 21 +++-- query/src/storages/fuse/operations/read.rs | 4 +- 4 files changed, 110 insertions(+), 10 deletions(-) diff --git a/common/streams/src/sources/source_parquet.rs b/common/streams/src/sources/source_parquet.rs index 61ccbb652f12..7e6693401449 100644 --- a/common/streams/src/sources/source_parquet.rs +++ b/common/streams/src/sources/source_parquet.rs @@ -92,6 +92,7 @@ where R: AsyncRead + AsyncSeek + Unpin + Send } }; + eprintln!("row group len {}", metadata.row_groups.len()); if self.current_row_group >= metadata.row_groups.len() { return Ok(None); } diff --git a/common/streams/tests/it/source.rs b/common/streams/tests/it/source.rs index af7e7fe216dc..71f454904444 100644 --- a/common/streams/tests/it/source.rs +++ b/common/streams/tests/it/source.rs @@ -19,10 +19,16 @@ use common_base::tokio; use common_dal::DataAccessor; use common_dal::Local; use common_datablocks::assert_blocks_eq; +use common_datablocks::DataBlock; +use common_datavalues::prelude::DataColumn; +use common_datavalues::series::Series; use common_datavalues::DataField; use common_datavalues::DataSchemaRefExt; use common_datavalues::DataType; +use common_exception::ErrorCode; +use common_exception::Result; use common_streams::CsvSource; +use common_streams::ParquetSource; use common_streams::Source; use common_streams::ValueSource; @@ -113,3 +119,91 @@ async fn test_parse_csvs() { } } } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_source_parquet() -> Result<()> { + use common_datavalues::DataType; + let schema = DataSchemaRefExt::create(vec![ + DataField::new("a", DataType::Int8, false), + DataField::new("b", DataType::String, false), + ]); + + let arrow_schema = schema.to_arrow(); + + use common_arrow::arrow::io::parquet::write::*; + let options = WriteOptions { + write_statistics: true, + compression: Compression::Lz4, // let's begin with lz4 + version: Version::V2, + }; + + use common_datavalues::prelude::SeriesFrom; + let col_a = Series::new(vec![1i8, 1, 2, 1, 2, 3]); + let col_b = Series::new(vec!["1", "1", "2", "1", "2", "3"]); + let sample_block = DataBlock::create(schema.clone(), vec![ + DataColumn::Array(col_a), + DataColumn::Array(col_b), + ]); + + use common_arrow::arrow::record_batch::RecordBatch; + let batch = RecordBatch::try_from(sample_block)?; + use common_arrow::parquet::encoding::Encoding; + let encodings = std::iter::repeat(Encoding::Plain) + .take(arrow_schema.fields.len()) + .collect::>(); + + let page_nums_expects = 3; + let name = "test-parquet"; + let dir = tempfile::tempdir().unwrap(); + + // write test parquet + let len = { + let rg_iter = std::iter::repeat(batch).map(Ok).take(page_nums_expects); + let row_groups = RowGroupIterator::try_new(rg_iter, &arrow_schema, options, encodings)?; + let parquet_schema = row_groups.parquet_schema().clone(); + let path = dir.path().join(name); + let mut writer = File::create(path).unwrap(); + common_arrow::parquet::write::write_file( + &mut writer, + row_groups, + parquet_schema, + options, + None, + None, + ) + .map_err(|e| ErrorCode::ParquetError(e.to_string()))? + }; + + let local = Local::with_path(dir.path().to_path_buf()); + let stream = local.get_input_stream(name, Some(len)).unwrap(); + + let default_proj = (0..schema.fields().len()) + .into_iter() + .collect::>(); + + let mut page_nums = 0; + let mut parquet_source = ParquetSource::new(stream, schema, default_proj); + // expects `page_nums_expects` blocks, and + while let Some(block) = parquet_source.read().await? { + page_nums += 1; + // for each block, the content is the same of `sample_block` + assert_blocks_eq( + vec![ + "+---+---+", + "| a | b |", + "+---+---+", + "| 1 | 1 |", + "| 1 | 1 |", + "| 2 | 2 |", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | 3 |", + "+---+---+", + ], + &[block], + ); + } + + assert_eq!(page_nums_expects, page_nums); + Ok(()) +} diff --git a/query/src/storages/fuse/io/block_reader.rs b/query/src/storages/fuse/io/block_reader.rs index 7b023314765d..59c430203efd 100644 --- a/query/src/storages/fuse/io/block_reader.rs +++ b/query/src/storages/fuse/io/block_reader.rs @@ -71,20 +71,25 @@ impl BlockReader { } } - async fn read_col( + async fn read_column( mut reader: BufReader, - col_meta: &ColumnChunkMetaData, + column_chunk_meta: &ColumnChunkMetaData, data_type: DataType, ) -> Result { - let col_pages = get_page_stream(col_meta, &mut reader, vec![], Arc::new(|_, _| true)) - .instrument(debug_span!("block_reader_get_column_page")) - .await - .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let col_pages = get_page_stream( + column_chunk_meta, + &mut reader, + vec![], + Arc::new(|_, _| true), + ) + .instrument(debug_span!("block_reader_get_column_page")) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; let pages = col_pages.map(|compressed_page| { debug_span!("block_reader_decompress_page") .in_scope(|| decompress(compressed_page?, &mut vec![])) }); - let array = page_stream_to_array(pages, col_meta, data_type) + let array = page_stream_to_array(pages, column_chunk_meta, data_type) .instrument(debug_span!("block_reader_page_stream_to_array")) .await?; let array: Arc = array.into(); @@ -138,7 +143,7 @@ impl BlockReader { let reader = data_accessor.get_input_stream(path.as_str(), Some(stream_len))?; let reader = BufReader::with_capacity(read_buffer_size as usize, reader); let data_type = fields[idx].data_type.clone(); - Self::read_col(reader, &col_meta, data_type).await + Self::read_column(reader, &col_meta, data_type).await } .instrument(debug_span!("block_reader_read_column").or_current()) }); diff --git a/query/src/storages/fuse/operations/read.rs b/query/src/storages/fuse/operations/read.rs index 94ef71b06fa4..e7be3ecf1e27 100644 --- a/query/src/storages/fuse/operations/read.rs +++ b/query/src/storages/fuse/operations/read.rs @@ -75,7 +75,7 @@ impl FuseTable { let part_location = part_info.location(); let part_len = part_info.length(); - let mut source = BlockReader::new( + let mut block_reader = BlockReader::new( da, part_info.location().to_owned(), table_schema, @@ -84,7 +84,7 @@ impl FuseTable { read_buffer_size, None, // TODO cache parquet meta ); - source.read().await.map_err(|e| { + block_reader.read().await.map_err(|e| { ErrorCode::ParquetError(format!( "fail to read block {}, {}", part_location, e From 47ecb44d33b566667fa9512df169616642b644fa Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 4 Jan 2022 20:18:56 +0800 Subject: [PATCH 7/8] minor code gc --- common/streams/src/sources/source_factory.rs | 1 - common/streams/src/sources/source_parquet.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/common/streams/src/sources/source_factory.rs b/common/streams/src/sources/source_factory.rs index 0f4384cbdfde..6c1629f4ee01 100644 --- a/common/streams/src/sources/source_factory.rs +++ b/common/streams/src/sources/source_factory.rs @@ -42,7 +42,6 @@ impl SourceFactory { pub fn try_get(params: SourceParams) -> Result> where R: AsyncRead + AsyncSeek + Unpin + Send + 'static { let format = params.format.to_lowercase(); - // let reader = params.acc.get_input_stream(params.path, None)?; match format.as_str() { "csv" => { let has_header = params diff --git a/common/streams/src/sources/source_parquet.rs b/common/streams/src/sources/source_parquet.rs index 7e6693401449..61ccbb652f12 100644 --- a/common/streams/src/sources/source_parquet.rs +++ b/common/streams/src/sources/source_parquet.rs @@ -92,7 +92,6 @@ where R: AsyncRead + AsyncSeek + Unpin + Send } }; - eprintln!("row group len {}", metadata.row_groups.len()); if self.current_row_group >= metadata.row_groups.len() { return Ok(None); } From d6163b0046cab02983f43b5308de90e640fc5f37 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 4 Jan 2022 20:58:02 +0800 Subject: [PATCH 8/8] tidy up --- query/src/storages/fuse/io/block_reader.rs | 77 +++------------------- query/src/storages/fuse/operations/read.rs | 5 +- 2 files changed, 13 insertions(+), 69 deletions(-) diff --git a/query/src/storages/fuse/io/block_reader.rs b/query/src/storages/fuse/io/block_reader.rs index 0c55e30b8a20..d8601817c1de 100644 --- a/query/src/storages/fuse/io/block_reader.rs +++ b/query/src/storages/fuse/io/block_reader.rs @@ -18,6 +18,7 @@ use common_arrow::arrow::datatypes::DataType; use common_arrow::arrow::datatypes::Schema as ArrowSchema; use common_arrow::arrow::io::parquet::read::decompress; use common_arrow::arrow::io::parquet::read::page_stream_to_array; +use common_arrow::arrow::io::parquet::read::read_metadata; use common_arrow::arrow::io::parquet::read::read_metadata_async; use common_arrow::arrow::io::parquet::read::schema::FileMetaData; use common_arrow::parquet::metadata::ColumnChunkMetaData; @@ -37,6 +38,8 @@ use futures::io::BufReader; use futures::StreamExt; use futures::TryStreamExt; +use crate::storages::cache::StorageCache; + pub struct BlockReader { data_accessor: Arc, path: String, @@ -45,42 +48,17 @@ pub struct BlockReader { projection: Vec, file_len: u64, read_buffer_size: u64, - metadata: Option, + metadata: FileMetaData, } impl BlockReader { - pub fn new( + pub async fn new( data_accessor: Arc, path: String, table_schema: DataSchemaRef, projection: Vec, file_len: u64, read_buffer_size: u64, - metadata: Option, - ) -> Self { - let block_schema = Arc::new(table_schema.project(projection.clone())); - Self { - cache: Arc>>, - ) -> Result { - Self::with_hints( - data_accessor, - path, - table_schema, - projection, - None, - None, - cache, - ) - .await - } - - pub async fn with_hints( - data_accessor: Arc, - path: String, - table_schema: DataSchemaRef, - projection: Vec, - file_len: Option, - read_buffer_size: Option, cache: Arc>>, ) -> Result { let block_schema = Arc::new(table_schema.project(projection.clone())); @@ -103,10 +81,10 @@ impl BlockReader { block_schema, arrow_table_schema: table_schema.to_arrow(), projection, - metadata, file_len, read_buffer_size, - } + metadata, + }) } async fn read_column( @@ -136,23 +114,9 @@ impl BlockReader { #[tracing::instrument(level = "debug", skip_all)] pub async fn read(&mut self) -> Result { - // TODO cache parquet meta, and remove the following `let match clause` - let fetched_metadata; - let metadata = match &self.metadata { - Some(m) => m, - None => { - let mut reader = self - .data_accessor - .get_input_stream(self.path.as_str(), None)?; - fetched_metadata = read_metadata_async(&mut reader) - .instrument(debug_span!("block_reader_read_meta")) - .await - .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - &fetched_metadata - } - }; + let metadata = &self.metadata; - // FUSE uses exact one "row group" only + // FUSE uses exact one "row group" let num_row_groups = metadata.row_groups.len(); let row_group = if num_row_groups != 1 { return Err(ErrorCode::LogicalError(format!( @@ -164,27 +128,6 @@ impl BlockReader { }; let col_num = self.projection.len(); - row_group: 0, - row_groups: 0, - metadata, - file_len, - read_buffer_size, - cache, - }) - } - - #[tracing::instrument(level = "debug", skip_all)] - pub async fn read(&mut self) -> Result> { - let metadata = &self.metadata; - - self.row_groups = metadata.row_groups.len(); - self.row_group = 0; - - if self.row_group >= self.row_groups { - return Ok(None); - } - let col_num = self.projection.len(); - let row_group = self.row_group; let cols = self .projection .clone() @@ -195,7 +138,7 @@ impl BlockReader { let stream_len = self.file_len; let read_buffer_size = self.read_buffer_size; - let streamsou = futures::stream::iter(cols).map(|(col_meta, idx)| { + let stream = futures::stream::iter(cols).map(|(col_meta, idx)| { let data_accessor = self.data_accessor.clone(); let path = self.path.clone(); async move { diff --git a/query/src/storages/fuse/operations/read.rs b/query/src/storages/fuse/operations/read.rs index d66afccbd06a..6122b8823fda 100644 --- a/query/src/storages/fuse/operations/read.rs +++ b/query/src/storages/fuse/operations/read.rs @@ -83,8 +83,9 @@ impl FuseTable { projection, part_len, read_buffer_size, - None, // TODO cache parquet meta - ); + cache, + ) + .await?; block_reader.read().await.map_err(|e| { ErrorCode::ParquetError(format!( "fail to read block {}, {}",