Skip to content

Commit

Permalink
revert type param on parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen committed Aug 2, 2021
1 parent 77e1eb2 commit 710977b
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 52 deletions.
15 changes: 7 additions & 8 deletions datafusion/src/datasource/datasource2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ pub struct SourceDescriptor {
pub schema: SchemaRef,
}

pub trait DataSource2<R: ChunkReader + 'static>: Send + Sync {
pub trait DataSource2: Send + Sync {
fn list_partitions(&self, max_concurrency: usize) -> Result<Arc<FilePartition>>;

fn schema(&self) -> Result<Arc<Schema>>;

fn get_read_for_file(&self, partitioned_file: PartitionedFile) -> Result<R>;
fn get_read_for_file(&self, partitioned_file: PartitionedFile) -> Result<dyn ChunkReader>;

fn statistics(&self) -> &Statistics;
}

pub trait SourceDescBuilder<R: ChunkReader + 'static> {
pub trait SourceDescBuilder {
fn get_source_desc(root_path: &str) -> Result<SourceDescriptor> {
let filenames = Self::get_all_files(root_path)?;
if filenames.is_empty() {
Expand All @@ -71,13 +71,12 @@ pub trait SourceDescBuilder<R: ChunkReader + 'static> {
// build a list of Parquet partitions with statistics and gather all unique schemas
// used in this data set
let mut schemas: Vec<Schema> = vec![];
let mut partitioned_files: Vec<PartitionedFile> = vec![];

let partitioned_files = filenames
.iter()
.map(|file_path| {
let pf = Self::get_file_meta(file_path)?;
let schema = pf.schema;
let schema = pf.schema.clone();
if schemas.is_empty() {
schemas.push(schema);
} else if schema != schemas[0] {
Expand All @@ -103,15 +102,15 @@ pub trait SourceDescBuilder<R: ChunkReader + 'static> {

fn get_file_meta(file_path: &str) -> Result<PartitionedFile>;

fn reader_for_file_meta(file_path: &str) -> Result<R>;
fn reader_for_file_meta(file_path: &str) -> Result<dyn ChunkReader>;
}

pub trait ParquetSourceDescBuilder: SourceDescBuilder {

fn get_file_meta(file_path: &str) {
fn get_file_meta(file_path: &str) -> Result<PartitionedFile> {
let chunk_reader = Self::reader_for_file_meta(file_path)?;
let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
let arrow_reader = ParquetFileArrowReader::new(file_reader);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let file_path = file_path.to_string();
let schema = arrow_reader.get_schema()?;
let num_fields = schema.fields().len();
Expand Down
8 changes: 1 addition & 7 deletions datafusion/src/datasource/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,8 @@ use super::datasource2::DataSource2;
use std::fs;
use std::fs::metadata;

struct LocalFSHander {}

impl DataSource2 for LocalFSHander {

}

/// Recursively build a list of files in a directory with a given extension with an accumulator list
fn list_all_files(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> {
pub fn list_all_files(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> {
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(ext) {
Expand Down
16 changes: 10 additions & 6 deletions datafusion/src/datasource/local/parquet_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::datasource::datasource2::SourceDescBuilder;
use crate::datasource::datasource2::{SourceDescBuilder, PartitionedFile};
use crate::datasource::datasource2::SourceDescriptor;
use crate::error::Result;
use std::fs::File;
Expand All @@ -32,14 +32,18 @@ impl LocalParquetSource {
}
}

impl SourceDescBuilder<File> for LocalParquetSource {
fn reader_for_file_meta(file_path: &str) -> Result<File> {
Ok(File::open(file_path)?)
}

impl SourceDescBuilder for LocalParquetSource {
fn get_all_files(root_path: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
super::list_all_files(root_path, &mut filenames, ".parquet");
Ok(filenames)
}

fn get_file_meta(file_path: &str) -> Result<PartitionedFile> {
todo!()
}

fn reader_for_file_meta(file_path: &str) -> Result<File> {
Ok(File::open(file_path)?)
}
}
12 changes: 6 additions & 6 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ use super::datasource::TableProviderFilterPushDown;
use super::datasource2::DataSource2;

/// Table-based representation of a `ParquetFile`.
pub struct ParquetTable<R: ChunkReader + 'static> {
source: Arc<Box<dyn DataSource2<R>>>,
pub struct ParquetTable {
source: Arc<Box<dyn DataSource2>>,
max_concurrency: usize,
enable_pruning: bool,
}

impl<R: ChunkReader + 'static> ParquetTable<R> {
impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(
source: Arc<dyn DataSource2<R>>,
source: Arc<dyn DataSource2>,
max_concurrency: usize,
) -> Result<Self> {
Ok(Self {
source: Box::new(source),
source,
max_concurrency,
enable_pruning: true,
})
Expand All @@ -65,7 +65,7 @@ impl<R: ChunkReader + 'static> ParquetTable<R> {
}
}

impl<R: ChunkReader + 'static> TableProvider for ParquetTable<R> {
impl TableProvider for ParquetTable {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down
40 changes: 25 additions & 15 deletions datafusion/src/datasource/protocol_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,41 @@ use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use super::datasource2::DataSource2;
use crate::parquet::file::reader::ChunkReader;
use crate::error::{DataFusionError, Result};
use std::any::Any;
use std::fs::File;

pub trait ProtocolHander<R: ChunkReader + 'static>: Sync + Send {
pub trait ProtocolHandler: Sync + Send {
/// Returns the protocol handler as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

fn source(&self,

) -> Result<Arc<dyn DataSource2>>;
fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>>;

fn list_all_files(&self, root_path: &str) -> Result<Vec<String>>;

fn get_reader(&self, file_path: &str) -> Result<R>;
fn get_reader(&self, file_path: &str) -> Result<dyn ChunkReader>;
}

struct LocalFSHander {

}
pub struct LocalFSHandler;

impl ProtocolHander for LocalFSHander {
fn as_any(&self) -> &dyn Any {
return self;
}

fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
crate::datasource::local::list_all_files(root_path, &mut filenames, ext);
Ok(filenames)
}

fn get_reader(&self, file_path: &str) -> Result<R> {
Ok(File::open(file_path)?)
}
}

pub struct ProtocolRegistry {
pub protocol_handlers: RwLock<HashMap<String, Arc<dyn ProtocolHander>>>,
pub protocol_handlers: RwLock<HashMap<String, Arc<dyn ProtocolHandler>>>,
}

impl ProtocolRegistry {
Expand All @@ -60,12 +70,12 @@ impl ProtocolRegistry {
prefix: &str,
handler: Arc<dyn ProtocolHander>,
) -> Option<Arc<dyn ProtocolHander>> {
let mut handler = self.protocol_handlers.write().unwrap();
handler.insert(prefix.to_string(), handler)
let mut handlers = self.protocol_handlers.write().unwrap();
handlers.insert(prefix.to_string(), handler)
}

pub fn handler(&self, prefix: &str) -> Option<Arc<dyn ProtocolHander>> {
let handler = self.protocol_handlers.read().unwrap();
handler.get(prefix).cloned()
let handlers = self.protocol_handlers.read().unwrap();
handlers.get(prefix).cloned()
}
}
7 changes: 4 additions & 3 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ use crate::catalog::{
};
use crate::datasource::csv::CsvFile;
use crate::datasource::parquet::ParquetTable;
use crate::datasource::protocol_registry::ProtocolRegistry;
use crate::datasource::protocol_registry::LocalFSHandler;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::execution::dataframe_impl::DataFrameImpl;
Expand Down Expand Up @@ -156,8 +158,9 @@ impl ExecutionContext {
.register_catalog(config.default_catalog.clone(), default_catalog);
}

// register local handler to enable read file from localFS
let protocol_registry = ProtocolRegistry::new();
protocol_registry.register_handler("file", )
protocol_registry.register_handler("file", Arc::new(LocalFSHandler{}));

Self {
state: Arc::new(Mutex::new(ExecutionContextState {
Expand Down Expand Up @@ -367,8 +370,6 @@ impl ExecutionContext {
prefix: &str,
handler: Arc<dyn ProtocolHander>,
) -> Option<Arc<dyn ProtocolHander>> {
let prefix = prefix.to_string();

self.state
.lock()
.unwrap()
Expand Down
15 changes: 8 additions & 7 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};

use super::SQLMetric;
use crate::datasource::datasource2::PartitionedFile;

/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec<R: ChunkReader + 'static> {
pub struct ParquetExec {
/// Parquet partitions to read
partitions: Vec<ParquetPartition>,
/// Source used for get reader for partitions
source: Box<dyn DataSource2<R>>,
source: Box<dyn DataSource2>,
/// Schema after projection is applied
schema: SchemaRef,
/// Projection for which columns to load
Expand Down Expand Up @@ -122,7 +123,7 @@ struct ParquetPartitionMetrics {
pub row_groups_pruned: Arc<SQLMetric>,
}

impl <R: ChunkReader + 'static> ParquetExec<R> {
impl ParquetExec {
/// Create a new Parquet reader execution plan based on the specified Parquet filename or
/// directory containing Parquet files
pub fn try_from_path(
Expand Down Expand Up @@ -158,7 +159,7 @@ impl <R: ChunkReader + 'static> ParquetExec<R> {
}

pub fn try_new(
source: Box<dyn DataSource2<R>>,
source: Box<dyn DataSource2>,
source_desc: SourceDescriptor,
projection: Option<Vec<usize>>,
predicate: Option<Expr>,
Expand Down Expand Up @@ -251,7 +252,7 @@ impl <R: ChunkReader + 'static> ParquetExec<R> {
/// Create a new Parquet reader execution plan with provided partitions and schema
pub fn new(
partitions: Vec<ParquetPartition>,
source: Box<dyn DataSource2<R>>,
source: Box<dyn DataSource2>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
statistics: Statistics,
Expand Down Expand Up @@ -611,8 +612,8 @@ fn build_row_group_predicate(
}
}

fn read_files<R: ChunkReader + 'static>(
source: Arc<dyn DataSource2<R>>,
fn read_files(
source: Arc<dyn DataSource2>,
partition: ParquetPartition,
metrics: ParquetPartitionMetrics,
projection: &[usize],
Expand Down

0 comments on commit 710977b

Please sign in to comment.