Skip to content

Commit

Permalink
[SPARK-26352][SQL][FOLLOWUP-2.4] Fix missing sameOutput in branch-2.4
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

After #23303 was merged to branch-2.3/2.4, the builds on those branches were broken due to missing a `LogicalPlan.sameOutput` function which came from #22713 only available on master.

This PR is to follow-up with the broken 2.3/2.4 branches and make a copy of the new `LogicalPlan.sameOutput` into `ReorderJoin` to make it locally available.

## How was this patch tested?

Fix the build of 2.3/2.4.

Closes #23330 from rednaxelafx/clean-build-2.4.

Authored-by: Kris Mok <rednaxelafx@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
rednaxelafx authored and cloud-fan committed Dec 17, 2018
1 parent e743e84 commit 0a69787
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,29 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
createOrderedJoin(input, conditions)
}

if (p.sameOutput(reordered)) {
if (sameOutput(p, reordered)) {
reordered
} else {
// Reordering the joins have changed the order of the columns.
// Inject a projection to make sure we restore to the expected ordering.
Project(p.output, reordered)
}
}

/**
* Returns true iff output of both plans are semantically the same, ie.:
* - they contain the same number of `Attribute`s;
* - references are the same;
* - the order is equal too.
* NOTE: this is copied over from SPARK-25691 from master.
*/
def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
val output1 = plan1.output
val output2 = plan2.output
output1.length == output2.length && output1.zip(output2).forall {
case (a1, a2) => a1.semanticEquals(a2)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,28 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
val optimized = Optimize.execute(analyzed)
val expected = groundTruthBestPlan.analyze

assert(analyzed.sameOutput(expected)) // if this fails, the expected plan itself is incorrect
assert(analyzed.sameOutput(optimized))
assert(sameOutput(analyzed, expected)) // if this fails, the expected plan itself is incorrect
assert(sameOutput(analyzed, optimized))

compareJoinOrder(optimized, expected)
}

private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = {
plans.map(_.output).reduce(_ ++ _)
}

/**
* Returns true iff output of both plans are semantically the same, ie.:
* - they contain the same number of `Attribute`s;
* - references are the same;
* - the order is equal too.
* NOTE: this is copied over from SPARK-25691 from master.
*/
def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
val output1 = plan1.output
val output2 = plan2.output
output1.length == output2.length && output1.zip(output2).forall {
case (a1, a2) => a1.semanticEquals(a2)
}
}
}

0 comments on commit 0a69787

Please sign in to comment.