Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29375][SQL] Exchange reuse across all subquery levels #26044

Closed

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Oct 7, 2019

What changes were proposed in this pull request?

This PR:

  • enables exchange reuse across all subquery levels
  • adds minor optimization by using a map of canonicalized plans to look up reusable exchanges

Example query:

SELECT
  (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key),
  a.key
FROM testData AS a
JOIN testData AS b ON b.key = a.key

Plan before this PR:

*(5) Project [Subquery scalar-subquery#270, [id=#605] AS scalarsubquery()#277, key#13]
:  +- Subquery scalar-subquery#270, [id=#605]
:     +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#276])
:        +- Exchange SinglePartition, true, [id=#601]
:           +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#281])
:              +- *(5) Project [key#13]
:                 +- *(5) SortMergeJoin [key#13], [key#273], Inner
:                    :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
:                    :  +- Exchange hashpartitioning(key#13, 5), true, [id=#584]
:                    :     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
:                    :        +- Scan[obj#12]
:                    +- *(4) Sort [key#273 ASC NULLS FIRST], false, 0
:                       +- Exchange hashpartitioning(key#273, 5), true, [id=#592]
:                          +- *(3) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#273]
:                             +- Scan[obj#12]
+- *(5) SortMergeJoin [key#13], [key#271], Inner
   :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#13, 5), true, [id=#617]
   :     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
   :        +- Scan[obj#12]
   +- *(4) Sort [key#271 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(key#271, 5), true, [id=#625]
         +- *(3) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#271]
            +- Scan[obj#12]

Plan after this PR:

*(5) Project [Subquery scalar-subquery#240, [id=#193] AS scalarsubquery()#247, key#13]
:  +- Subquery scalar-subquery#240, [id=#193]
:     +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246])
:        +- Exchange SinglePartition, true, [id=#189]
:           +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251])
:              +- *(5) Project [key#13]
:                 +- *(5) SortMergeJoin [key#13], [key#243], Inner
:                    :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
:                    :  +- Exchange hashpartitioning(key#13, 5), true, [id=#145]
:                    :     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
:                    :        +- Scan[obj#12]
:                    +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0
:                       +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145]
+- *(5) SortMergeJoin [key#13], [key#241], Inner
   :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0
   :  +- ReusedExchange [key#13], Exchange hashpartitioning(key#13, 5), true, [id=#145]
   +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0
      +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#145]

Why are the changes needed?

Performance improvement.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added a new UT.

@SparkQA
Copy link

SparkQA commented Oct 7, 2019

Test build #111847 has finished for PR 26044 at commit e6d4997.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -470,7 +470,7 @@ class PlannerSuite extends SharedSparkSession {
Inner,
None,
shuffle,
shuffle)
shuffle.copy())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a plan contains the exact same instance of Exchange multiple times then it makes no sense to replace one of the instances to a ReusedExchange.

@SparkQA
Copy link

SparkQA commented Oct 8, 2019

Test build #111882 has finished for PR 26044 at commit efd6045.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth peter-toth changed the title [SPARK-29375][SQL][WIP] Exchange reuse across all subquery levels [SPARK-29375][SQL] Exchange reuse across all subquery levels Oct 10, 2019
@peter-toth
Copy link
Contributor Author

cc @gatorsmile @maryannxue @mgaido91

// Keep the output of this exchange, the following plans require that to resolve
// attributes.
ReusedExchangeExec(exchange.output, samePlan.get)
val newExchange = exchanges.getOrElseUpdate(exchange.canonicalized, exchange)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit worried that this introduces a significant overhead with big plans

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Let me change this PR a bit and come back to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've modified it a bit to avoid canonicalization if possible.

@SparkQA
Copy link

SparkQA commented Oct 15, 2019

Test build #112112 has finished for PR 26044 at commit 8ee1921.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth peter-toth force-pushed the SPARK-29375-exchange-reuse branch 2 times, most recently from 1475ade to fe7526e Compare October 16, 2019 15:12
@SparkQA
Copy link

SparkQA commented Oct 16, 2019

Test build #112167 has finished for PR 26044 at commit 1475ade.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 16, 2019

Test build #112178 has finished for PR 26044 at commit fe7526e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth peter-toth force-pushed the SPARK-29375-exchange-reuse branch from fe7526e to fa0ac3a Compare November 1, 2019 15:19
@SparkQA
Copy link

SparkQA commented Nov 1, 2019

Test build #113098 has finished for PR 26044 at commit fa0ac3a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor Author

@cloud-fan @dongjoon-hyun @gatorsmile @maropu @maryannxue @mgaido91 @viirya any feedback/comment is gladly welcome

@maropu
Copy link
Member

maropu commented Nov 2, 2019

Could you show us actual performance numbers with/without this pr?

@SparkQA
Copy link

SparkQA commented Nov 10, 2019

Test build #113544 has finished for PR 26044 at commit 2661c3d.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -52,6 +54,13 @@ abstract class Exchange extends UnaryExecNode {
case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchange)
extends LeafExecNode {

override def equals(that: Any): Boolean = that match {
Copy link
Contributor Author

@peter-toth peter-toth Nov 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A ReusedExchangeExec is a reference to an Exchange, so 2 ReusedExchangeExecs should be equal only if they are pointing to the same Exchange instance.

@peter-toth
Copy link
Contributor Author

peter-toth commented Nov 10, 2019

Could you show us actual performance numbers with/without this pr?

I created a simple benchmark here: peter-toth@24cfee8 just to show that this PR (originalReuseExchangeVersion - false) makes sense in some cases.

OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Mac OS X 10.14.6
Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
Exchange reuse 1:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
originalReuseExchangeVersion - true               17409          17490         114          0.0  3481886067.0       1.0X
originalReuseExchangeVersion - false               6889           6907          26          0.0  1377804241.4       2.5X

OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Mac OS X 10.14.6
Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
Exchange reuse 2:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
originalReuseExchangeVersion - true               17279          17323          63          0.0  3455745161.4       1.0X
originalReuseExchangeVersion - false               7126           7177          72          0.0  1425193388.2       2.4X

@SparkQA
Copy link

SparkQA commented Nov 10, 2019

Test build #113549 has finished for PR 26044 at commit 1634003.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 11, 2019

Test build #113570 has finished for PR 26044 at commit 1634003.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth peter-toth force-pushed the SPARK-29375-exchange-reuse branch from a3978f0 to 04654f3 Compare November 11, 2019 08:02
@SparkQA
Copy link

SparkQA commented Nov 11, 2019

Test build #113573 has finished for PR 26044 at commit a3978f0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 11, 2019

Test build #113577 has finished for PR 26044 at commit 04654f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth peter-toth force-pushed the SPARK-29375-exchange-reuse branch from 04654f3 to 3f9e240 Compare November 18, 2019 12:03
@peter-toth
Copy link
Contributor Author

peter-toth commented Nov 18, 2019

@maropu I also run TPCDS queries and the following queries have modified plans after the PR:

  • q14a
  • q14b
  • q23a
  • q23b
  • q24a
  • q24b
  • q14-v2.7
  • q14a-v2.7
  • q24-v2.7

Here are the plans before and after the PR.
Benchmark results can be found here before and after the PR.
This comparison shows that q24a, q24b and q24-v2.7 become 1.4x times faster due to this change.

@SparkQA
Copy link

SparkQA commented Nov 18, 2019

Test build #114016 has finished for PR 26044 at commit 3f9e240.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 19, 2019

Test build #114104 has finished for PR 26044 at commit e0b4ca6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 19, 2019

Test build #114114 has finished for PR 26044 at commit e0b4ca6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114164 has finished for PR 26044 at commit a3c92e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth peter-toth force-pushed the SPARK-29375-exchange-reuse branch from a3c92e7 to 337735a Compare November 20, 2019 16:02
@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114171 has finished for PR 26044 at commit 337735a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case fallbackStartsAt =>
val splits = fallbackStartsAt.split(",").map(_.trim)
Some((splits.head.toInt, splits.last.toInt))
Option(sqlContext).flatMap {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was required because this UT error:

org.scalatest.exceptions.TestFailedException: udf/postgreSQL/udf-aggregates_part3.sql - Scala UDF Expected "struct<[col:bigint]>", but got "struct<[]>" Schema did not match for query #1 select udf((select udf(count(*))         from (values (1)) t0(inner_c))) as col from (values (2),(3)) t1(outer_c): QueryOutput(select udf((select udf(count(*))         from (values (1)) t0(inner_c))) as col from (values (2),(3)) t1(outer_c),struct<>,java.lang.NullPointerException null)

where the stacktrace of the executor is:

02:43:13.445 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 8.0 (TID 10)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
HashAggregate(keys=[], functions=[partial_count(1)], output=[count#397L])
+- Project
   +- LocalTableScan <empty>, [col1#385]

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:435)
	at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:424)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:102)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:63)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:132)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:261)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:772)
	at org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:742)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.execution.ScalarSubquery.canonicalized$lzycompute(subquery.scala:109)
	at org.apache.spark.sql.execution.ScalarSubquery.canonicalized(subquery.scala:108)
	at org.apache.spark.sql.execution.ScalarSubquery.canonicalized(subquery.scala:62)
	at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$canonicalized$1(Expression.scala:229)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.expressions.Expression.canonicalized$lzycompute(Expression.scala:229)
	at org.apache.spark.sql.catalyst.expressions.Expression.canonicalized(Expression.scala:228)
	at org.apache.spark.sql.catalyst.expressions.Expression.semanticHash(Expression.scala:248)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.hashCode(EquivalentExpressions.scala:41)
	at scala.runtime.Statics.anyHash(Statics.java:122)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
	at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
	at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
	at scala.collection.mutable.HashTable.findEntry(HashTable.scala:136)
	at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135)
	at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44)
	at scala.collection.mutable.HashMap.get(HashMap.scala:74)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:55)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:99)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1(CodeGenerator.scala:1118)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1$adapted(CodeGenerator.scala:1118)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1118)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1170)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.create(GenerateMutableProjection.scala:64)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:49)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:84)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:80)
	at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:47)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:95)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:103)
	at org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:471)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:116)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.GeneratedConstructorAccessor41.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:468)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:467)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 151 more
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.<init>(HashAggregateExec.scala:96)
	... 158 more

Because this PR adds lazy val canonicalized to ScalarSubquery, EvalPythonExec invoked canonicalization of HashAggregateExec on an executor where SparkSession is not available.
Honestly I'm not sure how many other SparkPlan nodes exist that can't be canonocalized on an executor for similar reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InSubqueryExec already has lazy val canonicalized defined so maybe this issue could come up without this PR in some DPP usecases.

@@ -59,7 +59,7 @@ case class ShuffleExchangeExec(

override def nodeName: String = "Exchange"

private val serializer: Serializer =
private lazy val serializer: Serializer =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar reasons for this change as above. This time the stacktrace is:

04:31:28.110 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 8.0 (TID 10)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
Exchange SinglePartition, true, [id=#180]
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#397L])
   +- *(1) Project
      +- *(1) LocalTableScan <empty>, [col1#385]

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:435)
        at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:424)
        at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:102)
        at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:63)
        at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:263)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:277)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:772)
        at org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:742)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
        at org.apache.spark.sql.execution.ScalarSubquery.canonicalized$lzycompute(subquery.scala:109)
        at org.apache.spark.sql.execution.ScalarSubquery.canonicalized(subquery.scala:108)
        at org.apache.spark.sql.execution.ScalarSubquery.canonicalized(subquery.scala:62)
        at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$canonicalized$1(Expression.scala:229)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.expressions.Expression.canonicalized$lzycompute(Expression.scala:229)
        at org.apache.spark.sql.catalyst.expressions.Expression.canonicalized(Expression.scala:228)
        at org.apache.spark.sql.catalyst.expressions.Expression.semanticHash(Expression.scala:248)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.hashCode(EquivalentExpressions.scala:41)
        at scala.runtime.Statics.anyHash(Statics.java:122)
        at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
        at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
        at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
        at scala.collection.mutable.HashTable.findEntry(HashTable.scala:136)
        at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135)
        at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44)
        at scala.collection.mutable.HashMap.get(HashMap.scala:74)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:55)
        at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:99)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1(CodeGenerator.scala:1118)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1$adapted(CodeGenerator.scala:1118)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1118)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1170)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.create(GenerateMutableProjection.scala:64)
        at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:49)
        at org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:84)
        at org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:80)
        at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:47)
        at org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:95)
        at org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:103)
        at org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:471)
        at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:116)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:468)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:467)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 133 more
Caused by: java.lang.NullPointerException
        at org.apache.spark.sql.execution.SparkPlan.sparkContext(SparkPlan.scala:72)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.metrics$lzycompute(ShuffleExchangeExec.scala:57)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.metrics(ShuffleExchangeExec.scala:58)
        at org.apache.spark.sql.execution.SparkPlan.longMetric(SparkPlan.scala:149)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.<init>(ShuffleExchangeExec.scala:63)
        ... 141 more

And if serializer is not lazy then it makes no sense for metrics to be lazy.

@peter-toth
Copy link
Contributor Author

@maropu

Could you show us actual performance numbers with/without this pr?

This PR results 1.4x times speedup on q24 queries. Here is a detailed comparison: #26044 (comment)
Do you think we can proceed?

@peter-toth peter-toth force-pushed the SPARK-29375-exchange-reuse branch from 337735a to ed26b33 Compare December 15, 2019 11:25
@SparkQA
Copy link

SparkQA commented Dec 15, 2019

Test build #115358 has finished for PR 26044 at commit ed26b33.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 25, 2020
@github-actions github-actions bot closed this Mar 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants