Skip to content

Commit

Permalink
Merge pull request #3757 from dantengsky/fix-3225
Browse files Browse the repository at this point in the history
ISSUE-3225: Make parquet reader works with only one stream reader
  • Loading branch information
sundy-li authored Jan 5, 2022
2 parents 7754e03 + d6163b0 commit b4c0215
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 187 deletions.
4 changes: 3 additions & 1 deletion common/streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ test = false
# Workspace dependencies
common-arrow = { path = "../arrow" }
common-base = { path = "../base" }
common-dal = { path = "../dal" }
common-datablocks = { path = "../datablocks" }
common-datavalues = { path = "../datavalues" }
common-exception = { path = "../exception" }
Expand All @@ -33,3 +32,6 @@ futures = "0.3.18"
pin-project-lite = "0.2.7"
tempfile = "3.2.0"
tokio-stream = { version = "0.1.8", features = ["net"] }

[dev-dependencies]
common-dal = {path = "../dal"}
19 changes: 10 additions & 9 deletions common/streams/src/sources/source_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,23 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_dal::DataAccessor;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
use futures::AsyncRead;
use futures::AsyncSeek;

use crate::CsvSource;
use crate::ParquetSource;
use crate::Source;

pub struct SourceFactory {}

pub struct SourceParams<'a> {
pub acc: Arc<dyn DataAccessor>,
pub struct SourceParams<'a, R>
where R: AsyncRead + Unpin + Send
{
pub reader: R,
pub path: &'a str,
pub format: &'a str,
pub schema: DataSchemaRef,
Expand All @@ -37,7 +39,8 @@ pub struct SourceParams<'a> {
}

impl SourceFactory {
pub fn try_get(params: SourceParams) -> Result<Box<dyn Source>> {
pub fn try_get<R>(params: SourceParams<R>) -> Result<Box<dyn Source>>
where R: AsyncRead + AsyncSeek + Unpin + Send + 'static {
let format = params.format.to_lowercase();
match format.as_str() {
"csv" => {
Expand Down Expand Up @@ -65,9 +68,8 @@ impl SourceFactory {
})
.unwrap_or(b'\n');

let reader = params.acc.get_input_stream(params.path, None)?;
Ok(Box::new(CsvSource::try_create(
reader,
params.reader,
params.schema,
has_header.eq_ignore_ascii_case("1"),
field_delimitor,
Expand All @@ -76,8 +78,7 @@ impl SourceFactory {
)?))
}
"parquet" => Ok(Box::new(ParquetSource::new(
params.acc,
params.path.to_owned(),
params.reader,
params.schema,
params.projection,
))),
Expand Down
136 changes: 44 additions & 92 deletions common/streams/src/sources/source_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_arrow::arrow::io::parquet::read::page_stream_to_array;
use common_arrow::arrow::io::parquet::read::read_metadata_async;
use common_arrow::arrow::io::parquet::read::schema::FileMetaData;
use common_arrow::parquet::read::get_page_stream;
use common_dal::DataAccessor;
use common_datablocks::DataBlock;
use common_datavalues::prelude::DataColumn;
use common_datavalues::series::IntoSeries;
Expand All @@ -31,141 +30,94 @@ use common_exception::Result;
use common_tracing::tracing;
use common_tracing::tracing::debug_span;
use common_tracing::tracing::Instrument;
use futures::io::BufReader;
use futures::AsyncRead;
use futures::AsyncSeek;
use futures::StreamExt;
use futures::TryStreamExt;

/// default buffer size of BufferedReader, 1MB
const DEFAULT_READ_BUFFER_SIZE: u64 = 1024 * 1024;

use crate::Source;

pub struct ParquetSource {
data_accessor: Arc<dyn DataAccessor>,
path: String,

pub struct ParquetSource<R> {
reader: R,
block_schema: DataSchemaRef,
arrow_table_schema: ArrowSchema,
projection: Vec<usize>,
row_group: usize,
row_groups: usize,
metadata: Option<FileMetaData>,
file_len: Option<u64>,
read_buffer_size: Option<u64>,
current_row_group: usize,
}

impl ParquetSource {
pub fn new(
data_accessor: Arc<dyn DataAccessor>,
path: String,
table_schema: DataSchemaRef,
projection: Vec<usize>,
) -> Self {
Self::with_hints(
data_accessor,
path,
table_schema,
projection,
None,
None,
None,
)
impl<R> ParquetSource<R>
where R: AsyncRead + AsyncSeek + Unpin + Send
{
pub fn new(reader: R, table_schema: DataSchemaRef, projection: Vec<usize>) -> Self {
Self::with_meta(reader, table_schema, projection, None)
}

pub fn with_hints(
data_accessor: Arc<dyn DataAccessor>,
path: String,
pub fn with_meta(
reader: R,
table_schema: DataSchemaRef,
projection: Vec<usize>,
metadata: Option<FileMetaData>,
file_len: Option<u64>,
read_buffer_size: Option<u64>,
) -> Self {
let block_schema = Arc::new(table_schema.project(projection.clone()));
Self {
data_accessor,
path,
ParquetSource {
reader,
block_schema,
arrow_table_schema: table_schema.to_arrow(),
projection,
row_group: 0,
row_groups: 0,
metadata,
file_len,
read_buffer_size,
current_row_group: 0,
}
}
}

#[async_trait]
impl Source for ParquetSource {
impl<R> Source for ParquetSource<R>
where R: AsyncRead + AsyncSeek + Unpin + Send
{
#[tracing::instrument(level = "debug", skip_all)]
async fn read(&mut self) -> Result<Option<DataBlock>> {
let fetched_metadata;
let metadata = match &self.metadata {
Some(m) => m,
None => {
let mut reader = self
.data_accessor
.get_input_stream(self.path.as_str(), None)?;
fetched_metadata = read_metadata_async(&mut reader)
fetched_metadata = read_metadata_async(&mut self.reader)
.instrument(debug_span!("parquet_source_read_meta"))
.await
.map_err(|e| ErrorCode::ParquetError(e.to_string()))?;
&fetched_metadata
self.metadata = Some(fetched_metadata);
match self.metadata.as_ref() {
Some(m) => m,
_ => unreachable!(),
}
}
};

self.row_groups = metadata.row_groups.len();
self.row_group = 0;

if self.row_group >= self.row_groups {
if self.current_row_group >= metadata.row_groups.len() {
return Ok(None);
}
let col_num = self.projection.len();
let row_group = self.row_group;

let fields = self.arrow_table_schema.fields();
let row_grp = &metadata.row_groups[self.current_row_group];
let cols = self
.projection
.clone()
.into_iter()
.map(|idx| (metadata.row_groups[row_group].column(idx).clone(), idx));

let fields = self.arrow_table_schema.fields();
let stream_len = self.file_len;
let read_buffer_size = self.read_buffer_size.unwrap_or(DEFAULT_READ_BUFFER_SIZE);

let stream = futures::stream::iter(cols).map(|(col_meta, idx)| {
let data_accessor = self.data_accessor.clone();
let path = self.path.clone();

async move {
let reader = data_accessor.get_input_stream(path.as_str(), stream_len)?;
let mut reader = BufReader::with_capacity(read_buffer_size as usize, reader);
// TODO cache block column
let col_pages =
get_page_stream(&col_meta, &mut reader, vec![], Arc::new(|_, _| true))
.instrument(debug_span!("parquet_source_get_column_page"))
.await
.map_err(|e| ErrorCode::ParquetError(e.to_string()))?;
let pages = col_pages.map(|compressed_page| {
debug_span!("parquet_source_decompress_page")
.in_scope(|| decompress(compressed_page?, &mut vec![]))
});
let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone())
.instrument(debug_span!("parquet_source_page_stream_to_array"))
.await?;
let array: Arc<dyn common_arrow::arrow::array::Array> = array.into();
Ok::<_, ErrorCode>(DataColumn::Array(array.into_series()))
}
.instrument(debug_span!("parquet_source_read_column").or_current())
});

// TODO configuration of the buffer size
let buffer_size = 10;
let n = std::cmp::min(buffer_size, col_num);
let data_cols = stream.buffered(n).try_collect().await?;

self.row_group += 1;
.map(|idx| (row_grp.column(idx).clone(), idx));
let mut data_cols = Vec::with_capacity(cols.len());
for (col_meta, idx) in cols {
let col_pages =
get_page_stream(&col_meta, &mut self.reader, vec![], Arc::new(|_, _| true))
.instrument(debug_span!("parquet_source_get_column_page"))
.await
.map_err(|e| ErrorCode::ParquetError(e.to_string()))?;
let pages = col_pages.map(|compressed_page| decompress(compressed_page?, &mut vec![]));
let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone())
.instrument(debug_span!("parquet_source_page_stream_to_array"))
.await?;
let array: Arc<dyn common_arrow::arrow::array::Array> = array.into();
data_cols.push(DataColumn::Array(array.into_series()))
}
self.current_row_group += 1;
let block = DataBlock::create(self.block_schema.clone(), data_cols);
Ok(Some(block))
}
Expand Down
94 changes: 94 additions & 0 deletions common/streams/tests/it/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@ use common_base::tokio;
use common_dal::DataAccessor;
use common_dal::Local;
use common_datablocks::assert_blocks_eq;
use common_datablocks::DataBlock;
use common_datavalues::prelude::DataColumn;
use common_datavalues::series::Series;
use common_datavalues::DataField;
use common_datavalues::DataSchemaRefExt;
use common_datavalues::DataType;
use common_exception::ErrorCode;
use common_exception::Result;
use common_streams::CsvSource;
use common_streams::ParquetSource;
use common_streams::Source;
use common_streams::ValueSource;

Expand Down Expand Up @@ -113,3 +119,91 @@ async fn test_parse_csvs() {
}
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_source_parquet() -> Result<()> {
use common_datavalues::DataType;
let schema = DataSchemaRefExt::create(vec![
DataField::new("a", DataType::Int8, false),
DataField::new("b", DataType::String, false),
]);

let arrow_schema = schema.to_arrow();

use common_arrow::arrow::io::parquet::write::*;
let options = WriteOptions {
write_statistics: true,
compression: Compression::Lz4, // let's begin with lz4
version: Version::V2,
};

use common_datavalues::prelude::SeriesFrom;
let col_a = Series::new(vec![1i8, 1, 2, 1, 2, 3]);
let col_b = Series::new(vec!["1", "1", "2", "1", "2", "3"]);
let sample_block = DataBlock::create(schema.clone(), vec![
DataColumn::Array(col_a),
DataColumn::Array(col_b),
]);

use common_arrow::arrow::record_batch::RecordBatch;
let batch = RecordBatch::try_from(sample_block)?;
use common_arrow::parquet::encoding::Encoding;
let encodings = std::iter::repeat(Encoding::Plain)
.take(arrow_schema.fields.len())
.collect::<Vec<_>>();

let page_nums_expects = 3;
let name = "test-parquet";
let dir = tempfile::tempdir().unwrap();

// write test parquet
let len = {
let rg_iter = std::iter::repeat(batch).map(Ok).take(page_nums_expects);
let row_groups = RowGroupIterator::try_new(rg_iter, &arrow_schema, options, encodings)?;
let parquet_schema = row_groups.parquet_schema().clone();
let path = dir.path().join(name);
let mut writer = File::create(path).unwrap();
common_arrow::parquet::write::write_file(
&mut writer,
row_groups,
parquet_schema,
options,
None,
None,
)
.map_err(|e| ErrorCode::ParquetError(e.to_string()))?
};

let local = Local::with_path(dir.path().to_path_buf());
let stream = local.get_input_stream(name, Some(len)).unwrap();

let default_proj = (0..schema.fields().len())
.into_iter()
.collect::<Vec<usize>>();

let mut page_nums = 0;
let mut parquet_source = ParquetSource::new(stream, schema, default_proj);
// expects `page_nums_expects` blocks, and
while let Some(block) = parquet_source.read().await? {
page_nums += 1;
// for each block, the content is the same of `sample_block`
assert_blocks_eq(
vec![
"+---+---+",
"| a | b |",
"+---+---+",
"| 1 | 1 |",
"| 1 | 1 |",
"| 2 | 2 |",
"| 1 | 1 |",
"| 2 | 2 |",
"| 3 | 3 |",
"+---+---+",
],
&[block],
);
}

assert_eq!(page_nums_expects, page_nums);
Ok(())
}
6 changes: 5 additions & 1 deletion query/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_streams::SendableDataBlockStream;
use common_streams::SourceFactory;
use common_streams::SourceParams;
use common_streams::SourceStream;
use futures::io::BufReader;
use futures::TryStreamExt;
use nom::bytes::complete::tag;
use nom::bytes::complete::take_until;
Expand Down Expand Up @@ -71,8 +72,11 @@ impl Interpreter for CopyInterpreter {

let acc = get_dal_by_stage(self.ctx.clone(), stage)?;
let max_block_size = self.ctx.get_settings().get_max_block_size()? as usize;
let input_stream = acc.get_input_stream(path, None)?;
let read_buffer_size = self.ctx.get_settings().get_storage_read_buffer_size()?;
let reader = BufReader::with_capacity(read_buffer_size as usize, input_stream);
let source_params = SourceParams {
acc,
reader,
path,
format: self.plan.format.as_str(),
schema: self.plan.schema.clone(),
Expand Down
Loading

0 comments on commit b4c0215

Please sign in to comment.