Skip to content

Commit

Permalink
WIP: extend source to enable read from remote storage
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen committed Aug 2, 2021
1 parent 5416341 commit 79c3242
Show file tree
Hide file tree
Showing 9 changed files with 455 additions and 189 deletions.
163 changes: 163 additions & 0 deletions datafusion/src/datasource/datasource2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::datasource::datasource::{ColumnStatistics, Statistics};
use crate::error::{DataFusionError, Result};
use crate::scalar::ScalarValue;
use arrow::datatypes::{Schema, SchemaRef};

use parquet::arrow::ArrowReader;
use parquet::arrow::ParquetFileArrowReader;
use parquet::file::reader::ChunkReader;
use parquet::file::serialized_reader::SerializedFileReader;
use std::sync::Arc;

#[derive(Debug, Clone)]
pub struct PartitionedFile {
pub file_path: String,
pub schema: Schema,
pub statistics: Statistics,
pub partition_value: Option<ScalarValue>,
pub partition_schema: Option<Schema>,
// We may include row group range here for a more fine-grained parallel execution
}

#[derive(Debug, Clone)]
pub struct FilePartition {
pub index: usize,
pub files: Vec<PartitionedFile>,
}

#[derive(Debug, Clone)]
pub struct SourceDescriptor {
pub partition_files: Vec<PartitionedFile>,
pub schema: SchemaRef,
}

pub trait DataSource2: Send + Sync {
fn list_partitions(&self, max_concurrency: usize) -> Result<Arc<FilePartition>>;

fn schema(&self) -> Result<Arc<Schema>>;

fn get_read_for_file(
&self,
partitioned_file: PartitionedFile,
) -> Result<dyn ChunkReader>;

fn statistics(&self) -> &Statistics;
}

pub trait SourceDescBuilder {
fn get_source_desc(root_path: &str) -> Result<SourceDescriptor> {
let filenames = Self::get_all_files(root_path)?;
if filenames.is_empty() {
return Err(DataFusionError::Plan(format!(
"No Parquet files (with .parquet extension) found at path {}",
root_path
)));
}

// build a list of Parquet partitions with statistics and gather all unique schemas
// used in this data set
let mut schemas: Vec<Schema> = vec![];

let partitioned_files = filenames
.iter()
.map(|file_path| {
let pf = Self::get_file_meta(file_path)?;
let schema = pf.schema.clone();
if schemas.is_empty() {
schemas.push(schema);
} else if schema != schemas[0] {
// we currently get the schema information from the first file rather than do
// schema merging and this is a limitation.
// See https://issues.apache.org/jira/browse/ARROW-11017
return Err(DataFusionError::Plan(format!(
"The file {} have different schema from the first file and DataFusion does \
not yet support schema merging",
file_path
)));
}
Ok(pf)
}).collect::<Result<Vec<PartitionedFile>>>();

Ok(SourceDescriptor {
partition_files: partitioned_files?,
schema: Arc::new(schemas.pop().unwrap()),
})
}

fn get_all_files(root_path: &str) -> Result<Vec<String>>;

fn get_file_meta(file_path: &str) -> Result<PartitionedFile>;

fn reader_for_file_meta(file_path: &str) -> Result<dyn ChunkReader>;
}

pub trait ParquetSourceDescBuilder: SourceDescBuilder {
fn get_file_meta(file_path: &str) -> Result<PartitionedFile> {
let chunk_reader = Self::reader_for_file_meta(file_path)?;
let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let file_path = file_path.to_string();
let schema = arrow_reader.get_schema()?;
let num_fields = schema.fields().len();
let meta_data = arrow_reader.get_metadata();

let mut num_rows = 0;
let mut total_byte_size = 0;
let mut null_counts = vec![0; num_fields];

for row_group_meta in meta_data.row_groups() {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();

let columns_null_counts = row_group_meta
.columns()
.iter()
.flat_map(|c| c.statistics().map(|stats| stats.null_count()));

for (i, cnt) in columns_null_counts.enumerate() {
null_counts[i] += cnt
}
}

let column_stats = null_counts
.iter()
.map(|null_count| ColumnStatistics {
null_count: Some(*null_count as usize),
max_value: None,
min_value: None,
distinct_count: None,
})
.collect();

let statistics = Statistics {
num_rows: Some(num_rows as usize),
total_byte_size: Some(total_byte_size as usize),
column_statistics: Some(column_stats),
};

Ok(PartitionedFile {
file_path: file_path.clone(),
schema,
statistics,
partition_value: None,
partition_schema: None,
})
}
}
49 changes: 49 additions & 0 deletions datafusion/src/datasource/local/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod parquet_source;

use super::datasource2::DataSource2;
use crate::error::DataFusionError;
use crate::error::Result;
use std::fs;
use std::fs::metadata;

/// Recursively build a list of files in a directory with a given extension with an accumulator list
pub fn list_all_files(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> {
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(ext) {
filenames.push(dir.to_string());
}
} else {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
list_all_files(path_name, filenames, ext)?;
} else if path_name.ends_with(ext) {
filenames.push(path_name.to_string());
}
} else {
return Err(DataFusionError::Plan("Invalid path".to_string()));
}
}
}
Ok(())
}
49 changes: 49 additions & 0 deletions datafusion/src/datasource/local/parquet_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::datasource::datasource2::SourceDescriptor;
use crate::datasource::datasource2::{PartitionedFile, SourceDescBuilder};
use crate::error::Result;
use std::fs::File;

struct LocalParquetSource {
desc: SourceDescriptor,
}

impl LocalParquetSource {
pub fn try_new(root_path: &str) -> Result<Self> {
Ok(Self {
desc: Self::get_source_desc(root_path)?,
})
}
}

impl SourceDescBuilder for LocalParquetSource {
fn get_all_files(root_path: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
super::list_all_files(root_path, &mut filenames, ".parquet");
Ok(filenames)
}

fn get_file_meta(file_path: &str) -> Result<PartitionedFile> {
todo!()
}

fn reader_for_file_meta(file_path: &str) -> Result<File> {
Ok(File::open(file_path)?)
}
}
3 changes: 3 additions & 0 deletions datafusion/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
pub mod csv;
pub mod datasource;
pub mod datasource2;
pub mod empty;
pub mod json;
pub mod local;
pub mod memory;
pub mod parquet;
pub mod protocol_registry;

pub use self::csv::{CsvFile, CsvReadOptions};
pub use self::datasource::{TableProvider, TableType};
Expand Down
25 changes: 7 additions & 18 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
//! Parquet data source
use std::any::Any;
use std::string::String;
use std::sync::Arc;

use arrow::datatypes::*;
use parquet::file::reader::ChunkReader;

use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
Expand All @@ -31,36 +31,25 @@ use crate::physical_plan::parquet::ParquetExec;
use crate::physical_plan::ExecutionPlan;

use super::datasource::TableProviderFilterPushDown;
use super::datasource2::DataSource2;

/// Table-based representation of a `ParquetFile`.
pub struct ParquetTable {
path: String,
schema: SchemaRef,
statistics: Statistics,
source: Arc<Box<dyn DataSource2>>,
max_concurrency: usize,
enable_pruning: bool,
}

impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(path: impl Into<String>, max_concurrency: usize) -> Result<Self> {
let path = path.into();
let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?;
let schema = parquet_exec.schema();
pub fn try_new(source: Arc<dyn DataSource2>, max_concurrency: usize) -> Result<Self> {
Ok(Self {
path,
schema,
statistics: parquet_exec.statistics().to_owned(),
source,
max_concurrency,
enable_pruning: true,
})
}

/// Get the path for the Parquet file(s) represented by this ParquetTable instance
pub fn path(&self) -> &str {
&self.path
}

/// Get parquet pruning option
pub fn get_enable_pruning(&self) -> bool {
self.enable_pruning
Expand All @@ -80,7 +69,7 @@ impl TableProvider for ParquetTable {

/// Get the schema for this parquet file.
fn schema(&self) -> SchemaRef {
self.schema.clone()
self.source.schema().unwrap()
}

fn supports_filter_pushdown(
Expand Down Expand Up @@ -120,7 +109,7 @@ impl TableProvider for ParquetTable {
}

fn statistics(&self) -> Statistics {
self.statistics.clone()
self.source.statistics().clone()
}

fn has_exact_statistics(&self) -> bool {
Expand Down
Loading

0 comments on commit 79c3242

Please sign in to comment.