diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index ba3e68e4011f..ea3644183355 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1364,6 +1364,7 @@ dependencies = [ "arrow", "datafusion-common", "datafusion-expr", + "paste", ] [[package]] diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index d1202c83d526..8fce83a79268 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -39,3 +39,4 @@ path = "src/lib.rs" arrow = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +paste = "^1.0" diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr-common/src/expressions/binary.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/binary.rs rename to datafusion/physical-expr-common/src/expressions/binary.rs index 76154dca0338..48e108af4173 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr-common/src/expressions/binary.rs @@ -23,8 +23,8 @@ use std::{any::Any, sync::Arc}; use crate::expressions::datum::{apply, apply_cmp}; use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison}; use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::PhysicalExpr; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use arrow::array::*; use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene}; @@ -622,8 +622,13 @@ pub fn binary( #[cfg(test)] mod tests { use super::*; - use crate::expressions::{col, lit, try_cast, Literal}; - use datafusion_common::plan_datafusion_err; + use crate::expressions::column::col; + use crate::expressions::literal::{lit, Literal}; + use crate::expressions::try_cast::try_cast; + use arrow::datatypes::{ + ArrowNumericType, Decimal128Type, Field, Int32Type, SchemaRef, + }; + use datafusion_common::{plan_datafusion_err, Result}; use datafusion_expr::type_coercion::binary::get_input_types; /// Performs a binary operation, applying any type coercion necessary diff --git a/datafusion/physical-expr/src/expressions/binary/kernels.rs b/datafusion/physical-expr-common/src/expressions/binary/kernels.rs similarity index 100% rename from datafusion/physical-expr/src/expressions/binary/kernels.rs rename to datafusion/physical-expr-common/src/expressions/binary/kernels.rs diff --git a/datafusion/physical-expr/src/expressions/datum.rs b/datafusion/physical-expr-common/src/expressions/datum.rs similarity index 96% rename from datafusion/physical-expr/src/expressions/datum.rs rename to datafusion/physical-expr-common/src/expressions/datum.rs index 2bb79922cfec..201bf104f20e 100644 --- a/datafusion/physical-expr/src/expressions/datum.rs +++ b/datafusion/physical-expr-common/src/expressions/datum.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::BooleanArray; use arrow::array::{ArrayRef, Datum}; use arrow::error::ArrowError; -use arrow_array::BooleanArray; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::ColumnarValue; use std::sync::Arc; @@ -25,7 +25,7 @@ use std::sync::Arc; /// Applies a binary [`Datum`] kernel `f` to `lhs` and `rhs` /// /// This maps arrow-rs' [`Datum`] kernels to DataFusion's [`ColumnarValue`] abstraction -pub(crate) fn apply( +pub fn apply( lhs: &ColumnarValue, rhs: &ColumnarValue, f: impl Fn(&dyn Datum, &dyn Datum) -> Result, @@ -49,7 +49,7 @@ pub(crate) fn apply( } /// Applies a binary [`Datum`] comparison kernel `f` to `lhs` and `rhs` -pub(crate) fn apply_cmp( +pub fn apply_cmp( lhs: &ColumnarValue, rhs: &ColumnarValue, f: impl Fn(&dyn Datum, &dyn Datum) -> Result, diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr-common/src/expressions/literal.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/literal.rs rename to datafusion/physical-expr-common/src/expressions/literal.rs index 35ea80ea574d..bc3312d25755 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr-common/src/expressions/literal.rs @@ -22,8 +22,8 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; +use crate::physical_expr::PhysicalExpr; use crate::sort_properties::SortProperties; -use crate::PhysicalExpr; use arrow::{ datatypes::{DataType, Schema}, diff --git a/datafusion/physical-expr-common/src/expressions/mod.rs b/datafusion/physical-expr-common/src/expressions/mod.rs index d102422081dc..74e36670fb3d 100644 --- a/datafusion/physical-expr-common/src/expressions/mod.rs +++ b/datafusion/physical-expr-common/src/expressions/mod.rs @@ -15,4 +15,11 @@ // specific language governing permissions and limitations // under the License. +//! Defines physical expressions that can evaluated at runtime during query execution + +#[macro_use] +pub mod binary; pub mod column; +pub mod datum; +pub mod literal; +pub mod try_cast; diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr-common/src/expressions/try_cast.rs similarity index 99% rename from datafusion/physical-expr/src/expressions/try_cast.rs rename to datafusion/physical-expr-common/src/expressions/try_cast.rs index d25a904f7d6a..a5b93f3a0f86 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr-common/src/expressions/try_cast.rs @@ -21,7 +21,7 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use arrow::compute; use arrow::compute::{cast_with_options, CastOptions}; use arrow::datatypes::{DataType, Schema}; @@ -148,7 +148,7 @@ pub fn try_cast( #[cfg(test)] mod tests { use super::*; - use crate::expressions::col; + use crate::expressions::column::col; use arrow::array::{ Decimal128Array, Decimal128Builder, StringArray, Time64NanosecondArray, }; diff --git a/datafusion/physical-expr-common/src/intervals/cp_solver.rs b/datafusion/physical-expr-common/src/intervals/cp_solver.rs new file mode 100644 index 000000000000..28cf7c7b3974 --- /dev/null +++ b/datafusion/physical-expr-common/src/intervals/cp_solver.rs @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Constraint propagator/solver for custom PhysicalExpr graphs. + +use arrow::datatypes::DataType; +use datafusion_common::{internal_err, Result}; +use datafusion_expr::interval_arithmetic::{apply_operator, satisfy_greater, Interval}; +use datafusion_expr::Operator; + +use super::utils::{ + convert_duration_type_to_interval, convert_interval_type_to_duration, get_inverse_op, +}; + +/// This function refines intervals `left_child` and `right_child` by applying +/// constraint propagation through `parent` via operation. The main idea is +/// that we can shrink ranges of variables x and y using parent interval p. +/// +/// Assuming that x,y and p has ranges [xL, xU], [yL, yU], and [pL, pU], we +/// apply the following operations: +/// - For plus operation, specifically, we would first do +/// - [xL, xU] <- ([pL, pU] - [yL, yU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([pL, pU] - [xL, xU]) ∩ [yL, yU]. +/// - For minus operation, specifically, we would first do +/// - [xL, xU] <- ([yL, yU] + [pL, pU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([xL, xU] - [pL, pU]) ∩ [yL, yU]. +/// - For multiplication operation, specifically, we would first do +/// - [xL, xU] <- ([pL, pU] / [yL, yU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([pL, pU] / [xL, xU]) ∩ [yL, yU]. +/// - For division operation, specifically, we would first do +/// - [xL, xU] <- ([yL, yU] * [pL, pU]) ∩ [xL, xU], and then +/// - [yL, yU] <- ([xL, xU] / [pL, pU]) ∩ [yL, yU]. +pub fn propagate_arithmetic( + op: &Operator, + parent: &Interval, + left_child: &Interval, + right_child: &Interval, +) -> Result> { + let inverse_op = get_inverse_op(*op)?; + match (left_child.data_type(), right_child.data_type()) { + // If we have a child whose type is a time interval (i.e. DataType::Interval), + // we need special handling since timestamp differencing results in a + // Duration type. + (DataType::Timestamp(..), DataType::Interval(_)) => { + propagate_time_interval_at_right( + left_child, + right_child, + parent, + op, + &inverse_op, + ) + } + (DataType::Interval(_), DataType::Timestamp(..)) => { + propagate_time_interval_at_left( + left_child, + right_child, + parent, + op, + &inverse_op, + ) + } + _ => { + // First, propagate to the left: + match apply_operator(&inverse_op, parent, right_child)? + .intersect(left_child)? + { + // Left is feasible: + Some(value) => Ok( + // Propagate to the right using the new left. + propagate_right(&value, parent, right_child, op, &inverse_op)? + .map(|right| (value, right)), + ), + // If the left child is infeasible, short-circuit. + None => Ok(None), + } + } + } +} + +/// This function refines intervals `left_child` and `right_child` by applying +/// comparison propagation through `parent` via operation. The main idea is +/// that we can shrink ranges of variables x and y using parent interval p. +/// Two intervals can be ordered in 6 ways for a Gt `>` operator: +/// ```text +/// (1): Infeasible, short-circuit +/// left: | ================ | +/// right: | ======================== | +/// +/// (2): Update both interval +/// left: | ====================== | +/// right: | ====================== | +/// | +/// V +/// left: | ======= | +/// right: | ======= | +/// +/// (3): Update left interval +/// left: | ============================== | +/// right: | ========== | +/// | +/// V +/// left: | ===================== | +/// right: | ========== | +/// +/// (4): Update right interval +/// left: | ========== | +/// right: | =========================== | +/// | +/// V +/// left: | ========== | +/// right | ================== | +/// +/// (5): No change +/// left: | ============================ | +/// right: | =================== | +/// +/// (6): No change +/// left: | ==================== | +/// right: | =============== | +/// +/// -inf --------------------------------------------------------------- +inf +/// ``` +pub fn propagate_comparison( + op: &Operator, + parent: &Interval, + left_child: &Interval, + right_child: &Interval, +) -> Result> { + if parent == &Interval::CERTAINLY_TRUE { + match op { + Operator::Eq => left_child.intersect(right_child).map(|result| { + result.map(|intersection| (intersection.clone(), intersection)) + }), + Operator::Gt => satisfy_greater(left_child, right_child, true), + Operator::GtEq => satisfy_greater(left_child, right_child, false), + Operator::Lt => satisfy_greater(right_child, left_child, true) + .map(|t| t.map(reverse_tuple)), + Operator::LtEq => satisfy_greater(right_child, left_child, false) + .map(|t| t.map(reverse_tuple)), + _ => internal_err!( + "The operator must be a comparison operator to propagate intervals" + ), + } + } else if parent == &Interval::CERTAINLY_FALSE { + match op { + Operator::Eq => { + // TODO: Propagation is not possible until we support interval sets. + Ok(None) + } + Operator::Gt => satisfy_greater(right_child, left_child, false), + Operator::GtEq => satisfy_greater(right_child, left_child, true), + Operator::Lt => satisfy_greater(left_child, right_child, false) + .map(|t| t.map(reverse_tuple)), + Operator::LtEq => satisfy_greater(left_child, right_child, true) + .map(|t| t.map(reverse_tuple)), + _ => internal_err!( + "The operator must be a comparison operator to propagate intervals" + ), + } + } else { + // Uncertainty cannot change any end-point of the intervals. + Ok(None) + } +} + +/// During the propagation of [`Interval`] values on an ExprIntervalGraph, +/// if there exists a `timestamp - timestamp` operation, the result would be +/// of type `Duration`. However, we may encounter a situation where a time interval +/// is involved in an arithmetic operation with a `Duration` type. This function +/// offers special handling for such cases, where the time interval resides on +/// the right side of the operation. +fn propagate_time_interval_at_right( + left_child: &Interval, + right_child: &Interval, + parent: &Interval, + op: &Operator, + inverse_op: &Operator, +) -> Result> { + // We check if the child's time interval(s) has a non-zero month or day field(s). + // If so, we return it as is without propagating. Otherwise, we first convert + // the time intervals to the `Duration` type, then propagate, and then convert + // the bounds to time intervals again. + let result = if let Some(duration) = convert_interval_type_to_duration(right_child) { + match apply_operator(inverse_op, parent, &duration)?.intersect(left_child)? { + Some(value) => { + propagate_right(left_child, parent, &duration, op, inverse_op)? + .and_then(|right| convert_duration_type_to_interval(&right)) + .map(|right| (value, right)) + } + None => None, + } + } else { + apply_operator(inverse_op, parent, right_child)? + .intersect(left_child)? + .map(|value| (value, right_child.clone())) + }; + Ok(result) +} + +/// This is a subfunction of the `propagate_arithmetic` function that propagates to the right child. +fn propagate_right( + left: &Interval, + parent: &Interval, + right: &Interval, + op: &Operator, + inverse_op: &Operator, +) -> Result> { + match op { + Operator::Minus => apply_operator(op, left, parent), + Operator::Plus => apply_operator(inverse_op, parent, left), + Operator::Divide => apply_operator(op, left, parent), + Operator::Multiply => apply_operator(inverse_op, parent, left), + _ => internal_err!("Interval arithmetic does not support the operator {}", op), + }? + .intersect(right) +} + +/// During the propagation of [`Interval`] values on an ExprIntervalGraph, +/// if there exists a `timestamp - timestamp` operation, the result would be +/// of type `Duration`. However, we may encounter a situation where a time interval +/// is involved in an arithmetic operation with a `Duration` type. This function +/// offers special handling for such cases, where the time interval resides on +/// the left side of the operation. +fn propagate_time_interval_at_left( + left_child: &Interval, + right_child: &Interval, + parent: &Interval, + op: &Operator, + inverse_op: &Operator, +) -> Result> { + // We check if the child's time interval(s) has a non-zero month or day field(s). + // If so, we return it as is without propagating. Otherwise, we first convert + // the time intervals to the `Duration` type, then propagate, and then convert + // the bounds to time intervals again. + let result = if let Some(duration) = convert_interval_type_to_duration(left_child) { + match apply_operator(inverse_op, parent, right_child)?.intersect(duration)? { + Some(value) => { + let left = convert_duration_type_to_interval(&value); + let right = propagate_right(&value, parent, right_child, op, inverse_op)?; + match (left, right) { + (Some(left), Some(right)) => Some((left, right)), + _ => None, + } + } + None => None, + } + } else { + propagate_right(left_child, parent, right_child, op, inverse_op)? + .map(|right| (left_child.clone(), right)) + }; + Ok(result) +} + +fn reverse_tuple((first, second): (T, U)) -> (U, T) { + (second, first) +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::TimeUnit; + use datafusion_common::ScalarValue; + + use super::*; + + #[test] + fn test_propagate_comparison() -> Result<()> { + // In the examples below: + // `left` is unbounded: [?, ?], + // `right` is known to be [1000,1000] + // so `left` < `right` results in no new knowledge of `right` but knowing that `left` is now < 1000:` [?, 999] + let left = Interval::make_unbounded(&DataType::Int64)?; + let right = Interval::make(Some(1000_i64), Some(1000_i64))?; + assert_eq!( + (Some(( + Interval::make(None, Some(999_i64))?, + Interval::make(Some(1000_i64), Some(1000_i64))?, + ))), + propagate_comparison( + &Operator::Lt, + &Interval::CERTAINLY_TRUE, + &left, + &right + )? + ); + + let left = + Interval::make_unbounded(&DataType::Timestamp(TimeUnit::Nanosecond, None))?; + let right = Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), None), + ScalarValue::TimestampNanosecond(Some(1000), None), + )?; + assert_eq!( + (Some(( + Interval::try_new( + ScalarValue::try_from(&DataType::Timestamp( + TimeUnit::Nanosecond, + None + )) + .unwrap(), + ScalarValue::TimestampNanosecond(Some(999), None), + )?, + Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), None), + ScalarValue::TimestampNanosecond(Some(1000), None), + )? + ))), + propagate_comparison( + &Operator::Lt, + &Interval::CERTAINLY_TRUE, + &left, + &right + )? + ); + + let left = Interval::make_unbounded(&DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+05:00".into()), + ))?; + let right = Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + )?; + assert_eq!( + (Some(( + Interval::try_new( + ScalarValue::try_from(&DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+05:00".into()), + )) + .unwrap(), + ScalarValue::TimestampNanosecond(Some(999), Some("+05:00".into())), + )?, + Interval::try_new( + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + )? + ))), + propagate_comparison( + &Operator::Lt, + &Interval::CERTAINLY_TRUE, + &left, + &right + )? + ); + + Ok(()) + } +} diff --git a/datafusion/physical-expr-common/src/intervals/mod.rs b/datafusion/physical-expr-common/src/intervals/mod.rs new file mode 100644 index 000000000000..7022bf2c42b9 --- /dev/null +++ b/datafusion/physical-expr-common/src/intervals/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Interval arithmetic and constraint propagation library + +pub mod cp_solver; +pub mod utils; diff --git a/datafusion/physical-expr-common/src/intervals/utils.rs b/datafusion/physical-expr-common/src/intervals/utils.rs new file mode 100644 index 000000000000..7e8fad259a96 --- /dev/null +++ b/datafusion/physical-expr-common/src/intervals/utils.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility functions for the interval arithmetic library + +use datafusion_common::{internal_datafusion_err, internal_err, Result, ScalarValue}; +use datafusion_expr::{interval_arithmetic::Interval, Operator}; + +const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000; +const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF; +const DT_MS_MASK: i64 = 0xFFFF_FFFF; + +// This function returns the inverse operator of the given operator. +pub fn get_inverse_op(op: Operator) -> Result { + match op { + Operator::Plus => Ok(Operator::Minus), + Operator::Minus => Ok(Operator::Plus), + Operator::Multiply => Ok(Operator::Divide), + Operator::Divide => Ok(Operator::Multiply), + _ => internal_err!("Interval arithmetic does not support the operator {}", op), + } +} + +/// Converts an [`Interval`] of time intervals to one of `Duration`s, if applicable. Otherwise, returns [`None`]. +pub fn convert_interval_type_to_duration(interval: &Interval) -> Option { + if let (Some(lower), Some(upper)) = ( + convert_interval_bound_to_duration(interval.lower()), + convert_interval_bound_to_duration(interval.upper()), + ) { + Interval::try_new(lower, upper).ok() + } else { + None + } +} + +/// Converts an [`ScalarValue`] containing a time interval to one containing a `Duration`, if applicable. Otherwise, returns [`None`]. +fn convert_interval_bound_to_duration( + interval_bound: &ScalarValue, +) -> Option { + match interval_bound { + ScalarValue::IntervalMonthDayNano(Some(mdn)) => interval_mdn_to_duration_ns(mdn) + .ok() + .map(|duration| ScalarValue::DurationNanosecond(Some(duration))), + ScalarValue::IntervalDayTime(Some(dt)) => interval_dt_to_duration_ms(dt) + .ok() + .map(|duration| ScalarValue::DurationMillisecond(Some(duration))), + _ => None, + } +} + +/// If both the month and day fields of [`ScalarValue::IntervalMonthDayNano`] are zero, this function returns the nanoseconds part. +/// Otherwise, it returns an error. +fn interval_mdn_to_duration_ns(mdn: &i128) -> Result { + let months = mdn >> 96; + let days = (mdn & MDN_DAY_MASK) >> 64; + let nanoseconds = mdn & MDN_NS_MASK; + + if months == 0 && days == 0 { + nanoseconds + .try_into() + .map_err(|_| internal_datafusion_err!("Resulting duration exceeds i64::MAX")) + } else { + internal_err!( + "The interval cannot have a non-zero month or day value for duration convertibility" + ) + } +} + +/// If the day field of the [`ScalarValue::IntervalDayTime`] is zero, this function returns the milliseconds part. +/// Otherwise, it returns an error. +fn interval_dt_to_duration_ms(dt: &i64) -> Result { + let days = dt >> 32; + let milliseconds = dt & DT_MS_MASK; + + if days == 0 { + Ok(milliseconds) + } else { + internal_err!( + "The interval cannot have a non-zero day value for duration convertibility" + ) + } +} + +/// Converts an [`Interval`] of `Duration`s to one of time intervals, if applicable. Otherwise, returns [`None`]. +pub fn convert_duration_type_to_interval(interval: &Interval) -> Option { + if let (Some(lower), Some(upper)) = ( + convert_duration_bound_to_interval(interval.lower()), + convert_duration_bound_to_interval(interval.upper()), + ) { + Interval::try_new(lower, upper).ok() + } else { + None + } +} + +/// Converts a [`ScalarValue`] containing a `Duration` to one containing a time interval, if applicable. Otherwise, returns [`None`]. +fn convert_duration_bound_to_interval( + interval_bound: &ScalarValue, +) -> Option { + match interval_bound { + ScalarValue::DurationNanosecond(Some(duration)) => { + Some(ScalarValue::new_interval_mdn(0, 0, *duration)) + } + ScalarValue::DurationMicrosecond(Some(duration)) => { + Some(ScalarValue::new_interval_mdn(0, 0, *duration * 1000)) + } + ScalarValue::DurationMillisecond(Some(duration)) => { + Some(ScalarValue::new_interval_dt(0, *duration as i32)) + } + ScalarValue::DurationSecond(Some(duration)) => { + Some(ScalarValue::new_interval_dt(0, *duration as i32 * 1000)) + } + _ => None, + } +} diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs index 53e3134a1b05..de3b61c035ad 100644 --- a/datafusion/physical-expr-common/src/lib.rs +++ b/datafusion/physical-expr-common/src/lib.rs @@ -17,6 +17,7 @@ pub mod aggregate; pub mod expressions; +pub mod intervals; pub mod physical_expr; pub mod sort_expr; pub mod sort_properties; diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index be6358e73c99..184f78c29a0b 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -25,10 +25,15 @@ use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::utils::DataPtr; -use datafusion_common::{internal_err, not_impl_err, Result}; +use datafusion_common::{internal_err, not_impl_err, DFSchema, Result}; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::expr::Alias; use datafusion_expr::interval_arithmetic::Interval; -use datafusion_expr::ColumnarValue; +use datafusion_expr::{BinaryExpr, ColumnarValue, Expr}; +use crate::expressions::binary::binary; +use crate::expressions::column::Column; +use crate::expressions::literal::Literal; use crate::sort_properties::SortProperties; use crate::utils::scatter; @@ -209,3 +214,359 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { any } } + +/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1 +/// AS int)`. +/// +/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical +/// planning, and can be evaluated directly on a [RecordBatch]. They are +/// normally created from [Expr] by a [PhysicalPlanner] and can be created +/// directly using [create_physical_expr]. +/// +/// A Physical expression knows its type, nullability and how to evaluate itself. +/// +/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html +/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html +/// +/// # Example: Create `PhysicalExpr` from `Expr` +/// ``` +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::DFSchema; +/// # use datafusion_expr::{Expr, col, lit}; +/// # use datafusion_physical_expr_common::physical_expr::create_physical_expr; +/// # use datafusion_expr::execution_props::ExecutionProps; +/// // For a logical expression `a = 1`, we can create a physical expression +/// let expr = col("a").eq(lit(1)); +/// // To create a PhysicalExpr we need 1. a schema +/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); +/// let df_schema = DFSchema::try_from(schema).unwrap(); +/// // 2. ExecutionProps +/// let props = ExecutionProps::new(); +/// // We can now create a PhysicalExpr: +/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); +/// ``` +/// +/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue] +/// ``` +/// # use std::sync::Arc; +/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch}; +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_common::{assert_batches_eq, DFSchema}; +/// # use datafusion_expr::{Expr, col, lit, ColumnarValue}; +/// # use datafusion_physical_expr_common::physical_expr::create_physical_expr; +/// # use datafusion_expr::execution_props::ExecutionProps; +/// # let expr = col("a").eq(lit(1)); +/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); +/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap(); +/// # let props = ExecutionProps::new(); +/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this: +/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); +/// // Input of [1,2,3] +/// let input_batch = RecordBatch::try_from_iter(vec![ +/// ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _) +/// ]).unwrap(); +/// // The result is a ColumnarValue (either an Array or a Scalar) +/// let result = physical_expr.evaluate(&input_batch).unwrap(); +/// // In this case, a BooleanArray with the result of the comparison +/// let ColumnarValue::Array(arr) = result else { +/// panic!("Expected an array") +/// }; +/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false])); +/// ``` +/// +/// [ColumnarValue]: datafusion_expr::ColumnarValue +/// +/// Create a physical expression from a logical expression ([Expr]). +/// +/// # Arguments +/// +/// * `e` - The logical expression +/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references +/// to qualified or unqualified fields by name. +#[allow(clippy::only_used_in_recursion)] +pub fn create_physical_expr( + e: &Expr, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result> { + let input_schema: &Schema = &input_dfschema.into(); + + match e { + Expr::Alias(Alias { expr, .. }) => { + Ok(create_physical_expr(expr, input_dfschema, execution_props)?) + } + Expr::Column(c) => { + let idx = input_dfschema.index_of_column(c)?; + Ok(Arc::new(Column::new(&c.name, idx))) + } + Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), + // Expr::ScalarVariable(_, variable_names) => { + // if is_system_variables(variable_names) { + // match execution_props.get_var_provider(VarType::System) { + // Some(provider) => { + // let scalar_value = provider.get_value(variable_names.clone())?; + // Ok(Arc::new(Literal::new(scalar_value))) + // } + // _ => plan_err!("No system variable provider found"), + // } + // } else { + // match execution_props.get_var_provider(VarType::UserDefined) { + // Some(provider) => { + // let scalar_value = provider.get_value(variable_names.clone())?; + // Ok(Arc::new(Literal::new(scalar_value))) + // } + // _ => plan_err!("No user defined variable provider found"), + // } + // } + // } + // Expr::IsTrue(expr) => { + // let binary_op = binary_expr( + // expr.as_ref().clone(), + // Operator::IsNotDistinctFrom, + // Expr::Literal(ScalarValue::Boolean(Some(true))), + // ); + // create_physical_expr(&binary_op, input_dfschema, execution_props) + // } + // Expr::IsNotTrue(expr) => { + // let binary_op = binary_expr( + // expr.as_ref().clone(), + // Operator::IsDistinctFrom, + // Expr::Literal(ScalarValue::Boolean(Some(true))), + // ); + // create_physical_expr(&binary_op, input_dfschema, execution_props) + // } + // Expr::IsFalse(expr) => { + // let binary_op = binary_expr( + // expr.as_ref().clone(), + // Operator::IsNotDistinctFrom, + // Expr::Literal(ScalarValue::Boolean(Some(false))), + // ); + // create_physical_expr(&binary_op, input_dfschema, execution_props) + // } + // Expr::IsNotFalse(expr) => { + // let binary_op = binary_expr( + // expr.as_ref().clone(), + // Operator::IsDistinctFrom, + // Expr::Literal(ScalarValue::Boolean(Some(false))), + // ); + // create_physical_expr(&binary_op, input_dfschema, execution_props) + // } + // Expr::IsUnknown(expr) => { + // let binary_op = binary_expr( + // expr.as_ref().clone(), + // Operator::IsNotDistinctFrom, + // Expr::Literal(ScalarValue::Boolean(None)), + // ); + // create_physical_expr(&binary_op, input_dfschema, execution_props) + // } + // Expr::IsNotUnknown(expr) => { + // let binary_op = binary_expr( + // expr.as_ref().clone(), + // Operator::IsDistinctFrom, + // Expr::Literal(ScalarValue::Boolean(None)), + // ); + // create_physical_expr(&binary_op, input_dfschema, execution_props) + // } + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { + // Create physical expressions for left and right operands + let lhs = create_physical_expr(left, input_dfschema, execution_props)?; + let rhs = create_physical_expr(right, input_dfschema, execution_props)?; + // Note that the logical planner is responsible + // for type coercion on the arguments (e.g. if one + // argument was originally Int32 and one was + // Int64 they will both be coerced to Int64). + // + // There should be no coercion during physical + // planning. + binary(lhs, *op, rhs, input_schema) + } + // Expr::Like(Like { + // negated, + // expr, + // pattern, + // escape_char, + // case_insensitive, + // }) => { + // if escape_char.is_some() { + // return exec_err!("LIKE does not support escape_char"); + // } + // let physical_expr = + // create_physical_expr(expr, input_dfschema, execution_props)?; + // let physical_pattern = + // create_physical_expr(pattern, input_dfschema, execution_props)?; + // like( + // *negated, + // *case_insensitive, + // physical_expr, + // physical_pattern, + // input_schema, + // ) + // } + // Expr::Case(case) => { + // let expr: Option> = if let Some(e) = &case.expr { + // Some(create_physical_expr( + // e.as_ref(), + // input_dfschema, + // execution_props, + // )?) + // } else { + // None + // }; + // let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case + // .when_then_expr + // .iter() + // .map(|(w, t)| (w.as_ref(), t.as_ref())) + // .unzip(); + // let when_expr = + // create_physical_exprs(when_expr, input_dfschema, execution_props)?; + // let then_expr = + // create_physical_exprs(then_expr, input_dfschema, execution_props)?; + // let when_then_expr: Vec<(Arc, Arc)> = + // when_expr + // .iter() + // .zip(then_expr.iter()) + // .map(|(w, t)| (w.clone(), t.clone())) + // .collect(); + // let else_expr: Option> = + // if let Some(e) = &case.else_expr { + // Some(create_physical_expr( + // e.as_ref(), + // input_dfschema, + // execution_props, + // )?) + // } else { + // None + // }; + // Ok(expressions::case(expr, when_then_expr, else_expr)?) + // } + // Expr::Cast(Cast { expr, data_type }) => expressions::cast( + // create_physical_expr(expr, input_dfschema, execution_props)?, + // input_schema, + // data_type.clone(), + // ), + // Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast( + // create_physical_expr(expr, input_dfschema, execution_props)?, + // input_schema, + // data_type.clone(), + // ), + // Expr::Not(expr) => { + // expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?) + // } + // Expr::Negative(expr) => expressions::negative( + // create_physical_expr(expr, input_dfschema, execution_props)?, + // input_schema, + // ), + // Expr::IsNull(expr) => expressions::is_null(create_physical_expr( + // expr, + // input_dfschema, + // execution_props, + // )?), + // Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr( + // expr, + // input_dfschema, + // execution_props, + // )?), + // Expr::GetIndexedField(GetIndexedField { expr: _, field }) => match field { + // GetFieldAccess::NamedStructField { name: _ } => { + // internal_err!( + // "NamedStructField should be rewritten in OperatorToFunction" + // ) + // } + // GetFieldAccess::ListIndex { key: _ } => { + // internal_err!("ListIndex should be rewritten in OperatorToFunction") + // } + // GetFieldAccess::ListRange { + // start: _, + // stop: _, + // stride: _, + // } => { + // internal_err!("ListRange should be rewritten in OperatorToFunction") + // } + // }, + + // Expr::ScalarFunction(ScalarFunction { func_def, args }) => { + // let physical_args = + // create_physical_exprs(args, input_dfschema, execution_props)?; + + // match func_def { + // ScalarFunctionDefinition::BuiltIn(fun) => { + // functions::create_builtin_physical_expr( + // fun, + // &physical_args, + // input_schema, + // execution_props, + // ) + // } + // ScalarFunctionDefinition::UDF(fun) => udf::create_physical_expr( + // fun.clone().as_ref(), + // &physical_args, + // input_schema, + // args, + // input_dfschema, + // ), + // ScalarFunctionDefinition::Name(_) => { + // internal_err!("Function `Expr` with name should be resolved.") + // } + // } + // } + // Expr::Between(Between { + // expr, + // negated, + // low, + // high, + // }) => { + // let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?; + // let low_expr = create_physical_expr(low, input_dfschema, execution_props)?; + // let high_expr = create_physical_expr(high, input_dfschema, execution_props)?; + + // // rewrite the between into the two binary operators + // let binary_expr = binary( + // binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?, + // Operator::And, + // binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?, + // input_schema, + // ); + + // if *negated { + // expressions::not(binary_expr?) + // } else { + // binary_expr + // } + // } + // Expr::InList(InList { + // expr, + // list, + // negated, + // }) => match expr.as_ref() { + // Expr::Literal(ScalarValue::Utf8(None)) => { + // Ok(expressions::lit(ScalarValue::Boolean(None))) + // } + // _ => { + // let value_expr = + // create_physical_expr(expr, input_dfschema, execution_props)?; + + // let list_exprs = + // create_physical_exprs(list, input_dfschema, execution_props)?; + // expressions::in_list(value_expr, list_exprs, negated, input_schema) + // } + // }, + other => { + not_impl_err!("Physical plan does not support logical expression {other:?}") + } + } +} + +/// Create vector of Physical Expression from a vector of logical expression +pub fn create_physical_exprs<'a, I>( + exprs: I, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result>> +where + I: IntoIterator, +{ + exprs + .into_iter() + .map(|expr| create_physical_expr(expr, input_dfschema, execution_props)) + .collect::>>() +} diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils/mod.rs similarity index 100% rename from datafusion/physical-expr-common/src/utils.rs rename to datafusion/physical-expr-common/src/utils/mod.rs diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs index 6e0beeb0beea..c219dea83617 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -20,11 +20,11 @@ use std::{any::Any, sync::Arc}; use crate::{physical_expr::down_cast_any_ref, PhysicalExpr}; -use crate::expressions::datum::apply_cmp; use arrow::record_batch::RecordBatch; use arrow_schema::{DataType, Schema}; use datafusion_common::{internal_err, Result}; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::expressions::datum::apply_cmp; // Like expression #[derive(Debug, Hash)] diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 688d5ce6eabf..1d26c9bd806a 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -17,21 +17,16 @@ //! Defines physical expressions that can evaluated at runtime during query execution -#[macro_use] -mod binary; mod case; mod cast; mod column; -mod datum; mod in_list; mod is_not_null; mod is_null; mod like; -mod literal; mod negative; mod no_op; mod not; -mod try_cast; /// Module with some convenient methods used in expression building pub mod helpers { @@ -79,21 +74,21 @@ pub use datafusion_functions_aggregate::first_last::{ FirstValuePhysicalExpr as FirstValue, LastValuePhysicalExpr as LastValue, }; -pub use binary::{binary, BinaryExpr}; pub use case::{case, CaseExpr}; pub use cast::{cast, cast_with_options, CastExpr}; pub use column::UnKnownColumn; pub use datafusion_expr::utils::format_state_name; +pub use datafusion_physical_expr_common::expressions::binary::{binary, BinaryExpr}; pub use datafusion_physical_expr_common::expressions::column::{col, Column}; +pub use datafusion_physical_expr_common::expressions::literal::{lit, Literal}; +pub use datafusion_physical_expr_common::expressions::try_cast::{try_cast, TryCastExpr}; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; pub use like::{like, LikeExpr}; -pub use literal::{lit, Literal}; pub use negative::{negative, NegativeExpr}; pub use no_op::NoOp; pub use not::{not, NotExpr}; -pub use try_cast::{try_cast, TryCastExpr}; #[cfg(test)] pub(crate) mod tests { diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index 0c25e26d17aa..980bd5b235e5 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -21,23 +21,23 @@ use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::sync::Arc; -use super::utils::{ - convert_duration_type_to_interval, convert_interval_type_to_duration, get_inverse_op, -}; use crate::expressions::Literal; use crate::utils::{build_dag, ExprTreeNode}; use crate::PhysicalExpr; use arrow_schema::{DataType, Schema}; -use datafusion_common::{internal_err, Result}; -use datafusion_expr::interval_arithmetic::{apply_operator, satisfy_greater, Interval}; -use datafusion_expr::Operator; +use datafusion_common::Result; +use datafusion_expr::interval_arithmetic::Interval; use petgraph::graph::NodeIndex; use petgraph::stable_graph::{DefaultIx, StableGraph}; use petgraph::visit::{Bfs, Dfs, DfsPostOrder, EdgeRef}; use petgraph::Outgoing; +pub use datafusion_physical_expr_common::intervals::cp_solver::{ + propagate_arithmetic, propagate_comparison, +}; + // Interval arithmetic provides a way to perform mathematical operations on // intervals, which represent a range of possible values rather than a single // point value. This allows for the propagation of ranges through mathematical @@ -198,157 +198,6 @@ impl PartialEq for ExprIntervalGraphNode { } } -/// This function refines intervals `left_child` and `right_child` by applying -/// constraint propagation through `parent` via operation. The main idea is -/// that we can shrink ranges of variables x and y using parent interval p. -/// -/// Assuming that x,y and p has ranges [xL, xU], [yL, yU], and [pL, pU], we -/// apply the following operations: -/// - For plus operation, specifically, we would first do -/// - [xL, xU] <- ([pL, pU] - [yL, yU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([pL, pU] - [xL, xU]) ∩ [yL, yU]. -/// - For minus operation, specifically, we would first do -/// - [xL, xU] <- ([yL, yU] + [pL, pU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([xL, xU] - [pL, pU]) ∩ [yL, yU]. -/// - For multiplication operation, specifically, we would first do -/// - [xL, xU] <- ([pL, pU] / [yL, yU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([pL, pU] / [xL, xU]) ∩ [yL, yU]. -/// - For division operation, specifically, we would first do -/// - [xL, xU] <- ([yL, yU] * [pL, pU]) ∩ [xL, xU], and then -/// - [yL, yU] <- ([xL, xU] / [pL, pU]) ∩ [yL, yU]. -pub fn propagate_arithmetic( - op: &Operator, - parent: &Interval, - left_child: &Interval, - right_child: &Interval, -) -> Result> { - let inverse_op = get_inverse_op(*op)?; - match (left_child.data_type(), right_child.data_type()) { - // If we have a child whose type is a time interval (i.e. DataType::Interval), - // we need special handling since timestamp differencing results in a - // Duration type. - (DataType::Timestamp(..), DataType::Interval(_)) => { - propagate_time_interval_at_right( - left_child, - right_child, - parent, - op, - &inverse_op, - ) - } - (DataType::Interval(_), DataType::Timestamp(..)) => { - propagate_time_interval_at_left( - left_child, - right_child, - parent, - op, - &inverse_op, - ) - } - _ => { - // First, propagate to the left: - match apply_operator(&inverse_op, parent, right_child)? - .intersect(left_child)? - { - // Left is feasible: - Some(value) => Ok( - // Propagate to the right using the new left. - propagate_right(&value, parent, right_child, op, &inverse_op)? - .map(|right| (value, right)), - ), - // If the left child is infeasible, short-circuit. - None => Ok(None), - } - } - } -} - -/// This function refines intervals `left_child` and `right_child` by applying -/// comparison propagation through `parent` via operation. The main idea is -/// that we can shrink ranges of variables x and y using parent interval p. -/// Two intervals can be ordered in 6 ways for a Gt `>` operator: -/// ```text -/// (1): Infeasible, short-circuit -/// left: | ================ | -/// right: | ======================== | -/// -/// (2): Update both interval -/// left: | ====================== | -/// right: | ====================== | -/// | -/// V -/// left: | ======= | -/// right: | ======= | -/// -/// (3): Update left interval -/// left: | ============================== | -/// right: | ========== | -/// | -/// V -/// left: | ===================== | -/// right: | ========== | -/// -/// (4): Update right interval -/// left: | ========== | -/// right: | =========================== | -/// | -/// V -/// left: | ========== | -/// right | ================== | -/// -/// (5): No change -/// left: | ============================ | -/// right: | =================== | -/// -/// (6): No change -/// left: | ==================== | -/// right: | =============== | -/// -/// -inf --------------------------------------------------------------- +inf -/// ``` -pub fn propagate_comparison( - op: &Operator, - parent: &Interval, - left_child: &Interval, - right_child: &Interval, -) -> Result> { - if parent == &Interval::CERTAINLY_TRUE { - match op { - Operator::Eq => left_child.intersect(right_child).map(|result| { - result.map(|intersection| (intersection.clone(), intersection)) - }), - Operator::Gt => satisfy_greater(left_child, right_child, true), - Operator::GtEq => satisfy_greater(left_child, right_child, false), - Operator::Lt => satisfy_greater(right_child, left_child, true) - .map(|t| t.map(reverse_tuple)), - Operator::LtEq => satisfy_greater(right_child, left_child, false) - .map(|t| t.map(reverse_tuple)), - _ => internal_err!( - "The operator must be a comparison operator to propagate intervals" - ), - } - } else if parent == &Interval::CERTAINLY_FALSE { - match op { - Operator::Eq => { - // TODO: Propagation is not possible until we support interval sets. - Ok(None) - } - Operator::Gt => satisfy_greater(right_child, left_child, false), - Operator::GtEq => satisfy_greater(right_child, left_child, true), - Operator::Lt => satisfy_greater(left_child, right_child, false) - .map(|t| t.map(reverse_tuple)), - Operator::LtEq => satisfy_greater(left_child, right_child, true) - .map(|t| t.map(reverse_tuple)), - _ => internal_err!( - "The operator must be a comparison operator to propagate intervals" - ), - } - } else { - // Uncertainty cannot change any end-point of the intervals. - Ok(None) - } -} - impl ExprIntervalGraph { pub fn try_new(expr: Arc, schema: &Schema) -> Result { // Build the full graph: @@ -624,107 +473,15 @@ impl ExprIntervalGraph { } } -/// This is a subfunction of the `propagate_arithmetic` function that propagates to the right child. -fn propagate_right( - left: &Interval, - parent: &Interval, - right: &Interval, - op: &Operator, - inverse_op: &Operator, -) -> Result> { - match op { - Operator::Minus => apply_operator(op, left, parent), - Operator::Plus => apply_operator(inverse_op, parent, left), - Operator::Divide => apply_operator(op, left, parent), - Operator::Multiply => apply_operator(inverse_op, parent, left), - _ => internal_err!("Interval arithmetic does not support the operator {}", op), - }? - .intersect(right) -} - -/// During the propagation of [`Interval`] values on an [`ExprIntervalGraph`], -/// if there exists a `timestamp - timestamp` operation, the result would be -/// of type `Duration`. However, we may encounter a situation where a time interval -/// is involved in an arithmetic operation with a `Duration` type. This function -/// offers special handling for such cases, where the time interval resides on -/// the left side of the operation. -fn propagate_time_interval_at_left( - left_child: &Interval, - right_child: &Interval, - parent: &Interval, - op: &Operator, - inverse_op: &Operator, -) -> Result> { - // We check if the child's time interval(s) has a non-zero month or day field(s). - // If so, we return it as is without propagating. Otherwise, we first convert - // the time intervals to the `Duration` type, then propagate, and then convert - // the bounds to time intervals again. - let result = if let Some(duration) = convert_interval_type_to_duration(left_child) { - match apply_operator(inverse_op, parent, right_child)?.intersect(duration)? { - Some(value) => { - let left = convert_duration_type_to_interval(&value); - let right = propagate_right(&value, parent, right_child, op, inverse_op)?; - match (left, right) { - (Some(left), Some(right)) => Some((left, right)), - _ => None, - } - } - None => None, - } - } else { - propagate_right(left_child, parent, right_child, op, inverse_op)? - .map(|right| (left_child.clone(), right)) - }; - Ok(result) -} - -/// During the propagation of [`Interval`] values on an [`ExprIntervalGraph`], -/// if there exists a `timestamp - timestamp` operation, the result would be -/// of type `Duration`. However, we may encounter a situation where a time interval -/// is involved in an arithmetic operation with a `Duration` type. This function -/// offers special handling for such cases, where the time interval resides on -/// the right side of the operation. -fn propagate_time_interval_at_right( - left_child: &Interval, - right_child: &Interval, - parent: &Interval, - op: &Operator, - inverse_op: &Operator, -) -> Result> { - // We check if the child's time interval(s) has a non-zero month or day field(s). - // If so, we return it as is without propagating. Otherwise, we first convert - // the time intervals to the `Duration` type, then propagate, and then convert - // the bounds to time intervals again. - let result = if let Some(duration) = convert_interval_type_to_duration(right_child) { - match apply_operator(inverse_op, parent, &duration)?.intersect(left_child)? { - Some(value) => { - propagate_right(left_child, parent, &duration, op, inverse_op)? - .and_then(|right| convert_duration_type_to_interval(&right)) - .map(|right| (value, right)) - } - None => None, - } - } else { - apply_operator(inverse_op, parent, right_child)? - .intersect(left_child)? - .map(|value| (value, right_child.clone())) - }; - Ok(result) -} - -fn reverse_tuple((first, second): (T, U)) -> (U, T) { - (second, first) -} - #[cfg(test)] mod tests { use super::*; use crate::expressions::{BinaryExpr, Column}; use crate::intervals::test_utils::gen_conjunctive_numerical_expr; - use arrow::datatypes::TimeUnit; use arrow_schema::Field; use datafusion_common::ScalarValue; + use datafusion_expr::Operator; use itertools::Itertools; use rand::rngs::StdRng; @@ -1477,90 +1234,6 @@ mod tests { Ok(()) } - #[test] - fn test_propagate_comparison() -> Result<()> { - // In the examples below: - // `left` is unbounded: [?, ?], - // `right` is known to be [1000,1000] - // so `left` < `right` results in no new knowledge of `right` but knowing that `left` is now < 1000:` [?, 999] - let left = Interval::make_unbounded(&DataType::Int64)?; - let right = Interval::make(Some(1000_i64), Some(1000_i64))?; - assert_eq!( - (Some(( - Interval::make(None, Some(999_i64))?, - Interval::make(Some(1000_i64), Some(1000_i64))?, - ))), - propagate_comparison( - &Operator::Lt, - &Interval::CERTAINLY_TRUE, - &left, - &right - )? - ); - - let left = - Interval::make_unbounded(&DataType::Timestamp(TimeUnit::Nanosecond, None))?; - let right = Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), None), - ScalarValue::TimestampNanosecond(Some(1000), None), - )?; - assert_eq!( - (Some(( - Interval::try_new( - ScalarValue::try_from(&DataType::Timestamp( - TimeUnit::Nanosecond, - None - )) - .unwrap(), - ScalarValue::TimestampNanosecond(Some(999), None), - )?, - Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), None), - ScalarValue::TimestampNanosecond(Some(1000), None), - )? - ))), - propagate_comparison( - &Operator::Lt, - &Interval::CERTAINLY_TRUE, - &left, - &right - )? - ); - - let left = Interval::make_unbounded(&DataType::Timestamp( - TimeUnit::Nanosecond, - Some("+05:00".into()), - ))?; - let right = Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - )?; - assert_eq!( - (Some(( - Interval::try_new( - ScalarValue::try_from(&DataType::Timestamp( - TimeUnit::Nanosecond, - Some("+05:00".into()), - )) - .unwrap(), - ScalarValue::TimestampNanosecond(Some(999), Some("+05:00".into())), - )?, - Interval::try_new( - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), - )? - ))), - propagate_comparison( - &Operator::Lt, - &Interval::CERTAINLY_TRUE, - &left, - &right - )? - ); - - Ok(()) - } - #[test] fn test_propagate_or() -> Result<()> { let expr = Arc::new(BinaryExpr::new( diff --git a/datafusion/physical-expr/src/intervals/utils.rs b/datafusion/physical-expr/src/intervals/utils.rs index e188b2d56bae..ff7fd63126a6 100644 --- a/datafusion/physical-expr/src/intervals/utils.rs +++ b/datafusion/physical-expr/src/intervals/utils.rs @@ -25,14 +25,8 @@ use crate::{ }; use arrow_schema::{DataType, SchemaRef}; -use datafusion_common::{internal_datafusion_err, internal_err, Result, ScalarValue}; -use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::Operator; -const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000; -const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF; -const DT_MS_MASK: i64 = 0xFFFF_FFFF; - /// Indicates whether interval arithmetic is supported for the given expression. /// Currently, we do not support all [`PhysicalExpr`]s for interval calculations. /// We do not support every type of [`Operator`]s either. Over time, this check @@ -65,17 +59,6 @@ pub fn check_support(expr: &Arc, schema: &SchemaRef) -> bool { } } -// This function returns the inverse operator of the given operator. -pub fn get_inverse_op(op: Operator) -> Result { - match op { - Operator::Plus => Ok(Operator::Minus), - Operator::Minus => Ok(Operator::Plus), - Operator::Multiply => Ok(Operator::Divide), - Operator::Divide => Ok(Operator::Multiply), - _ => internal_err!("Interval arithmetic does not support the operator {}", op), - } -} - /// Indicates whether interval arithmetic is supported for the given operator. pub fn is_operator_supported(op: &Operator) -> bool { matches!( @@ -109,96 +92,3 @@ pub fn is_datatype_supported(data_type: &DataType) -> bool { | &DataType::Float32 ) } - -/// Converts an [`Interval`] of time intervals to one of `Duration`s, if applicable. Otherwise, returns [`None`]. -pub fn convert_interval_type_to_duration(interval: &Interval) -> Option { - if let (Some(lower), Some(upper)) = ( - convert_interval_bound_to_duration(interval.lower()), - convert_interval_bound_to_duration(interval.upper()), - ) { - Interval::try_new(lower, upper).ok() - } else { - None - } -} - -/// Converts an [`ScalarValue`] containing a time interval to one containing a `Duration`, if applicable. Otherwise, returns [`None`]. -fn convert_interval_bound_to_duration( - interval_bound: &ScalarValue, -) -> Option { - match interval_bound { - ScalarValue::IntervalMonthDayNano(Some(mdn)) => interval_mdn_to_duration_ns(mdn) - .ok() - .map(|duration| ScalarValue::DurationNanosecond(Some(duration))), - ScalarValue::IntervalDayTime(Some(dt)) => interval_dt_to_duration_ms(dt) - .ok() - .map(|duration| ScalarValue::DurationMillisecond(Some(duration))), - _ => None, - } -} - -/// Converts an [`Interval`] of `Duration`s to one of time intervals, if applicable. Otherwise, returns [`None`]. -pub fn convert_duration_type_to_interval(interval: &Interval) -> Option { - if let (Some(lower), Some(upper)) = ( - convert_duration_bound_to_interval(interval.lower()), - convert_duration_bound_to_interval(interval.upper()), - ) { - Interval::try_new(lower, upper).ok() - } else { - None - } -} - -/// Converts a [`ScalarValue`] containing a `Duration` to one containing a time interval, if applicable. Otherwise, returns [`None`]. -fn convert_duration_bound_to_interval( - interval_bound: &ScalarValue, -) -> Option { - match interval_bound { - ScalarValue::DurationNanosecond(Some(duration)) => { - Some(ScalarValue::new_interval_mdn(0, 0, *duration)) - } - ScalarValue::DurationMicrosecond(Some(duration)) => { - Some(ScalarValue::new_interval_mdn(0, 0, *duration * 1000)) - } - ScalarValue::DurationMillisecond(Some(duration)) => { - Some(ScalarValue::new_interval_dt(0, *duration as i32)) - } - ScalarValue::DurationSecond(Some(duration)) => { - Some(ScalarValue::new_interval_dt(0, *duration as i32 * 1000)) - } - _ => None, - } -} - -/// If both the month and day fields of [`ScalarValue::IntervalMonthDayNano`] are zero, this function returns the nanoseconds part. -/// Otherwise, it returns an error. -fn interval_mdn_to_duration_ns(mdn: &i128) -> Result { - let months = mdn >> 96; - let days = (mdn & MDN_DAY_MASK) >> 64; - let nanoseconds = mdn & MDN_NS_MASK; - - if months == 0 && days == 0 { - nanoseconds - .try_into() - .map_err(|_| internal_datafusion_err!("Resulting duration exceeds i64::MAX")) - } else { - internal_err!( - "The interval cannot have a non-zero month or day value for duration convertibility" - ) - } -} - -/// If the day field of the [`ScalarValue::IntervalDayTime`] is zero, this function returns the milliseconds part. -/// Otherwise, it returns an error. -fn interval_dt_to_duration_ms(dt: &i64) -> Result { - let days = dt >> 32; - let milliseconds = dt & DT_MS_MASK; - - if days == 0 { - Ok(milliseconds) - } else { - internal_err!( - "The interval cannot have a non-zero day value for duration convertibility" - ) - } -} diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index f46e5f6ec68f..75f5e040f801 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -109,6 +109,14 @@ pub fn create_physical_expr( input_dfschema: &DFSchema, execution_props: &ExecutionProps, ) -> Result> { + use datafusion_physical_expr_common::physical_expr::create_physical_expr as create_physical_expr_common; + + // PR #10074: Temporary solution, after all the logic is moved to common, we can remove this function + let res = create_physical_expr_common(e, input_dfschema, execution_props); + if res.is_ok() { + return res; + } + let input_schema: &Schema = &input_dfschema.into(); match e {