Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38959][SQL][FOLLOWUP] Optimizer batch PartitionPruning should optimize subqueries #38557

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
}
def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning(
_.containsPattern(PLAN_EXPRESSION), ruleId) {
// Do not optimize DPP subquery, as it was created from optimized plan and we should not
// optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
case d: DynamicPruningSubquery => d
Comment on lines +323 to +325
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. Just wondering that is this particularly related to SPARK-38959?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because this PR adds OptimizeSubqueries to the batch PartitionPruning and we should not break #33664

case s: SubqueryExpression =>
val Subquery(newPlan, _) = Optimizer.this.execute(Subquery.fromExpression(s))
// At this point we have an optimized subquery plan that we are going to attach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class SparkOptimizer(
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("PartitionPruning", Once,
PartitionPruning,
RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
RowLevelOperationRuntimeGroupFiltering,
OptimizeSubqueries) :+
Batch("InjectRuntimeFilter", FixedPoint(1),
InjectRuntimeFilter) :+
Batch("MergeScalarSubqueries", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.execution.dynamicpruning

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, InSubquery, ListQuery, PredicateHelper, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
Expand All @@ -37,8 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat
*
* Note this rule only applies to group-based row-level operations.
*/
case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
extends Rule[LogicalPlan] with PredicateHelper {
object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with PredicateHelper {

import DataSourceV2Implicits._

Expand Down Expand Up @@ -66,7 +65,7 @@ case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic
}

// optimize subqueries to rewrite them as joins and trigger job planning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment can be removed.

replaceData.copy(query = optimizeSubqueries(newQuery))
replaceData.copy(query = newQuery)
}

private def buildMatchingRowsPlan(
Expand All @@ -89,10 +88,8 @@ case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic
buildKeys: Seq[Attribute],
pruningKeys: Seq[Attribute]): Expression = {

val buildQuery = Project(buildKeys, matchingRowsPlan)
val dynamicPruningSubqueries = pruningKeys.zipWithIndex.map { case (key, index) =>
DynamicPruningSubquery(key, buildQuery, buildKeys, index, onlyInBroadcast = false)
}
dynamicPruningSubqueries.reduce(And)
val buildQuery = Aggregate(buildKeys, buildKeys, matchingRowsPlan)
Copy link
Contributor

@aokolnychyi aokolnychyi Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any downsides of rewriting DynamicPruningSubquery into DynamicPruningExpression directly instead of relying on PlanDynamicPruningFilters and PlanAdaptiveDynamicPruningFilters?

I see some special branches for exchange reuse in those rules that would not apply now.

Copy link
Contributor Author

@cloud-fan cloud-fan Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any downside. We can only reuse broadcast if the DPP filter is derived from a join, which doesn't apply here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I was originally worried we could miss some future optimizations given that dynamic pruning for row-level operations would go through a different route compared to the normal DPP.

One alternative could be to extend DynamicPruningSubquery with a flag whether it should be optimized or not. Up to you, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My rationale is, what we really need is a subquery here. This is completely different from dynamic partition pruning. One limitation is DS v2 runtime filter pushdown only applies to DynamicPruningExpression. We can probably fix that and accept normal non-correlated subqueries as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, DS v2 runtime filtering framework is fairly limited at this point.

DynamicPruningExpression(
InSubquery(pruningKeys, ListQuery(buildQuery, childOutputs = buildQuery.output)))
}
}