Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26078][SQL] Dedup self-join attributes on IN subqueries #23057

Closed
wants to merge 10 commits into from

Conversation

mgaido91
Copy link
Contributor

What changes were proposed in this pull request?

When there is a self-join as result of a IN subquery, the join condition may be invalid, resulting in trivially true predicates and return wrong results.

The PR deduplicates the subquery output in order to avoid the issue.

How was this patch tested?

added UT

@mgaido91
Copy link
Contributor Author

cc @cloud-fan @viirya

@SparkQA
Copy link

SparkQA commented Nov 16, 2018

Test build #98913 has finished for PR 23057 at commit 2af656a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Nov 16, 2018

Thanks @mgaido91. I will review this tomorrow.

@SparkQA
Copy link

SparkQA commented Nov 16, 2018

Test build #98914 has finished for PR 23057 at commit a71b1c6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 16, 2018

Test build #98920 has finished for PR 23057 at commit a71b1c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


private def dedupSubqueryOnSelfJoin(values: Seq[Expression], sub: LogicalPlan): LogicalPlan = {
val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references))
val rightRefs = AttributeSet(sub.output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just outputSet?

@@ -119,7 +139,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
// (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2) AND B.B3 > 1
val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And)
// Deduplicate conflicting attributes if any.
dedupJoin(Join(outerPlan, sub, LeftAnti, Option(finalJoinCond)))
dedupJoin(Join(outerPlan, newSub, LeftAnti, Option(finalJoinCond)))
case (p, predicate) =>
val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p)
Project(p.output, Filter(newCond.get, inputPlan))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In rewriteExistentialExpr, there is a similar logic for InSubquery. Should we also do dedupSubqueryOnSelfJoin for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmmh...rewriteExistentialExpr operates on the result of the foldLeft,so every InSubquery there was already transformed using dedupSubqueryOnSelfJoin, right? So I don't think it is needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try this test case?

val df1 = spark.sql(
        """
          |SELECT id,num,source FROM (
          |  SELECT id, num, 'a' as source FROM a
          |  UNION ALL
          |  SELECT id, num, 'b' as source FROM b
          |) AS c WHERE c.id IN (SELECT id FROM b WHERE num = 2) OR
          |c.id IN (SELECT id FROM b WHERE num = 3)
        """.stripMargin)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fails indeed. I'll investigate it, thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your help here @viirya. I added the check also to rewriteExistentialExpr. I was missing the case when it is invoked not only on the result of foldLeft. Thanks.

val a = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))),
StructType(Seq(StructField("id", StringType), StructField("num", IntegerType))))
val b = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("a", 2), Row("b", 1))),
StructType(Seq(StructField("id", StringType), StructField("num", IntegerType))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two schema is the same. We can define it just once?

@SparkQA
Copy link

SparkQA commented Nov 17, 2018

Test build #98969 has finished for PR 23057 at commit 86106fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 19, 2018

Test build #99008 has finished for PR 23057 at commit 3d010fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

any more comments @cloud-fan @viirya ?

Project(aliasedExpressions, plan)
}

private def dedupSubqueryOnSelfJoin(values: Seq[Expression], sub: LogicalPlan): LogicalPlan = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a simple code comment for this method?

@viirya
Copy link
Member

viirya commented Nov 21, 2018

The change looks fine to me. cc @cloud-fan

@mgaido91
Copy link
Contributor Author

thanks @viirya , I added a comment

@SparkQA
Copy link

SparkQA commented Nov 21, 2018

Test build #99111 has finished for PR 23057 at commit 65fca4f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

any comments @cloud-fan ?

@mgaido91
Copy link
Contributor Author

cc @gatorsmile too

@mccheah
Copy link
Contributor

mccheah commented Dec 1, 2018

Is this ready to merge?

@mgaido91
Copy link
Contributor Author

mgaido91 commented Dec 3, 2018

@mccheah this is waiting for reviews by committers

@mgaido91
Copy link
Contributor Author

@cloud-fan @gatorsmile may you please take a look at this? Thanks.

StructType(Seq(StructField("id", StringType), StructField("num", IntegerType))))
df.createOrReplaceTempView(name)
}
genTestViewWithName("a")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("a")
Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("b")

val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
// Deduplicate conflicting attributes if any.
dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond))
dedupJoin(Join(outerPlan, newSub, LeftSemi, joinCond))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need to dedup here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't, let me remove it.

@@ -92,18 +114,20 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
// Deduplicate conflicting attributes if any.
dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't need dedupJoin, but always dedup the subquery before putting it in a join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to dedup the subquery only when the join condition has not been created yet (so in the case of InSubquery). In this case, the condition is already there, so I think we still have to use dedupJoin (for Exists)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dedupJoin will evetually dedup the subquery IIUC.

What I'd like to do is to unify dedupJoin and dedupSubqueryOnSelfJoin, so that the code will be consistent for all cases:

val newSub = dedup(sub, values)
// create join condition if any
Join(outerPlan, newSub, ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main problem is that in the other cases, so when exists is there, the condition is already created. So we would need to complicate quite a lot the method in order to handle the 2 cases and I am not sure wether it is worth. For instance, the values, in the Exists case, should be taken from the conditions as those expressions referencing attributes from one side and the join condition needs to be rewritten. So I don't think that it is a good idea to have a common rewrite for both them: it would be overcomplicated IMHO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah thanks for the explanation! Makes sense to me.

@SparkQA
Copy link

SparkQA commented Dec 11, 2018

Test build #99968 has finished for PR 23057 at commit 1beb40c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 11, 2018

Test build #99970 has finished for PR 23057 at commit 1beb40c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 12, 2018

Test build #100018 has finished for PR 23057 at commit 0312558.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 12, 2018

Test build #100023 has finished for PR 23057 at commit 0312558.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

aliasMap.getOrElse(ref, ref)
}
val newRight = Project(aliasedExpressions, right)
val newRight = rewriteDedupPlan(right, aliasMap)
val newJoinCond = joinCond.map { condExpr =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to your PR, but is this corrected? The duplicated attributes in join condition may refer to left or right child, how can we blindly replace them with new attributes from right side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I actually think this is useless. Let me try and remove it.

condition: Option[Expression]): Join = {
// Deduplicate conflicting attributes if any.
val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan)
Join(outerPlan, dedupSubplan, joinType, condition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add an assert to make sure the condition doesn't contain conflicting attributes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this: how do we check it? If the same attribute is present on both sides of a BinaryOperator? Is this always forbidden?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to refactor the code a little bit

...
val duplicates = outerRefs.intersect(subplan.outputSet)
condition.foreach {
  case a: Attribute if duplicates.contains(a) => fail
  case _ =>
}
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean now. I'll do that, thanks.

@SparkQA
Copy link

SparkQA commented Dec 13, 2018

Test build #100083 has finished for PR 23057 at commit 6528582.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 13, 2018

Test build #100112 has started for PR 23057 at commit ec710d7.

@mgaido91
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 14, 2018

Test build #100145 has finished for PR 23057 at commit ec710d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

condition.foreach { e =>
val conflictingAttrs = e.references.intersect(duplicates)
if (conflictingAttrs.nonEmpty) {
throw new AnalysisException("Found conflicting attributes " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for curiosity, when can this happen? or how we guarantee this will never happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can happen in case the condition is built in advance (eg. the correlated condition of exists) and it contains some attribute which is not dedup. I am not sure if this scenario can actually happen or our dedup logic in the previous rules guarantees this will never happen, though.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in cd815ae Dec 16, 2018
@cloud-fan
Copy link
Contributor

Hi @mgaido91 , since we are going to have new releases for branch 2.3 and 2.4, do you know if this bug exists in 2.3/2.4 and shall we backport it? thanks!

@mgaido91
Copy link
Contributor Author

mgaido91 commented Jan 3, 2019

@cloud-fan yes, this affects 2.3/2.4 too. Let me know if you want me to open the PRs for backporting it there.

I am just wondering what to do for 2.2, since there is a discussion about its last release. If we want this there too, we should backport also SPARK-21835. What do you think? cc @viirya too

@cloud-fan
Copy link
Contributor

also cc @dongjoon-hyun @HyukjinKwon

IIRC there were some refactorings about subquery rewrite, not sure how hard it is to backport to 2.2.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Jan 3, 2019

backporting to 2.2 requires SPARK-21835, not sure if that is built on top of other changes...

@dongjoon-hyun
Copy link
Member

Hi, @cloud-fan , @gatorsmile , @rxin . I know the correctness issue policy on release.
So, for the most correctness issues, I'm trying to review and cover them.

However, we have already one exceptional correctness issue SPARK-25206 which is decided not to be in branch-2.2 due to technical difficulty and risk. For me, Spark 2.2.3 is a little bit exceptional release since that is a farewell release and branch-2.2 is already EOL and too far from the active branch master.

So, for these risky issues (this one: SPARK-26078) and SPARK-25206, I'd like to put them out of the scope of the farewell release and recommend the users to use the latest one.

How do you think about that?

@cloud-fan
Copy link
Contributor

SGTM, seems 2.3 and 2.4 are good enough to backport. @mgaido91 do you mind sending a new PR to backport? thanks!

@dongjoon-hyun
Copy link
Member

Thank you, @cloud-fan !

mgaido91 added a commit to mgaido91/spark that referenced this pull request Jan 4, 2019
…queries

When there is a self-join as result of a IN subquery, the join condition may be invalid, resulting in trivially true predicates and return wrong results.

The PR deduplicates the subquery output in order to avoid the issue.

added UT

Closes apache#23057 from mgaido91/SPARK-26078.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
mgaido91 added a commit to mgaido91/spark that referenced this pull request Jan 4, 2019
…queries

When there is a self-join as result of a IN subquery, the join condition may be invalid, resulting in trivially true predicates and return wrong results.

The PR deduplicates the subquery output in order to avoid the issue.

added UT

Closes apache#23057 from mgaido91/SPARK-26078.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
holdenk pushed a commit to holdenk/spark that referenced this pull request Jan 5, 2019
## What changes were proposed in this pull request?

When there is a self-join as result of a IN subquery, the join condition may be invalid, resulting in trivially true predicates and return wrong results.

The PR deduplicates the subquery output in order to avoid the issue.

## How was this patch tested?

added UT

Closes apache#23057 from mgaido91/SPARK-26078.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

When there is a self-join as result of a IN subquery, the join condition may be invalid, resulting in trivially true predicates and return wrong results.

The PR deduplicates the subquery output in order to avoid the issue.

## How was this patch tested?

added UT

Closes apache#23057 from mgaido91/SPARK-26078.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants