diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index ba3e68e4011f..65fb634bfe7b 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1361,9 +1361,15 @@ dependencies = [ name = "datafusion-physical-expr-common" version = "37.1.0" dependencies = [ + "ahash", "arrow", + "arrow-schema", "datafusion-common", "datafusion-expr", + "half", + "hashbrown 0.14.3", + "itertools", + "paste", ] [[package]] diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index d1202c83d526..a4a91c114529 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -36,6 +36,14 @@ name = "datafusion_physical_expr_common" path = "src/lib.rs" [dependencies] +ahash = { version = "0.8", default-features = false, features = [ + "runtime-rng", +] } arrow = { workspace = true } +arrow-schema = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +half = { workspace = true } +hashbrown = { version = "0.14", features = ["raw"] } +itertools = { workspace = true, features = ["use_std"] } +paste = "^1.0" diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr-common/src/expressions/binary.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/binary.rs rename to datafusion/physical-expr-common/src/expressions/binary.rs index 76154dca0338..48e108af4173 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr-common/src/expressions/binary.rs @@ -23,8 +23,8 @@ use std::{any::Any, sync::Arc}; use crate::expressions::datum::{apply, apply_cmp}; use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison}; use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::PhysicalExpr; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use arrow::array::*; use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene}; @@ -622,8 +622,13 @@ pub fn binary( #[cfg(test)] mod tests { use super::*; - use crate::expressions::{col, lit, try_cast, Literal}; - use datafusion_common::plan_datafusion_err; + use crate::expressions::column::col; + use crate::expressions::literal::{lit, Literal}; + use crate::expressions::try_cast::try_cast; + use arrow::datatypes::{ + ArrowNumericType, Decimal128Type, Field, Int32Type, SchemaRef, + }; + use datafusion_common::{plan_datafusion_err, Result}; use datafusion_expr::type_coercion::binary::get_input_types; /// Performs a binary operation, applying any type coercion necessary diff --git a/datafusion/physical-expr/src/expressions/binary/kernels.rs b/datafusion/physical-expr-common/src/expressions/binary/kernels.rs similarity index 100% rename from datafusion/physical-expr/src/expressions/binary/kernels.rs rename to datafusion/physical-expr-common/src/expressions/binary/kernels.rs diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr-common/src/expressions/case.rs similarity index 97% rename from datafusion/physical-expr/src/expressions/case.rs rename to datafusion/physical-expr-common/src/expressions/case.rs index 7b10df9ac146..890956a2816e 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr-common/src/expressions/case.rs @@ -19,9 +19,10 @@ use std::borrow::Cow; use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; -use crate::expressions::{try_cast, NoOp}; +use crate::expressions::no_op::NoOp; +use crate::expressions::try_cast::try_cast; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use arrow::array::*; use arrow::compute::kernels::cmp::eq; @@ -413,7 +414,11 @@ pub fn case( #[cfg(test)] mod tests { use super::*; - use crate::expressions::{binary, cast, col, lit}; + + use crate::expressions::binary::binary; + use crate::expressions::cast::cast; + use crate::expressions::column::col; + use crate::expressions::literal::{lit, Literal}; use arrow::buffer::Buffer; use arrow::datatypes::DataType::Float64; @@ -957,16 +962,15 @@ mod tests { let expr2 = expr .clone() .transform(|e| { - let transformed = - match e.as_any().downcast_ref::() { - Some(lit_value) => match lit_value.value() { - ScalarValue::Utf8(Some(str_value)) => { - Some(lit(str_value.to_uppercase())) - } - _ => None, - }, + let transformed = match e.as_any().downcast_ref::() { + Some(lit_value) => match lit_value.value() { + ScalarValue::Utf8(Some(str_value)) => { + Some(lit(str_value.to_uppercase())) + } _ => None, - }; + }, + _ => None, + }; Ok(if let Some(transformed) = transformed { Transformed::yes(transformed) } else { @@ -979,16 +983,15 @@ mod tests { let expr3 = expr .clone() .transform_down(|e| { - let transformed = - match e.as_any().downcast_ref::() { - Some(lit_value) => match lit_value.value() { - ScalarValue::Utf8(Some(str_value)) => { - Some(lit(str_value.to_uppercase())) - } - _ => None, - }, + let transformed = match e.as_any().downcast_ref::() { + Some(lit_value) => match lit_value.value() { + ScalarValue::Utf8(Some(str_value)) => { + Some(lit(str_value.to_uppercase())) + } _ => None, - }; + }, + _ => None, + }; Ok(if let Some(transformed) = transformed { Transformed::yes(transformed) } else { diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr-common/src/expressions/cast.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/cast.rs rename to datafusion/physical-expr-common/src/expressions/cast.rs index a3b32461e581..4788c4bb95c7 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr-common/src/expressions/cast.rs @@ -16,8 +16,8 @@ // under the License. use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::PhysicalExpr; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use std::any::Any; use std::fmt; use std::hash::{Hash, Hasher}; @@ -217,7 +217,7 @@ pub fn cast( #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::{ array::{ diff --git a/datafusion/physical-expr/src/expressions/datum.rs b/datafusion/physical-expr-common/src/expressions/datum.rs similarity index 96% rename from datafusion/physical-expr/src/expressions/datum.rs rename to datafusion/physical-expr-common/src/expressions/datum.rs index 2bb79922cfec..201bf104f20e 100644 --- a/datafusion/physical-expr/src/expressions/datum.rs +++ b/datafusion/physical-expr-common/src/expressions/datum.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::BooleanArray; use arrow::array::{ArrayRef, Datum}; use arrow::error::ArrowError; -use arrow_array::BooleanArray; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::ColumnarValue; use std::sync::Arc; @@ -25,7 +25,7 @@ use std::sync::Arc; /// Applies a binary [`Datum`] kernel `f` to `lhs` and `rhs` /// /// This maps arrow-rs' [`Datum`] kernels to DataFusion's [`ColumnarValue`] abstraction -pub(crate) fn apply( +pub fn apply( lhs: &ColumnarValue, rhs: &ColumnarValue, f: impl Fn(&dyn Datum, &dyn Datum) -> Result, @@ -49,7 +49,7 @@ pub(crate) fn apply( } /// Applies a binary [`Datum`] comparison kernel `f` to `lhs` and `rhs` -pub(crate) fn apply_cmp( +pub fn apply_cmp( lhs: &ColumnarValue, rhs: &ColumnarValue, f: impl Fn(&dyn Datum, &dyn Datum) -> Result, diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr-common/src/expressions/in_list.rs similarity index 96% rename from datafusion/physical-expr/src/expressions/in_list.rs rename to datafusion/physical-expr-common/src/expressions/in_list.rs index 9ae4c2784ccf..e1f2e1d5ff68 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr-common/src/expressions/in_list.rs @@ -22,17 +22,21 @@ use std::fmt::Debug; use std::hash::{Hash, Hasher}; use std::sync::Arc; +use crate::physical_expr::PhysicalExpr; use crate::physical_expr::{down_cast_any_ref, physical_exprs_bag_equal}; -use crate::PhysicalExpr; -use arrow::array::*; +use arrow::array::downcast_primitive_array; +use arrow::array::{ + as_largestring_array, downcast_array, downcast_dictionary_array, Array, + ArrayAccessor, ArrayData, ArrayIter, ArrayRef, BooleanArray, +}; use arrow::buffer::BooleanBuffer; use arrow::compute::kernels::boolean::{not, or_kleene}; use arrow::compute::kernels::cmp::eq; use arrow::compute::take; -use arrow::datatypes::*; +use arrow::datatypes::{i256, DataType, Schema}; +use arrow::record_batch::RecordBatch; use arrow::util::bit_iterator::BitIndexIterator; -use arrow::{downcast_dictionary_array, downcast_primitive_array}; use datafusion_common::cast::{ as_boolean_array, as_generic_binary_array, as_string_array, }; @@ -454,13 +458,29 @@ pub fn in_list( #[cfg(test)] mod tests { - use super::*; - use crate::expressions; - use crate::expressions::{col, lit, try_cast}; + + use crate::expressions::cast::cast; + use crate::expressions::column::col; + use crate::expressions::literal::lit; + use crate::expressions::try_cast::try_cast; + use arrow::array::BinaryArray; + use arrow::array::Date32Array; + use arrow::array::Date64Array; + use arrow::array::Decimal128Array; + use arrow::array::Float64Array; + use arrow::array::Int32Array; + use arrow::array::Int64Array; + use arrow::array::StringArray; + use arrow::array::TimestampMicrosecondArray; + use arrow::array::UInt16DictionaryArray; use datafusion_common::plan_err; + use datafusion_common::Result; use datafusion_expr::type_coercion::binary::comparison_coercion; + use arrow::datatypes::Field; + use arrow::datatypes::TimeUnit; + type InListCastResult = (Arc, Vec>); // Try to do the type coercion for list physical expr. @@ -1106,8 +1126,8 @@ mod tests { // list of phy expr let mut phy_exprs = vec![ lit(1i64), - expressions::cast(lit(2i32), &schema, DataType::Int64)?, - expressions::try_cast(lit(3.13f32), &schema, DataType::Int64)?, + cast(lit(2i32), &schema, DataType::Int64)?, + try_cast(lit(3.13f32), &schema, DataType::Int64)?, ]; let result = try_cast_static_filter_to_set(&phy_exprs, &schema).unwrap(); @@ -1117,8 +1137,8 @@ mod tests { try_cast_static_filter_to_set(&phy_exprs, &schema).unwrap(); // cast(cast(lit())), but the cast to the same data type, one case will be ignored - phy_exprs.push(expressions::cast( - expressions::cast(lit(2i32), &schema, DataType::Int64)?, + phy_exprs.push(cast( + cast(lit(2i32), &schema, DataType::Int64)?, &schema, DataType::Int64, )?); @@ -1127,15 +1147,15 @@ mod tests { phy_exprs.clear(); // case(cast(lit())), the cast to the diff data type - phy_exprs.push(expressions::cast( - expressions::cast(lit(2i32), &schema, DataType::Int64)?, + phy_exprs.push(cast( + cast(lit(2i32), &schema, DataType::Int64)?, &schema, DataType::Int32, )?); try_cast_static_filter_to_set(&phy_exprs, &schema).unwrap(); // column - phy_exprs.push(expressions::col("a", &schema)?); + phy_exprs.push(col("a", &schema)?); assert!(try_cast_static_filter_to_set(&phy_exprs, &schema).is_err()); Ok(()) diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr-common/src/expressions/is_not_null.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/is_not_null.rs rename to datafusion/physical-expr-common/src/expressions/is_not_null.rs index c5c673ec28ea..91c213f59c86 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr-common/src/expressions/is_not_null.rs @@ -21,7 +21,7 @@ use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use arrow::compute; use arrow::{ datatypes::{DataType, Schema}, @@ -115,7 +115,7 @@ pub fn is_not_null(arg: Arc) -> Result> #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::{ array::{BooleanArray, StringArray}, datatypes::*, diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr-common/src/expressions/is_null.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/is_null.rs rename to datafusion/physical-expr-common/src/expressions/is_null.rs index b0f70b6f0d7a..191642fc6070 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr-common/src/expressions/is_null.rs @@ -27,7 +27,7 @@ use arrow::{ }; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; @@ -116,7 +116,7 @@ pub fn is_null(arg: Arc) -> Result> { #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::{ array::{BooleanArray, StringArray}, datatypes::*, diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr-common/src/expressions/like.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/like.rs rename to datafusion/physical-expr-common/src/expressions/like.rs index 6e0beeb0beea..e76e0b9f01ef 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr-common/src/expressions/like.rs @@ -18,7 +18,7 @@ use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; -use crate::{physical_expr::down_cast_any_ref, PhysicalExpr}; +use crate::physical_expr::{down_cast_any_ref, PhysicalExpr}; use crate::expressions::datum::apply_cmp; use arrow::record_batch::RecordBatch; @@ -174,7 +174,7 @@ pub fn like( #[cfg(test)] mod test { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::array::*; use arrow_schema::Field; use datafusion_common::cast::as_boolean_array; diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr-common/src/expressions/literal.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/literal.rs rename to datafusion/physical-expr-common/src/expressions/literal.rs index 35ea80ea574d..bc3312d25755 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr-common/src/expressions/literal.rs @@ -22,8 +22,8 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::PhysicalExpr; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use arrow::{ datatypes::{DataType, Schema}, diff --git a/datafusion/physical-expr-common/src/expressions/mod.rs b/datafusion/physical-expr-common/src/expressions/mod.rs index d102422081dc..a6da353bf25f 100644 --- a/datafusion/physical-expr-common/src/expressions/mod.rs +++ b/datafusion/physical-expr-common/src/expressions/mod.rs @@ -15,4 +15,20 @@ // specific language governing permissions and limitations // under the License. +//! Defines physical expressions that can evaluated at runtime during query execution + +#[macro_use] +pub mod binary; +pub mod case; +pub mod cast; pub mod column; +pub mod datum; +pub mod in_list; +pub mod is_not_null; +pub mod is_null; +pub mod like; +pub mod literal; +pub mod negative; +pub mod no_op; +pub mod not; +pub mod try_cast; diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr-common/src/expressions/negative.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/negative.rs rename to datafusion/physical-expr-common/src/expressions/negative.rs index f6d4620c427f..7a59b621d8f0 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr-common/src/expressions/negative.rs @@ -22,8 +22,8 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::PhysicalExpr; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use arrow::{ compute::kernels::numeric::neg_wrapping, @@ -173,7 +173,7 @@ pub fn negative( #[cfg(test)] mod tests { use super::*; - use crate::expressions::{col, Column}; + use crate::expressions::column::{col, Column}; use arrow::array::*; use arrow::datatypes::*; diff --git a/datafusion/physical-expr/src/expressions/no_op.rs b/datafusion/physical-expr-common/src/expressions/no_op.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/no_op.rs rename to datafusion/physical-expr-common/src/expressions/no_op.rs index b558ccab154d..109e28d7c0f8 100644 --- a/datafusion/physical-expr/src/expressions/no_op.rs +++ b/datafusion/physical-expr-common/src/expressions/no_op.rs @@ -27,7 +27,7 @@ use arrow::{ }; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use datafusion_common::{internal_err, Result}; use datafusion_expr::ColumnarValue; diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr-common/src/expressions/not.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/not.rs rename to datafusion/physical-expr-common/src/expressions/not.rs index 1428be71cc21..b58ba5256e26 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr-common/src/expressions/not.rs @@ -23,7 +23,7 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::{cast::as_boolean_array, Result, ScalarValue}; @@ -123,7 +123,7 @@ pub fn not(arg: Arc) -> Result> { #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::{array::BooleanArray, datatypes::*}; #[test] diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr-common/src/expressions/try_cast.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/try_cast.rs rename to datafusion/physical-expr-common/src/expressions/try_cast.rs index d25a904f7d6a..a5b93f3a0f86 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr-common/src/expressions/try_cast.rs @@ -21,7 +21,7 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use arrow::compute; use arrow::compute::{cast_with_options, CastOptions}; use arrow::datatypes::{DataType, Schema}; @@ -148,7 +148,7 @@ pub fn try_cast( #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::array::{ Decimal128Array, Decimal128Builder, StringArray, Time64NanosecondArray, }; diff --git a/datafusion/physical-expr-common/src/functions.rs b/datafusion/physical-expr-common/src/functions.rs new file mode 100644 index 000000000000..46c0d466f7b1 --- /dev/null +++ b/datafusion/physical-expr-common/src/functions.rs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Neg; + +use datafusion_expr::FuncMonotonicity; + +use crate::sort_properties::SortProperties; + +/// Determines a [ScalarFunctionExpr]'s monotonicity for the given arguments +/// and the function's behavior depending on its arguments. +/// +/// [ScalarFunctionExpr]: crate::scalar_function::ScalarFunctionExpr +pub fn out_ordering( + func: &FuncMonotonicity, + arg_orderings: &[SortProperties], +) -> SortProperties { + func.iter().zip(arg_orderings).fold( + SortProperties::Singleton, + |prev_sort, (item, arg)| { + let current_sort = func_order_in_one_dimension(item, arg); + + match (prev_sort, current_sort) { + (_, SortProperties::Unordered) => SortProperties::Unordered, + (SortProperties::Singleton, SortProperties::Ordered(_)) => current_sort, + (SortProperties::Ordered(prev), SortProperties::Ordered(current)) + if prev.descending != current.descending => + { + SortProperties::Unordered + } + _ => prev_sort, + } + }, + ) +} + +/// This function decides the monotonicity property of a [ScalarFunctionExpr] for a single argument (i.e. across a single dimension), given that argument's sort properties. +/// +/// [ScalarFunctionExpr]: crate::scalar_function::ScalarFunctionExpr +fn func_order_in_one_dimension( + func_monotonicity: &Option, + arg: &SortProperties, +) -> SortProperties { + if *arg == SortProperties::Singleton { + SortProperties::Singleton + } else { + match func_monotonicity { + None => SortProperties::Unordered, + Some(false) => { + if let SortProperties::Ordered(_) = arg { + arg.neg() + } else { + SortProperties::Unordered + } + } + Some(true) => { + if let SortProperties::Ordered(_) = arg { + *arg + } else { + SortProperties::Unordered + } + } + } + } +} diff --git a/datafusion/physical-expr-common/src/intervals/cp_solver.rs b/datafusion/physical-expr-common/src/intervals/cp_solver.rs new file mode 100644 index 000000000000..28cf7c7b3974 --- /dev/null +++ b/datafusion/physical-expr-common/src/intervals/cp_solver.rs @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Constraint propagator/solver for custom PhysicalExpr graphs. + +use arrow::datatypes::DataType; +use datafusion_common::{internal_err, Result}; +use datafusion_expr::interval_arithmetic::{apply_operator, satisfy_greater, Interval}; +use datafusion_expr::Operator; + +use super::utils::{ + convert_duration_type_to_interval, convert_interval_type_to_duration, get_inverse_op, +}; + +/// This function refines intervals `left_child` and `right_child` by applying +/// constraint propagation through `parent` via operation. The main idea is +/// that we can shrink ranges of variables x and y using parent interval p. +/// +/// Assuming that x,y and p has ranges [xL, xU], [yL, yU], and [pL, pU], we +/// apply the following operations: +/// - For plus operation, specifically, we would first do +/// - [xL, xU] <- ([pL, pU] - [yL, yU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([pL, pU] - [xL, xU]) ∩ [yL, yU]. +/// - For minus operation, specifically, we would first do +/// - [xL, xU] <- ([yL, yU] + [pL, pU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([xL, xU] - [pL, pU]) ∩ [yL, yU]. +/// - For multiplication operation, specifically, we would first do +/// - [xL, xU] <- ([pL, pU] / [yL, yU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([pL, pU] / [xL, xU]) ∩ [yL, yU]. +/// - For division operation, specifically, we would first do +/// - [xL, xU] <- ([yL, yU] * [pL, pU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([xL, xU] / [pL, pU]) ∩ [yL, yU]. +pub fn propagate_arithmetic( + op: &Operator, + parent: &Interval, + left_child: &Interval, + right_child: &Interval, +) -> Result> { + let inverse_op = get_inverse_op(*op)?; + match (left_child.data_type(), right_child.data_type()) { + // If we have a child whose type is a time interval (i.e. DataType::Interval), + // we need special handling since timestamp differencing results in a + // Duration type. + (DataType::Timestamp(..), DataType::Interval(_)) => { + propagate_time_interval_at_right( + left_child, + right_child, + parent, + op, + &inverse_op, + ) + } + (DataType::Interval(_), DataType::Timestamp(..)) => { + propagate_time_interval_at_left( + left_child, + right_child, + parent, + op, + &inverse_op, + ) + } + _ => { + // First, propagate to the left: + match apply_operator(&inverse_op, parent, right_child)? + .intersect(left_child)? + { + // Left is feasible: + Some(value) => Ok( + // Propagate to the right using the new left. + propagate_right(&value, parent, right_child, op, &inverse_op)? + .map(|right| (value, right)), + ), + // If the left child is infeasible, short-circuit. + None => Ok(None), + } + } + } +} + +/// This function refines intervals `left_child` and `right_child` by applying +/// comparison propagation through `parent` via operation. The main idea is +/// that we can shrink ranges of variables x and y using parent interval p. +/// Two intervals can be ordered in 6 ways for a Gt `>` operator: +/// ```text +/// (1): Infeasible, short-circuit +/// left: | ================ | +/// right: | ======================== | +/// +/// (2): Update both interval +/// left: | ====================== | +/// right: | ====================== | +/// | +/// V +/// left: | ======= | +/// right: | ======= | +/// +/// (3): Update left interval +/// left: | ============================== | +/// right: | ========== | +/// | +/// V +/// left: | ===================== | +/// right: | ========== | +/// +/// (4): Update right interval +/// left: | ========== | +/// right: | =========================== | +/// | +/// V +/// left: | ========== | +/// right | ================== | +/// +/// (5): No change +/// left: | ============================ | +/// right: | =================== | +/// +/// (6): No change +/// left: | ==================== | +/// right: | =============== | +/// +/// -inf --------------------------------------------------------------- +inf +/// ``` +pub fn propagate_comparison( + op: &Operator, + parent: &Interval, + left_child: &Interval, + right_child: &Interval, +) -> Result> { + if parent == &Interval::CERTAINLY_TRUE { + match op { + Operator::Eq => left_child.intersect(right_child).map(|result| { + result.map(|intersection| (intersection.clone(), intersection)) + }), + Operator::Gt => satisfy_greater(left_child, right_child, true), + Operator::GtEq => satisfy_greater(left_child, right_child, false), + Operator::Lt => satisfy_greater(right_child, left_child, true) + .map(|t| t.map(reverse_tuple)), + Operator::LtEq => satisfy_greater(right_child, left_child, false) + .map(|t| t.map(reverse_tuple)), + _ => internal_err!( + "The operator must be a comparison operator to propagate intervals" + ), + } + } else if parent == &Interval::CERTAINLY_FALSE { + match op { + Operator::Eq => { + // TODO: Propagation is not possible until we support interval sets. + Ok(None) + } + Operator::Gt => satisfy_greater(right_child, left_child, false), + Operator::GtEq => satisfy_greater(right_child, left_child, true), + Operator::Lt => satisfy_greater(left_child, right_child, false) + .map(|t| t.map(reverse_tuple)), + Operator::LtEq => satisfy_greater(left_child, right_child, true) + .map(|t| t.map(reverse_tuple)), + _ => internal_err!( + "The operator must be a comparison operator to propagate intervals" + ), + } + } else { + // Uncertainty cannot change any end-point of the intervals. + Ok(None) + } +} + +/// During the propagation of [`Interval`] values on an ExprIntervalGraph, +/// if there exists a `timestamp - timestamp` operation, the result would be +/// of type `Duration`. However, we may encounter a situation where a time interval +/// is involved in an arithmetic operation with a `Duration` type. This function +/// offers special handling for such cases, where the time interval resides on +/// the right side of the operation. +fn propagate_time_interval_at_right( + left_child: &Interval, + right_child: &Interval, + parent: &Interval, + op: &Operator, + inverse_op: &Operator, +) -> Result> { + // We check if the child's time interval(s) has a non-zero month or day field(s). + // If so, we return it as is without propagating. Otherwise, we first convert + // the time intervals to the `Duration` type, then propagate, and then convert + // the bounds to time intervals again. + let result = if let Some(duration) = convert_interval_type_to_duration(right_child) { + match apply_operator(inverse_op, parent, &duration)?.intersect(left_child)? { + Some(value) => { + propagate_right(left_child, parent, &duration, op, inverse_op)? + .and_then(|right| convert_duration_type_to_interval(&right)) + .map(|right| (value, right)) + } + None => None, + } + } else { + apply_operator(inverse_op, parent, right_child)? + .intersect(left_child)? + .map(|value| (value, right_child.clone())) + }; + Ok(result) +} + +/// This is a subfunction of the `propagate_arithmetic` function that propagates to the right child. +fn propagate_right( + left: &Interval, + parent: &Interval, + right: &Interval, + op: &Operator, + inverse_op: &Operator, +) -> Result> { + match op { + Operator::Minus => apply_operator(op, left, parent), + Operator::Plus => apply_operator(inverse_op, parent, left), + Operator::Divide => apply_operator(op, left, parent), + Operator::Multiply => apply_operator(inverse_op, parent, left), + _ => internal_err!("Interval arithmetic does not support the operator {}", op), + }? + .intersect(right) +} + +/// During the propagation of [`Interval`] values on an ExprIntervalGraph, +/// if there exists a `timestamp - timestamp` operation, the result would be +/// of type `Duration`. However, we may encounter a situation where a time interval +/// is involved in an arithmetic operation with a `Duration` type. This function +/// offers special handling for such cases, where the time interval resides on +/// the left side of the operation. +fn propagate_time_interval_at_left( + left_child: &Interval, + right_child: &Interval, + parent: &Interval, + op: &Operator, + inverse_op: &Operator, +) -> Result> { + // We check if the child's time interval(s) has a non-zero month or day field(s). + // If so, we return it as is without propagating. Otherwise, we first convert + // the time intervals to the `Duration` type, then propagate, and then convert + // the bounds to time intervals again. + let result = if let Some(duration) = convert_interval_type_to_duration(left_child) { + match apply_operator(inverse_op, parent, right_child)?.intersect(duration)? { + Some(value) => { + let left = convert_duration_type_to_interval(&value); + let right = propagate_right(&value, parent, right_child, op, inverse_op)?; + match (left, right) { + (Some(left), Some(right)) => Some((left, right)), + _ => None, + } + } + None => None, + } + } else { + propagate_right(left_child, parent, right_child, op, inverse_op)? + .map(|right| (left_child.clone(), right)) + }; + Ok(result) +} + +fn reverse_tuple((first, second): (T, U)) -> (U, T) { + (second, first) +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::TimeUnit; + use datafusion_common::ScalarValue; + + use super::*; + + #[test] + fn test_propagate_comparison() -> Result<()> { + // In the examples below: + // `left` is unbounded: [?, ?], + // `right` is known to be [1000,1000] + // so `left` < `right` results in no new knowledge of `right` but knowing that `left` is now < 1000:` [?, 999] + let left = Interval::make_unbounded(&DataType::Int64)?; + let right = Interval::make(Some(1000_i64), Some(1000_i64))?; + assert_eq!( + (Some(( + Interval::make(None, Some(999_i64))?, + Interval::make(Some(1000_i64), Some(1000_i64))?, + ))), + propagate_comparison( + &Operator::Lt, + &Interval::CERTAINLY_TRUE, + &left, + &right + )? + ); + + let left = + Interval::make_unbounded(&DataType::Timestamp(TimeUnit::Nanosecond, None))?; + let right = Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), None), + ScalarValue::TimestampNanosecond(Some(1000), None), + )?; + assert_eq!( + (Some(( + Interval::try_new( + ScalarValue::try_from(&DataType::Timestamp( + TimeUnit::Nanosecond, + None + )) + .unwrap(), + ScalarValue::TimestampNanosecond(Some(999), None), + )?, + Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), None), + ScalarValue::TimestampNanosecond(Some(1000), None), + )? + ))), + propagate_comparison( + &Operator::Lt, + &Interval::CERTAINLY_TRUE, + &left, + &right + )? + ); + + let left = Interval::make_unbounded(&DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+05:00".into()), + ))?; + let right = Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + )?; + assert_eq!( + (Some(( + Interval::try_new( + ScalarValue::try_from(&DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+05:00".into()), + )) + .unwrap(), + ScalarValue::TimestampNanosecond(Some(999), Some("+05:00".into())), + )?, + Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + )? + ))), + propagate_comparison( + &Operator::Lt, + &Interval::CERTAINLY_TRUE, + &left, + &right + )? + ); + + Ok(()) + } +} diff --git a/datafusion/physical-expr-common/src/intervals/mod.rs b/datafusion/physical-expr-common/src/intervals/mod.rs new file mode 100644 index 000000000000..7022bf2c42b9 --- /dev/null +++ b/datafusion/physical-expr-common/src/intervals/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Interval arithmetic and constraint propagation library + +pub mod cp_solver; +pub mod utils; diff --git a/datafusion/physical-expr-common/src/intervals/utils.rs b/datafusion/physical-expr-common/src/intervals/utils.rs new file mode 100644 index 000000000000..7e8fad259a96 --- /dev/null +++ b/datafusion/physical-expr-common/src/intervals/utils.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility functions for the interval arithmetic library + +use datafusion_common::{internal_datafusion_err, internal_err, Result, ScalarValue}; +use datafusion_expr::{interval_arithmetic::Interval, Operator}; + +const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000; +const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF; +const DT_MS_MASK: i64 = 0xFFFF_FFFF; + +// This function returns the inverse operator of the given operator. +pub fn get_inverse_op(op: Operator) -> Result { + match op { + Operator::Plus => Ok(Operator::Minus), + Operator::Minus => Ok(Operator::Plus), + Operator::Multiply => Ok(Operator::Divide), + Operator::Divide => Ok(Operator::Multiply), + _ => internal_err!("Interval arithmetic does not support the operator {}", op), + } +} + +/// Converts an [`Interval`] of time intervals to one of `Duration`s, if applicable. Otherwise, returns [`None`]. +pub fn convert_interval_type_to_duration(interval: &Interval) -> Option { + if let (Some(lower), Some(upper)) = ( + convert_interval_bound_to_duration(interval.lower()), + convert_interval_bound_to_duration(interval.upper()), + ) { + Interval::try_new(lower, upper).ok() + } else { + None + } +} + +/// Converts an [`ScalarValue`] containing a time interval to one containing a `Duration`, if applicable. Otherwise, returns [`None`]. +fn convert_interval_bound_to_duration( + interval_bound: &ScalarValue, +) -> Option { + match interval_bound { + ScalarValue::IntervalMonthDayNano(Some(mdn)) => interval_mdn_to_duration_ns(mdn) + .ok() + .map(|duration| ScalarValue::DurationNanosecond(Some(duration))), + ScalarValue::IntervalDayTime(Some(dt)) => interval_dt_to_duration_ms(dt) + .ok() + .map(|duration| ScalarValue::DurationMillisecond(Some(duration))), + _ => None, + } +} + +/// If both the month and day fields of [`ScalarValue::IntervalMonthDayNano`] are zero, this function returns the nanoseconds part. +/// Otherwise, it returns an error. +fn interval_mdn_to_duration_ns(mdn: &i128) -> Result { + let months = mdn >> 96; + let days = (mdn & MDN_DAY_MASK) >> 64; + let nanoseconds = mdn & MDN_NS_MASK; + + if months == 0 && days == 0 { + nanoseconds + .try_into() + .map_err(|_| internal_datafusion_err!("Resulting duration exceeds i64::MAX")) + } else { + internal_err!( + "The interval cannot have a non-zero month or day value for duration convertibility" + ) + } +} + +/// If the day field of the [`ScalarValue::IntervalDayTime`] is zero, this function returns the milliseconds part. +/// Otherwise, it returns an error. +fn interval_dt_to_duration_ms(dt: &i64) -> Result { + let days = dt >> 32; + let milliseconds = dt & DT_MS_MASK; + + if days == 0 { + Ok(milliseconds) + } else { + internal_err!( + "The interval cannot have a non-zero day value for duration convertibility" + ) + } +} + +/// Converts an [`Interval`] of `Duration`s to one of time intervals, if applicable. Otherwise, returns [`None`]. +pub fn convert_duration_type_to_interval(interval: &Interval) -> Option { + if let (Some(lower), Some(upper)) = ( + convert_duration_bound_to_interval(interval.lower()), + convert_duration_bound_to_interval(interval.upper()), + ) { + Interval::try_new(lower, upper).ok() + } else { + None + } +} + +/// Converts a [`ScalarValue`] containing a `Duration` to one containing a time interval, if applicable. Otherwise, returns [`None`]. +fn convert_duration_bound_to_interval( + interval_bound: &ScalarValue, +) -> Option { + match interval_bound { + ScalarValue::DurationNanosecond(Some(duration)) => { + Some(ScalarValue::new_interval_mdn(0, 0, *duration)) + } + ScalarValue::DurationMicrosecond(Some(duration)) => { + Some(ScalarValue::new_interval_mdn(0, 0, *duration * 1000)) + } + ScalarValue::DurationMillisecond(Some(duration)) => { + Some(ScalarValue::new_interval_dt(0, *duration as i32)) + } + ScalarValue::DurationSecond(Some(duration)) => { + Some(ScalarValue::new_interval_dt(0, *duration as i32 * 1000)) + } + _ => None, + } +} diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs index 53e3134a1b05..983218eba3c7 100644 --- a/datafusion/physical-expr-common/src/lib.rs +++ b/datafusion/physical-expr-common/src/lib.rs @@ -17,8 +17,12 @@ pub mod aggregate; pub mod expressions; +pub mod functions; +pub mod intervals; pub mod physical_expr; +pub mod scalar_function; pub mod sort_expr; pub mod sort_properties; pub mod tree_node; +pub mod udf; pub mod utils; diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index be6358e73c99..73d7de32183c 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -24,14 +24,38 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; + use datafusion_common::utils::DataPtr; -use datafusion_common::{internal_err, not_impl_err, Result}; +use datafusion_common::{ + exec_err, internal_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, +}; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::expr::{Alias, InList, ScalarFunction}; use datafusion_expr::interval_arithmetic::Interval; -use datafusion_expr::ColumnarValue; +use datafusion_expr::var_provider::{is_system_variables, VarType}; +use datafusion_expr::{ + Between, BinaryExpr, Cast, ColumnarValue, Expr, GetFieldAccess, GetIndexedField, + Like, Operator, ScalarFunctionDefinition, TryCast, +}; +use crate::expressions::binary::binary; +use crate::expressions::case; +use crate::expressions::cast::cast; +use crate::expressions::column::Column; +use crate::expressions::in_list::in_list; +use crate::expressions::is_not_null::is_not_null; +use crate::expressions::is_null::is_null; +use crate::expressions::like::like; +use crate::expressions::literal::{lit, Literal}; +use crate::expressions::negative::negative; +use crate::expressions::not::not; +use crate::expressions::try_cast::try_cast; use crate::sort_properties::SortProperties; +use crate::udf; use crate::utils::scatter; +use itertools::izip; + /// See [create_physical_expr](https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html) /// for examples of creating `PhysicalExpr` from `Expr` pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { @@ -209,3 +233,413 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { any } } + +/// Checks whether the given physical expression slices are equal in the sense +/// of bags (multi-sets), disregarding their orderings. +pub fn physical_exprs_bag_equal( + lhs: &[Arc], + rhs: &[Arc], +) -> bool { + // TODO: Once we can use `HashMap`s with `Arc`, this + // function should use a `HashMap` to reduce computational complexity. + if lhs.len() == rhs.len() { + let mut rhs_vec = rhs.to_vec(); + for expr in lhs { + if let Some(idx) = rhs_vec.iter().position(|e| expr.eq(e)) { + rhs_vec.swap_remove(idx); + } else { + return false; + } + } + true + } else { + false + } +} + +/// Checks whether the given physical expression slices are equal. +pub fn physical_exprs_equal( + lhs: &[Arc], + rhs: &[Arc], +) -> bool { + lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs)) +} + +/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1 +/// AS int)`. +/// +/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical +/// planning, and can be evaluated directly on a [RecordBatch]. They are +/// normally created from [Expr] by a [PhysicalPlanner] and can be created +/// directly using [create_physical_expr]. +/// +/// A Physical expression knows its type, nullability and how to evaluate itself. +/// +/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html +/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html +/// +/// # Example: Create `PhysicalExpr` from `Expr` +/// ``` +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::DFSchema; +/// # use datafusion_expr::{Expr, col, lit}; +/// # use datafusion_physical_expr_common::physical_expr::create_physical_expr; +/// # use datafusion_expr::execution_props::ExecutionProps; +/// // For a logical expression `a = 1`, we can create a physical expression +/// let expr = col("a").eq(lit(1)); +/// // To create a PhysicalExpr we need 1. a schema +/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); +/// let df_schema = DFSchema::try_from(schema).unwrap(); +/// // 2. ExecutionProps +/// let props = ExecutionProps::new(); +/// // We can now create a PhysicalExpr: +/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); +/// ``` +/// +/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue] +/// ``` +/// # use std::sync::Arc; +/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch}; +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::{assert_batches_eq, DFSchema}; +/// # use datafusion_expr::{Expr, col, lit, ColumnarValue}; +/// # use datafusion_physical_expr_common::physical_expr::create_physical_expr; +/// # use datafusion_expr::execution_props::ExecutionProps; +/// # let expr = col("a").eq(lit(1)); +/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); +/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap(); +/// # let props = ExecutionProps::new(); +/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this: +/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); +/// // Input of [1,2,3] +/// let input_batch = RecordBatch::try_from_iter(vec![ +/// ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _) +/// ]).unwrap(); +/// // The result is a ColumnarValue (either an Array or a Scalar) +/// let result = physical_expr.evaluate(&input_batch).unwrap(); +/// // In this case, a BooleanArray with the result of the comparison +/// let ColumnarValue::Array(arr) = result else { +/// panic!("Expected an array") +/// }; +/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false])); +/// ``` +/// +/// [ColumnarValue]: datafusion_expr::ColumnarValue +/// +/// Create a physical expression from a logical expression ([Expr]). +/// +/// # Arguments +/// +/// * `e` - The logical expression +/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references +/// to qualified or unqualified fields by name. +pub fn create_physical_expr( + e: &Expr, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result> { + let input_schema: &Schema = &input_dfschema.into(); + + match e { + Expr::Alias(Alias { expr, .. }) => { + Ok(create_physical_expr(expr, input_dfschema, execution_props)?) + } + Expr::Column(c) => { + let idx = input_dfschema.index_of_column(c)?; + Ok(Arc::new(Column::new(&c.name, idx))) + } + Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), + Expr::ScalarVariable(_, variable_names) => { + if is_system_variables(variable_names) { + match execution_props.get_var_provider(VarType::System) { + Some(provider) => { + let scalar_value = provider.get_value(variable_names.clone())?; + Ok(Arc::new(Literal::new(scalar_value))) + } + _ => plan_err!("No system variable provider found"), + } + } else { + match execution_props.get_var_provider(VarType::UserDefined) { + Some(provider) => { + let scalar_value = provider.get_value(variable_names.clone())?; + Ok(Arc::new(Literal::new(scalar_value))) + } + _ => plan_err!("No user defined variable provider found"), + } + } + } + Expr::IsTrue(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsNotDistinctFrom, + Expr::Literal(ScalarValue::Boolean(Some(true))), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::IsNotTrue(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsDistinctFrom, + Expr::Literal(ScalarValue::Boolean(Some(true))), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::IsFalse(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsNotDistinctFrom, + Expr::Literal(ScalarValue::Boolean(Some(false))), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::IsNotFalse(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsDistinctFrom, + Expr::Literal(ScalarValue::Boolean(Some(false))), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::IsUnknown(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsNotDistinctFrom, + Expr::Literal(ScalarValue::Boolean(None)), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::IsNotUnknown(expr) => { + let binary_op = datafusion_expr::binary_expr( + expr.as_ref().clone(), + Operator::IsDistinctFrom, + Expr::Literal(ScalarValue::Boolean(None)), + ); + create_physical_expr(&binary_op, input_dfschema, execution_props) + } + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { + // Create physical expressions for left and right operands + let lhs = create_physical_expr(left, input_dfschema, execution_props)?; + let rhs = create_physical_expr(right, input_dfschema, execution_props)?; + // Note that the logical planner is responsible + // for type coercion on the arguments (e.g. if one + // argument was originally Int32 and one was + // Int64 they will both be coerced to Int64). + // + // There should be no coercion during physical + // planning. + binary(lhs, *op, rhs, input_schema) + } + Expr::Like(Like { + negated, + expr, + pattern, + escape_char, + case_insensitive, + }) => { + if escape_char.is_some() { + return exec_err!("LIKE does not support escape_char"); + } + let physical_expr = + create_physical_expr(expr, input_dfschema, execution_props)?; + let physical_pattern = + create_physical_expr(pattern, input_dfschema, execution_props)?; + like( + *negated, + *case_insensitive, + physical_expr, + physical_pattern, + input_schema, + ) + } + Expr::Case(case) => { + let expr: Option> = if let Some(e) = &case.expr { + Some(create_physical_expr( + e.as_ref(), + input_dfschema, + execution_props, + )?) + } else { + None + }; + let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case + .when_then_expr + .iter() + .map(|(w, t)| (w.as_ref(), t.as_ref())) + .unzip(); + let when_expr = + create_physical_exprs(when_expr, input_dfschema, execution_props)?; + let then_expr = + create_physical_exprs(then_expr, input_dfschema, execution_props)?; + let when_then_expr: Vec<(Arc, Arc)> = + when_expr + .iter() + .zip(then_expr.iter()) + .map(|(w, t)| (w.clone(), t.clone())) + .collect(); + let else_expr: Option> = + if let Some(e) = &case.else_expr { + Some(create_physical_expr( + e.as_ref(), + input_dfschema, + execution_props, + )?) + } else { + None + }; + Ok(case::case(expr, when_then_expr, else_expr)?) + } + Expr::Cast(Cast { expr, data_type }) => cast( + create_physical_expr(expr, input_dfschema, execution_props)?, + input_schema, + data_type.clone(), + ), + Expr::TryCast(TryCast { expr, data_type }) => try_cast( + create_physical_expr(expr, input_dfschema, execution_props)?, + input_schema, + data_type.clone(), + ), + Expr::Not(expr) => { + not(create_physical_expr(expr, input_dfschema, execution_props)?) + } + Expr::Negative(expr) => negative( + create_physical_expr(expr, input_dfschema, execution_props)?, + input_schema, + ), + Expr::IsNull(expr) => { + is_null(create_physical_expr(expr, input_dfschema, execution_props)?) + } + Expr::IsNotNull(expr) => { + is_not_null(create_physical_expr(expr, input_dfschema, execution_props)?) + } + Expr::GetIndexedField(GetIndexedField { expr: _, field }) => match field { + GetFieldAccess::NamedStructField { name: _ } => { + internal_err!( + "NamedStructField should be rewritten in OperatorToFunction" + ) + } + GetFieldAccess::ListIndex { key: _ } => { + internal_err!("ListIndex should be rewritten in OperatorToFunction") + } + GetFieldAccess::ListRange { + start: _, + stop: _, + stride: _, + } => { + internal_err!("ListRange should be rewritten in OperatorToFunction") + } + }, + Expr::ScalarFunction(ScalarFunction { func_def, args }) => { + let physical_args = + create_physical_exprs(args, input_dfschema, execution_props)?; + + match func_def { + ScalarFunctionDefinition::UDF(fun) => udf::create_physical_expr( + fun.clone().as_ref(), + &physical_args, + input_schema, + args, + input_dfschema, + ), + ScalarFunctionDefinition::Name(_) => { + internal_err!("Function `Expr` with name should be resolved.") + } + } + } + Expr::Between(Between { + expr, + negated, + low, + high, + }) => { + let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?; + let low_expr = create_physical_expr(low, input_dfschema, execution_props)?; + let high_expr = create_physical_expr(high, input_dfschema, execution_props)?; + + // rewrite the between into the two binary operators + let binary_expr = binary( + binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?, + Operator::And, + binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?, + input_schema, + ); + + if *negated { + not(binary_expr?) + } else { + binary_expr + } + } + Expr::InList(InList { + expr, + list, + negated, + }) => match expr.as_ref() { + Expr::Literal(ScalarValue::Utf8(None)) => Ok(lit(ScalarValue::Boolean(None))), + _ => { + let value_expr = + create_physical_expr(expr, input_dfschema, execution_props)?; + + let list_exprs = + create_physical_exprs(list, input_dfschema, execution_props)?; + in_list(value_expr, list_exprs, negated, input_schema) + } + }, + other => { + not_impl_err!("Physical plan does not support logical expression {other:?}") + } + } +} + +/// Create vector of Physical Expression from a vector of logical expression +pub fn create_physical_exprs<'a, I>( + exprs: I, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result>> +where + I: IntoIterator, +{ + exprs + .into_iter() + .map(|expr| create_physical_expr(expr, input_dfschema, execution_props)) + .collect::>>() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + + use datafusion_common::{DFSchema, Result}; + use datafusion_expr::execution_props::ExecutionProps; + use datafusion_expr::{col, lit}; + + use super::create_physical_expr; + + #[test] + fn test_create_physical_expr_scalar_input_output() -> Result<()> { + let expr = col("letter").eq(lit("A")); + + let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]); + let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?; + let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?; + + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(StringArray::from_iter_values(vec![ + "A", "B", "C", "D", + ]))], + )?; + let result = p.evaluate(&batch)?; + let result = result.into_array(4).expect("Failed to convert to array"); + + assert_eq!( + &result, + &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef) + ); + + Ok(()) + } +} diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr-common/src/scalar_function.rs similarity index 99% rename from datafusion/physical-expr/src/scalar_function.rs rename to datafusion/physical-expr-common/src/scalar_function.rs index 9ae9f3dee3e7..c11c7717d6b5 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr-common/src/scalar_function.rs @@ -43,9 +43,9 @@ use datafusion_expr::{ }; use crate::functions::out_ordering; +use crate::physical_expr::PhysicalExpr; use crate::physical_expr::{down_cast_any_ref, physical_exprs_equal}; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; /// Physical expression of a scalar function pub struct ScalarFunctionExpr { diff --git a/datafusion/physical-expr/src/udf.rs b/datafusion/physical-expr-common/src/udf.rs similarity index 91% rename from datafusion/physical-expr/src/udf.rs rename to datafusion/physical-expr-common/src/udf.rs index 368dfdf92f45..8ffcfa022026 100644 --- a/datafusion/physical-expr/src/udf.rs +++ b/datafusion/physical-expr-common/src/udf.rs @@ -26,7 +26,7 @@ use datafusion_expr::{ type_coercion::functions::data_types, Expr, ScalarFunctionDefinition, }; -use crate::{PhysicalExpr, ScalarFunctionExpr}; +use crate::{physical_expr::PhysicalExpr, scalar_function::ScalarFunctionExpr}; /// Create a physical expression of the UDF. /// @@ -68,8 +68,10 @@ mod tests { use datafusion_common::{DFSchema, Result}; use datafusion_expr::ScalarUDF; - use crate::utils::tests::TestScalarUDF; - use crate::ScalarFunctionExpr; + use crate::{ + expressions::literal::lit, scalar_function::ScalarFunctionExpr, + utils::test_utils::TestScalarUDF, + }; use super::create_physical_expr; @@ -78,7 +80,7 @@ mod tests { // create and register the udf let udf = ScalarUDF::from(TestScalarUDF::new()); - let e = crate::expressions::lit(1.1); + let e = lit(1.1); let p_expr = create_physical_expr(&udf, &[e], &Schema::empty(), &[], &DFSchema::empty())?; diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils/mod.rs similarity index 99% rename from datafusion/physical-expr-common/src/utils.rs rename to datafusion/physical-expr-common/src/utils/mod.rs index 459b5a4849cb..65ee8116acf3 100644 --- a/datafusion/physical-expr-common/src/utils.rs +++ b/datafusion/physical-expr-common/src/utils/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +pub mod test_utils; + use arrow::{ array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}, compute::{and_kleene, is_not_null, SlicesIterator}, diff --git a/datafusion/physical-expr-common/src/utils/test_utils.rs b/datafusion/physical-expr-common/src/utils/test_utils.rs new file mode 100644 index 000000000000..2189d8a3f02a --- /dev/null +++ b/datafusion/physical-expr-common/src/utils/test_utils.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, sync::Arc}; + +use datafusion_common::{exec_err, DataFusionError, Result}; + +use arrow::{ + array::{ArrayRef, Float32Array, Float64Array}, + datatypes::DataType, +}; + +use datafusion_expr::{ + ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility, +}; + +#[derive(Debug, Clone)] +pub struct TestScalarUDF { + pub signature: Signature, +} + +impl Default for TestScalarUDF { + fn default() -> Self { + Self::new() + } +} + +impl TestScalarUDF { + pub fn new() -> Self { + Self { + signature: Signature::uniform( + 1, + vec![DataType::Float64, DataType::Float32], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for TestScalarUDF { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "test-scalar-udf" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + let arg_type = &arg_types[0]; + + match arg_type { + DataType::Float32 => Ok(DataType::Float32), + _ => Ok(DataType::Float64), + } + } + + fn monotonicity(&self) -> Result> { + Ok(Some(vec![Some(true)])) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + let args = ColumnarValue::values_to_arrays(args)?; + + let arr: ArrayRef = match args[0].data_type() { + DataType::Float64 => Arc::new({ + let arg = &args[0].as_any().downcast_ref::().ok_or_else( + || { + DataFusionError::Internal(format!( + "could not cast {} to {}", + self.name(), + std::any::type_name::() + )) + }, + )?; + + arg.iter() + .map(|a| a.map(f64::floor)) + .collect::() + }), + DataType::Float32 => Arc::new({ + let arg = &args[0].as_any().downcast_ref::().ok_or_else( + || { + DataFusionError::Internal(format!( + "could not cast {} to {}", + self.name(), + std::any::type_name::() + )) + }, + )?; + + arg.iter() + .map(|a| a.map(f32::floor)) + .collect::() + }), + other => { + return exec_err!( + "Unsupported data type {other:?} for function {}", + self.name() + ); + } + }; + Ok(ColumnarValue::Array(arr)) + } +} diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index ed4600f2d95e..1af750374fdf 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -225,6 +225,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::SortOptions; + use datafusion_physical_expr_common::udf; use itertools::Itertools; use datafusion_common::{DFSchema, Result}; @@ -285,21 +286,21 @@ mod tests { let col_e = &col("e", &test_schema)?; let col_f = &col("f", &test_schema)?; let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new()); - let floor_a = &crate::udf::create_physical_expr( + let floor_a = &udf::create_physical_expr( &test_fun, &[col("a", &test_schema)?], &test_schema, &[], &DFSchema::empty(), )?; - let floor_f = &crate::udf::create_physical_expr( + let floor_f = &udf::create_physical_expr( &test_fun, &[col("f", &test_schema)?], &test_schema, &[], &DFSchema::empty(), )?; - let exp_a = &crate::udf::create_physical_expr( + let exp_a = &udf::create_physical_expr( &test_fun, &[col("a", &test_schema)?], &test_schema, @@ -812,7 +813,7 @@ mod tests { generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?; let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new()); - let floor_a = crate::udf::create_physical_expr( + let floor_a = udf::create_physical_expr( &test_fun, &[col("a", &test_schema)?], &test_schema, diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 260610f23dc6..0d9d0975db89 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -129,9 +129,9 @@ mod tests { }; use crate::equivalence::EquivalenceProperties; use crate::expressions::{col, BinaryExpr}; - use crate::udf::create_physical_expr; use crate::utils::tests::TestScalarUDF; use crate::PhysicalSortExpr; + use datafusion_physical_expr_common::udf::create_physical_expr; use super::*; diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 555f0ad31786..02a93285ba32 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1297,10 +1297,11 @@ mod tests { use std::ops::Not; use arrow::datatypes::{DataType, Field, Schema}; - use arrow_schema::{Fields, TimeUnit}; + use arrow_schema::{Fields, SortOptions, TimeUnit}; use datafusion_common::DFSchema; use datafusion_expr::{Operator, ScalarUDF}; + use datafusion_physical_expr_common::udf; use crate::equivalence::add_offset_to_expr; use crate::equivalence::tests::{ @@ -1791,7 +1792,7 @@ mod tests { generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, N_DISTINCT)?; let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new()); - let floor_a = crate::udf::create_physical_expr( + let floor_a = udf::create_physical_expr( &test_fun, &[col("a", &test_schema)?], &test_schema, diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 688d5ce6eabf..93fbcdfff7d2 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -17,21 +17,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution -#[macro_use] -mod binary; -mod case; -mod cast; mod column; -mod datum; -mod in_list; -mod is_not_null; -mod is_null; -mod like; -mod literal; -mod negative; -mod no_op; -mod not; -mod try_cast; /// Module with some convenient methods used in expression building pub mod helpers { @@ -79,21 +65,27 @@ pub use datafusion_functions_aggregate::first_last::{ FirstValuePhysicalExpr as FirstValue, LastValuePhysicalExpr as LastValue, }; -pub use binary::{binary, BinaryExpr}; -pub use case::{case, CaseExpr}; -pub use cast::{cast, cast_with_options, CastExpr}; pub use column::UnKnownColumn; pub use datafusion_expr::utils::format_state_name; +pub use datafusion_physical_expr_common::expressions::binary::{binary, BinaryExpr}; +pub use datafusion_physical_expr_common::expressions::case::{case, CaseExpr}; +pub use datafusion_physical_expr_common::expressions::cast::{ + cast, cast_with_options, CastExpr, +}; pub use datafusion_physical_expr_common::expressions::column::{col, Column}; -pub use in_list::{in_list, InListExpr}; -pub use is_not_null::{is_not_null, IsNotNullExpr}; -pub use is_null::{is_null, IsNullExpr}; -pub use like::{like, LikeExpr}; -pub use literal::{lit, Literal}; -pub use negative::{negative, NegativeExpr}; -pub use no_op::NoOp; -pub use not::{not, NotExpr}; -pub use try_cast::{try_cast, TryCastExpr}; +pub use datafusion_physical_expr_common::expressions::in_list::{in_list, InListExpr}; +pub use datafusion_physical_expr_common::expressions::is_not_null::{ + is_not_null, IsNotNullExpr, +}; +pub use datafusion_physical_expr_common::expressions::is_null::{is_null, IsNullExpr}; +pub use datafusion_physical_expr_common::expressions::like::{like, LikeExpr}; +pub use datafusion_physical_expr_common::expressions::literal::{lit, Literal}; +pub use datafusion_physical_expr_common::expressions::negative::{ + negative, NegativeExpr, +}; +pub use datafusion_physical_expr_common::expressions::no_op::NoOp; +pub use datafusion_physical_expr_common::expressions::not::{not, NotExpr}; +pub use datafusion_physical_expr_common::expressions::try_cast::{try_cast, TryCastExpr}; #[cfg(test)] pub(crate) mod tests { diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index ac5b87e701af..7fb3335b694b 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -30,7 +30,6 @@ //! an argument i32 is passed to a function that supports f64, the //! argument is automatically is coerced to f64. -use std::ops::Neg; use std::sync::Arc; use arrow::{array::ArrayRef, datatypes::Schema}; @@ -43,9 +42,10 @@ use datafusion_expr::{ }; use datafusion_expr::{Expr, ScalarFunctionDefinition, ScalarUDF}; -use crate::sort_properties::SortProperties; use crate::{PhysicalExpr, ScalarFunctionExpr}; +/// TODO: Duplicated code from datafusion/physical-expr-common/src/udf.rs, remove this one. +/// /// Create a physical (function) expression. /// This function errors when `args`' can't be coerced to a valid argument type of the function. pub fn create_physical_expr( @@ -166,59 +166,6 @@ where }) } -/// Determines a [`ScalarFunctionExpr`]'s monotonicity for the given arguments -/// and the function's behavior depending on its arguments. -pub fn out_ordering( - func: &FuncMonotonicity, - arg_orderings: &[SortProperties], -) -> SortProperties { - func.iter().zip(arg_orderings).fold( - SortProperties::Singleton, - |prev_sort, (item, arg)| { - let current_sort = func_order_in_one_dimension(item, arg); - - match (prev_sort, current_sort) { - (_, SortProperties::Unordered) => SortProperties::Unordered, - (SortProperties::Singleton, SortProperties::Ordered(_)) => current_sort, - (SortProperties::Ordered(prev), SortProperties::Ordered(current)) - if prev.descending != current.descending => - { - SortProperties::Unordered - } - _ => prev_sort, - } - }, - ) -} - -/// This function decides the monotonicity property of a [`ScalarFunctionExpr`] for a single argument (i.e. across a single dimension), given that argument's sort properties. -fn func_order_in_one_dimension( - func_monotonicity: &Option, - arg: &SortProperties, -) -> SortProperties { - if *arg == SortProperties::Singleton { - SortProperties::Singleton - } else { - match func_monotonicity { - None => SortProperties::Unordered, - Some(false) => { - if let SortProperties::Ordered(_) = arg { - arg.neg() - } else { - SortProperties::Unordered - } - } - Some(true) => { - if let SortProperties::Ordered(_) = arg { - *arg - } else { - SortProperties::Unordered - } - } - } - } -} - #[cfg(test)] mod tests { use arrow::{ diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index 0c25e26d17aa..49981036d3ff 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -21,23 +21,23 @@ use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::sync::Arc; -use super::utils::{ - convert_duration_type_to_interval, convert_interval_type_to_duration, get_inverse_op, -}; use crate::expressions::Literal; use crate::utils::{build_dag, ExprTreeNode}; use crate::PhysicalExpr; use arrow_schema::{DataType, Schema}; -use datafusion_common::{internal_err, Result}; -use datafusion_expr::interval_arithmetic::{apply_operator, satisfy_greater, Interval}; -use datafusion_expr::Operator; +use datafusion_common::Result; +use datafusion_expr::interval_arithmetic::Interval; use petgraph::graph::NodeIndex; use petgraph::stable_graph::{DefaultIx, StableGraph}; use petgraph::visit::{Bfs, Dfs, DfsPostOrder, EdgeRef}; use petgraph::Outgoing; +pub use datafusion_physical_expr_common::intervals::cp_solver::{ + propagate_arithmetic, propagate_comparison, +}; + // Interval arithmetic provides a way to perform mathematical operations on // intervals, which represent a range of possible values rather than a single // point value. This allows for the propagation of ranges through mathematical @@ -198,157 +198,6 @@ impl PartialEq for ExprIntervalGraphNode { } } -/// This function refines intervals `left_child` and `right_child` by applying -/// constraint propagation through `parent` via operation. The main idea is -/// that we can shrink ranges of variables x and y using parent interval p. -/// -/// Assuming that x,y and p has ranges [xL, xU], [yL, yU], and [pL, pU], we -/// apply the following operations: -/// - For plus operation, specifically, we would first do -/// - [xL, xU] <- ([pL, pU] - [yL, yU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([pL, pU] - [xL, xU]) ∩ [yL, yU]. -/// - For minus operation, specifically, we would first do -/// - [xL, xU] <- ([yL, yU] + [pL, pU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([xL, xU] - [pL, pU]) ∩ [yL, yU]. -/// - For multiplication operation, specifically, we would first do -/// - [xL, xU] <- ([pL, pU] / [yL, yU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([pL, pU] / [xL, xU]) ∩ [yL, yU]. -/// - For division operation, specifically, we would first do -/// - [xL, xU] <- ([yL, yU] * [pL, pU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([xL, xU] / [pL, pU]) ∩ [yL, yU]. -pub fn propagate_arithmetic( - op: &Operator, - parent: &Interval, - left_child: &Interval, - right_child: &Interval, -) -> Result> { - let inverse_op = get_inverse_op(*op)?; - match (left_child.data_type(), right_child.data_type()) { - // If we have a child whose type is a time interval (i.e. DataType::Interval), - // we need special handling since timestamp differencing results in a - // Duration type. - (DataType::Timestamp(..), DataType::Interval(_)) => { - propagate_time_interval_at_right( - left_child, - right_child, - parent, - op, - &inverse_op, - ) - } - (DataType::Interval(_), DataType::Timestamp(..)) => { - propagate_time_interval_at_left( - left_child, - right_child, - parent, - op, - &inverse_op, - ) - } - _ => { - // First, propagate to the left: - match apply_operator(&inverse_op, parent, right_child)? - .intersect(left_child)? - { - // Left is feasible: - Some(value) => Ok( - // Propagate to the right using the new left. - propagate_right(&value, parent, right_child, op, &inverse_op)? - .map(|right| (value, right)), - ), - // If the left child is infeasible, short-circuit. - None => Ok(None), - } - } - } -} - -/// This function refines intervals `left_child` and `right_child` by applying -/// comparison propagation through `parent` via operation. The main idea is -/// that we can shrink ranges of variables x and y using parent interval p. -/// Two intervals can be ordered in 6 ways for a Gt `>` operator: -/// ```text -/// (1): Infeasible, short-circuit -/// left: | ================ | -/// right: | ======================== | -/// -/// (2): Update both interval -/// left: | ====================== | -/// right: | ====================== | -/// | -/// V -/// left: | ======= | -/// right: | ======= | -/// -/// (3): Update left interval -/// left: | ============================== | -/// right: | ========== | -/// | -/// V -/// left: | ===================== | -/// right: | ========== | -/// -/// (4): Update right interval -/// left: | ========== | -/// right: | =========================== | -/// | -/// V -/// left: | ========== | -/// right | ================== | -/// -/// (5): No change -/// left: | ============================ | -/// right: | =================== | -/// -/// (6): No change -/// left: | ==================== | -/// right: | =============== | -/// -/// -inf --------------------------------------------------------------- +inf -/// ``` -pub fn propagate_comparison( - op: &Operator, - parent: &Interval, - left_child: &Interval, - right_child: &Interval, -) -> Result> { - if parent == &Interval::CERTAINLY_TRUE { - match op { - Operator::Eq => left_child.intersect(right_child).map(|result| { - result.map(|intersection| (intersection.clone(), intersection)) - }), - Operator::Gt => satisfy_greater(left_child, right_child, true), - Operator::GtEq => satisfy_greater(left_child, right_child, false), - Operator::Lt => satisfy_greater(right_child, left_child, true) - .map(|t| t.map(reverse_tuple)), - Operator::LtEq => satisfy_greater(right_child, left_child, false) - .map(|t| t.map(reverse_tuple)), - _ => internal_err!( - "The operator must be a comparison operator to propagate intervals" - ), - } - } else if parent == &Interval::CERTAINLY_FALSE { - match op { - Operator::Eq => { - // TODO: Propagation is not possible until we support interval sets. - Ok(None) - } - Operator::Gt => satisfy_greater(right_child, left_child, false), - Operator::GtEq => satisfy_greater(right_child, left_child, true), - Operator::Lt => satisfy_greater(left_child, right_child, false) - .map(|t| t.map(reverse_tuple)), - Operator::LtEq => satisfy_greater(left_child, right_child, true) - .map(|t| t.map(reverse_tuple)), - _ => internal_err!( - "The operator must be a comparison operator to propagate intervals" - ), - } - } else { - // Uncertainty cannot change any end-point of the intervals. - Ok(None) - } -} - impl ExprIntervalGraph { pub fn try_new(expr: Arc, schema: &Schema) -> Result { // Build the full graph: @@ -624,107 +473,15 @@ impl ExprIntervalGraph { } } -/// This is a subfunction of the `propagate_arithmetic` function that propagates to the right child. -fn propagate_right( - left: &Interval, - parent: &Interval, - right: &Interval, - op: &Operator, - inverse_op: &Operator, -) -> Result> { - match op { - Operator::Minus => apply_operator(op, left, parent), - Operator::Plus => apply_operator(inverse_op, parent, left), - Operator::Divide => apply_operator(op, left, parent), - Operator::Multiply => apply_operator(inverse_op, parent, left), - _ => internal_err!("Interval arithmetic does not support the operator {}", op), - }? - .intersect(right) -} - -/// During the propagation of [`Interval`] values on an [`ExprIntervalGraph`], -/// if there exists a `timestamp - timestamp` operation, the result would be -/// of type `Duration`. However, we may encounter a situation where a time interval -/// is involved in an arithmetic operation with a `Duration` type. This function -/// offers special handling for such cases, where the time interval resides on -/// the left side of the operation. -fn propagate_time_interval_at_left( - left_child: &Interval, - right_child: &Interval, - parent: &Interval, - op: &Operator, - inverse_op: &Operator, -) -> Result> { - // We check if the child's time interval(s) has a non-zero month or day field(s). - // If so, we return it as is without propagating. Otherwise, we first convert - // the time intervals to the `Duration` type, then propagate, and then convert - // the bounds to time intervals again. - let result = if let Some(duration) = convert_interval_type_to_duration(left_child) { - match apply_operator(inverse_op, parent, right_child)?.intersect(duration)? { - Some(value) => { - let left = convert_duration_type_to_interval(&value); - let right = propagate_right(&value, parent, right_child, op, inverse_op)?; - match (left, right) { - (Some(left), Some(right)) => Some((left, right)), - _ => None, - } - } - None => None, - } - } else { - propagate_right(left_child, parent, right_child, op, inverse_op)? - .map(|right| (left_child.clone(), right)) - }; - Ok(result) -} - -/// During the propagation of [`Interval`] values on an [`ExprIntervalGraph`], -/// if there exists a `timestamp - timestamp` operation, the result would be -/// of type `Duration`. However, we may encounter a situation where a time interval -/// is involved in an arithmetic operation with a `Duration` type. This function -/// offers special handling for such cases, where the time interval resides on -/// the right side of the operation. -fn propagate_time_interval_at_right( - left_child: &Interval, - right_child: &Interval, - parent: &Interval, - op: &Operator, - inverse_op: &Operator, -) -> Result> { - // We check if the child's time interval(s) has a non-zero month or day field(s). - // If so, we return it as is without propagating. Otherwise, we first convert - // the time intervals to the `Duration` type, then propagate, and then convert - // the bounds to time intervals again. - let result = if let Some(duration) = convert_interval_type_to_duration(right_child) { - match apply_operator(inverse_op, parent, &duration)?.intersect(left_child)? { - Some(value) => { - propagate_right(left_child, parent, &duration, op, inverse_op)? - .and_then(|right| convert_duration_type_to_interval(&right)) - .map(|right| (value, right)) - } - None => None, - } - } else { - apply_operator(inverse_op, parent, right_child)? - .intersect(left_child)? - .map(|value| (value, right_child.clone())) - }; - Ok(result) -} - -fn reverse_tuple((first, second): (T, U)) -> (U, T) { - (second, first) -} - #[cfg(test)] mod tests { use super::*; use crate::expressions::{BinaryExpr, Column}; use crate::intervals::test_utils::gen_conjunctive_numerical_expr; - use arrow::datatypes::TimeUnit; - use arrow_schema::Field; + use arrow_schema::{DataType, Field}; use datafusion_common::ScalarValue; + use datafusion_expr::Operator; use itertools::Itertools; use rand::rngs::StdRng; @@ -1477,90 +1234,6 @@ mod tests { Ok(()) } - #[test] - fn test_propagate_comparison() -> Result<()> { - // In the examples below: - // `left` is unbounded: [?, ?], - // `right` is known to be [1000,1000] - // so `left` < `right` results in no new knowledge of `right` but knowing that `left` is now < 1000:` [?, 999] - let left = Interval::make_unbounded(&DataType::Int64)?; - let right = Interval::make(Some(1000_i64), Some(1000_i64))?; - assert_eq!( - (Some(( - Interval::make(None, Some(999_i64))?, - Interval::make(Some(1000_i64), Some(1000_i64))?, - ))), - propagate_comparison( - &Operator::Lt, - &Interval::CERTAINLY_TRUE, - &left, - &right - )? - ); - - let left = - Interval::make_unbounded(&DataType::Timestamp(TimeUnit::Nanosecond, None))?; - let right = Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), None), - ScalarValue::TimestampNanosecond(Some(1000), None), - )?; - assert_eq!( - (Some(( - Interval::try_new( - ScalarValue::try_from(&DataType::Timestamp( - TimeUnit::Nanosecond, - None - )) - .unwrap(), - ScalarValue::TimestampNanosecond(Some(999), None), - )?, - Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), None), - ScalarValue::TimestampNanosecond(Some(1000), None), - )? - ))), - propagate_comparison( - &Operator::Lt, - &Interval::CERTAINLY_TRUE, - &left, - &right - )? - ); - - let left = Interval::make_unbounded(&DataType::Timestamp( - TimeUnit::Nanosecond, - Some("+05:00".into()), - ))?; - let right = Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - )?; - assert_eq!( - (Some(( - Interval::try_new( - ScalarValue::try_from(&DataType::Timestamp( - TimeUnit::Nanosecond, - Some("+05:00".into()), - )) - .unwrap(), - ScalarValue::TimestampNanosecond(Some(999), Some("+05:00".into())), - )?, - Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - )? - ))), - propagate_comparison( - &Operator::Lt, - &Interval::CERTAINLY_TRUE, - &left, - &right - )? - ); - - Ok(()) - } - #[test] fn test_propagate_or() -> Result<()> { let expr = Arc::new(BinaryExpr::new( diff --git a/datafusion/physical-expr/src/intervals/utils.rs b/datafusion/physical-expr/src/intervals/utils.rs index e188b2d56bae..ff7fd63126a6 100644 --- a/datafusion/physical-expr/src/intervals/utils.rs +++ b/datafusion/physical-expr/src/intervals/utils.rs @@ -25,14 +25,8 @@ use crate::{ }; use arrow_schema::{DataType, SchemaRef}; -use datafusion_common::{internal_datafusion_err, internal_err, Result, ScalarValue}; -use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::Operator; -const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000; -const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF; -const DT_MS_MASK: i64 = 0xFFFF_FFFF; - /// Indicates whether interval arithmetic is supported for the given expression. /// Currently, we do not support all [`PhysicalExpr`]s for interval calculations. /// We do not support every type of [`Operator`]s either. Over time, this check @@ -65,17 +59,6 @@ pub fn check_support(expr: &Arc, schema: &SchemaRef) -> bool { } } -// This function returns the inverse operator of the given operator. -pub fn get_inverse_op(op: Operator) -> Result { - match op { - Operator::Plus => Ok(Operator::Minus), - Operator::Minus => Ok(Operator::Plus), - Operator::Multiply => Ok(Operator::Divide), - Operator::Divide => Ok(Operator::Multiply), - _ => internal_err!("Interval arithmetic does not support the operator {}", op), - } -} - /// Indicates whether interval arithmetic is supported for the given operator. pub fn is_operator_supported(op: &Operator) -> bool { matches!( @@ -109,96 +92,3 @@ pub fn is_datatype_supported(data_type: &DataType) -> bool { | &DataType::Float32 ) } - -/// Converts an [`Interval`] of time intervals to one of `Duration`s, if applicable. Otherwise, returns [`None`]. -pub fn convert_interval_type_to_duration(interval: &Interval) -> Option { - if let (Some(lower), Some(upper)) = ( - convert_interval_bound_to_duration(interval.lower()), - convert_interval_bound_to_duration(interval.upper()), - ) { - Interval::try_new(lower, upper).ok() - } else { - None - } -} - -/// Converts an [`ScalarValue`] containing a time interval to one containing a `Duration`, if applicable. Otherwise, returns [`None`]. -fn convert_interval_bound_to_duration( - interval_bound: &ScalarValue, -) -> Option { - match interval_bound { - ScalarValue::IntervalMonthDayNano(Some(mdn)) => interval_mdn_to_duration_ns(mdn) - .ok() - .map(|duration| ScalarValue::DurationNanosecond(Some(duration))), - ScalarValue::IntervalDayTime(Some(dt)) => interval_dt_to_duration_ms(dt) - .ok() - .map(|duration| ScalarValue::DurationMillisecond(Some(duration))), - _ => None, - } -} - -/// Converts an [`Interval`] of `Duration`s to one of time intervals, if applicable. Otherwise, returns [`None`]. -pub fn convert_duration_type_to_interval(interval: &Interval) -> Option { - if let (Some(lower), Some(upper)) = ( - convert_duration_bound_to_interval(interval.lower()), - convert_duration_bound_to_interval(interval.upper()), - ) { - Interval::try_new(lower, upper).ok() - } else { - None - } -} - -/// Converts a [`ScalarValue`] containing a `Duration` to one containing a time interval, if applicable. Otherwise, returns [`None`]. -fn convert_duration_bound_to_interval( - interval_bound: &ScalarValue, -) -> Option { - match interval_bound { - ScalarValue::DurationNanosecond(Some(duration)) => { - Some(ScalarValue::new_interval_mdn(0, 0, *duration)) - } - ScalarValue::DurationMicrosecond(Some(duration)) => { - Some(ScalarValue::new_interval_mdn(0, 0, *duration * 1000)) - } - ScalarValue::DurationMillisecond(Some(duration)) => { - Some(ScalarValue::new_interval_dt(0, *duration as i32)) - } - ScalarValue::DurationSecond(Some(duration)) => { - Some(ScalarValue::new_interval_dt(0, *duration as i32 * 1000)) - } - _ => None, - } -} - -/// If both the month and day fields of [`ScalarValue::IntervalMonthDayNano`] are zero, this function returns the nanoseconds part. -/// Otherwise, it returns an error. -fn interval_mdn_to_duration_ns(mdn: &i128) -> Result { - let months = mdn >> 96; - let days = (mdn & MDN_DAY_MASK) >> 64; - let nanoseconds = mdn & MDN_NS_MASK; - - if months == 0 && days == 0 { - nanoseconds - .try_into() - .map_err(|_| internal_datafusion_err!("Resulting duration exceeds i64::MAX")) - } else { - internal_err!( - "The interval cannot have a non-zero month or day value for duration convertibility" - ) - } -} - -/// If the day field of the [`ScalarValue::IntervalDayTime`] is zero, this function returns the milliseconds part. -/// Otherwise, it returns an error. -fn interval_dt_to_duration_ms(dt: &i64) -> Result { - let days = dt >> 32; - let milliseconds = dt & DT_MS_MASK; - - if days == 0 { - Ok(milliseconds) - } else { - internal_err!( - "The interval cannot have a non-zero day value for duration convertibility" - ) - } -} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index e0f19ad133e5..b2ddc592f749 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -26,8 +26,6 @@ pub mod math_expressions; mod partitioning; mod physical_expr; pub mod planner; -mod scalar_function; -pub mod udf; pub mod utils; pub mod window; @@ -53,8 +51,8 @@ pub use datafusion_physical_expr_common::sort_expr::{ PhysicalSortRequirement, }; +pub use datafusion_physical_expr_common::scalar_function::ScalarFunctionExpr; pub use planner::{create_physical_expr, create_physical_exprs}; -pub use scalar_function::ScalarFunctionExpr; pub use datafusion_physical_expr_common::utils::reverse_order_bys; pub use utils::split_conjunction; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 127194f681a5..a78eae6fb1aa 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -18,9 +18,10 @@ use std::sync::Arc; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use itertools::izip; pub use datafusion_physical_expr_common::physical_expr::down_cast_any_ref; +pub use datafusion_physical_expr_common::physical_expr::physical_exprs_bag_equal; +pub use datafusion_physical_expr_common::physical_expr::physical_exprs_equal; /// Shared [`PhysicalExpr`]. pub type PhysicalExprRef = Arc; @@ -36,37 +37,6 @@ pub fn physical_exprs_contains( .any(|physical_expr| physical_expr.eq(expr)) } -/// Checks whether the given physical expression slices are equal. -pub fn physical_exprs_equal( - lhs: &[Arc], - rhs: &[Arc], -) -> bool { - lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs)) -} - -/// Checks whether the given physical expression slices are equal in the sense -/// of bags (multi-sets), disregarding their orderings. -pub fn physical_exprs_bag_equal( - lhs: &[Arc], - rhs: &[Arc], -) -> bool { - // TODO: Once we can use `HashMap`s with `Arc`, this - // function should use a `HashMap` to reduce computational complexity. - if lhs.len() == rhs.len() { - let mut rhs_vec = rhs.to_vec(); - for expr in lhs { - if let Some(idx) = rhs_vec.iter().position(|e| expr.eq(e)) { - rhs_vec.swap_remove(idx); - } else { - return false; - } - } - true - } else { - false - } -} - /// This utility function removes duplicates from the given `exprs` vector. /// Note that this function does not necessarily preserve its input ordering. pub fn deduplicate_physical_exprs(exprs: &mut Vec>) { diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index f46e5f6ec68f..ac25bf28a44a 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -15,405 +15,5 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - -use arrow::datatypes::Schema; - -use datafusion_common::{ - exec_err, internal_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, -}; -use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction}; -use datafusion_expr::var_provider::is_system_variables; -use datafusion_expr::var_provider::VarType; -use datafusion_expr::{ - binary_expr, Between, BinaryExpr, Expr, GetFieldAccess, GetIndexedField, Like, - Operator, ScalarFunctionDefinition, TryCast, -}; - -use crate::{ - expressions::{self, binary, like, Column, Literal}, - udf, PhysicalExpr, -}; - -/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1 -/// AS int)`. -/// -/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical -/// planning, and can be evaluated directly on a [RecordBatch]. They are -/// normally created from [Expr] by a [PhysicalPlanner] and can be created -/// directly using [create_physical_expr]. -/// -/// A Physical expression knows its type, nullability and how to evaluate itself. -/// -/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html -/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html -/// -/// # Example: Create `PhysicalExpr` from `Expr` -/// ``` -/// # use arrow::datatypes::{DataType, Field, Schema}; -/// # use datafusion_common::DFSchema; -/// # use datafusion_expr::{Expr, col, lit}; -/// # use datafusion_physical_expr::create_physical_expr; -/// # use datafusion_expr::execution_props::ExecutionProps; -/// // For a logical expression `a = 1`, we can create a physical expression -/// let expr = col("a").eq(lit(1)); -/// // To create a PhysicalExpr we need 1. a schema -/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); -/// let df_schema = DFSchema::try_from(schema).unwrap(); -/// // 2. ExecutionProps -/// let props = ExecutionProps::new(); -/// // We can now create a PhysicalExpr: -/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); -/// ``` -/// -/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue] -/// ``` -/// # use std::sync::Arc; -/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch}; -/// # use arrow::datatypes::{DataType, Field, Schema}; -/// # use datafusion_common::{assert_batches_eq, DFSchema}; -/// # use datafusion_expr::{Expr, col, lit, ColumnarValue}; -/// # use datafusion_physical_expr::create_physical_expr; -/// # use datafusion_expr::execution_props::ExecutionProps; -/// # let expr = col("a").eq(lit(1)); -/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); -/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap(); -/// # let props = ExecutionProps::new(); -/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this: -/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); -/// // Input of [1,2,3] -/// let input_batch = RecordBatch::try_from_iter(vec![ -/// ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _) -/// ]).unwrap(); -/// // The result is a ColumnarValue (either an Array or a Scalar) -/// let result = physical_expr.evaluate(&input_batch).unwrap(); -/// // In this case, a BooleanArray with the result of the comparison -/// let ColumnarValue::Array(arr) = result else { -/// panic!("Expected an array") -/// }; -/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false])); -/// ``` -/// -/// [ColumnarValue]: datafusion_expr::ColumnarValue -/// -/// Create a physical expression from a logical expression ([Expr]). -/// -/// # Arguments -/// -/// * `e` - The logical expression -/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references -/// to qualified or unqualified fields by name. -pub fn create_physical_expr( - e: &Expr, - input_dfschema: &DFSchema, - execution_props: &ExecutionProps, -) -> Result> { - let input_schema: &Schema = &input_dfschema.into(); - - match e { - Expr::Alias(Alias { expr, .. }) => { - Ok(create_physical_expr(expr, input_dfschema, execution_props)?) - } - Expr::Column(c) => { - let idx = input_dfschema.index_of_column(c)?; - Ok(Arc::new(Column::new(&c.name, idx))) - } - Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), - Expr::ScalarVariable(_, variable_names) => { - if is_system_variables(variable_names) { - match execution_props.get_var_provider(VarType::System) { - Some(provider) => { - let scalar_value = provider.get_value(variable_names.clone())?; - Ok(Arc::new(Literal::new(scalar_value))) - } - _ => plan_err!("No system variable provider found"), - } - } else { - match execution_props.get_var_provider(VarType::UserDefined) { - Some(provider) => { - let scalar_value = provider.get_value(variable_names.clone())?; - Ok(Arc::new(Literal::new(scalar_value))) - } - _ => plan_err!("No user defined variable provider found"), - } - } - } - Expr::IsTrue(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsNotDistinctFrom, - Expr::Literal(ScalarValue::Boolean(Some(true))), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::IsNotTrue(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsDistinctFrom, - Expr::Literal(ScalarValue::Boolean(Some(true))), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::IsFalse(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsNotDistinctFrom, - Expr::Literal(ScalarValue::Boolean(Some(false))), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::IsNotFalse(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsDistinctFrom, - Expr::Literal(ScalarValue::Boolean(Some(false))), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::IsUnknown(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsNotDistinctFrom, - Expr::Literal(ScalarValue::Boolean(None)), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::IsNotUnknown(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsDistinctFrom, - Expr::Literal(ScalarValue::Boolean(None)), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - // Create physical expressions for left and right operands - let lhs = create_physical_expr(left, input_dfschema, execution_props)?; - let rhs = create_physical_expr(right, input_dfschema, execution_props)?; - // Note that the logical planner is responsible - // for type coercion on the arguments (e.g. if one - // argument was originally Int32 and one was - // Int64 they will both be coerced to Int64). - // - // There should be no coercion during physical - // planning. - binary(lhs, *op, rhs, input_schema) - } - Expr::Like(Like { - negated, - expr, - pattern, - escape_char, - case_insensitive, - }) => { - if escape_char.is_some() { - return exec_err!("LIKE does not support escape_char"); - } - let physical_expr = - create_physical_expr(expr, input_dfschema, execution_props)?; - let physical_pattern = - create_physical_expr(pattern, input_dfschema, execution_props)?; - like( - *negated, - *case_insensitive, - physical_expr, - physical_pattern, - input_schema, - ) - } - Expr::Case(case) => { - let expr: Option> = if let Some(e) = &case.expr { - Some(create_physical_expr( - e.as_ref(), - input_dfschema, - execution_props, - )?) - } else { - None - }; - let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case - .when_then_expr - .iter() - .map(|(w, t)| (w.as_ref(), t.as_ref())) - .unzip(); - let when_expr = - create_physical_exprs(when_expr, input_dfschema, execution_props)?; - let then_expr = - create_physical_exprs(then_expr, input_dfschema, execution_props)?; - let when_then_expr: Vec<(Arc, Arc)> = - when_expr - .iter() - .zip(then_expr.iter()) - .map(|(w, t)| (w.clone(), t.clone())) - .collect(); - let else_expr: Option> = - if let Some(e) = &case.else_expr { - Some(create_physical_expr( - e.as_ref(), - input_dfschema, - execution_props, - )?) - } else { - None - }; - Ok(expressions::case(expr, when_then_expr, else_expr)?) - } - Expr::Cast(Cast { expr, data_type }) => expressions::cast( - create_physical_expr(expr, input_dfschema, execution_props)?, - input_schema, - data_type.clone(), - ), - Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast( - create_physical_expr(expr, input_dfschema, execution_props)?, - input_schema, - data_type.clone(), - ), - Expr::Not(expr) => { - expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?) - } - Expr::Negative(expr) => expressions::negative( - create_physical_expr(expr, input_dfschema, execution_props)?, - input_schema, - ), - Expr::IsNull(expr) => expressions::is_null(create_physical_expr( - expr, - input_dfschema, - execution_props, - )?), - Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr( - expr, - input_dfschema, - execution_props, - )?), - Expr::GetIndexedField(GetIndexedField { expr: _, field }) => match field { - GetFieldAccess::NamedStructField { name: _ } => { - internal_err!( - "NamedStructField should be rewritten in OperatorToFunction" - ) - } - GetFieldAccess::ListIndex { key: _ } => { - internal_err!("ListIndex should be rewritten in OperatorToFunction") - } - GetFieldAccess::ListRange { - start: _, - stop: _, - stride: _, - } => { - internal_err!("ListRange should be rewritten in OperatorToFunction") - } - }, - - Expr::ScalarFunction(ScalarFunction { func_def, args }) => { - let physical_args = - create_physical_exprs(args, input_dfschema, execution_props)?; - - match func_def { - ScalarFunctionDefinition::UDF(fun) => udf::create_physical_expr( - fun.clone().as_ref(), - &physical_args, - input_schema, - args, - input_dfschema, - ), - ScalarFunctionDefinition::Name(_) => { - internal_err!("Function `Expr` with name should be resolved.") - } - } - } - Expr::Between(Between { - expr, - negated, - low, - high, - }) => { - let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?; - let low_expr = create_physical_expr(low, input_dfschema, execution_props)?; - let high_expr = create_physical_expr(high, input_dfschema, execution_props)?; - - // rewrite the between into the two binary operators - let binary_expr = binary( - binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?, - Operator::And, - binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?, - input_schema, - ); - - if *negated { - expressions::not(binary_expr?) - } else { - binary_expr - } - } - Expr::InList(InList { - expr, - list, - negated, - }) => match expr.as_ref() { - Expr::Literal(ScalarValue::Utf8(None)) => { - Ok(expressions::lit(ScalarValue::Boolean(None))) - } - _ => { - let value_expr = - create_physical_expr(expr, input_dfschema, execution_props)?; - - let list_exprs = - create_physical_exprs(list, input_dfschema, execution_props)?; - expressions::in_list(value_expr, list_exprs, negated, input_schema) - } - }, - other => { - not_impl_err!("Physical plan does not support logical expression {other:?}") - } - } -} - -/// Create vector of Physical Expression from a vector of logical expression -pub fn create_physical_exprs<'a, I>( - exprs: I, - input_dfschema: &DFSchema, - execution_props: &ExecutionProps, -) -> Result>> -where - I: IntoIterator, -{ - exprs - .into_iter() - .map(|expr| create_physical_expr(expr, input_dfschema, execution_props)) - .collect::>>() -} - -#[cfg(test)] -mod tests { - use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray}; - use arrow_schema::{DataType, Field}; - - use datafusion_expr::{col, lit}; - - use super::*; - - #[test] - fn test_create_physical_expr_scalar_input_output() -> Result<()> { - let expr = col("letter").eq(lit("A")); - - let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]); - let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?; - let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?; - - let batch = RecordBatch::try_new( - Arc::new(schema), - vec![Arc::new(StringArray::from_iter_values(vec![ - "A", "B", "C", "D", - ]))], - )?; - let result = p.evaluate(&batch)?; - let result = result.into_array(4).expect("Failed to convert to array"); - - assert_eq!( - &result, - &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef) - ); - - Ok(()) - } -} +pub use datafusion_physical_expr_common::physical_expr::create_physical_expr; +pub use datafusion_physical_expr_common::physical_expr::create_physical_exprs; diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 76cee3a1a786..60f00098f4d9 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -255,110 +255,17 @@ pub fn merge_vectors( #[cfg(test)] pub(crate) mod tests { - use arrow_array::{ArrayRef, Float32Array, Float64Array}; - use std::any::Any; use std::fmt::{Display, Formatter}; use super::*; use crate::expressions::{binary, cast, col, in_list, lit, Literal}; use arrow_schema::{DataType, Field, Schema}; - use datafusion_common::{exec_err, DataFusionError, ScalarValue}; + use datafusion_common::{Result, ScalarValue}; - use datafusion_expr::{ - ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility, - }; use petgraph::visit::Bfs; - #[derive(Debug, Clone)] - pub struct TestScalarUDF { - pub(crate) signature: Signature, - } - - impl TestScalarUDF { - pub fn new() -> Self { - use DataType::*; - Self { - signature: Signature::uniform( - 1, - vec![Float64, Float32], - Volatility::Immutable, - ), - } - } - } - - impl ScalarUDFImpl for TestScalarUDF { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { - "test-scalar-udf" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - let arg_type = &arg_types[0]; - - match arg_type { - DataType::Float32 => Ok(DataType::Float32), - _ => Ok(DataType::Float64), - } - } - - fn monotonicity(&self) -> Result> { - Ok(Some(vec![Some(true)])) - } - - fn invoke(&self, args: &[ColumnarValue]) -> Result { - let args = ColumnarValue::values_to_arrays(args)?; - - let arr: ArrayRef = match args[0].data_type() { - DataType::Float64 => Arc::new({ - let arg = &args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal(format!( - "could not cast {} to {}", - self.name(), - std::any::type_name::() - )) - })?; - - arg.iter() - .map(|a| a.map(f64::floor)) - .collect::() - }), - DataType::Float32 => Arc::new({ - let arg = &args[0] - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal(format!( - "could not cast {} to {}", - self.name(), - std::any::type_name::() - )) - })?; - - arg.iter() - .map(|a| a.map(f32::floor)) - .collect::() - }), - other => { - return exec_err!( - "Unsupported data type {other:?} for function {}", - self.name() - ); - } - }; - Ok(ColumnarValue::Array(arr)) - } - } + pub use datafusion_physical_expr_common::utils::test_utils::TestScalarUDF; #[derive(Clone)] struct DummyProperty { diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index e1c8489655bf..f557d5372c42 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -84,8 +84,9 @@ pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; pub use datafusion_physical_expr::{ - expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr, + expressions, functions, AggregateExpr, Distribution, Partitioning, PhysicalExpr, }; +pub use datafusion_physical_expr_common::udf; // Backwards compatibility pub use crate::stream::EmptyRecordBatchStream;