Skip to content

Commit

Permalink
implement rewrite for FilterNullJoinKeys
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Apr 21, 2024
1 parent 70db5ea commit 8b4b002
Showing 1 changed file with 40 additions and 28 deletions.
68 changes: 40 additions & 28 deletions datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_common::tree_node::Transformed;
use datafusion_common::{internal_err, Result};
use datafusion_expr::{
and, logical_plan::Filter, logical_plan::JoinType, Expr, ExprSchemable, LogicalPlan,
logical_plan::Filter, logical_plan::JoinType, Expr, ExprSchemable, LogicalPlan,
};
use datafusion_expr::{BinaryExpr, Operator};
use std::sync::Arc;

/// The FilterNullJoinKeys rule will identify inner joins with equi-join conditions
Expand All @@ -32,24 +34,34 @@ use std::sync::Arc;
#[derive(Default)]
pub struct FilterNullJoinKeys {}

impl FilterNullJoinKeys {
pub const NAME: &'static str = "filter_null_join_keys";
}

impl OptimizerRule for FilterNullJoinKeys {
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called FilterNullJoinKeys::rewrite")
}

fn supports_rewrite(&self) -> bool {
true
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
if !config.options().optimizer.filter_null_join_keys {
return Ok(None);
return Ok(Transformed::no(plan));
}

match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
let mut join = join.clone();

LogicalPlan::Join(mut join) if join.join_type == JoinType::Inner => {
let left_schema = join.left.schema();
let right_schema = join.right.schema();

Expand All @@ -69,29 +81,22 @@ impl OptimizerRule for FilterNullJoinKeys {
if !left_filters.is_empty() {
let predicate = create_not_null_predicate(left_filters);
join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(
predicate,
join.left.clone(),
predicate, join.left,
)?));
}
if !right_filters.is_empty() {
let predicate = create_not_null_predicate(right_filters);
join.right = Arc::new(LogicalPlan::Filter(Filter::try_new(
predicate,
join.right.clone(),
predicate, join.right,
)?));
}
Ok(Some(LogicalPlan::Join(join)))
Ok(Transformed::yes(LogicalPlan::Join(join)))
}
_ => Ok(None),
_ => Ok(Transformed::no(plan)),
}
}

fn name(&self) -> &str {
Self::NAME
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
"filter_null_join_keys"
}
}

Expand All @@ -100,11 +105,18 @@ fn create_not_null_predicate(filters: Vec<Expr>) -> Expr {
.into_iter()
.map(|c| Expr::IsNotNull(Box::new(c)))
.collect();
// combine the IsNotNull expressions with AND

// directly unwrap since it should always have a value
not_null_exprs
.iter()
.skip(1)
.fold(not_null_exprs[0].clone(), |a, b| and(a, b.clone()))
.into_iter()
.reduce(|a, b| {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(a),
op: Operator::And,
right: Box::new(b),
})
})
.unwrap()
}

#[cfg(test)]
Expand Down

0 comments on commit 8b4b002

Please sign in to comment.