Skip to content

Commit

Permalink
FilePartition and PartitionedFile for scanning flexibility (#932)
Browse files Browse the repository at this point in the history
* FilePartition and partitionedFile for scanning flexibility

* clippy

* remove schema from partitioned file

* ballista logical parquet table

* ballista physical parquet exec

* resolve comments

* resolve comments
  • Loading branch information
yjshen authored Aug 30, 2021
1 parent 775477f commit 8a085fc
Show file tree
Hide file tree
Showing 10 changed files with 869 additions and 494 deletions.
33 changes: 27 additions & 6 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,28 @@ message CsvTableScanNode {
repeated LogicalExprNode filters = 8;
}

message Statistics {
int64 num_rows = 1;
int64 total_byte_size = 2;
repeated ColumnStats column_stats = 3;
}

message PartitionedFile {
string path = 1;
Statistics statistics = 2;
}

message TableDescriptor {
string path = 1;
repeated PartitionedFile partition_files = 2;
Schema schema = 3;
}

message ParquetTableScanNode {
string table_name = 1;
string path = 2;
TableDescriptor table_desc = 2;
ProjectionColumns projection = 3;
Schema schema = 4;
repeated LogicalExprNode filters = 5;
repeated LogicalExprNode filters = 4;
}

message ProjectionNode {
Expand Down Expand Up @@ -567,10 +583,15 @@ message FilterExecNode {
PhysicalExprNode expr = 2;
}

message ParquetPartition {
uint32 index = 1;
repeated PartitionedFile files = 2;
}

message ParquetScanExecNode {
repeated string filename = 1;
repeated uint32 projection = 2;
uint32 num_partitions = 3;
repeated ParquetPartition partitions = 1;
Schema schema = 2;
repeated uint32 projection = 3;
uint32 batch_size = 4;
}

Expand Down
72 changes: 68 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use crate::error::BallistaError;
use crate::serde::{from_proto_binary_op, proto_error, protobuf};
use crate::{convert_box_required, convert_required};
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::datasource::parquet::{ParquetTable, ParquetTableDescriptor};
use datafusion::datasource::{PartitionedFile, TableDescriptor};
use datafusion::logical_plan::window_frames::{
WindowFrame, WindowFrameBound, WindowFrameUnits,
};
Expand Down Expand Up @@ -134,10 +136,11 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.map_err(|e| e.into())
}
LogicalPlanType::ParquetScan(scan) => {
let descriptor: TableDescriptor = convert_required!(scan.table_desc)?;
let projection = match scan.projection.as_ref() {
None => None,
Some(columns) => {
let schema: Schema = convert_required!(scan.schema)?;
let schema = descriptor.schema.clone();
let r: Result<Vec<usize>, _> = columns
.columns
.iter()
Expand All @@ -154,11 +157,16 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
Some(r?)
}
};
LogicalPlanBuilder::scan_parquet_with_name(
&scan.path,
projection,

let parquet_table = ParquetTable::try_new_with_desc(
Arc::new(ParquetTableDescriptor { descriptor }),
24,
true,
)?;
LogicalPlanBuilder::scan(
&scan.table_name,
Arc::new(parquet_table),
projection,
)? //TODO remove hard-coded max_partitions
.build()
.map_err(|e| e.into())
Expand Down Expand Up @@ -301,6 +309,60 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
}
}

impl TryInto<TableDescriptor> for &protobuf::TableDescriptor {
type Error = BallistaError;

fn try_into(self) -> Result<TableDescriptor, Self::Error> {
let partition_files = self
.partition_files
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<PartitionedFile>, _>>()?;
let schema = convert_required!(self.schema)?;
Ok(TableDescriptor {
path: self.path.to_owned(),
partition_files,
schema: Arc::new(schema),
})
}
}

impl TryInto<PartitionedFile> for &protobuf::PartitionedFile {
type Error = BallistaError;

fn try_into(self) -> Result<PartitionedFile, Self::Error> {
let statistics = convert_required!(self.statistics)?;
Ok(PartitionedFile {
path: self.path.clone(),
statistics,
})
}
}

impl From<&protobuf::ColumnStats> for ColumnStatistics {
fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
ColumnStatistics {
null_count: Some(cs.null_count as usize),
max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()),
min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()),
distinct_count: Some(cs.distinct_count as usize),
}
}
}

impl TryInto<Statistics> for &protobuf::Statistics {
type Error = BallistaError;

fn try_into(self) -> Result<Statistics, Self::Error> {
let column_statistics = self.column_stats.iter().map(|s| s.into()).collect();
Ok(Statistics {
num_rows: Some(self.num_rows as usize),
total_byte_size: Some(self.total_byte_size as usize),
column_statistics: Some(column_statistics),
})
}
}

impl From<&protobuf::Column> for Column {
fn from(c: &protobuf::Column) -> Column {
let c = c.clone();
Expand Down Expand Up @@ -1114,6 +1176,8 @@ impl TryInto<Field> for &protobuf::Field {
}
}

use crate::serde::protobuf::ColumnStats;
use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
use datafusion::physical_plan::{aggregates, windows};
use datafusion::prelude::{
array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
Expand Down
77 changes: 73 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
use super::super::proto_error;
use crate::datasource::DfTableAdapter;
use crate::serde::{protobuf, BallistaError};
use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
use datafusion::datasource::CsvFile;
use datafusion::arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
};
use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor};
use datafusion::logical_plan::{
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, Expr, JoinConstraint, JoinType, LogicalPlan,
Expand Down Expand Up @@ -253,6 +256,58 @@ impl TryInto<DataType> for &protobuf::ArrowType {
}
}

impl From<&ColumnStatistics> for protobuf::ColumnStats {
fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats {
protobuf::ColumnStats {
min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()),
max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()),
null_count: cs.null_count.map(|n| n as u32).unwrap_or(0),
distinct_count: cs.distinct_count.map(|n| n as u32).unwrap_or(0),
}
}
}

impl From<&Statistics> for protobuf::Statistics {
fn from(s: &Statistics) -> protobuf::Statistics {
let none_value = -1_i64;
let column_stats = match &s.column_statistics {
None => vec![],
Some(column_stats) => column_stats.iter().map(|s| s.into()).collect(),
};
protobuf::Statistics {
num_rows: s.num_rows.map(|n| n as i64).unwrap_or(none_value),
total_byte_size: s.total_byte_size.map(|n| n as i64).unwrap_or(none_value),
column_stats,
}
}
}

impl From<&PartitionedFile> for protobuf::PartitionedFile {
fn from(pf: &PartitionedFile) -> protobuf::PartitionedFile {
protobuf::PartitionedFile {
path: pf.path.clone(),
statistics: Some((&pf.statistics).into()),
}
}
}

impl TryFrom<TableDescriptor> for protobuf::TableDescriptor {
type Error = BallistaError;

fn try_from(desc: TableDescriptor) -> Result<protobuf::TableDescriptor, Self::Error> {
let partition_files: Vec<protobuf::PartitionedFile> =
desc.partition_files.iter().map(|pf| pf.into()).collect();

let schema: protobuf::Schema = desc.schema.into();

Ok(protobuf::TableDescriptor {
path: desc.path,
partition_files,
schema: Some(schema),
})
}
}

impl TryInto<DataType> for &Box<protobuf::List> {
type Error = BallistaError;
fn try_into(self) -> Result<DataType, Self::Error> {
Expand Down Expand Up @@ -706,13 +761,14 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
.collect::<Result<Vec<_>, _>>()?;

if let Some(parquet) = source.downcast_ref::<ParquetTable>() {
let table_desc: protobuf::TableDescriptor =
parquet.desc.descriptor.clone().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ParquetScan(
protobuf::ParquetTableScanNode {
table_name: table_name.to_owned(),
path: parquet.path().to_owned(),
table_desc: Some(table_desc),
projection,
schema: Some(schema),
filters,
},
)),
Expand Down Expand Up @@ -1262,6 +1318,19 @@ impl Into<protobuf::Schema> for &Schema {
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::Schema> for SchemaRef {
fn into(self) -> protobuf::Schema {
protobuf::Schema {
columns: self
.fields()
.iter()
.map(protobuf::Field::from)
.collect::<Vec<_>>(),
}
}
}

impl From<&datafusion::logical_plan::DFField> for protobuf::DfField {
fn from(f: &datafusion::logical_plan::DFField) -> protobuf::DfField {
protobuf::DfField {
Expand Down
40 changes: 34 additions & 6 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::datasource::datasource::Statistics;
use datafusion::datasource::FilePartition;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
Expand All @@ -44,6 +46,8 @@ use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunc
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::parquet::ParquetPartition;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
Expand Down Expand Up @@ -129,17 +133,23 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
)?))
}
PhysicalPlanType::ParquetScan(scan) => {
let partitions = scan
.partitions
.iter()
.map(|p| p.try_into())
.collect::<Result<Vec<ParquetPartition>, _>>()?;
let schema = Arc::new(convert_required!(scan.schema)?);
let projection = scan.projection.iter().map(|i| *i as usize).collect();
let filenames: Vec<&str> =
scan.filename.iter().map(|s| s.as_str()).collect();
Ok(Arc::new(ParquetExec::try_from_files(
&filenames,
Ok(Arc::new(ParquetExec::new(
partitions,
schema,
Some(projection),
Statistics::default(),
ExecutionPlanMetricsSet::new(),
None,
scan.batch_size as usize,
scan.num_partitions as usize,
None,
)?))
)))
}
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> =
Expand Down Expand Up @@ -470,6 +480,23 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
}
}

impl TryInto<ParquetPartition> for &protobuf::ParquetPartition {
type Error = BallistaError;

fn try_into(self) -> Result<ParquetPartition, Self::Error> {
let files = self
.files
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()?;
Ok(ParquetPartition::new(
files,
self.index as usize,
ExecutionPlanMetricsSet::new(),
))
}
}

impl From<&protobuf::PhysicalColumn> for Column {
fn from(c: &protobuf::PhysicalColumn) -> Column {
Column::new(&c.name, c.index as usize)
Expand Down Expand Up @@ -620,6 +647,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;

let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
Expand Down
Loading

0 comments on commit 8a085fc

Please sign in to comment.