Skip to content

Commit

Permalink
Day 1 (#1)
Browse files Browse the repository at this point in the history
* wip

* more
  • Loading branch information
yjshen authored Sep 14, 2021
1 parent 84676d5 commit b9f12d0
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 80 deletions.
27 changes: 18 additions & 9 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Parquet data source
use std::any::Any;
use std::any::{Any, type_name};
use std::fs::File;
use std::sync::Arc;

Expand All @@ -35,7 +35,7 @@ use crate::datasource::{
create_max_min_accs, get_col_stats, get_statistics_with_limit, FileAndSchema,
PartitionedFile, TableDescriptor, TableDescriptorBuilder, TableProvider,
};
use crate::error::Result;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{combine_filters, Expr};
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::parquet::ParquetExec;
Expand Down Expand Up @@ -221,7 +221,12 @@ impl ParquetTableDescriptor {
if let DataType::$DT = fields[i].data_type() {
let stats = stats
.as_any()
.downcast_ref::<ParquetPrimitiveStatistics<$PRIMITIVE_TYPE>>()?;
.downcast_ref::<ParquetPrimitiveStatistics<$PRIMITIVE_TYPE>>().ok_or_else(|| {
DataFusionError::Internal(format!(
"Failed to cast stats to {} stats",
type_name::<$PRIMITIVE_TYPE>()
))
})?;
if let Some(max_value) = &mut max_values[i] {
if let Some(v) = stats.max_value {
match max_value.update(&[ScalarValue::$DT(Some(v))]) {
Expand Down Expand Up @@ -250,7 +255,9 @@ impl ParquetTableDescriptor {
PhysicalType::Boolean => {
if let DataType::Boolean = fields[i].data_type() {
let stats =
stats.as_any().downcast_ref::<ParquetBooleanStatistics>()?;
stats.as_any().downcast_ref::<ParquetBooleanStatistics>().ok_or_else(|| {
DataFusionError::Internal("Failed to cast stats to boolean stats".to_owned())
})?;
if let Some(max_value) = &mut max_values[i] {
if let Some(v) = stats.max_value {
match max_value.update(&[ScalarValue::Boolean(Some(v))]) {
Expand Down Expand Up @@ -290,11 +297,13 @@ impl ParquetTableDescriptor {
PhysicalType::ByteArray => {
if let DataType::Utf8 = fields[i].data_type() {
let stats =
stats.as_any().downcast_ref::<ParquetBinaryStatistics>()?;
stats.as_any().downcast_ref::<ParquetBinaryStatistics>().ok_or_else(|| {
DataFusionError::Internal("Failed to cast stats to binary stats".to_owned())
})?;
if let Some(max_value) = &mut max_values[i] {
if let Some(v) = stats.max_value {
match max_value.update(&[ScalarValue::Utf8(
std::str::from_utf8(v).map(|s| s.to_string()).ok(),
std::str::from_utf8(&*v).map(|s| s.to_string()).ok(),
)]) {
Ok(_) => {}
Err(_) => {
Expand All @@ -306,7 +315,7 @@ impl ParquetTableDescriptor {
if let Some(min_value) = &mut min_values[i] {
if let Some(v) = stats.min_value {
match min_value.update(&[ScalarValue::Utf8(
std::str::from_utf8(v).map(|s| s.to_string()).ok(),
std::str::from_utf8(&*v).map(|s| s.to_string()).ok(),
)]) {
Ok(_) => {}
Err(_) => {
Expand Down Expand Up @@ -341,7 +350,7 @@ impl TableDescriptorBuilder for ParquetTableDescriptor {

let (mut max_values, mut min_values) = create_max_min_accs(&schema);

for row_group_meta in meta_data.row_groups() {
for row_group_meta in meta_data.row_groups {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();

Expand Down Expand Up @@ -386,7 +395,7 @@ impl TableDescriptorBuilder for ParquetTableDescriptor {
};

Ok(FileAndSchema {
file: PartitionedFile { path, statistics },
file: PartitionedFile { path: path.to_owned(), statistics },
schema,
})
}
Expand Down
12 changes: 12 additions & 0 deletions datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::io;
use std::result;

use arrow::error::ArrowError;
use parquet::error::ParquetError;
use sqlparser::parser::ParserError;

/// Result type for operations that could result in an [DataFusionError]
Expand All @@ -34,6 +35,8 @@ pub type Result<T> = result::Result<T, DataFusionError>;
pub enum DataFusionError {
/// Error returned by arrow.
ArrowError(ArrowError),
/// Wraps an error from the Parquet crate
ParquetError(ParquetError),
/// Error associated to I/O operations and associated traits.
IoError(io::Error),
/// Error returned when SQL is syntactically incorrect.
Expand Down Expand Up @@ -74,6 +77,12 @@ impl From<ArrowError> for DataFusionError {
}
}

impl From<ParquetError> for DataFusionError {
fn from(e: ParquetError) -> Self {
DataFusionError::ParquetError(e)
}
}

impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
DataFusionError::SQL(e)
Expand All @@ -84,6 +93,9 @@ impl Display for DataFusionError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match *self {
DataFusionError::ArrowError(ref desc) => write!(f, "Arrow error: {}", desc),
DataFusionError::ParquetError(ref desc) => {
write!(f, "Parquet error: {}", desc)
}
DataFusionError::IoError(ref desc) => write!(f, "IO error: {}", desc),
DataFusionError::SQL(ref desc) => {
write!(f, "SQL error: {:?}", desc)
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::sync::{Arc, Mutex};

use arrow::io::print
use arrow::io::print;
use arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::execution::context::{ExecutionContext, ExecutionContextState};
Expand Down Expand Up @@ -160,13 +160,13 @@ impl DataFrame for DataFrameImpl {
/// Print results.
async fn show(&self) -> Result<()> {
let results = self.collect().await?;
Ok(print::print(&results)?)
Ok(print::print(&results))
}

/// Print results and limit rows.
async fn show_limit(&self, num: usize) -> Result<()> {
let results = self.limit(num)?.collect().await?;
Ok(print::print(&results)?)
Ok(print::print(&results))
}

/// Convert the logical plan represented by this DataFrame into a physical plan and
Expand Down
12 changes: 0 additions & 12 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ use std::{
sync::Arc,
};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use crate::datasource::TableProvider;
use crate::sql::parser::FileType;

use super::extension::UserDefinedLogicalNode;
use super::{
display::{GraphvizVisitor, IndentVisitor},
Column,
};
use crate::logical_plan::dfschema::DFSchemaRef;

/// Join type
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinType {
Expand Down
2 changes: 0 additions & 2 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use std::sync::Arc;

use arrow::compute::cast;
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::DataType;
use arrow::temporal_conversions::utf8_to_timestamp_ns_scalar;

Expand All @@ -32,7 +31,6 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::physical_plan::functions::BuiltinScalarFunction;
use crate::scalar::ScalarValue;
use arrow::compute::{kernels, DEFAULT_CAST_OPTIONS};

/// Optimizer that simplifies comparison expressions involving boolean literals.
///
Expand Down
29 changes: 14 additions & 15 deletions datafusion/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use crate::{
physical_plan::{display::DisplayableExecutionPlan, Partitioning},
physical_plan::{DisplayFormatType, ExecutionPlan},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;

use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream};
use async_trait::async_trait;
use arrow::array::MutableUtf8Array;

/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
/// discards the results, and then prints out an annotated plan with metrics
Expand Down Expand Up @@ -149,43 +150,41 @@ impl ExecutionPlan for AnalyzeExec {
}
let end = Instant::now();

let mut type_builder = StringBuilder::new(1);
let mut plan_builder = StringBuilder::new(1);
let mut type_builder: MutableUtf8Array<i32> = MutableUtf8Array::new();
let mut plan_builder: MutableUtf8Array<i32> = MutableUtf8Array::new();

// TODO use some sort of enum rather than strings?
type_builder.append_value("Plan with Metrics").unwrap();
type_builder.push(Some("Plan with Metrics"));

let annotated_plan =
DisplayableExecutionPlan::with_metrics(captured_input.as_ref())
.indent()
.to_string();
plan_builder.append_value(annotated_plan).unwrap();
plan_builder.push(Some(annotated_plan));

// Verbose output
// TODO make this more sophisticated
if verbose {
type_builder.append_value("Plan with Full Metrics").unwrap();
type_builder.push(Some("Plan with Full Metrics"));

let annotated_plan =
DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref())
.indent()
.to_string();
plan_builder.append_value(annotated_plan).unwrap();
plan_builder.push(Some(annotated_plan));

type_builder.append_value("Output Rows").unwrap();
plan_builder.append_value(total_rows.to_string()).unwrap();
type_builder.push(Some("Output Rows"));
plan_builder.push(Some(total_rows.to_string()));

type_builder.append_value("Duration").unwrap();
plan_builder
.append_value(format!("{:?}", end - start))
.unwrap();
type_builder.push(Some("Duration"));
plan_builder.push(Some(format!("{:?}", end - start)));
}

let maybe_batch = RecordBatch::try_new(
captured_schema,
vec![
Arc::new(type_builder.finish()),
Arc::new(plan_builder.finish()),
type_builder.into_arc(),
plan_builder.into_arc(),
],
);
// again ignore error
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use arrow::{
array::*,
compute::cast,
datatypes::{
ArrowPrimitiveType, DataType, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
DataType, TimeUnit,
},
temporal_conversions::utf8_to_timestamp_ns_scalar,
types::NativeType,
Expand Down
19 changes: 0 additions & 19 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,25 +776,6 @@ mod tests {
Ok(())
}

#[test]
fn modulus_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));

apply_arithmetic::<i32>(
schema,
vec![a, b],
Operator::Modulo,
Int32Array::from(vec![0, 0, 2, 8, 0]),
)?;

Ok(())
}

fn apply_arithmetic<T: NativeType>(
schema: Arc<Schema>,
data: Vec<Arc<dyn Array>>,
Expand Down
19 changes: 15 additions & 4 deletions datafusion/src/physical_plan/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ macro_rules! compare_op_scalar {
}};
}

// TODO: primitive array currently doesn't have `values_iter()`, it may
// worth adding one there, and this specialized case could be removed.
macro_rules! compare_primitive_op_scalar {
($left: expr, $right:expr, $op:expr) => {{
let validity = $left.validity();
let values =
Bitmap::from_trusted_len_iter($left.values().iter().map(|x| $op(x, $right)));
Ok(BooleanArray::from_data(DataType::Boolean, values, validity))
}};
}

/// InList
#[derive(Debug)]
pub struct InListExpr {
Expand Down Expand Up @@ -162,18 +173,18 @@ macro_rules! make_contains_primitive {
// whether each value on the left (can be null) is contained in the non-null list
fn in_list_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
values: &[<T as NativeType>],
values: &[T],
) -> Result<BooleanArray> {
compare_op_scalar!(array, values, |x, v: &[<T as NativeType>]| v
compare_primitive_op_scalar!(array, values, |x, v| v
.contains(&x))
}

// whether each value on the left (can be null) is contained in the non-null list
fn not_in_list_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
values: &[<T as NativeType>],
values: &[T],
) -> Result<BooleanArray> {
compare_op_scalar!(array, values, |x, v: &[<T as NativeType>]| !v
compare_primitive_op_scalar!(array, values, |x, v| !v
.contains(&x))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ fn combine_hashes(l: u64, r: u64) -> u64 {
}

macro_rules! hash_array {
($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
($array_type:ty, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
if array.null_count() == 0 {
if $multi_col {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
use crate::expressions::{PhysicalSortExpr, SortColumn};
use crate::physical_plan::expressions::{PhysicalSortExpr, SortColumn};
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use arrow::{
record_batch::RecordBatch,
};
use log::debug;
use parquet::file::reader::{FileReader, SerializedFileReader};

use parquet::statistics::{
BinaryStatistics as ParquetBinaryStatistics,
BooleanStatistics as ParquetBooleanStatistics,
Expand Down Expand Up @@ -579,7 +579,7 @@ fn read_partition(
ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics);
let mut file = File::open(partitioned_file.path.as_str())?;
let reader = read::RecordReader::try_new(
std::io::BufReader::new(file)
std::io::BufReader::new(file),
Some(projection.to_vec()),
limit,
None,
Expand Down
Loading

0 comments on commit b9f12d0

Please sign in to comment.