Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow returning an EmptyHashedRelation when a broadcast result is empty [databricks] #4256

Merged
merged 27 commits into from
Dec 9, 2021

Conversation

abellina
Copy link
Collaborator

@abellina abellina commented Dec 1, 2021

Signed-off-by: Alessandro Bellina abellina@nvidia.com

Closes #4134.

The PR allows the broadcast exchange to produce an EmptyHashedRelation or an empty array in the case of the identity broadcast, in order for AQE's EliminateJoinToEmptyRelation rule to be able to optimize the plan. This changes q16 massively in the way we run things, but it lets us match what the CPU does.

In terms of performance, I ran this at 3TB and Q16 is now ~12 seconds which is slightly faster than the CPU (this is 3.5x faster than what we had before on the GPU). I don't see regressions with other queries.

This PR enables isFoldableNonLitAllowed for UnaryExprMeta so that expressions like cast(null as bigint) can be handled. These casts show up given an empty projection due to the AQE rule to remove the join. That said, ConstantFolding does not re-execute as part of AQE, so they are left in the plan. Both tests I have added will generate plans with these for AQE.

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
@abellina abellina added the performance A performance related task/issue label Dec 1, 2021
@abellina
Copy link
Collaborator Author

abellina commented Dec 1, 2021

Ok EliminateJoinToEmptyRelation changed in Spark 3.2 to EliminateUnnecessaryJoin and now it's looking at getRowCount from the Statistics object in the ShuffleExchangeExec it seems. This caught me off guard, I need to spend time on Spark 3.2 to understand if this is going to cause problems or how they are handling it now.

revans2
revans2 previously approved these changes Dec 1, 2021
Comment on lines 37 to 38
* @param broadcastPlan - the SparkPlan to use to obtain the schema for the broadcast
* batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing an entire plan just to get the schema is very heavyweight. This should simply take a schema parameter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed now.

@abellina abellina force-pushed the perf/empty_hash_relation branch 2 times, most recently from 860e35c to 3f31a67 Compare December 6, 2021 19:51
@abellina abellina changed the title Allow returning an EmptyHashedRelation when a broadcast result is empty Allow returning an EmptyHashedRelation when a broadcast result is empty [databricks] Dec 6, 2021
@abellina
Copy link
Collaborator Author

abellina commented Dec 6, 2021

build

@abellina
Copy link
Collaborator Author

abellina commented Dec 7, 2021

Note that: f62167c adds isFoldableNonLitAllowed to the UnaryExprMeta. What @revans2 explained was (if I understand correctly) that an AQE rule that removes a join is likely running after constant folding, and therefore we are left with some unary expressions that didn't get folded.

That said I am not entirely sure yet of the order of things, so I am not 100% there yet.

@abellina
Copy link
Collaborator Author

abellina commented Dec 8, 2021

That said I am not entirely sure yet of the order of things, so I am not 100% there yet.

I've been reading more about this and I think it makes sense now. Yes ConstantFolding isn't in the path after AQE removes the join because the ConstantFolding optimization happens for the basic logical plan optimizer (which happens early on), and not the AQEOptimizer. As far as I understand, once the plan is wrapped in an AdaptiveSparkPlanExec via InsertAdaptiveSparkPlan (from org.apache.spark.sql.executions.QueryExecution.preparations), the optimizer that includes ConstantFolding is not executed again, instead the optimizers used are the AQE optimizer (which only worries about Propagate Empty Relations,Dynamic Join Selection), and the canonicalizer (CleanExpressions). @andygrove does this make sense?

Given the optimization, we are looking at the logicalLink of the plan wrapped in the adaptive exec, and then producing a new projection, which in my case included a cast(null as bigint) (aka something that should have gotten folded to a literal). Nowadays, support for this should be available on the GPU, but we don't have test cases for it, so it is disabled by default.

@abellina
Copy link
Collaborator Author

abellina commented Dec 8, 2021

build

@abellina
Copy link
Collaborator Author

abellina commented Dec 8, 2021

build

1 similar comment
@abellina
Copy link
Collaborator Author

abellina commented Dec 8, 2021

build

@abellina
Copy link
Collaborator Author

abellina commented Dec 8, 2021

This is broken in databricks:

  override def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any =
    mode.transform(rows, TaskContext.get.taskMemoryManager())

I was expecting to be able to transform the BroadcastMode, as spark does (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L148), but it seems we need the task context.

I am looking into it further.

@abellina
Copy link
Collaborator Author

abellina commented Dec 9, 2021

build

@abellina abellina marked this pull request as ready for review December 9, 2021 04:57
@abellina
Copy link
Collaborator Author

abellina commented Dec 9, 2021

Thanks @jlowe, I believe I have addressed the comments

@abellina
Copy link
Collaborator Author

abellina commented Dec 9, 2021

@revans2 this is ready for another look when you get a chance.

revans2
revans2 previously approved these changes Dec 9, 2021
@andygrove
Copy link
Contributor

I've been reading more about this and I think it makes sense now. Yes ConstantFolding isn't in the path after AQE removes the join because the ConstantFolding optimization happens for the basic logical plan optimizer (which happens early on), and not the AQEOptimizer. As far as I understand, once the plan is wrapped in an AdaptiveSparkPlanExec via InsertAdaptiveSparkPlan (from org.apache.spark.sql.executions.QueryExecution.preparations), the optimizer that includes ConstantFolding is not executed again, instead the optimizers used are the AQE optimizer (which only worries about Propagate Empty Relations,Dynamic Join Selection), and the canonicalizer (CleanExpressions). @andygrove does this make sense?

Yes, that is correct. I ran some tests to confirm this.

@abellina
Copy link
Collaborator Author

abellina commented Dec 9, 2021

build

@abellina abellina merged commit 1588f6a into NVIDIA:branch-22.02 Dec 9, 2021
@abellina abellina deleted the perf/empty_hash_relation branch December 9, 2021 19:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Allow EliminateJoinToEmptyRelation in GpuBroadcastExchangeExec
4 participants