Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mgaido91 committed Nov 17, 2018
1 parent a71b1c6 commit 86106fa
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {

private def dedupSubqueryOnSelfJoin(values: Seq[Expression], sub: LogicalPlan): LogicalPlan = {
val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references))
val rightRefs = AttributeSet(sub.output)
val duplicates = leftRefs.intersect(rightRefs)
val duplicates = leftRefs.intersect(sub.outputSet)
if (duplicates.isEmpty) {
sub
} else {
Expand Down
14 changes: 8 additions & 6 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1284,12 +1284,14 @@ class SubquerySuite extends QueryTest with SharedSQLContext {

test("SPARK-26078: deduplicate fake self joins for IN subqueries") {
withTempView("a", "b") {
val a = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))),
StructType(Seq(StructField("id", StringType), StructField("num", IntegerType))))
val b = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))),
StructType(Seq(StructField("id", StringType), StructField("num", IntegerType))))
a.createOrReplaceTempView("a")
b.createOrReplaceTempView("b")
def genTestViewWithName(name: String): Unit = {
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))),
StructType(Seq(StructField("id", StringType), StructField("num", IntegerType))))
df.createOrReplaceTempView(name)
}
genTestViewWithName("a")
genTestViewWithName("b")

val df1 = spark.sql(
"""
Expand Down

0 comments on commit 86106fa

Please sign in to comment.