Skip to content

Commit

Permalink
make parquet table support projection
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Nov 15, 2021
1 parent c9ad990 commit 80602f6
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 13 deletions.
5 changes: 4 additions & 1 deletion common/dal/src/impls/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ impl Local {
// TODO customize error code
Err(ErrorCode::from(Error::new(
ErrorKind::Other,
format!("please dont play with me, malicious path {:?}", path),
format!(
"please dont play with me, malicious path {:?}, root path {:?}",
path, self.root
),
)))
}
}
Expand Down
11 changes: 10 additions & 1 deletion common/datavalues/src/data_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,16 @@ impl DataSchema {
}

/// project will do column pruning.
pub fn project(&self, fields: Vec<DataField>) -> Self {
pub fn project(&self, projection: Vec<usize>) -> Self {
let fields = projection
.iter()
.map(|idx| self.fields()[*idx].clone())
.collect();
Self::new_from(fields, self.meta().clone())
}

/// project will do column pruning.
pub fn project_by_fields(&self, fields: Vec<DataField>) -> Self {
Self::new_from(fields, self.meta().clone())
}

Expand Down
2 changes: 1 addition & 1 deletion common/planners/src/plan_read_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ReadDataSourcePlan {
.clone()
.map(|x| {
let fields: Vec<_> = x.iter().map(|(_, f)| f.clone()).collect();
Arc::new(self.table_info.schema().project(fields))
Arc::new(self.table_info.schema().project_by_fields(fields))
})
.unwrap_or_else(|| self.table_info.schema())
}
Expand Down
16 changes: 9 additions & 7 deletions common/streams/src/sources/source_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use common_dal::DataAccessor;
use common_datablocks::DataBlock;
use common_datavalues::prelude::DataColumn;
use common_datavalues::series::IntoSeries;
use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -37,7 +36,9 @@ use crate::Source;
pub struct ParquetSource {
data_accessor: Arc<dyn DataAccessor>,
path: String,
schema: ArrowSchema,

block_schema: DataSchemaRef,
arrow_table_schema: ArrowSchema,
projection: Vec<usize>,
row_group: usize,
row_groups: usize,
Expand All @@ -48,13 +49,15 @@ impl ParquetSource {
pub fn new(
data_accessor: Arc<dyn DataAccessor>,
path: String,
schema: DataSchemaRef,
table_schema: DataSchemaRef,
projection: Vec<usize>,
) -> Self {
let block_schema = Arc::new(table_schema.project(projection.clone()));
Self {
data_accessor,
path,
schema: schema.to_arrow(),
block_schema,
arrow_table_schema: table_schema.to_arrow(),
projection,
row_group: 0,
row_groups: 0,
Expand Down Expand Up @@ -85,7 +88,6 @@ impl Source for ParquetSource {
if self.row_group >= self.row_groups {
return Ok(None);
}

let col_num = self.projection.len();
let row_group = self.row_group;
let cols = self
Expand All @@ -94,7 +96,7 @@ impl Source for ParquetSource {
.into_iter()
.map(|idx| (metadata.row_groups[row_group].column(idx).clone(), idx));

let fields = self.schema.fields();
let fields = self.arrow_table_schema.fields();

let stream = futures::stream::iter(cols).map(|(col_meta, idx)| {
let column_chunk_meta = (metadata.row_groups[row_group].columns()[idx]).clone();
Expand Down Expand Up @@ -124,7 +126,7 @@ impl Source for ParquetSource {
let data_cols = stream.buffered(n).try_collect().await?;

self.row_group += 1;
let block = DataBlock::create(Arc::new(DataSchema::from(self.schema.clone())), data_cols);
let block = DataBlock::create(self.block_schema.clone(), data_cols);
Ok(Some(block))
}
}
1 change: 1 addition & 0 deletions query/src/datasources/table/fuse/table_test_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl TestFixture {
config.storage.storage_type = "Disk".to_string();
// use `TempDir` as root path (auto clean)
config.storage.disk.data_path = tmp_dir.path().to_str().unwrap().to_string();
config.storage.disk.temp_data_path = tmp_dir.path().to_str().unwrap().to_string();
let ctx = crate::tests::try_create_context_with_config(config).unwrap();

let random_prefix: String = Uuid::new_v4().to_simple().to_string();
Expand Down
8 changes: 6 additions & 2 deletions query/src/datasources/table/parquet/parquet_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ impl Table for ParquetTable {
&self.table_info
}

fn benefit_column_prune(&self) -> bool {
true
}

fn read_partitions(
&self,
_io_ctx: Arc<TableIOContext>,
Expand All @@ -93,7 +97,7 @@ impl Table for ParquetTable {
.get_user_data()?
.expect("DatabendQueryContext should not be None");
let ctx_clone = ctx.clone();
let schema = plan.schema();
let table_schema = self.get_table_info().schema();
let projection = plan.projections();
let conf = ctx.get_config().storage;
let dal = Arc::new(Local::new(conf.disk.temp_data_path.as_str()));
Expand All @@ -108,7 +112,7 @@ impl Table for ParquetTable {
}
let part = partitions.get(0).unwrap();

let mut source = ParquetSource::new(dal.clone(), part.name.clone(), schema.clone(), projection.clone());
let mut source = ParquetSource::new(dal.clone(), part.name.clone(), table_schema.clone(), projection.clone());

loop {
let block = source.read().await;
Expand Down
1 change: 0 additions & 1 deletion tests/data/sample.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ create table test_parquet (
double_col double,
date_string_col varchar(255),
string_col varchar(255),
timestamp_col Timestamp
) Engine = Parquet location = 'tests/data/alltypes_plain.parquet';

select * from system.tables;

0 comments on commit 80602f6

Please sign in to comment.