From 8a085fc2701f04381cf26bc8ea925339c68cf6e4 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 30 Aug 2021 19:33:50 +0800 Subject: [PATCH] FilePartition and PartitionedFile for scanning flexibility (#932) * FilePartition and partitionedFile for scanning flexibility * clippy * remove schema from partitioned file * ballista logical parquet table * ballista physical parquet exec * resolve comments * resolve comments --- ballista/rust/core/proto/ballista.proto | 33 +- .../core/src/serde/logical_plan/from_proto.rs | 72 ++- .../core/src/serde/logical_plan/to_proto.rs | 77 ++- .../src/serde/physical_plan/from_proto.rs | 40 +- .../core/src/serde/physical_plan/to_proto.rs | 23 +- ballista/rust/scheduler/src/lib.rs | 33 +- datafusion/src/datasource/mod.rs | 256 +++++++++ datafusion/src/datasource/parquet.rs | 330 ++++++++++-- .../src/physical_optimizer/repartition.rs | 11 +- datafusion/src/physical_plan/parquet.rs | 488 +++--------------- 10 files changed, 869 insertions(+), 494 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index c184de3d1329..45ff6c5984ca 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -271,12 +271,28 @@ message CsvTableScanNode { repeated LogicalExprNode filters = 8; } +message Statistics { + int64 num_rows = 1; + int64 total_byte_size = 2; + repeated ColumnStats column_stats = 3; +} + +message PartitionedFile { + string path = 1; + Statistics statistics = 2; +} + +message TableDescriptor { + string path = 1; + repeated PartitionedFile partition_files = 2; + Schema schema = 3; +} + message ParquetTableScanNode { string table_name = 1; - string path = 2; + TableDescriptor table_desc = 2; ProjectionColumns projection = 3; - Schema schema = 4; - repeated LogicalExprNode filters = 5; + repeated LogicalExprNode filters = 4; } message ProjectionNode { @@ -567,10 +583,15 @@ message FilterExecNode { PhysicalExprNode expr = 2; } +message ParquetPartition { + uint32 index = 1; + repeated PartitionedFile files = 2; +} + message ParquetScanExecNode { - repeated string filename = 1; - repeated uint32 projection = 2; - uint32 num_partitions = 3; + repeated ParquetPartition partitions = 1; + Schema schema = 2; + repeated uint32 projection = 3; uint32 batch_size = 4; } diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 14fba0646ec1..fc4ac2c9076c 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -21,6 +21,8 @@ use crate::error::BallistaError; use crate::serde::{from_proto_binary_op, proto_error, protobuf}; use crate::{convert_box_required, convert_required}; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use datafusion::datasource::parquet::{ParquetTable, ParquetTableDescriptor}; +use datafusion::datasource::{PartitionedFile, TableDescriptor}; use datafusion::logical_plan::window_frames::{ WindowFrame, WindowFrameBound, WindowFrameUnits, }; @@ -134,10 +136,11 @@ impl TryInto for &protobuf::LogicalPlanNode { .map_err(|e| e.into()) } LogicalPlanType::ParquetScan(scan) => { + let descriptor: TableDescriptor = convert_required!(scan.table_desc)?; let projection = match scan.projection.as_ref() { None => None, Some(columns) => { - let schema: Schema = convert_required!(scan.schema)?; + let schema = descriptor.schema.clone(); let r: Result, _> = columns .columns .iter() @@ -154,11 +157,16 @@ impl TryInto for &protobuf::LogicalPlanNode { Some(r?) } }; - LogicalPlanBuilder::scan_parquet_with_name( - &scan.path, - projection, + + let parquet_table = ParquetTable::try_new_with_desc( + Arc::new(ParquetTableDescriptor { descriptor }), 24, + true, + )?; + LogicalPlanBuilder::scan( &scan.table_name, + Arc::new(parquet_table), + projection, )? //TODO remove hard-coded max_partitions .build() .map_err(|e| e.into()) @@ -301,6 +309,60 @@ impl TryInto for &protobuf::LogicalPlanNode { } } +impl TryInto for &protobuf::TableDescriptor { + type Error = BallistaError; + + fn try_into(self) -> Result { + let partition_files = self + .partition_files + .iter() + .map(|f| f.try_into()) + .collect::, _>>()?; + let schema = convert_required!(self.schema)?; + Ok(TableDescriptor { + path: self.path.to_owned(), + partition_files, + schema: Arc::new(schema), + }) + } +} + +impl TryInto for &protobuf::PartitionedFile { + type Error = BallistaError; + + fn try_into(self) -> Result { + let statistics = convert_required!(self.statistics)?; + Ok(PartitionedFile { + path: self.path.clone(), + statistics, + }) + } +} + +impl From<&protobuf::ColumnStats> for ColumnStatistics { + fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics { + ColumnStatistics { + null_count: Some(cs.null_count as usize), + max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()), + min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()), + distinct_count: Some(cs.distinct_count as usize), + } + } +} + +impl TryInto for &protobuf::Statistics { + type Error = BallistaError; + + fn try_into(self) -> Result { + let column_statistics = self.column_stats.iter().map(|s| s.into()).collect(); + Ok(Statistics { + num_rows: Some(self.num_rows as usize), + total_byte_size: Some(self.total_byte_size as usize), + column_statistics: Some(column_statistics), + }) + } +} + impl From<&protobuf::Column> for Column { fn from(c: &protobuf::Column) -> Column { let c = c.clone(); @@ -1114,6 +1176,8 @@ impl TryInto for &protobuf::Field { } } +use crate::serde::protobuf::ColumnStats; +use datafusion::datasource::datasource::{ColumnStatistics, Statistics}; use datafusion::physical_plan::{aggregates, windows}; use datafusion::prelude::{ array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256, diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 5877ced5f561..aa7a973dd340 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -22,8 +22,11 @@ use super::super::proto_error; use crate::datasource::DfTableAdapter; use crate::serde::{protobuf, BallistaError}; -use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit}; -use datafusion::datasource::CsvFile; +use datafusion::arrow::datatypes::{ + DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, +}; +use datafusion::datasource::datasource::{ColumnStatistics, Statistics}; +use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor}; use datafusion::logical_plan::{ window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, Column, Expr, JoinConstraint, JoinType, LogicalPlan, @@ -253,6 +256,58 @@ impl TryInto for &protobuf::ArrowType { } } +impl From<&ColumnStatistics> for protobuf::ColumnStats { + fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats { + protobuf::ColumnStats { + min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()), + max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()), + null_count: cs.null_count.map(|n| n as u32).unwrap_or(0), + distinct_count: cs.distinct_count.map(|n| n as u32).unwrap_or(0), + } + } +} + +impl From<&Statistics> for protobuf::Statistics { + fn from(s: &Statistics) -> protobuf::Statistics { + let none_value = -1_i64; + let column_stats = match &s.column_statistics { + None => vec![], + Some(column_stats) => column_stats.iter().map(|s| s.into()).collect(), + }; + protobuf::Statistics { + num_rows: s.num_rows.map(|n| n as i64).unwrap_or(none_value), + total_byte_size: s.total_byte_size.map(|n| n as i64).unwrap_or(none_value), + column_stats, + } + } +} + +impl From<&PartitionedFile> for protobuf::PartitionedFile { + fn from(pf: &PartitionedFile) -> protobuf::PartitionedFile { + protobuf::PartitionedFile { + path: pf.path.clone(), + statistics: Some((&pf.statistics).into()), + } + } +} + +impl TryFrom for protobuf::TableDescriptor { + type Error = BallistaError; + + fn try_from(desc: TableDescriptor) -> Result { + let partition_files: Vec = + desc.partition_files.iter().map(|pf| pf.into()).collect(); + + let schema: protobuf::Schema = desc.schema.into(); + + Ok(protobuf::TableDescriptor { + path: desc.path, + partition_files, + schema: Some(schema), + }) + } +} + impl TryInto for &Box { type Error = BallistaError; fn try_into(self) -> Result { @@ -706,13 +761,14 @@ impl TryInto for &LogicalPlan { .collect::, _>>()?; if let Some(parquet) = source.downcast_ref::() { + let table_desc: protobuf::TableDescriptor = + parquet.desc.descriptor.clone().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::ParquetScan( protobuf::ParquetTableScanNode { table_name: table_name.to_owned(), - path: parquet.path().to_owned(), + table_desc: Some(table_desc), projection, - schema: Some(schema), filters, }, )), @@ -1262,6 +1318,19 @@ impl Into for &Schema { } } +#[allow(clippy::from_over_into)] +impl Into for SchemaRef { + fn into(self) -> protobuf::Schema { + protobuf::Schema { + columns: self + .fields() + .iter() + .map(protobuf::Field::from) + .collect::>(), + } + } +} + impl From<&datafusion::logical_plan::DFField> for protobuf::DfField { fn from(f: &datafusion::logical_plan::DFField) -> protobuf::DfField { protobuf::DfField { diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index d37940e71a49..522bac2e2ca4 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -34,6 +34,8 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::catalog::catalog::{ CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider, }; +use datafusion::datasource::datasource::Statistics; +use datafusion::datasource::FilePartition; use datafusion::execution::context::{ ExecutionConfig, ExecutionContextState, ExecutionProps, }; @@ -44,6 +46,8 @@ use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunc use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::PartitionMode; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::parquet::ParquetPartition; use datafusion::physical_plan::planner::DefaultPhysicalPlanner; use datafusion::physical_plan::window_functions::{ BuiltInWindowFunction, WindowFunction, @@ -129,17 +133,23 @@ impl TryInto> for &protobuf::PhysicalPlanNode { )?)) } PhysicalPlanType::ParquetScan(scan) => { + let partitions = scan + .partitions + .iter() + .map(|p| p.try_into()) + .collect::, _>>()?; + let schema = Arc::new(convert_required!(scan.schema)?); let projection = scan.projection.iter().map(|i| *i as usize).collect(); - let filenames: Vec<&str> = - scan.filename.iter().map(|s| s.as_str()).collect(); - Ok(Arc::new(ParquetExec::try_from_files( - &filenames, + Ok(Arc::new(ParquetExec::new( + partitions, + schema, Some(projection), + Statistics::default(), + ExecutionPlanMetricsSet::new(), None, scan.batch_size as usize, - scan.num_partitions as usize, None, - )?)) + ))) } PhysicalPlanType::CoalesceBatches(coalesce_batches) => { let input: Arc = @@ -470,6 +480,23 @@ impl TryInto> for &protobuf::PhysicalPlanNode { } } +impl TryInto for &protobuf::ParquetPartition { + type Error = BallistaError; + + fn try_into(self) -> Result { + let files = self + .files + .iter() + .map(|f| f.try_into()) + .collect::, _>>()?; + Ok(ParquetPartition::new( + files, + self.index as usize, + ExecutionPlanMetricsSet::new(), + )) + } +} + impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { Column::new(&c.name, c.index as usize) @@ -620,6 +647,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc; + let ctx_state = ExecutionContextState { catalog_list, scalar_functions: Default::default(), diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 8d8f917461a9..e7d4ac652874 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -38,7 +38,7 @@ use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::hash_aggregate::AggregateMode; use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use datafusion::physical_plan::parquet::ParquetExec; +use datafusion::physical_plan::parquet::{ParquetExec, ParquetPartition}; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sort::SortExec; use datafusion::{ @@ -268,22 +268,19 @@ impl TryInto for Arc { )), }) } else if let Some(exec) = plan.downcast_ref::() { - let filenames = exec - .partitions() - .iter() - .flat_map(|part| part.filenames().to_owned()) - .collect(); + let partitions = exec.partitions().iter().map(|p| p.into()).collect(); + Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ParquetScan( protobuf::ParquetScanExecNode { - filename: filenames, + partitions, + schema: Some(exec.schema.as_ref().into()), projection: exec .projection() .as_ref() .iter() .map(|n| *n as u32) .collect(), - num_partitions: exec.partitions().len() as u32, batch_size: exec.batch_size() as u32, }, )), @@ -621,6 +618,16 @@ impl TryFrom> for protobuf::PhysicalExprNode { } } +impl From<&ParquetPartition> for protobuf::ParquetPartition { + fn from(p: &ParquetPartition) -> protobuf::ParquetPartition { + let files = p.file_partition.files.iter().map(|f| f.into()).collect(); + protobuf::ParquetPartition { + index: p.file_partition.index as u32, + files, + } + } +} + fn try_parse_when_then_expr( when_expr: &Arc, then_expr: &Arc, diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 00f2c98b5c2a..f03d08b1b0ed 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -82,7 +82,7 @@ use self::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::BallistaConfig; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use datafusion::physical_plan::parquet::ParquetExec; +use datafusion::datasource::parquet::ParquetTableDescriptor; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -282,24 +282,25 @@ impl SchedulerGrpc for SchedulerServer { match file_type { FileType::Parquet => { - let parquet_exec = - ParquetExec::try_from_path(&path, None, None, 1024, 1, None) - .map_err(|e| { - let msg = format!("Error opening parquet files: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; + let parquet_desc = ParquetTableDescriptor::new(&path).map_err(|e| { + let msg = format!("Error opening parquet files: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + + let partitions = parquet_desc + .descriptor + .partition_files + .iter() + .map(|pf| FilePartitionMetadata { + filename: vec![pf.path.clone()], + }) + .collect(); //TODO include statistics and any other info needed to reconstruct ParquetExec Ok(Response::new(GetFileMetadataResult { - schema: Some(parquet_exec.schema().as_ref().into()), - partitions: parquet_exec - .partitions() - .iter() - .map(|part| FilePartitionMetadata { - filename: part.filenames().to_vec(), - }) - .collect(), + schema: Some(parquet_desc.schema().as_ref().into()), + partitions, })) } //TODO implement for CSV diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 9699a997caa1..d5e29522f6ba 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -27,6 +27,13 @@ pub mod parquet; pub use self::csv::{CsvFile, CsvReadOptions}; pub use self::datasource::{TableProvider, TableType}; pub use self::memory::MemTable; +use crate::arrow::datatypes::{Schema, SchemaRef}; +use crate::datasource::datasource::{ColumnStatistics, Statistics}; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::common::build_file_list; +use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; +use crate::physical_plan::Accumulator; +use std::sync::Arc; /// Source for table input data pub(crate) enum Source> { @@ -36,3 +43,252 @@ pub(crate) enum Source> { /// Read data from a reader Reader(std::sync::Mutex>), } + +#[derive(Debug, Clone)] +/// A single file that should be read, along with its schema, statistics +/// and partition column values that need to be appended to each row. +pub struct PartitionedFile { + /// Path for the file (e.g. URL, filesystem path, etc) + pub path: String, + /// Statistics of the file + pub statistics: Statistics, + // Values of partition columns to be appended to each row + // pub partition_value: Option>, + // We may include row group range here for a more fine-grained parallel execution +} + +impl From for PartitionedFile { + fn from(path: String) -> Self { + Self { + path, + statistics: Default::default(), + } + } +} + +impl std::fmt::Display for PartitionedFile { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.path) + } +} + +#[derive(Debug, Clone)] +/// A collection of files that should be read in a single task +pub struct FilePartition { + /// The index of the partition among all partitions + pub index: usize, + /// The contained files of the partition + pub files: Vec, +} + +impl std::fmt::Display for FilePartition { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let files: Vec = self.files.iter().map(|f| f.to_string()).collect(); + write!(f, "{}", files.join(", ")) + } +} + +#[derive(Debug, Clone)] +/// All source files with same schema exists in a path +pub struct TableDescriptor { + /// root path of the table + pub path: String, + /// All source files in the path + pub partition_files: Vec, + /// The schema of the files + pub schema: SchemaRef, +} + +/// Returned partitioned file with its schema +pub struct FileAndSchema { + file: PartitionedFile, + schema: Schema, +} + +/// Builder for ['TableDescriptor'] inside given path +pub trait TableDescriptorBuilder { + /// Construct a ['TableDescriptor'] from the provided path + fn build_table_desc( + path: &str, + ext: &str, + provided_schema: Option, + collect_statistics: bool, + ) -> Result { + let filenames = build_file_list(path, ext)?; + if filenames.is_empty() { + return Err(DataFusionError::Plan(format!( + "No file (with .{} extension) found at path {}", + ext, path + ))); + } + + // build a list of partitions with statistics and gather all unique schemas + // used in this data set + let mut schemas: Vec = vec![]; + let mut contains_file = false; + + let partitioned_files = filenames + .iter() + .map(|file_path| { + contains_file = true; + let result = if collect_statistics { + let FileAndSchema {file, schema} = Self::file_meta(file_path)?; + if schemas.is_empty() { + schemas.push(schema); + } else if schema != schemas[0] { + // we currently get the schema information from the first file rather than do + // schema merging and this is a limitation. + // See https://issues.apache.org/jira/browse/ARROW-11017 + return Err(DataFusionError::Plan(format!( + "The file {} have different schema from the first file and DataFusion does \ + not yet support schema merging", + file_path + ))); + } + file + } else { + PartitionedFile { + path: file_path.to_owned(), + statistics: Statistics::default(), + } + }; + + Ok(result) + }).collect::>>(); + + if !contains_file { + return Err(DataFusionError::Plan(format!( + "No file (with .{} extension) found at path {}", + ext, path + ))); + } + + let result_schema = provided_schema.unwrap_or_else(|| schemas.pop().unwrap()); + + Ok(TableDescriptor { + path: path.to_string(), + partition_files: partitioned_files?, + schema: Arc::new(result_schema), + }) + } + + /// Get all metadata for a source file, including schema, statistics, partitions, etc. + fn file_meta(path: &str) -> Result; +} + +/// Get all files as well as the summary statistic +/// if the optional `limit` is provided, includes only sufficient files +/// needed to read up to `limit` number of rows +pub fn get_statistics_with_limit( + table_desc: &TableDescriptor, + limit: Option, +) -> (Vec, Statistics) { + let mut all_files = table_desc.partition_files.clone(); + let schema = table_desc.schema.clone(); + + let mut total_byte_size = 0; + let mut null_counts = vec![0; schema.fields().len()]; + let mut has_statistics = false; + let (mut max_values, mut min_values) = create_max_min_accs(&schema); + + let mut num_rows = 0; + let mut num_files = 0; + for file in &all_files { + num_files += 1; + let file_stats = &file.statistics; + num_rows += file_stats.num_rows.unwrap_or(0); + total_byte_size += file_stats.total_byte_size.unwrap_or(0); + if let Some(vec) = &file_stats.column_statistics { + has_statistics = true; + for (i, cs) in vec.iter().enumerate() { + null_counts[i] += cs.null_count.unwrap_or(0); + + if let Some(max_value) = &mut max_values[i] { + if let Some(file_max) = cs.max_value.clone() { + match max_value.update(&[file_max]) { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + } + + if let Some(min_value) = &mut min_values[i] { + if let Some(file_min) = cs.min_value.clone() { + match min_value.update(&[file_min]) { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + if num_rows > limit.unwrap_or(usize::MAX) { + break; + } + } + all_files.truncate(num_files); + + let column_stats = if has_statistics { + Some(get_col_stats( + &*schema, + null_counts, + &mut max_values, + &mut min_values, + )) + } else { + None + }; + + let statistics = Statistics { + num_rows: Some(num_rows as usize), + total_byte_size: Some(total_byte_size as usize), + column_statistics: column_stats, + }; + (all_files, statistics) +} + +fn create_max_min_accs( + schema: &Schema, +) -> (Vec>, Vec>) { + let max_values: Vec> = schema + .fields() + .iter() + .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) + .collect::>(); + let min_values: Vec> = schema + .fields() + .iter() + .map(|field| MinAccumulator::try_new(field.data_type()).ok()) + .collect::>(); + (max_values, min_values) +} + +fn get_col_stats( + schema: &Schema, + null_counts: Vec, + max_values: &mut Vec>, + min_values: &mut Vec>, +) -> Vec { + (0..schema.fields().len()) + .map(|i| { + let max_value = match &max_values[i] { + Some(max_value) => max_value.evaluate().ok(), + None => None, + }; + let min_value = match &min_values[i] { + Some(min_value) => min_value.evaluate().ok(), + None => None, + }; + ColumnStatistics { + null_count: Some(null_counts[i] as usize), + max_value, + min_value, + distinct_count: None, + } + }) + .collect() +} diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index a9b4c0fef348..c11aadea9a64 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -18,25 +18,32 @@ //! Parquet data source use std::any::Any; -use std::string::String; +use std::fs::File; use std::sync::Arc; -use arrow::datatypes::*; +use parquet::arrow::ArrowReader; +use parquet::arrow::ParquetFileArrowReader; +use parquet::file::serialized_reader::SerializedFileReader; +use parquet::file::statistics::Statistics as ParquetStatistics; +use super::datasource::TableProviderFilterPushDown; +use crate::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use crate::datasource::datasource::Statistics; -use crate::datasource::TableProvider; +use crate::datasource::{ + create_max_min_accs, get_col_stats, get_statistics_with_limit, FileAndSchema, + PartitionedFile, TableDescriptor, TableDescriptorBuilder, TableProvider, +}; use crate::error::Result; use crate::logical_plan::{combine_filters, Expr}; +use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::parquet::ParquetExec; -use crate::physical_plan::ExecutionPlan; - -use super::datasource::TableProviderFilterPushDown; +use crate::physical_plan::{Accumulator, ExecutionPlan}; +use crate::scalar::ScalarValue; /// Table-based representation of a `ParquetFile`. pub struct ParquetTable { - path: String, - schema: SchemaRef, - statistics: Statistics, + /// Descriptor of the table, including schema, files, etc. + pub desc: Arc, max_partitions: usize, enable_pruning: bool, } @@ -45,12 +52,9 @@ impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path. pub fn try_new(path: impl Into, max_partitions: usize) -> Result { let path = path.into(); - let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?; - let schema = parquet_exec.schema(); + let table_desc = ParquetTableDescriptor::new(path.as_str()); Ok(Self { - path, - schema, - statistics: parquet_exec.statistics().to_owned(), + desc: Arc::new(table_desc?), max_partitions, enable_pruning: true, }) @@ -65,29 +69,34 @@ impl ParquetTable { collect_statistics: bool, ) -> Result { let path = path.into(); - if collect_statistics { - let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?; - Ok(Self { - path, - schema: Arc::new(schema), - statistics: parquet_exec.statistics().to_owned(), - max_partitions, - enable_pruning: true, - }) - } else { - Ok(Self { - path, - schema: Arc::new(schema), - statistics: Statistics::default(), - max_partitions, - enable_pruning: true, - }) - } + let table_desc = ParquetTableDescriptor::new_with_schema( + path.as_str(), + Some(schema), + collect_statistics, + ); + Ok(Self { + desc: Arc::new(table_desc?), + max_partitions, + enable_pruning: true, + }) + } + + /// Attempt to initialize a new `ParquetTable` from a table descriptor. + pub fn try_new_with_desc( + desc: Arc, + max_partitions: usize, + enable_pruning: bool, + ) -> Result { + Ok(Self { + desc, + max_partitions, + enable_pruning, + }) } /// Get the path for the Parquet file(s) represented by this ParquetTable instance pub fn path(&self) -> &str { - &self.path + &self.desc.descriptor.path } /// Get parquet pruning option @@ -109,7 +118,7 @@ impl TableProvider for ParquetTable { /// Get the schema for this parquet file. fn schema(&self) -> SchemaRef { - self.schema.clone() + self.desc.schema() } fn supports_filter_pushdown( @@ -136,8 +145,8 @@ impl TableProvider for ParquetTable { } else { None }; - Ok(Arc::new(ParquetExec::try_from_path( - &self.path, + Ok(Arc::new(ParquetExec::try_new( + self.desc.clone(), projection.clone(), predicate, limit @@ -149,7 +158,7 @@ impl TableProvider for ParquetTable { } fn statistics(&self) -> Statistics { - self.statistics.clone() + self.desc.statistics() } fn has_exact_statistics(&self) -> bool { @@ -157,6 +166,253 @@ impl TableProvider for ParquetTable { } } +#[derive(Debug, Clone)] +/// Descriptor for a parquet root path +pub struct ParquetTableDescriptor { + /// metadata for files inside the root path + pub descriptor: TableDescriptor, +} + +impl ParquetTableDescriptor { + /// Construct a new parquet descriptor for a root path + pub fn new(root_path: &str) -> Result { + let table_desc = Self::build_table_desc(root_path, "parquet", None, true); + Ok(Self { + descriptor: table_desc?, + }) + } + + /// Construct a new parquet descriptor for a root path with known schema + pub fn new_with_schema( + root_path: &str, + schema: Option, + collect_statistics: bool, + ) -> Result { + let table_desc = + Self::build_table_desc(root_path, "parquet", schema, collect_statistics); + Ok(Self { + descriptor: table_desc?, + }) + } + + /// Get file schema for all parquet files + pub fn schema(&self) -> SchemaRef { + self.descriptor.schema.clone() + } + + /// Get the summary statistics for all parquet files + pub fn statistics(&self) -> Statistics { + get_statistics_with_limit(&self.descriptor, None).1 + } + + fn summarize_min_max( + max_values: &mut Vec>, + min_values: &mut Vec>, + fields: &[Field], + i: usize, + stat: &ParquetStatistics, + ) { + match stat { + ParquetStatistics::Boolean(s) => { + if let DataType::Boolean = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Boolean(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Boolean(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Int32(s) => { + if let DataType::Int32 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ScalarValue::Int32(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ScalarValue::Int32(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Int64(s) => { + if let DataType::Int64 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value.update(&[ScalarValue::Int64(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value.update(&[ScalarValue::Int64(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Float(s) => { + if let DataType::Float32 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Float32(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Float32(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + ParquetStatistics::Double(s) => { + if let DataType::Float64 = fields[i].data_type() { + if s.has_min_max_set() { + if let Some(max_value) = &mut max_values[i] { + match max_value + .update(&[ScalarValue::Float64(Some(*s.max()))]) + { + Ok(_) => {} + Err(_) => { + max_values[i] = None; + } + } + } + if let Some(min_value) = &mut min_values[i] { + match min_value + .update(&[ScalarValue::Float64(Some(*s.min()))]) + { + Ok(_) => {} + Err(_) => { + min_values[i] = None; + } + } + } + } + } + } + _ => {} + } + } +} + +impl TableDescriptorBuilder for ParquetTableDescriptor { + fn file_meta(path: &str) -> Result { + let file = File::open(path)?; + let file_reader = Arc::new(SerializedFileReader::new(file)?); + let mut arrow_reader = ParquetFileArrowReader::new(file_reader); + let path = path.to_string(); + let schema = arrow_reader.get_schema()?; + let num_fields = schema.fields().len(); + let fields = schema.fields().to_vec(); + let meta_data = arrow_reader.get_metadata(); + + let mut num_rows = 0; + let mut total_byte_size = 0; + let mut null_counts = vec![0; num_fields]; + let mut has_statistics = false; + + let (mut max_values, mut min_values) = create_max_min_accs(&schema); + + for row_group_meta in meta_data.row_groups() { + num_rows += row_group_meta.num_rows(); + total_byte_size += row_group_meta.total_byte_size(); + + let columns_null_counts = row_group_meta + .columns() + .iter() + .flat_map(|c| c.statistics().map(|stats| stats.null_count())); + + for (i, cnt) in columns_null_counts.enumerate() { + null_counts[i] += cnt as usize + } + + for (i, column) in row_group_meta.columns().iter().enumerate() { + if let Some(stat) = column.statistics() { + has_statistics = true; + ParquetTableDescriptor::summarize_min_max( + &mut max_values, + &mut min_values, + &fields, + i, + stat, + ) + } + } + } + + let column_stats = if has_statistics { + Some(get_col_stats( + &schema, + null_counts, + &mut max_values, + &mut min_values, + )) + } else { + None + }; + + let statistics = Statistics { + num_rows: Some(num_rows as usize), + total_byte_size: Some(total_byte_size as usize), + column_statistics: column_stats, + }; + + Ok(FileAndSchema { + file: PartitionedFile { path, statistics }, + schema, + }) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index a983af402826..fd8650411d71 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -110,6 +110,7 @@ mod tests { use super::*; use crate::datasource::datasource::Statistics; + use crate::datasource::PartitionedFile; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use crate::physical_plan::parquet::{ParquetExec, ParquetPartition}; use crate::physical_plan::projection::ProjectionExec; @@ -122,12 +123,13 @@ mod tests { vec![], Arc::new(ParquetExec::new( vec![ParquetPartition::new( - vec!["x".to_string()], - Statistics::default(), + vec![PartitionedFile::from("x".to_string())], + 0, metrics.clone(), )], schema, None, + Statistics::default(), metrics, None, 2048, @@ -162,12 +164,13 @@ mod tests { vec![], Arc::new(ParquetExec::new( vec![ParquetPartition::new( - vec!["x".to_string()], - Statistics::default(), + vec![PartitionedFile::from("x".to_string())], + 0, metrics.clone(), )], schema, None, + Statistics::default(), metrics, None, 2048, diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index ac1655b60762..eb8f927fc2ad 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -27,14 +27,14 @@ use crate::{ logical_plan::{Column, Expr}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ - common, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, }, scalar::ScalarValue, }; use arrow::{ array::ArrayRef, - datatypes::{DataType, Schema, SchemaRef}, + datatypes::{Schema, SchemaRef}, error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; @@ -53,21 +53,21 @@ use tokio::{ task, }; -use crate::datasource::datasource::{ColumnStatistics, Statistics}; +use crate::datasource::datasource::Statistics; use async_trait::async_trait; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::stream::RecordBatchReceiverStream; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::Accumulator; +use crate::datasource::parquet::ParquetTableDescriptor; +use crate::datasource::{get_statistics_with_limit, FilePartition, PartitionedFile}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { /// Parquet partitions to read - partitions: Vec, + pub partitions: Vec, /// Schema after projection is applied - schema: SchemaRef, + pub schema: SchemaRef, /// Projection for which columns to load projection: Vec, /// Batch size @@ -94,9 +94,7 @@ pub struct ParquetExec { #[derive(Debug, Clone)] pub struct ParquetPartition { /// The Parquet filename for this partition - pub filenames: Vec, - /// Statistics for this partition - pub statistics: Statistics, + pub file_partition: FilePartition, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -104,7 +102,7 @@ pub struct ParquetPartition { /// Stores metrics about the parquet execution for a particular parquet file #[derive(Debug, Clone)] struct ParquetFileMetrics { - /// Numer of times the predicate could not be evaluated + /// Number of times the predicate could not be evaluated pub predicate_evaluation_errors: metrics::Count, /// Number of row groups pruned using pub row_groups_pruned: metrics::Count, @@ -123,290 +121,42 @@ impl ParquetExec { ) -> Result { // build a list of filenames from the specified path, which could be a single file or // a directory containing one or more parquet files - let filenames = common::build_file_list(path, ".parquet")?; - if filenames.is_empty() { - Err(DataFusionError::Plan(format!( - "No Parquet files (with .parquet extension) found at path {}", - path - ))) - } else { - let filenames = filenames - .iter() - .map(|filename| filename.as_str()) - .collect::>(); - Self::try_from_files( - &filenames, - projection, - predicate, - batch_size, - max_partitions, - limit, - ) - } + let table_desc = ParquetTableDescriptor::new(path)?; + Self::try_new( + Arc::new(table_desc), + projection, + predicate, + batch_size, + max_partitions, + limit, + ) } - /// Create a new Parquet reader execution plan based on the specified list of Parquet - /// files - pub fn try_from_files( - filenames: &[&str], + /// Create a new Parquet reader execution plan with root descriptor, provided partitions and schema + pub fn try_new( + desc: Arc, projection: Option>, predicate: Option, batch_size: usize, max_partitions: usize, limit: Option, ) -> Result { - debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", - filenames, projection, predicate, limit); - // build a list of Parquet partitions with statistics and gather all unique schemas - // used in this data set - let metrics = ExecutionPlanMetricsSet::new(); - let mut schemas: Vec = vec![]; - let mut partitions = Vec::with_capacity(max_partitions); - let filenames: Vec = filenames.iter().map(|s| s.to_string()).collect(); - let chunks = split_files(&filenames, max_partitions); - let mut num_rows = 0; - let mut num_fields = 0; - let mut fields = Vec::new(); - let mut total_byte_size = 0; - let mut null_counts = Vec::new(); - let mut max_values: Vec> = Vec::new(); - let mut min_values: Vec> = Vec::new(); - let mut limit_exhausted = false; - for chunk in chunks { - let mut filenames: Vec = - chunk.iter().map(|x| x.to_string()).collect(); - let mut total_files = 0; - for filename in &filenames { - total_files += 1; - let file = File::open(filename)?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); - let mut arrow_reader = ParquetFileArrowReader::new(file_reader); - let meta_data = arrow_reader.get_metadata(); - // collect all the unique schemas in this data set - let schema = arrow_reader.get_schema()?; - if schemas.is_empty() || schema != schemas[0] { - fields = schema.fields().to_vec(); - num_fields = schema.fields().len(); - null_counts = vec![0; num_fields]; - max_values = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - min_values = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - schemas.push(schema); - } - - for row_group_meta in meta_data.row_groups() { - num_rows += row_group_meta.num_rows(); - total_byte_size += row_group_meta.total_byte_size(); + debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", + desc, projection, predicate, limit); - // Currently assumes every Parquet file has same schema - // https://issues.apache.org/jira/browse/ARROW-11017 - let columns_null_counts = row_group_meta - .columns() - .iter() - .flat_map(|c| c.statistics().map(|stats| stats.null_count())); - - for (i, cnt) in columns_null_counts.enumerate() { - null_counts[i] += cnt - } - - for (i, column) in row_group_meta.columns().iter().enumerate() { - if let Some(stat) = column.statistics() { - match stat { - ParquetStatistics::Boolean(s) => { - if let DataType::Boolean = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Boolean(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Boolean(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Int32(s) => { - if let DataType::Int32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Int32(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Int32(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Int64(s) => { - if let DataType::Int64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Int64(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Int64(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Float(s) => { - if let DataType::Float32 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Float32(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Float32(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - ParquetStatistics::Double(s) => { - if let DataType::Float64 = fields[i].data_type() { - if s.has_min_max_set() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[ - ScalarValue::Float64(Some(*s.max())), - ]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[ - ScalarValue::Float64(Some(*s.min())), - ]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - _ => {} - } - } - } + let metrics = ExecutionPlanMetricsSet::new(); + let (all_files, statistics) = get_statistics_with_limit(&desc.descriptor, limit); + let schema = desc.schema(); - if limit.map(|x| num_rows >= x as i64).unwrap_or(false) { - limit_exhausted = true; - break; - } - } - } - let column_stats = (0..num_fields) - .map(|i| { - let max_value = match &max_values[i] { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match &min_values[i] { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: Some(null_counts[i] as usize), - max_value, - min_value, - distinct_count: None, - } - }) - .collect(); - - let statistics = Statistics { - num_rows: Some(num_rows as usize), - total_byte_size: Some(total_byte_size as usize), - column_statistics: Some(column_stats), - }; - // remove files that are not needed in case of limit - filenames.truncate(total_files); + let mut partitions = Vec::with_capacity(max_partitions); + let chunked_files = split_files(&all_files, max_partitions); + for (index, group) in chunked_files.iter().enumerate() { partitions.push(ParquetPartition::new( - filenames, - statistics, + Vec::from(*group), + index, metrics.clone(), )); - if limit_exhausted { - break; - } - } - - // we currently get the schema information from the first file rather than do - // schema merging and this is a limitation. - // See https://issues.apache.org/jira/browse/ARROW-11017 - if schemas.len() > 1 { - return Err(DataFusionError::Plan(format!( - "The Parquet files have {} different schemas and DataFusion does \ - not yet support schema merging", - schemas.len() - ))); } - let schema = Arc::new(schemas.pop().unwrap()); let metrics = ExecutionPlanMetricsSet::new(); let predicate_creation_errors = @@ -430,6 +180,7 @@ impl ParquetExec { partitions, schema, projection, + statistics, metrics, predicate_builder, batch_size, @@ -438,10 +189,12 @@ impl ParquetExec { } /// Create a new Parquet reader execution plan with provided partitions and schema + #[allow(clippy::too_many_arguments)] pub fn new( partitions: Vec, schema: SchemaRef, projection: Option>, + statistics: Statistics, metrics: ExecutionPlanMetricsSet, predicate_builder: Option, batch_size: usize, @@ -459,94 +212,20 @@ impl ParquetExec { .collect(), ); - // sum the statistics - let mut num_rows: Option = None; - let mut total_byte_size: Option = None; - let mut null_counts: Vec = vec![0; schema.fields().len()]; - let mut has_statistics = false; - let mut max_values = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - let mut min_values = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - for part in &partitions { - if let Some(n) = part.statistics.num_rows { - num_rows = Some(num_rows.unwrap_or(0) + n) - } - if let Some(n) = part.statistics.total_byte_size { - total_byte_size = Some(total_byte_size.unwrap_or(0) + n) + let new_column_statistics = statistics.column_statistics.map(|stats| { + let mut projected_stats = Vec::with_capacity(projection.len()); + for proj in &projection { + projected_stats.push(stats[*proj].clone()); } - if let Some(x) = &part.statistics.column_statistics { - let part_nulls: Vec> = - x.iter().map(|c| c.null_count).collect(); - has_statistics = true; - - let part_max_values: Vec> = - x.iter().map(|c| c.max_value.clone()).collect(); - let part_min_values: Vec> = - x.iter().map(|c| c.min_value.clone()).collect(); - - for &i in projection.iter() { - null_counts[i] = part_nulls[i].unwrap_or(0); - if let Some(part_max_value) = part_max_values[i].clone() { - if let Some(max_value) = &mut max_values[i] { - match max_value.update(&[part_max_value]) { - Ok(_) => {} - Err(_) => { - max_values[i] = None; - } - } - } - } - if let Some(part_min_value) = part_min_values[i].clone() { - if let Some(min_value) = &mut min_values[i] { - match min_value.update(&[part_min_value]) { - Ok(_) => {} - Err(_) => { - min_values[i] = None; - } - } - } - } - } - } - } - - let column_stats = if has_statistics { - Some( - (0..schema.fields().len()) - .map(|i| { - let max_value = match &max_values[i] { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match &min_values[i] { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: Some(null_counts[i] as usize), - max_value, - min_value, - distinct_count: None, - } - }) - .collect(), - ) - } else { - None - }; + projected_stats + }); let statistics = Statistics { - num_rows, - total_byte_size, - column_statistics: column_stats, + num_rows: statistics.num_rows, + total_byte_size: statistics.total_byte_size, + column_statistics: new_column_statistics, }; + Self { partitions, schema: Arc::new(projected_schema), @@ -583,26 +262,15 @@ impl ParquetExec { impl ParquetPartition { /// Create a new parquet partition pub fn new( - filenames: Vec, - statistics: Statistics, + files: Vec, + index: usize, metrics: ExecutionPlanMetricsSet, ) -> Self { Self { - filenames, - statistics, + file_partition: FilePartition { index, files }, metrics, } } - - /// The Parquet filename for this partition - pub fn filenames(&self) -> &[String] { - &self.filenames - } - - /// Statistics for this partition - pub fn statistics(&self) -> &Statistics { - &self.statistics - } } impl ParquetFileMetrics { @@ -662,7 +330,7 @@ impl ExecutionPlan for ParquetExec { } } - async fn execute(&self, partition: usize) -> Result { + async fn execute(&self, partition_index: usize) -> Result { // because the parquet implementation is not thread-safe, it is necessary to execute // on a thread and communicate with channels let (response_tx, response_rx): ( @@ -670,8 +338,7 @@ impl ExecutionPlan for ParquetExec { Receiver>, ) = channel(2); - let parquet_partition = &self.partitions[partition]; - let filenames = parquet_partition.filenames.clone(); + let partition = self.partitions[partition_index].clone(); let metrics = self.metrics.clone(); let projection = self.projection.clone(); let predicate_builder = self.predicate_builder.clone(); @@ -679,9 +346,9 @@ impl ExecutionPlan for ParquetExec { let limit = self.limit; task::spawn_blocking(move || { - if let Err(e) = read_files( + if let Err(e) = read_partition( + partition_index, partition, - &filenames, metrics, &projection, &predicate_builder, @@ -706,9 +373,7 @@ impl ExecutionPlan for ParquetExec { let files: Vec<_> = self .partitions .iter() - .map(|pp| pp.filenames.iter()) - .flatten() - .map(|s| s.as_str()) + .map(|pp| format!("{}", pp.file_partition)) .collect(); write!( @@ -838,7 +503,7 @@ fn build_row_group_predicate( match predicate_values { Ok(values) => { // NB: false means don't scan row group - let num_pruned = values.iter().filter(|&v| !v).count(); + let num_pruned = values.iter().filter(|&v| !*v).count(); metrics.row_groups_pruned.add(num_pruned); Box::new(move |_, i| values[i]) } @@ -853,9 +518,9 @@ fn build_row_group_predicate( } #[allow(clippy::too_many_arguments)] -fn read_files( - partition: usize, - filenames: &[String], +fn read_partition( + partition_index: usize, + partition: ParquetPartition, metrics: ExecutionPlanMetricsSet, projection: &[usize], predicate_builder: &Option, @@ -864,9 +529,11 @@ fn read_files( limit: Option, ) -> Result<()> { let mut total_rows = 0; - 'outer: for filename in filenames { - let file_metrics = ParquetFileMetrics::new(partition, filename, &metrics); - let file = File::open(&filename)?; + let all_files = partition.file_partition.files; + 'outer: for partitioned_file in all_files { + let file_metrics = + ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics); + let file = File::open(partitioned_file.path.as_str())?; let mut file_reader = SerializedFileReader::new(file)?; if let Some(predicate_builder) = predicate_builder { let row_group_predicate = build_row_group_predicate( @@ -894,7 +561,7 @@ fn read_files( Some(Err(e)) => { let err_msg = format!( "Error reading batch from {}: {}", - filename, + partitioned_file, e.to_string() ); // send error to operator @@ -914,12 +581,15 @@ fn read_files( Ok(()) } -fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> { - let mut chunk_size = filenames.len() / n; - if filenames.len() % n > 0 { +fn split_files( + partitioned_files: &[PartitionedFile], + n: usize, +) -> Vec<&[PartitionedFile]> { + let mut chunk_size = partitioned_files.len() / n; + if partitioned_files.len() % n > 0 { chunk_size += 1; } - filenames.chunks(chunk_size).collect() + partitioned_files.chunks(chunk_size).collect() } #[cfg(test)] @@ -935,24 +605,24 @@ mod tests { #[test] fn test_split_files() { - let filenames = vec![ - "a".to_string(), - "b".to_string(), - "c".to_string(), - "d".to_string(), - "e".to_string(), + let files = vec![ + PartitionedFile::from("a".to_string()), + PartitionedFile::from("b".to_string()), + PartitionedFile::from("c".to_string()), + PartitionedFile::from("d".to_string()), + PartitionedFile::from("e".to_string()), ]; - let chunks = split_files(&filenames, 1); + let chunks = split_files(&files, 1); assert_eq!(1, chunks.len()); assert_eq!(5, chunks[0].len()); - let chunks = split_files(&filenames, 2); + let chunks = split_files(&files, 2); assert_eq!(2, chunks.len()); assert_eq!(3, chunks[0].len()); assert_eq!(2, chunks[1].len()); - let chunks = split_files(&filenames, 5); + let chunks = split_files(&files, 5); assert_eq!(5, chunks.len()); assert_eq!(1, chunks[0].len()); assert_eq!(1, chunks[1].len()); @@ -960,7 +630,7 @@ mod tests { assert_eq!(1, chunks[3].len()); assert_eq!(1, chunks[4].len()); - let chunks = split_files(&filenames, 123); + let chunks = split_files(&files, 123); assert_eq!(5, chunks.len()); assert_eq!(1, chunks[0].len()); assert_eq!(1, chunks[1].len());