diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index b65221c236bfe..3d2c1e2997781 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -103,7 +103,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { createOrderedJoin(input, conditions) } - if (p.sameOutput(reordered)) { + if (sameOutput(p, reordered)) { reordered } else { // Reordering the joins have changed the order of the columns. @@ -111,6 +111,21 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { Project(p.output, reordered) } } + + /** + * Returns true iff output of both plans are semantically the same, ie.: + * - they contain the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + * NOTE: this is copied over from SPARK-25691 from master. + */ + def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { + val output1 = plan1.output + val output2 = plan2.output + output1.length == output2.length && output1.zip(output2).forall { + case (a1, a2) => a1.semanticEquals(a2) + } + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala index 36db2e2dd0ae2..133671ca363ac 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala @@ -376,8 +376,8 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { val optimized = Optimize.execute(analyzed) val expected = ResolveHints.execute(groundTruthBestPlan.analyze) - assert(analyzed.sameOutput(expected)) // if this fails, the expected plan itself is incorrect - assert(analyzed.sameOutput(optimized)) + assert(sameOutput(analyzed, expected)) // if this fails, the expected plan itself is incorrect + assert(sameOutput(analyzed, optimized)) compareJoinOrder(optimized, expected) } @@ -385,4 +385,19 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = { plans.map(_.output).reduce(_ ++ _) } + + /** + * Returns true iff output of both plans are semantically the same, ie.: + * - they contain the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + * NOTE: this is copied over from SPARK-25691 from master. + */ + def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { + val output1 = plan1.output + val output2 = plan2.output + output1.length == output2.length && output1.zip(output2).forall { + case (a1, a2) => a1.semanticEquals(a2) + } + } }