Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PhysicalExpr Orderings with Range Information #10504

Merged
merged 17 commits into from
May 17, 2024
Merged
28 changes: 12 additions & 16 deletions datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::{
arrow::{
array::{ArrayRef, Float32Array, Float64Array},
datatypes::DataType,
record_batch::RecordBatch,
},
logical_expr::Volatility,
};
use std::any::Any;
use std::sync::Arc;

use arrow::array::{new_null_array, Array, AsArray};
use arrow::array::{
new_null_array, Array, ArrayRef, AsArray, Float32Array, Float64Array,
};
use arrow::compute;
use arrow::datatypes::Float64Type;
use arrow::datatypes::{DataType, Float64Type};
use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::logical_expr::Volatility;
use datafusion::prelude::*;
use datafusion_common::{internal_err, ScalarValue};
use datafusion_expr::{
ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature,
};
use std::sync::Arc;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};

/// This example shows how to use the full ScalarUDFImpl API to implement a user
/// defined function. As in the `simple_udf.rs` example, this struct implements
Expand Down Expand Up @@ -186,8 +181,9 @@ impl ScalarUDFImpl for PowUdf {
&self.aliases
}

fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
Ok(Some(vec![Some(true)]))
fn monotonicity(&self, input: &[ExprProperties]) -> Result<SortProperties> {
// The POW function preserves the order of its argument.
Ok(input[0].sort_properties)
}
}

Expand Down
13 changes: 7 additions & 6 deletions datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::result::Result as RResult;
use std::sync::Arc;

use datafusion::error::Result;
use datafusion::execution::context::{
FunctionFactory, RegisterFunction, SessionContext, SessionState,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_expr::simplify::ExprSimplifyResult;
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{CreateFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature};
use std::result::Result as RResult;
use std::sync::Arc;

/// This example shows how to utilize [FunctionFactory] to implement simple
/// SQL-macro like functions using a `CREATE FUNCTION` statement. The same
Expand Down Expand Up @@ -156,8 +157,8 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
&[]
}

fn monotonicity(&self) -> Result<Option<datafusion_expr::FuncMonotonicity>> {
Ok(None)
fn monotonicity(&self, _input: &[ExprProperties]) -> Result<SortProperties> {
Ok(SortProperties::Unordered)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3572,7 +3572,11 @@ pub(crate) mod tests {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias = vec![("a".to_string(), "a".to_string())];
let alias = vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "b".to_string()),
("c".to_string(), "c".to_string()),
];
Copy link
Contributor Author

@berkaysynnada berkaysynnada May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test bug shows up, fixed here.

let plan = sort_preserving_merge_exec(
sort_key.clone(),
sort_exec(
Expand All @@ -3585,7 +3589,7 @@ pub(crate) mod tests {
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
// Since this projection is trivial, increasing parallelism is not beneficial
"ProjectionExec: expr=[a@0 as a]",
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use arrow_schema::Schema;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{internal_err, JoinSide, JoinType};
use datafusion_expr::sort_properties::SortProperties;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::sort_properties::SortProperties;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};

/// The [`JoinSelection`] rule tries to modify a given plan so that it can
Expand Down Expand Up @@ -561,7 +561,7 @@ fn hash_join_convert_symmetric_subrule(
let name = schema.field(*index).name();
let col = Arc::new(Column::new(name, *index)) as _;
// Check if the column is ordered.
equivalence.get_expr_ordering(col).data
equivalence.get_expr_properties(col).sort_properties
!= SortProperties::Unordered
},
)
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,6 @@ mod tests {
)),
],
DataType::Int32,
None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -1442,7 +1441,6 @@ mod tests {
)),
],
DataType::Int32,
None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 3))),
Expand Down Expand Up @@ -1511,7 +1509,6 @@ mod tests {
)),
],
DataType::Int32,
None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -1577,7 +1574,6 @@ mod tests {
)),
],
DataType::Int32,
None,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d_new", 3))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ mod sp_repartition_fuzz_tests {
config::SessionConfig, memory_pool::MemoryConsumer, SendableRecordBatchStream,
};
use datafusion_physical_expr::{
equivalence::{EquivalenceClass, EquivalenceProperties},
expressions::{col, Column},
EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
PhysicalExpr, PhysicalSortExpr,
};
use test_utils::add_empty_batches;

use datafusion_physical_expr::equivalence::EquivalenceClass;
use itertools::izip;
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};

Expand Down Expand Up @@ -78,7 +78,7 @@ mod sp_repartition_fuzz_tests {

let mut eq_properties = EquivalenceProperties::new(test_schema.clone());
// Define a and f are aliases
eq_properties.add_equal_conditions(col_a, col_f);
eq_properties.add_equal_conditions(col_a, col_f)?;
// Column e has constant value.
eq_properties = eq_properties.add_constants([col_e.clone()]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,27 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::sync::Arc;

use arrow::compute::kernels::numeric::add;
use arrow_array::{ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState};
use datafusion::prelude::*;
use datafusion::{execution::registry::FunctionRegistry, test_util};
use datafusion_common::cast::{as_float64_array, as_int32_array};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{
assert_batches_eq, assert_batches_sorted_eq, cast::as_float64_array,
cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, ScalarValue,
assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, internal_err,
not_impl_err, plan_err, DataFusionError, ExprSchema, Result, ScalarValue,
};
use datafusion_common::{assert_contains, exec_err, internal_err, DataFusionError};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
Accumulator, ColumnarValue, CreateFunction, ExprSchemable, LogicalPlanBuilder,
ScalarUDF, ScalarUDFImpl, Signature, Volatility,
};
use std::any::Any;
use std::sync::Arc;

/// test that casting happens on udfs.
/// c11 is f32, but `custom_sqrt` requires f64. Casting happens but the logical plan and
Expand Down Expand Up @@ -776,10 +777,6 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
fn aliases(&self) -> &[String] {
&[]
}

fn monotonicity(&self) -> Result<Option<datafusion_expr::FuncMonotonicity>> {
Ok(None)
}
}

impl ScalarFunctionWrapper {
Expand Down
Loading