diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index cdd512a07634f..004570707a484 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -103,6 +103,7 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// `───────────────────' /// /// ``` +/// /// # Features /// /// Supports the following optimizations: @@ -118,36 +119,50 @@ pub use statistics::{RequestedStatistics, StatisticsConverter}; /// /// * Limit pushdown: stop execution early after some number of rows are read. /// -/// * Custom readers: controls I/O for accessing pages. See -/// [`ParquetFileReaderFactory`] for more details. +/// * Custom readers: controls I/O for accessing pages, and reading +/// [`ParquetMetadata`]. This can be used to implement custom IO scheduling, and +/// re-using parsed metadata. See [`ParquetFileReaderFactory`] for more details. /// -/// * Schema adapters: read parquet files with different schemas into a unified +/// * Schema adapters:read parquet files with different schemas into a unified /// table schema. This can be used to implement "schema evolution". See /// [`SchemaAdapterFactory`] for more details. /// /// * metadata_size_hint: controls the number of bytes read from the end of the -/// file in the initial I/O. +/// file in the initial I/O. See [`ParquetExecBuilder::with_metadata_size_hint`] +/// for details. /// /// # Execution Overview /// -/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`] +/// * Step 1: `ParquetExec::execute` is called, returning a [`FileStream`] /// configured to open parquet files with a [`ParquetOpener`]. /// -/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open +/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open /// the file. /// /// * Step 3: The `ParquetOpener` gets the file metadata by reading the footer, /// and applies any predicates and projections to determine what pages must be /// read. /// -/// * Step 4: The stream begins reading data, fetching the required pages +/// * Step 4: The stream begins reading data, by fetching the required pages /// and incrementally decoding them. /// -/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a -/// [`SchemaAdapter`] to match the table schema. By default missing columns are -/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`]. -/// -/// [`RecordBatch`]: arrow::record_batch::RecordBatch +/// # Example: Create a `ParquetExec` +/// ``` +/// # use std::sync::Arc; +/// # use arrow::datatypes::Schema; +/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; +/// # use datafusion::datasource::listing::PartitionedFile; +/// # let file_schema = Arc::new(Schema::empty()); +/// # let object_store_url = ObjectStoreUrl::local_filesystem(); +/// # let file_scan_config = FileScanConfig::new(object_store_url, file_schema); +/// # use datafusion_execution::object_store::ObjectStoreUrl; +/// # use datafusion_physical_expr::expressions::lit; +/// # let predicate = lit(true); +/// let exec = ParquetExec::builder(file_scan_config) +/// // Provide a predicate for filtering row groups +/// .with_predicate(predicate) +/// .build(); +/// ``` #[derive(Debug, Clone)] pub struct ParquetExec { /// Base configuration for this scan @@ -173,14 +188,107 @@ pub struct ParquetExec { schema_adapter_factory: Option>, } -impl ParquetExec { - /// Create a new Parquet reader execution plan provided file list and schema. - pub fn new( - base_config: FileScanConfig, - predicate: Option>, - metadata_size_hint: Option, +/// [`ParquetExecBuilder`]`, builder for [`ParquetExec`]. +/// +/// See example on [`ParquetExec`]. +pub struct ParquetExecBuilder { + file_scan_config: FileScanConfig, + predicate: Option>, + metadata_size_hint: Option, + table_parquet_options: TableParquetOptions, + parquet_file_reader_factory: Option>, + schema_adapter_factory: Option>, +} + +impl ParquetExecBuilder { + /// Create a new builder to read the provided file scan configuration + pub fn new(file_scan_config: FileScanConfig) -> Self { + Self::new_with_options(file_scan_config, TableParquetOptions::default()) + } + + /// Create a new builder to read the provided file scan configuration + /// with the provided table parquet options + pub fn new_with_options( + file_scan_config: FileScanConfig, table_parquet_options: TableParquetOptions, ) -> Self { + Self { + file_scan_config, + predicate: None, + metadata_size_hint: None, + table_parquet_options, + parquet_file_reader_factory: None, + schema_adapter_factory: None, + } + } + + /// Set the predicate for the scan. The ParquetExec will use this predicate + /// to filter row groups and data pages using the Parquet statistics and + /// bloom filters. + pub fn with_predicate(mut self, predicate: Arc) -> Self { + self.predicate = Some(predicate); + self + } + + /// Set the metadata size hint + /// + /// This value determines how many bytes at the end of the file the + /// ParquetExec will request in the initial IO. If this is too small, the + /// ParquetExec will need to make additional IO requests to read the footer. + pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { + self.metadata_size_hint = Some(metadata_size_hint); + self + } + + /// Set the table parquet options that control how the ParquetExec reads + pub fn with_table_parquet_options( + mut self, + table_parquet_options: TableParquetOptions, + ) -> Self { + self.table_parquet_options = table_parquet_options; + self + } + + /// Optional user defined parquet file reader factory. + /// + /// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom + /// implementation for data access operations. + /// + /// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed + /// to this factory instead of `ObjectStore`. + pub fn with_parquet_file_reader_factory( + mut self, + parquet_file_reader_factory: Arc, + ) -> Self { + self.parquet_file_reader_factory = Some(parquet_file_reader_factory); + self + } + + /// Optional schema adapter factory. + /// + /// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to + /// that of the table schema. The default schema adapter uses arrow's cast library to map + /// the parquet fields to the table schema. + pub fn with_schema_adapter_factory( + mut self, + schema_adapter_factory: Arc, + ) -> Self { + self.schema_adapter_factory = Some(schema_adapter_factory); + self + } + + /// Build a [`ParquetExec`] + pub fn build(self) -> ParquetExec { + let Self { + file_scan_config, + predicate, + metadata_size_hint, + table_parquet_options, + parquet_file_reader_factory, + schema_adapter_factory, + } = self; + + let base_config = file_scan_config; debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -219,12 +327,12 @@ impl ParquetExec { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); - let cache = Self::compute_properties( + let cache = ParquetExec::compute_properties( projected_schema, &projected_output_ordering, &base_config, ); - Self { + ParquetExec { base_config, projected_statistics, metrics, @@ -232,12 +340,38 @@ impl ParquetExec { pruning_predicate, page_pruning_predicate, metadata_size_hint, - parquet_file_reader_factory: None, + parquet_file_reader_factory, cache, table_parquet_options, - schema_adapter_factory: None, + schema_adapter_factory, } } +} + +impl ParquetExec { + /// Create a new Parquet reader execution plan provided file list and schema. + // TODO deprecate + pub fn new( + base_config: FileScanConfig, + predicate: Option>, + metadata_size_hint: Option, + table_parquet_options: TableParquetOptions, + ) -> Self { + let mut builder = + ParquetExecBuilder::new_with_options(base_config, table_parquet_options); + if let Some(predicate) = predicate { + builder = builder.with_predicate(predicate); + } + if let Some(metadata_size_hint) = metadata_size_hint { + builder = builder.with_metadata_size_hint(metadata_size_hint); + } + builder.build() + } + + /// Return a builder for `ParquetExec`. See example on [`ParquetExec`]. + pub fn builder(file_scan_config: FileScanConfig) -> ParquetExecBuilder { + ParquetExecBuilder::new(file_scan_config) + } /// [`FileScanConfig`] that controls this scan (such as which files to read) pub fn base_config(&self) -> &FileScanConfig { @@ -261,11 +395,7 @@ impl ParquetExec { /// Optional user defined parquet file reader factory. /// - /// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom - /// implementation for data access operations. - /// - /// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed - /// to this factory instead of `ObjectStore`. + /// See documentation on [`ParquetExecBuilder::with_parquet_file_reader_factory`] pub fn with_parquet_file_reader_factory( mut self, parquet_file_reader_factory: Arc, @@ -276,9 +406,7 @@ impl ParquetExec { /// Optional schema adapter factory. /// - /// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to - /// that of the table schema. The default schema adapter uses arrow's cast library to map - /// the parquet fields to the table schema. + /// See documentation on [`ParquetExecBuilder::with_schema_adapter_factory`] pub fn with_schema_adapter_factory( mut self, schema_adapter_factory: Arc,