diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index 0571d30a6a95..b6afd22b245b 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -17,7 +17,7 @@ //! Parquet data source -use std::any::Any; +use std::any::{Any, type_name}; use std::fs::File; use std::sync::Arc; @@ -35,7 +35,7 @@ use crate::datasource::{ create_max_min_accs, get_col_stats, get_statistics_with_limit, FileAndSchema, PartitionedFile, TableDescriptor, TableDescriptorBuilder, TableProvider, }; -use crate::error::Result; +use crate::error::{DataFusionError, Result}; use crate::logical_plan::{combine_filters, Expr}; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::parquet::ParquetExec; @@ -221,7 +221,12 @@ impl ParquetTableDescriptor { if let DataType::$DT = fields[i].data_type() { let stats = stats .as_any() - .downcast_ref::>()?; + .downcast_ref::>().ok_or_else(|| { + DataFusionError::Internal(format!( + "Failed to cast stats to {} stats", + type_name::<$PRIMITIVE_TYPE>() + )) + })?; if let Some(max_value) = &mut max_values[i] { if let Some(v) = stats.max_value { match max_value.update(&[ScalarValue::$DT(Some(v))]) { @@ -250,7 +255,9 @@ impl ParquetTableDescriptor { PhysicalType::Boolean => { if let DataType::Boolean = fields[i].data_type() { let stats = - stats.as_any().downcast_ref::()?; + stats.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Internal("Failed to cast stats to boolean stats".to_owned()) + })?; if let Some(max_value) = &mut max_values[i] { if let Some(v) = stats.max_value { match max_value.update(&[ScalarValue::Boolean(Some(v))]) { @@ -290,11 +297,13 @@ impl ParquetTableDescriptor { PhysicalType::ByteArray => { if let DataType::Utf8 = fields[i].data_type() { let stats = - stats.as_any().downcast_ref::()?; + stats.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Internal("Failed to cast stats to binary stats".to_owned()) + })?; if let Some(max_value) = &mut max_values[i] { if let Some(v) = stats.max_value { match max_value.update(&[ScalarValue::Utf8( - std::str::from_utf8(v).map(|s| s.to_string()).ok(), + std::str::from_utf8(&*v).map(|s| s.to_string()).ok(), )]) { Ok(_) => {} Err(_) => { @@ -306,7 +315,7 @@ impl ParquetTableDescriptor { if let Some(min_value) = &mut min_values[i] { if let Some(v) = stats.min_value { match min_value.update(&[ScalarValue::Utf8( - std::str::from_utf8(v).map(|s| s.to_string()).ok(), + std::str::from_utf8(&*v).map(|s| s.to_string()).ok(), )]) { Ok(_) => {} Err(_) => { @@ -341,7 +350,7 @@ impl TableDescriptorBuilder for ParquetTableDescriptor { let (mut max_values, mut min_values) = create_max_min_accs(&schema); - for row_group_meta in meta_data.row_groups() { + for row_group_meta in meta_data.row_groups { num_rows += row_group_meta.num_rows(); total_byte_size += row_group_meta.total_byte_size(); @@ -386,7 +395,7 @@ impl TableDescriptorBuilder for ParquetTableDescriptor { }; Ok(FileAndSchema { - file: PartitionedFile { path, statistics }, + file: PartitionedFile { path: path.to_owned(), statistics }, schema, }) } diff --git a/datafusion/src/error.rs b/datafusion/src/error.rs index a229198e9dae..b5676669df00 100644 --- a/datafusion/src/error.rs +++ b/datafusion/src/error.rs @@ -23,6 +23,7 @@ use std::io; use std::result; use arrow::error::ArrowError; +use parquet::error::ParquetError; use sqlparser::parser::ParserError; /// Result type for operations that could result in an [DataFusionError] @@ -34,6 +35,8 @@ pub type Result = result::Result; pub enum DataFusionError { /// Error returned by arrow. ArrowError(ArrowError), + /// Wraps an error from the Parquet crate + ParquetError(ParquetError), /// Error associated to I/O operations and associated traits. IoError(io::Error), /// Error returned when SQL is syntactically incorrect. @@ -74,6 +77,12 @@ impl From for DataFusionError { } } +impl From for DataFusionError { + fn from(e: ParquetError) -> Self { + DataFusionError::ParquetError(e) + } +} + impl From for DataFusionError { fn from(e: ParserError) -> Self { DataFusionError::SQL(e) @@ -84,6 +93,9 @@ impl Display for DataFusionError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match *self { DataFusionError::ArrowError(ref desc) => write!(f, "Arrow error: {}", desc), + DataFusionError::ParquetError(ref desc) => { + write!(f, "Parquet error: {}", desc) + } DataFusionError::IoError(ref desc) => write!(f, "IO error: {}", desc), DataFusionError::SQL(ref desc) => { write!(f, "SQL error: {:?}", desc) diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 282886587643..55a69ed0dc0d 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -19,7 +19,7 @@ use std::sync::{Arc, Mutex}; -use arrow::io::print +use arrow::io::print; use arrow::record_batch::RecordBatch; use crate::error::Result; use crate::execution::context::{ExecutionContext, ExecutionContextState}; @@ -160,13 +160,13 @@ impl DataFrame for DataFrameImpl { /// Print results. async fn show(&self) -> Result<()> { let results = self.collect().await?; - Ok(print::print(&results)?) + Ok(print::print(&results)) } /// Print results and limit rows. async fn show_limit(&self, num: usize) -> Result<()> { let results = self.limit(num)?.collect().await?; - Ok(print::print(&results)?) + Ok(print::print(&results)) } /// Convert the logical plan represented by this DataFrame into a physical plan and diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 77f37dbb74b7..cb81b8d852fb 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -31,18 +31,6 @@ use std::{ sync::Arc, }; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - -use crate::datasource::TableProvider; -use crate::sql::parser::FileType; - -use super::extension::UserDefinedLogicalNode; -use super::{ - display::{GraphvizVisitor, IndentVisitor}, - Column, -}; -use crate::logical_plan::dfschema::DFSchemaRef; - /// Join type #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum JoinType { diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs index 8a2b164fea25..97b6603e08fd 100644 --- a/datafusion/src/optimizer/constant_folding.rs +++ b/datafusion/src/optimizer/constant_folding.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use arrow::compute::cast; -use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use arrow::datatypes::DataType; use arrow::temporal_conversions::utf8_to_timestamp_ns_scalar; @@ -32,7 +31,6 @@ use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; use crate::physical_plan::functions::BuiltinScalarFunction; use crate::scalar::ScalarValue; -use arrow::compute::{kernels, DEFAULT_CAST_OPTIONS}; /// Optimizer that simplifies comparison expressions involving boolean literals. /// diff --git a/datafusion/src/physical_plan/analyze.rs b/datafusion/src/physical_plan/analyze.rs index d0125579ace2..7ebf0160b128 100644 --- a/datafusion/src/physical_plan/analyze.rs +++ b/datafusion/src/physical_plan/analyze.rs @@ -25,11 +25,12 @@ use crate::{ physical_plan::{display::DisplayableExecutionPlan, Partitioning}, physical_plan::{DisplayFormatType, ExecutionPlan}, }; -use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use futures::StreamExt; use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream}; use async_trait::async_trait; +use arrow::array::MutableUtf8Array; /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input, /// discards the results, and then prints out an annotated plan with metrics @@ -149,43 +150,41 @@ impl ExecutionPlan for AnalyzeExec { } let end = Instant::now(); - let mut type_builder = StringBuilder::new(1); - let mut plan_builder = StringBuilder::new(1); + let mut type_builder: MutableUtf8Array = MutableUtf8Array::new(); + let mut plan_builder: MutableUtf8Array = MutableUtf8Array::new(); // TODO use some sort of enum rather than strings? - type_builder.append_value("Plan with Metrics").unwrap(); + type_builder.push(Some("Plan with Metrics")); let annotated_plan = DisplayableExecutionPlan::with_metrics(captured_input.as_ref()) .indent() .to_string(); - plan_builder.append_value(annotated_plan).unwrap(); + plan_builder.push(Some(annotated_plan)); // Verbose output // TODO make this more sophisticated if verbose { - type_builder.append_value("Plan with Full Metrics").unwrap(); + type_builder.push(Some("Plan with Full Metrics")); let annotated_plan = DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref()) .indent() .to_string(); - plan_builder.append_value(annotated_plan).unwrap(); + plan_builder.push(Some(annotated_plan)); - type_builder.append_value("Output Rows").unwrap(); - plan_builder.append_value(total_rows.to_string()).unwrap(); + type_builder.push(Some("Output Rows")); + plan_builder.push(Some(total_rows.to_string())); - type_builder.append_value("Duration").unwrap(); - plan_builder - .append_value(format!("{:?}", end - start)) - .unwrap(); + type_builder.push(Some("Duration")); + plan_builder.push(Some(format!("{:?}", end - start))); } let maybe_batch = RecordBatch::try_new( captured_schema, vec![ - Arc::new(type_builder.finish()), - Arc::new(plan_builder.finish()), + type_builder.into_arc(), + plan_builder.into_arc(), ], ); // again ignore error diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs index edc3b45a5c4d..298ce960fc7a 100644 --- a/datafusion/src/physical_plan/datetime_expressions.rs +++ b/datafusion/src/physical_plan/datetime_expressions.rs @@ -27,8 +27,7 @@ use arrow::{ array::*, compute::cast, datatypes::{ - ArrowPrimitiveType, DataType, TimeUnit, TimestampMicrosecondType, - TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, + DataType, TimeUnit, }, temporal_conversions::utf8_to_timestamp_ns_scalar, types::NativeType, diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs index 058b33aa5efd..59bafe134935 100644 --- a/datafusion/src/physical_plan/expressions/binary.rs +++ b/datafusion/src/physical_plan/expressions/binary.rs @@ -776,25 +776,6 @@ mod tests { Ok(()) } - #[test] - fn modulus_op() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - ])); - let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048])); - let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32])); - - apply_arithmetic::( - schema, - vec![a, b], - Operator::Modulo, - Int32Array::from(vec![0, 0, 2, 8, 0]), - )?; - - Ok(()) - } - fn apply_arithmetic( schema: Arc, data: Vec>, diff --git a/datafusion/src/physical_plan/expressions/in_list.rs b/datafusion/src/physical_plan/expressions/in_list.rs index 3dab8d03d171..f7a18be412e4 100644 --- a/datafusion/src/physical_plan/expressions/in_list.rs +++ b/datafusion/src/physical_plan/expressions/in_list.rs @@ -41,6 +41,17 @@ macro_rules! compare_op_scalar { }}; } +// TODO: primitive array currently doesn't have `values_iter()`, it may +// worth adding one there, and this specialized case could be removed. +macro_rules! compare_primitive_op_scalar { + ($left: expr, $right:expr, $op:expr) => {{ + let validity = $left.validity(); + let values = + Bitmap::from_trusted_len_iter($left.values().iter().map(|x| $op(x, $right))); + Ok(BooleanArray::from_data(DataType::Boolean, values, validity)) + }}; +} + /// InList #[derive(Debug)] pub struct InListExpr { @@ -162,18 +173,18 @@ macro_rules! make_contains_primitive { // whether each value on the left (can be null) is contained in the non-null list fn in_list_primitive( array: &PrimitiveArray, - values: &[], + values: &[T], ) -> Result { - compare_op_scalar!(array, values, |x, v: &[]| v + compare_primitive_op_scalar!(array, values, |x, v| v .contains(&x)) } // whether each value on the left (can be null) is contained in the non-null list fn not_in_list_primitive( array: &PrimitiveArray, - values: &[], + values: &[T], ) -> Result { - compare_op_scalar!(array, values, |x, v: &[]| !v + compare_primitive_op_scalar!(array, values, |x, v| !v .contains(&x)) } diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index 34849c76ee80..924882f7e6af 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -116,7 +116,7 @@ fn combine_hashes(l: u64, r: u64) -> u64 { } macro_rules! hash_array { - ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { + ($array_type:ty, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); if array.null_count() == 0 { if $multi_col { diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index b0f44e83692c..e571e97beb4f 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -22,7 +22,7 @@ use self::metrics::MetricsSet; use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; -use crate::expressions::{PhysicalSortExpr, SortColumn}; +use crate::physical_plan::expressions::{PhysicalSortExpr, SortColumn}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index a68532b3819a..c061c491bec5 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -41,7 +41,7 @@ use arrow::{ record_batch::RecordBatch, }; use log::debug; -use parquet::file::reader::{FileReader, SerializedFileReader}; + use parquet::statistics::{ BinaryStatistics as ParquetBinaryStatistics, BooleanStatistics as ParquetBooleanStatistics, @@ -579,7 +579,7 @@ fn read_partition( ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics); let mut file = File::open(partitioned_file.path.as_str())?; let reader = read::RecordReader::try_new( - std::io::BufReader::new(file) + std::io::BufReader::new(file), Some(projection.to_vec()), limit, None, diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index e94f7872190e..345f99554337 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -1703,6 +1703,17 @@ mod tests { }}; } + macro_rules! make_temporal_test_case { + ($INPUT:expr, $ARRAY_TY:ident, $ARROW_TU:ident, $SCALAR_TY:ident) => {{ + TestCase { + array: Arc::new($ARRAY_TY::from($INPUT) + .to(DataType::Interval(IntervalUnit::$ARROW_TU)), + ), + scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(), + } + }}; + } + macro_rules! make_str_test_case { ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident) => {{ TestCase { @@ -1731,10 +1742,10 @@ mod tests { /// create a test case for DictionaryArray<$INDEX_TY> macro_rules! make_str_dict_test_case { - ($INPUT:expr, $INDEX_TY:ident, $SCALAR_TY:ident) => {{ + ($INPUT:expr, $INDEX_TY:ty, $SCALAR_TY:ident) => {{ TestCase { array: Arc::new( - DictionaryArray<$INDEX_TY>::from($INPUT), + DictionaryArray::<$INDEX_TY>::from($INPUT), ), scalars: $INPUT .iter() @@ -1760,14 +1771,14 @@ mod tests { make_str_test_case!(str_vals, LargeStringArray, LargeUtf8), make_binary_test_case!(str_vals, SmallBinaryArray, Binary), make_binary_test_case!(str_vals, LargeBinaryArray, LargeBinary), - make_date_test_case!(&i32_vals, Int32Array, Date32), - make_date_test_case!(&i64_vals, Int64Array, Date64), - make_ts_test_case!(&i64_vals, Int64Array, Second, TimestampSecond), - make_ts_test_case!(&i64_vals, Int64Array, Millisecond, TimestampMillisecond), - make_ts_test_case!(&i64_vals, Int64Array, Microsecond, TimestampMicrosecond), - make_ts_test_case!(&i64_vals, Int64Array, Nanosecond, TimestampNanosecond), - make_temporal_test_case!(i32_vals, Int32Array, IntervalYearMonth), - make_temporal_test_case!(days_ms_vals, DaysMsArray, IntervalDayTime), + make_date_test_case!(i32_vals, Int32Array, Date32), + make_date_test_case!(i64_vals, Int64Array, Date64), + make_ts_test_case!(i64_vals, Int64Array, Second, TimestampSecond), + make_ts_test_case!(i64_vals, Int64Array, Millisecond, TimestampMillisecond), + make_ts_test_case!(i64_vals, Int64Array, Microsecond, TimestampMicrosecond), + make_ts_test_case!(i64_vals, Int64Array, Nanosecond, TimestampNanosecond), + make_temporal_test_case!(i32_vals, Int32Array, YearMonth, IntervalYearMonth), + make_temporal_test_case!(days_ms_vals, DaysMsArray, DayTime, IntervalDayTime), make_str_dict_test_case!(str_vals, i8, Utf8), make_str_dict_test_case!(str_vals, i16, Utf8), make_str_dict_test_case!(str_vals, i32, Utf8),