From 72c05b4f0bc20eada40381dce54be1c880c2c0cb Mon Sep 17 00:00:00 2001 From: Kris Mok Date: Mon, 17 Dec 2018 22:48:59 +0800 Subject: [PATCH] [SPARK-26352][SQL][FOLLOWUP-2.4] Fix missing sameOutput in branch-2.4 ## What changes were proposed in this pull request? After https://github.com/apache/spark/pull/23303 was merged to branch-2.3/2.4, the builds on those branches were broken due to missing a `LogicalPlan.sameOutput` function which came from https://github.com/apache/spark/pull/22713 only available on master. This PR is to follow-up with the broken 2.3/2.4 branches and make a copy of the new `LogicalPlan.sameOutput` into `ReorderJoin` to make it locally available. ## How was this patch tested? Fix the build of 2.3/2.4. Closes #23330 from rednaxelafx/clean-build-2.4. Authored-by: Kris Mok Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/optimizer/joins.scala | 17 ++++++++++++++++- .../catalyst/optimizer/JoinReorderSuite.scala | 19 +++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index b65221c236bfe..3d2c1e2997781 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -103,7 +103,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { createOrderedJoin(input, conditions) } - if (p.sameOutput(reordered)) { + if (sameOutput(p, reordered)) { reordered } else { // Reordering the joins have changed the order of the columns. @@ -111,6 +111,21 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { Project(p.output, reordered) } } + + /** + * Returns true iff output of both plans are semantically the same, ie.: + * - they contain the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + * NOTE: this is copied over from SPARK-25691 from master. + */ + def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { + val output1 = plan1.output + val output2 = plan2.output + output1.length == output2.length && output1.zip(output2).forall { + case (a1, a2) => a1.semanticEquals(a2) + } + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala index 36db2e2dd0ae2..133671ca363ac 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala @@ -376,8 +376,8 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { val optimized = Optimize.execute(analyzed) val expected = ResolveHints.execute(groundTruthBestPlan.analyze) - assert(analyzed.sameOutput(expected)) // if this fails, the expected plan itself is incorrect - assert(analyzed.sameOutput(optimized)) + assert(sameOutput(analyzed, expected)) // if this fails, the expected plan itself is incorrect + assert(sameOutput(analyzed, optimized)) compareJoinOrder(optimized, expected) } @@ -385,4 +385,19 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = { plans.map(_.output).reduce(_ ++ _) } + + /** + * Returns true iff output of both plans are semantically the same, ie.: + * - they contain the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + * NOTE: this is copied over from SPARK-25691 from master. + */ + def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { + val output1 = plan1.output + val output2 = plan2.output + output1.length == output2.length && output1.zip(output2).forall { + case (a1, a2) => a1.semanticEquals(a2) + } + } }