Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse #28885

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Jun 21, 2020

What changes were proposed in this pull request?

This PR:

  1. Fixes an issue in ReuseExchange rule that can result a ReusedExchange node pointing to an invalid exchange. This can happen due to the 2 separate traversals in ReuseExchange when the 2nd traversal modifies an exchange that has already been referenced (reused) in the 1st traversal.
    Consider the following query:

    WITH t AS (
      SELECT df1.id, df2.k
      FROM df1 JOIN df2 ON df1.k = df2.k
      WHERE df2.id < 2
    )
    SELECT * FROM t AS a JOIN t AS b ON a.id = b.id
    

    Before this PR the plan of the query was (note the <== this reuse node points to a non-existing node marker):

    == Physical Plan ==
    *(7) SortMergeJoin [id#14L], [id#18L], Inner
    :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(id#14L, 5), true, [id=#298]
    :     +- *(2) Project [id#14L, k#17L]
    :        +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
    :           :- *(2) Project [id#14L, k#15L]
    :           :  +- *(2) Filter isnotnull(id#14L)
    :           :     +- *(2) ColumnarToRow
    :           :        +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
    :           :              +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#289]
    :           :                 +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
    :           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
    :              +- *(1) Project [k#17L]
    :                 +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
    :                    +- *(1) ColumnarToRow
    :                       +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
    +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
       +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#184] <== this reuse node points to a non-existing node
    

    After this PR:

    == Physical Plan ==
    *(7) SortMergeJoin [id#14L], [id#18L], Inner
    :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(id#14L, 5), true, [id=#231]
    :     +- *(2) Project [id#14L, k#17L]
    :        +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
    :           :- *(2) Project [id#14L, k#15L]
    :           :  +- *(2) Filter isnotnull(id#14L)
    :           :     +- *(2) ColumnarToRow
    :           :        +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
    :           :              +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#103]
    :           :                 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
    :           :                    +- *(1) Project [k#17L]
    :           :                       +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
    :           :                          +- *(1) ColumnarToRow
    :           :                             +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
    :           +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
    +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
       +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#231]
    
  2. Fixes an issue with separate consecutive ReuseExchange and ReuseSubquery rules that can result a ReusedExchange node pointing to an invalid exchange. This can happen due to the 2 separate rules when ReuseSubquery rule modifies an exchange that has already been referenced (reused) in ReuseExchange rule.
    Consider the following query:

    WITH t AS (
      SELECT df1.id, df2.k
      FROM df1 JOIN df2 ON df1.k = df2.k
      WHERE df2.id < 2
    ),
    t2 AS (
      SELECT * FROM t
      UNION
      SELECT * FROM t
    )
    SELECT * FROM t2 AS a JOIN t2 AS b ON a.id = b.id
    

    Before this PR the plan of the query was (note the <== this reuse node points to a non-existing node marker):

    == Physical Plan ==
    *(15) SortMergeJoin [id#46L], [id#58L], Inner
    :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#979]
    :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
    :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#975]
    :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
    :              +- Union
    :                 :- *(2) Project [id#46L, k#49L]
    :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
    :                 :     :- *(2) Project [id#46L, k#47L]
    :                 :     :  +- *(2) Filter isnotnull(id#46L)
    :                 :     :     +- *(2) ColumnarToRow
    :                 :     :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
    :                 :     :              +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
    :                 :     :                 +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
    :                 :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
    :                 :        +- *(1) Project [k#49L]
    :                 :           +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
    :                 :              +- *(1) ColumnarToRow
    :                 :                 +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
    :                 +- *(4) Project [id#46L, k#49L]
    :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
    :                       :- *(4) Project [id#46L, k#47L]
    :                       :  +- *(4) Filter isnotnull(id#46L)
    :                       :     +- *(4) ColumnarToRow
    :                       :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
    :                       :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
    :                       +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
    +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
       +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#761] <== this reuse node points to a non-existing node
    

    After this PR:

    == Physical Plan ==
    *(15) SortMergeJoin [id#46L], [id#58L], Inner
    :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#793]
    :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
    :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#789]
    :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
    :              +- Union
    :                 :- *(2) Project [id#46L, k#49L]
    :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
    :                 :     :- *(2) Project [id#46L, k#47L]
    :                 :     :  +- *(2) Filter isnotnull(id#46L)
    :                 :     :     +- *(2) ColumnarToRow
    :                 :     :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
    :                 :     :              +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
    :                 :     :                 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
    :                 :     :                    +- *(1) Project [k#49L]
    :                 :     :                       +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
    :                 :     :                          +- *(1) ColumnarToRow
    :                 :     :                             +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
    :                 :     +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
    :                 +- *(4) Project [id#46L, k#49L]
    :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
    :                       :- *(4) Project [id#46L, k#47L]
    :                       :  +- *(4) Filter isnotnull(id#46L)
    :                       :     +- *(4) ColumnarToRow
    :                       :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
    :                       :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
    :                       +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
    +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
       +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#793]
    

    (This example contains issue 1 as well.)

  3. Improves the reuse of exchanges and subqueries by enabling reuse across the whole plan. This means that the new combined rule utilizes the reuse opportunities between parent and subqueries by traversing the whole plan. The traversal is started on the top level query only.

  4. Due to the order of traversal this PR does while adding reuse nodes, the reuse nodes appear in parent queries if reuse is possible between different levels of queries (typical for DPP). This is not an issue from execution perspective, but this also means "forward references" in explain formatted output where parent queries come first. The changes I made to ExplainUtils are to handle these references properly.

This PR fixes the above 3 issues by unifying the separate rules into a ReuseExchangeAndSubquery rule that does a 1 pass, whole-plan, bottom-up traversal.

Why are the changes needed?

Performance improvement.

How was this patch tested?

  • New UTs in ReuseExchangeAndSubquerySuite to cover 1. and 2.
  • New UTs in DynamicPartitionPruningSuite, SubquerySuite and ExchangeSuite to cover 3.
  • New ReuseMapSuite to test ReuseMap.
  • Checked new golden files of PlanStabilitySuites for invalid reuse references.
  • TPCDS benchmarks.

@peter-toth
Copy link
Contributor Author

peter-toth commented Jun 21, 2020

I measured considerable improvement (~30%) using TPCDSQueryBenchmark on scaleFactor=5 data with the TPCDS Q14a, Q14b, Q23a, Q23b, Q47 and Q57 queries.

@@ -474,9 +475,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
Inner,
None,
shuffle,
shuffle)
shuffle.copy())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this were the same as the other child of SortMergeJoinExec then no reuse would be required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this part, could you please let me know how copying here makes it different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so I think the whole point of exchange reuse is to find different exchange instances which result the same data and keep only one instance and reuse its output where we can (i.e. call execute/executeBroadcast multiple times on one exchange instance instead of call execute/executeBroadcast once on each instances). In this example the 2 children points to the same exchange instance (shuffle) so there is no point in reuse here.
(We could wrap one of them in a ReusedExcahngeExec node, but it wouldn't make any difference from performance point of view.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, although I think the reuse exchange logic that we have doesn't check if the instances are the same, and will replace it with a reuse exchange anyway, but good to have a different instance here.

@peter-toth peter-toth changed the title [WIP][SPARK-29375][SPARK-28940][SQL] Whole plan exchange and subquery reuse [SPARK-29375][SPARK-28940][SQL] Whole plan exchange and subquery reuse Jun 21, 2020
@peter-toth
Copy link
Contributor Author

cc @cloud-fan, @dongjoon-hyun

@peter-toth
Copy link
Contributor Author

peter-toth commented Jun 21, 2020

I've just realized that this PR partly overlaps with #28881. @prakharjain09 opened it 2 hours before I did, but my PR does a bit more than that and actually does the combined reuse in a bit different way so I wouldn't close mine yet.

@SparkQA
Copy link

SparkQA commented Jun 21, 2020

Test build #124334 has finished for PR 28885 at commit dbd8606.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WholePlanReuse(conf: SQLConf) extends Rule[SparkPlan]

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @peter-toth .

@HyukjinKwon
Copy link
Member

cc @maryannxue too FYI

@peter-toth peter-toth changed the title [SPARK-29375][SPARK-28940][SQL] Whole plan exchange and subquery reuse [SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse Jun 22, 2020
@SparkQA
Copy link

SparkQA commented Jun 26, 2020

Test build #124538 has finished for PR 28885 at commit 8eca64a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@dbaliafroozeh dbaliafroozeh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this! I think the idea of whole plan reuse is good and your approach is correct, but I think some parts can be done differently IMO, I left some comments.

@@ -326,7 +327,8 @@ object QueryExecution {
*/
private[execution] def preparations(
sparkSession: SparkSession,
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,
subquery: Boolean): Seq[Rule[SparkPlan]] = {
Copy link
Contributor

@dbaliafroozeh dbaliafroozeh Jun 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this boolean parameter here? What will happen if we just always run the WholePlanReuse rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't get this, why would we run this rule multiple times? This new rule traverses through the whole plan, does it make any sense to run in on subqueries and then run it on a main query which also incorporates traversing on subqueries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, from the performance perspective makes sense to exclude them. I sort of don't like having another parameter and select rules based on that, so was thinking if it's not a huge performance difference let's not do it, but it can be expensive with canonicalization, etc. I guess we don't have any other way of detecting if a physical plan is a subquery locally inside the new rule, so it's fine to do it like this, maybe we need a more explicit name for QueryExecution.prepareExecutedPlan in the future, like PrepareSubqueryForExecution to make it more clear that this method is only called for subqueries.


def apply(plan: SparkPlan): SparkPlan = {
if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {
// To avoid costly canonicalization of an exchange or a subquery:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have some measurements how much do we save when using Map[StructType, ... instead of a map from the canonicalized form? I know in theory it's beneficial when there is no match, but was wondering if it has some tangible effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Map[StructType, ... way of caching has been there for quite some time. A simple map of canonicalized plans naturally comes to my mind too and I feel that it would do the thing without any performance degradation for most of the queries. But I'm afraid that there can be edge cases where it could introduce degradation so just to be on the safe side I wouldn't touch this preliminary schema matching when looking up in the cache.
On the other hand I think the old ArrayBuffer[... can be easily replaced to a map of canonicalized plans to speed up look ups in the cache when schema matches.
I saw your other comment: #28885 (comment) on this topic and I think the Canonicalized[T] wrapper would be exactly same as the old Map[StructType, ArrayBuffer[T]] cache map, just a bit more complicated.
What I did in my latest commit: c49a0f9 is that I extracted the cache code and I think it became quite easy to follow. But I'm open for suggestions and will change the implementation if you think it is still too complicated.

case class WholePlanReuse(conf: SQLConf) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a flag for this rule? Like exchangeAndSubqueryReuseEanbled? I think this rule should only happen if both the flags are enabled. Do we have cases when we only want to reuse subquery and not exchange? Also, It will simplify the logic in your new rule. We can keep the old rules and then fallback to them if only one of the flags is off or the new flag is off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure why would anyone disable reuse subquery or reuse exchange at all. But those flags do exist and we shouldn't change their meaning that exchange or subquery reuse shouldn't happen when they are disabled.
IMHO if we introduce this "whole plan reuse" with this new rule, then we should still respect the old flags.
I also think this new rule can replace the old rules entirely with one whole plan traversal and I don't see any reason why would we keep the old rules. Actually, wouldn't be confusing if we had reuse related code at many places?

Copy link
Contributor

@dbaliafroozeh dbaliafroozeh Jun 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense, let's keep the old old behavior and just change the implementation.

@@ -1646,4 +1646,25 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkAnswer(df, df2)
checkAnswer(df, Nil)
}

test("Subquery reuse across the whole plan") {
val df = sql(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put the physical plan in a comment? It'll help to see what gets reused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I added the plan in commit: 2b3cde2
Let me know if a sample plan would be beneficial in other test cases too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned this to Peter offline -- I don't really agree with adding the plan in a comment. Other changes will continue happening in the rest of the planner, and then the plan will change more, and the comment will get out of date. If you actually want the plan to remain static, then you should add asserts against it (though I doubt that is what you want). If somebody wants to see the plan, they should run the test themselves to see the plan, right?

(I'm also unfamiliar with the norms around this part of the codebase, so its fine if its not unusual to put in the entire plan in a comment.)

@@ -474,9 +475,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
Inner,
None,
shuffle,
shuffle)
shuffle.copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this part, could you please let me know how copying here makes it different?

// - we insert it into the map of canonicalized plans only when at least 2 have the same
// schema
val exchanges = Map[StructType, (Exchange, Map[SparkPlan, Exchange])]()
val subqueries = Map[StructType, (BaseSubqueryExec, Map[SparkPlan, BaseSubqueryExec])]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested map somehow makese the logic unnecessarily complicated. Can we define a class like Canonicalized[T] where T can be an Exchange or BaseSubqueryExec and then implement the equals and hashcode of this class to first check for the schema equality? Then, we can have simply a map like Map[Canonicalized[T], T], which will simplify the code quite a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please find answer in #28885 (comment)

@SparkQA
Copy link

SparkQA commented Jun 28, 2020

Test build #124572 has finished for PR 28885 at commit c49a0f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor Author

Thanks for doing this! I think the idea of whole plan reuse is good and your approach is correct, but I think some parts can be done differently IMO, I left some comments.

Thanks for the review and comments @dbaliafroozeh!

@SparkQA
Copy link

SparkQA commented Jun 28, 2020

Test build #124602 has finished for PR 28885 at commit 2b3cde2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case sub: ExecSubqueryExpression =>
val subquery = reuse(sub.plan).asInstanceOf[BaseSubqueryExec]
sub.withNewPlan(
if (conf.subqueryReuseEnabled) {
Copy link
Contributor

@dbaliafroozeh dbaliafroozeh Jun 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new abstraction for hiding the loop, how about one step further and also abstract over this pattern when the found element is the same instance as the one in the cache, something like this:

def lookupOrElse(plan: T, f: T => T) {
	val res = ....
	if (res eq plan) {
		plan
	} else {
		f(res)
	}
}

and then the call site becomes: lookupOrElse(subquery, ReusedSubqueryExec(_))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name lookupOrElse sounds a bit weird to me, it suggests that f is applied if lookup fails (item isn't in the map yet). But in this case f should be applied if lookup founds the item in the map. So if we want that abstraction, shouldn't we call the method putOrElse or addOrElse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added addOrElse in the latest commits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree lookupOrElse is not the best name. I was thinking more about it, so we want a method with the following semantics:

Try to find an existing node with the same canonicalized form:

  • if no such node found, add the node to the map and return the node itself
  • if a node found that refers the same instance as the key, do nothing, and return the node itself
  • otherwise, call the given function on the retrieved node from the map and then return the result

I think it's not like anything (I mean common methods) on the Map interfaces that I know of. Somehow reminds me of Java 8 putIfAbsent method, but with the difference it calls the passed function if it's present.

On second thought, maybe we should rename the lookup method to getOrElseUpdate because it doesn't just lookup.
And call the second method applyIfPresent. Maybe also doesn't make sense to have the second method anymore since it's very specific and hard to capture with a method name what's going on.

One last thing, what was happening before if we had two exchanges in the query plan that were referring to the same instance? Were we leaving them intact or replacing it with a reuse node? Because now we just leave it as is. I'm not sure though if such a situation actually can happen to have to exchanges referring to the same instance in the query plan, if not, maybe we can remove the check and (the second method) altogether. Also maybe putting a reuse node when there are the same instances is even better, it's just a wrapper and signals the presence of the same exchange node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the name lookup doesn't capture exactly what the method does, but getOrElseUpdate has well known meaning and parameter list on scala maps and the first method doesn't match to that so I would use a different name like getOrElseAdd or lookupOrElseAdd.
Yes, it's even harder to find a good name for the second method. How about reuseOrElseAdd? It doesn't match any of the existing methods of a map and captures both reuse (with applying f) and add functionality.

The old behaviour was that on the second encounter of the same instance it was wrapped into a reuse node. This actually works different in this PR and the instance doesn't get wrapped on the second encounter. I did this way for 3 reasons:

  • From performance perspective it doesn't make any difference
  • An exchange or subquery instance can appear 2 times in the plan only if someone manually crafted the plan, which I don't think happens in a real word use case
  • I'm using getOrElseUpdate 2 times in lookup to simplify the code, but from the result of getOrElseUpdate we can't tell if the key had already been in the map or it was added during the call. Besides simple code I also like using getOrElseUpdate because some concurrent map implementation supports it as atomic operation. I admit it doesn't matter in this PR, but I think it would be nice to incorporate ReuseAdaptiveSubquery functionality into this rule in a separate PR in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuseOrElseAdd is probably a better name, but this naming is getting hard :-) About the new behavior, sounds good to me, but probably if there is no performance benefit and it's only happening in the tests, I would have just wrap the same instance in a reuse node to also not bother about the second method and the equality check, but don't have a strong opinion on this.

*/
case class ReuseExchangeAndSubquery(conf: SQLConf) extends Rule[SparkPlan] {

private class ReuseCache[T <: SparkPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this new abstraction, just two nitpicky comments:

  • This is not really a cache IMO, in a sense that we need to hold on to all the values, it's more a memo table. How about renaming it to ReuseMap? or something in this line?
  • Can we move this class somewhere else? I know it's being used only here, but I think it'll be great to use this anytime we wanna lookup something with canonicalized forms. If you agree to move it to a util package, let's also make the type upperbound QueryPlan so that we can use it for other node types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, renamed it to ReuseMap and generalized it as you suggested.

Copy link
Contributor

@dbaliafroozeh dbaliafroozeh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good to me, added 3 minor comments.

@peter-toth
Copy link
Contributor Author

Changes look good to me, added 3 minor comments.

Thanks @dbaliafroozeh.

@cloud-fan, @maryannxue could you review this PR?

@SparkQA
Copy link

SparkQA commented Jun 30, 2020

Test build #124663 has finished for PR 28885 at commit fe78a31.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 30, 2020

Test build #124679 has finished for PR 28885 at commit 8b50e81.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124753 has finished for PR 28885 at commit fd522ea.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 2, 2020

Test build #124810 has finished for PR 28885 at commit fd522ea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor Author

@cloud-fan, @maryannxue, @maropu, @viirya could you please review this PR?

*/
class ReuseMap[T <: QueryPlan[_]] {
// scalastyle:off structural.type
private val map = Map[StructType, (T, Map[T2 forSome { type T2 >: T }, T])]()
Copy link
Contributor Author

@peter-toth peter-toth Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the existence type T2, which stands for the canonicalized type of T, is hard to read then we can move T2 to the class definition like this:

class ReuseMap[T <: T2, T2 <: QueryPlan[T2]] {
  private val map = Map[StructType, (T, Map[T2, T])]()

and initialize the ReuseMaps as:

      val exchanges = new ReuseMap[Exchange, SparkPlan]()
      val subqueries = new ReuseMap[BaseSubqueryExec, SparkPlan]()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider the second version much easier to reason about. Its slightly more types you have to put into the code, but IMO those types help with readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, 5df4c53 adds the new T2 type param.

Comment on lines 129 to 140
plan transformUp {
case exchange: Exchange => reuse(exchange)
} transformAllExpressions {
// Lookup inside subqueries for duplicate exchanges
case in: InSubqueryExec =>
val newIn = in.plan.transformUp {
case exchange: Exchange => reuse(exchange)
}
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
}
}
Copy link
Contributor Author

@peter-toth peter-toth Jul 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This transformUp and then transformAllExpressions was actually a bit weird for 2 reasons:

  • A minor issue is that reuse is a partial function and yet we use it as a normal function, I mean this code probably should have been written as
      plan transformUp reuse transformAllExpressions {
        // Lookup inside subqueries for duplicate exchanges
        case in: InSubqueryExec =>
          val newIn = in.plan.transformUp reuse
          in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
      }
    
    and the definition of reuse should have been def reuse: PartialFunction[SparkPlan, SparkPlan] = {
  • A bigger issue is that this part of the code traverses the plan 2 times, first with the transformUp and then with the transformAllExpressions and both traversals can insert reuse references to the plan. Imagine that the first traversal (transformUp) inserts a reuse reference to an exchange. Let's say it is inserts a ReuseExchange node pointing to the Exchange id=1 but then the second traversal (transformAllExpressions) finds an InSubqueryExec expression in a FileSourceScanExec under the node Exchange id=1. If there is reuse opportunity in the subplan of that InSubqueryExec then the change due to inserting another reuse node into the subplan propagates up and results that Exchange id=1 gets replaced to Exchange id=x in the parent plan. In this case the reuse node created in the first traversal pointing to Exchange id=1 will be invalid.
    (Please note that this issue is very similar to what I showed in the description of the PR but in that case the 2 traversals were due to the separate ReuseSubquery and ReuseExchange rules.)
    I think the fix to this issue is to use an 1 pass, whole-plan, bottom-up traversal like I did in ReuseExchangeAndSubquery in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you tell me the exact test case that demonstrates this issue?

I kinda see the issue you're talking about, but my thinking is that maybe this just requires fixing ReuseExchange, rather than combining both into one rule.

Copy link
Contributor Author

@peter-toth peter-toth Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the issue described in 1. in the PR description and tested with the case SPARK-32041: No reuse interference inside ReuseExchange in the new ReuseExchangeAndSubquerySuite: https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR28

Combining the 2 rules are required to fix 2. and tested with the case SPARK-32041: No reuse interference between ReuseExchange and ReuseSubquery: https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR67

@cloud-fan
Copy link
Contributor

What's still WIP? This idea LGTM and I'll take a look when the code is ready for review.

@peter-toth
Copy link
Contributor Author

What's still WIP? This idea LGTM and I'll take a look when the code is ready for review.

Thanks! Actually it's not WIP, I just rebased it on #32885 yesterday and waited for the tests. Results looked Ok, but it seems I need to merge master and regenerate golden files again.

@peter-toth peter-toth changed the title [WIP][SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse [SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse Jun 17, 2021
# Conflicts:
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
#	sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64/explain.txt
@SparkQA
Copy link

SparkQA commented Jun 17, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44474/

@SparkQA
Copy link

SparkQA commented Jun 17, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44474/

@peter-toth
Copy link
Contributor Author

I updated the new rule to use pruned transforms and fixed a test suite because AQE is enabled by default now.
This PR is ready for review.

@SparkQA
Copy link

SparkQA commented Jun 17, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44477/

@SparkQA
Copy link

SparkQA commented Jun 17, 2021

Test build #139947 has finished for PR 28885 at commit c346387.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class TempResolvedColumn(child: Expression, nameParts: Seq[String]) extends UnaryExpression
  • case class MakeDTInterval(
  • case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric
  • case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric
  • case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric

@SparkQA
Copy link

SparkQA commented Jun 18, 2021

Test build #139950 has finished for PR 28885 at commit 5b4b719.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -41,22 +38,15 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
*
* @param plan Input query plan to process
* @param append function used to append the explain output
* @param startOperatorID The start value of operation id. The subsequent operations will
* be assigned higher value.
*
* @return The last generated operation id for this input plan. This is to ensure we
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 7187ebd.


/**
* Find out duplicated exchanges and subqueries in the whole spark plan including subqueries, then
* use the same exchange or subquery for all the references.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should explain why we reuse exchange and subquery in this single rule

Note that the Spark plan is a mutually recursive data structure:
  SparkPlan -> Expr -> Subquery -> SparkPlan -> Expr -> Subquery -> ...

Therefore, in this rule, we recursively rewrite the exchanges and subqueries in a bottom-up way, in one go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, added the explanation in 7187ebd

* @param plan the input plan
* @return the matching plan or the input plan
*/
def lookupOrElseAdd(plan: T): T = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 7187ebd

@cloud-fan
Copy link
Contributor

I measured considerable improvement (~30%) using TPCDSQueryBenchmark on scaleFactor=5 data with the TPCDS Q14a, Q14b, Q23a, Q23b, Q47 and Q57 queries.

Did you see any regressions?

@peter-toth
Copy link
Contributor Author

I measured considerable improvement (~30%) using TPCDSQueryBenchmark on scaleFactor=5 data with the TPCDS Q14a, Q14b, Q23a, Q23b, Q47 and Q57 queries.

Did you see any regressions?

Let me rerun the benchmarks again and come back to you.

@SparkQA
Copy link

SparkQA commented Jun 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44514/

@SparkQA
Copy link

SparkQA commented Jun 18, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44514/

@SparkQA
Copy link

SparkQA commented Jun 18, 2021

Test build #139987 has finished for PR 28885 at commit 7187ebd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor Author

peter-toth commented Jun 18, 2021

I measured considerable improvement (~30%) using TPCDSQueryBenchmark on scaleFactor=5 data with the TPCDS Q14a, Q14b, Q23a, Q23b, Q47 and Q57 queries.

Did you see any regressions?

Let me rerun the benchmarks again and come back to you.

The previous benchmark run was quite some time ago. As AQE has been enabled by default since that and now it works with most of those affected TPCDS queries, the improvement is only visible if I disable AQE. But, in that case it is still considerable: https://github.com/peter-toth/spark/blob/64c619e798c003e9b27ba534ae370db75b1cfa1c/sql/core/benchmarks/TPCDSQueryBenchmark-results.txt In some cases non-AQE became as good as AQE.

UPDATE: I've included AQE and non-AQE results as well to see the improvement and catch regression. I don't see any regression.

* Therefore, in this rule, we recursively rewrite the exchanges and subqueries in a bottom-up way,
* in one go.
*/
case object ReuseExchangeAndSubquery extends Rule[SparkPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AQE has a totally different way to reuse exchange/subquery. It has a query-global map to store created exchange/subquery (see AdaptiveExecutionContext), and AQE executes leaf subqueries first. This is exactly the same as what this rule is doing for exchange/subquery reuse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One followup we can do is to leverage the new ReuseMap in AdaptiveExecutionContext

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the AQE way may be better: just use the canonicalized plan as the key, instead of calling sameResult.

Copy link
Contributor Author

@peter-toth peter-toth Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReuseMap has changed since the first version of this PR.
Unfortunately, I rebased the PR already so only some discussion remained: #28885 (comment) (about reverting 2nd to 3rd).

The 1st version used a simple Map[<canonicalized plan>, <plan>] as AdaptiveExecutionContext does.
The 2nd version was Map[<schema>, (<first plan with this schema>, Map[<canonicalized plan>, <plan>])] with lazy initialization of the inner map to avoid canonicalization if there are no matching schemas but still provide quick lookup by canonicalized plans.
This 3rd version reverted to the original Map[<schema>, ArrayBuffer[<plan>]] idea that ReuseExchange and ReuseSubquery had used.

I can open a follow-up PR to improve ReuseMap to 2nd version if required, but I'm not sure that the improvement would be visible with TPCDS or real life queries.

If we want to consolidate reuse map logic then I think we should also take into account that ReuseAdaptiveSubquery uses a concurrent, lock-free TrieMap map implementation which is not required by this non-AQE rule.

Copy link
Contributor

@cloud-fan cloud-fan Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea non-AQE doesn't need thread safety, but I feel it's still better to unify the major idea:

  1. Map[<canonicalized plan>, <plan>]
  2. Map[<schema>, ArrayBuffer[<plan>]]

I agree it's not a big deal for perf, but code consistency is also important. Map[<canonicalized plan>, <plan>] looks better as it's simpler, we can remove ReuseMap.

Copy link
Contributor Author

@peter-toth peter-toth Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I can file a follow-up PR today or tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened the follow-up PR here: #33021

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 682e7f2 Jun 21, 2021
@peter-toth
Copy link
Contributor Author

thanks, merging to master!

Thanks for the review!

cloud-fan pushed a commit that referenced this pull request Jun 23, 2021
… rules

### What changes were proposed in this pull request?
This PR unifies reuse map data structures in non-AQE and AQE rules to a simple `Map[<canonicalized plan>, <plan>]` based on the discussion here: #28885 (comment)

### Why are the changes needed?
The proposed `Map[<canonicalized plan>, <plan>]` is simpler than the currently used `Map[<schema>, ArrayBuffer[<plan>]]` in `ReuseMap`/`ReuseExchangeAndSubquery` (non-AQE) and consistent with the `ReuseAdaptiveSubquery` (AQE) subquery reuse rule.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes #33021 from peter-toth/SPARK-35855-unify-reuse-map-data-structures.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants