Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28169][SQL] Convert scan predicate condition to CNF #28805

Closed

Conversation

AngersZhuuuu
Copy link
Contributor

What changes were proposed in this pull request?

Spark can't push down scan predicate condition of Or:
Such as if I have a table default.test, it's partition col is dt,
If we use query :

select * from default.test 
where dt=20190625 or (dt = 20190626 and id in (1,2,3) )

In this case, Spark will resolve Or condition as one expression, and since this expr has reference of "id", then it can't been push down.

Base on pr #28733, In my PR , for SQL like
select * from default.test
where dt = 20190626 or (dt = 20190627 and xxx="a")

For this condition dt = 20190626 or (dt = 20190627 and xxx="a" ), it will been converted to CNF

(dt = 20190626 or dt = 20190627) and (dt = 20190626 or xxx = "a" )

then condition dt = 20190626 or dt = 20190627 will be push down when partition pruning

Why are the changes needed?

Optimize partition pruning

Does this PR introduce any user-facing change?

NO

How was this patch tested?

Added UT

@SparkQA
Copy link

SparkQA commented Jun 12, 2020

Test build #123882 has finished for PR 28805 at commit b253af3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 12, 2020

Test build #123890 has finished for PR 28805 at commit 478a7a8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 12, 2020

Test build #123915 has finished for PR 28805 at commit 69f1763.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 12, 2020

Test build #123904 has finished for PR 28805 at commit 603660b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 12, 2020

Test build #123928 has finished for PR 28805 at commit 2f576fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu
Copy link
Contributor Author

@gengliangwang
Copy link
Member

In this case, Spark will resolve Or condition as one expression, and since this expr has reference of "id", then it can't been push down.

Sorry, could you explain more here?
The CNF process should break down dt = 20190626 and id in (1,2,3) to Seq((dt = 20190626), (id in (1,2,3)), and then these two sub-predicates will be processed in groupExpressionsByQualifier. What is the problem here?

@AngersZhuuuu
Copy link
Contributor Author

The CNF process should break down dt = 20190626 and id in (1,2,3) to Seq((dt = 20190626), (id in (1,2,3)), and then these two sub-predicates will be processed in groupExpressionsByQualifier. What is the problem here?

In current partition pruning, ScanOperation get predicates by splitConjunctivePredicates ,
if there is (dt = 1 or (dt = 2 and id = 3)), it won't be seperated, then since this expression is reference contains (id, dt), it won't be pushed down as a partition predicates. Then it will scan all data in the partition table.

object HiveTableScans extends Strategy {
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      case ScanOperation(projectList, predicates, relation: HiveTableRelation) =>
        // Filter out all predicates that only deal with partition keys, these are given to the
        // hive table scan operator to be used for partition pruning.
        val partitionKeyIds = AttributeSet(relation.partitionCols)
        val (pruningPredicates, otherPredicates) = predicates.partition { predicate =>
          !predicate.references.isEmpty &&
          predicate.references.subsetOf(partitionKeyIds)
        }

        pruneFilterProject(
          projectList,
          otherPredicates,
          identity[Seq[Expression]],
          HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil
      case _ =>
        Nil
    }
  }

With convert to CNF, (dt = 1 or (dt = 2 and id = 3)) will be converted to (dt = 1 or dt = 2) and (dt = 1 or id = 3)) then this expression can be split by splitConjunctivePredicates and split to two expression (dt = 1 or dt = 2) and (dt = 1 or id = 3)), then (dt = 1 or dt = 2) can be pushed down as partition pruning predicates.

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124256 has finished for PR 28805 at commit e71c45c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class TimeFormatters(date: DateFormatter, timestamp: TimestampFormatter)

@AngersZhuuuu
Copy link
Contributor Author

retest this please

@AngersZhuuuu
Copy link
Contributor Author

@dongjoon-hyun Seems jenkins wrong? I didn't add class named TimeFormatters

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124270 has finished for PR 28805 at commit e71c45c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class TimeFormatters(date: DateFormatter, timestamp: TimestampFormatter)

@SparkQA
Copy link

SparkQA commented Jun 30, 2020

Test build #124670 has finished for PR 28805 at commit 35b5813.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Convert an expression to conjunctive normal form when pushing predicates through Join,
* when expand predicates, we can group by the qualifier avoiding generate unnecessary
* expression to control the length of final result since there are multiple tables.
* @param condition condition need to be convert
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: convert -> converted

* expression to control the length of final result since there are multiple tables.
* @param condition condition need to be convert
* @return expression seq in conjunctive normal form of input expression, if length exceeds
* the threshold [[SQLConf.MAX_CNF_NODE_COUNT]] or length != 1, return empty Seq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This @return says the same thing with the line 211 in a different way?

   * @return the CNF result as sequence of disjunctive expressions. If the number of expressions
   *         exceeds threshold on converting `Or`, `Seq.empty` is returned.

* [[splitConjunctivePredicates]] won't split [[Or]] expression.
* @param condition condition need to be convert
* @return expression seq in conjunctive normal form of input expression, if length exceeds
* the threshold [[SQLConf.MAX_CNF_NODE_COUNT]] or length != 1, return empty Seq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

def conjunctiveNormalFormAndGroupExpsByQualifier(condition: Expression): Seq[Expression] = {
conjunctiveNormalForm(condition,
(expressions: Seq[Expression]) =>
expressions.groupBy(_.references.map(_.qualifier)).map(_._2.reduceLeft(And)).toSeq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit format:

    conjunctiveNormalForm(condition, (expressions: Seq[Expression]) =>
        expressions.groupBy(_.references.map(_.qualifier)).map(_._2.reduceLeft(And)).toSeq)

*/
def conjunctiveNormalFormAndGroupExpsByReference(condition: Expression): Seq[Expression] = {
conjunctiveNormalForm(condition,
(expressions: Seq[Expression]) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit format:

    conjunctiveNormalForm(condition, (expressions: Seq[Expression]) =>
        expressions.groupBy(e => AttributeSet(e.references)).map(_._2.reduceLeft(And)).toSeq)

/**
* Convert an expression to conjunctive normal form when pushing predicates for partition pruning,
* when expand predicates, we can group by the reference avoiding generate unnecessary expression
* to control the length of final result since here we just have one table. In partition pruning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How about rephrasing it like this?

   * Convert an expression to conjunctive normal form for predicate pushdown and partition pruning.
   * When expanding predicates, this method groups expressions by their references for reducing
   * the size of pushed down predicates and corresponding codegen. In partition pruning strategies,
   * ...


object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("PruneHiveTablePartitions", Once,
EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
}

test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") {
test("SPARK-15616 statistics pruned after going through PruneHiveTablePartitions") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SPARK-15616 -> SPARK-15616: (This is not related to this pr though)

@maropu
Copy link
Member

maropu commented Jul 1, 2020

LGTM except for the minor comments.

@AngersZhuuuu
Copy link
Contributor Author

LGTM except for the minor comments.

All minor comment done

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124717 has finished for PR 28805 at commit 3df019a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu
Copy link
Contributor Author

retest this please

* @return the CNF result as sequence of disjunctive expressions. If the number of expressions
* exceeds threshold on converting `Or`, `Seq.empty` is returned.
*/
def conjunctiveNormalFormAndGroupExpsByQualifier(condition: Expression): Seq[Expression] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, the method name conjunctiveNormalFormAndGroupExpsByQualifier is too long and the And is weird.
How about changing to CNFWithGroupExpressionsByQualifier?

* @return the CNF result as sequence of disjunctive expressions. If the number of expressions
* exceeds threshold on converting `Or`, `Seq.empty` is returned.
*/
def conjunctiveNormalFormAndGroupExpsByReference(condition: Expression): Seq[Expression] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing to CNFWithGroupExpressionsByReference?

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for one comment on method naming. Thanks for the work.

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124732 has finished for PR 28805 at commit 1b8466e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu
Copy link
Contributor Author

LGTM except for one comment on method naming. Thanks for the work.

Updated, seems lates jenkins test failed not related to my change?

@maropu
Copy link
Member

maropu commented Jul 1, 2020

Yea, looks the failuare not related to this PR.

@AngersZhuuuu
Copy link
Contributor Author

Yea, looks the failuare not related to this PR.

Ok, some confuse, can I see how spark's jenkins config the CI/CD?, I want to make our internal ci/cd pipline can show Unit Test result like this jenkins and some place I don't know how to config

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124721 has finished for PR 28805 at commit 1b8466e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 15fb5d7 Jul 1, 2020
@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124761 has finished for PR 28805 at commit e2777c9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

cloud-fan pushed a commit that referenced this pull request Jul 20, 2020
… Join/Partitions

### What changes were proposed in this pull request?

In #28733 and #28805, CNF conversion is used to push down disjunctive predicates through join and partitions pruning.

It's a good improvement, however, converting all the predicates in CNF can lead to a very long result, even with grouping functions over expressions.  For example, for the following predicate
```
(p0 = '1' AND p1 = '1') OR (p0 = '2' AND p1 = '2') OR (p0 = '3' AND p1 = '3') OR (p0 = '4' AND p1 = '4') OR (p0 = '5' AND p1 = '5') OR (p0 = '6' AND p1 = '6') OR (p0 = '7' AND p1 = '7') OR (p0 = '8' AND p1 = '8') OR (p0 = '9' AND p1 = '9') OR (p0 = '10' AND p1 = '10') OR (p0 = '11' AND p1 = '11') OR (p0 = '12' AND p1 = '12') OR (p0 = '13' AND p1 = '13') OR (p0 = '14' AND p1 = '14') OR (p0 = '15' AND p1 = '15') OR (p0 = '16' AND p1 = '16') OR (p0 = '17' AND p1 = '17') OR (p0 = '18' AND p1 = '18') OR (p0 = '19' AND p1 = '19') OR (p0 = '20' AND p1 = '20')
```
will be converted into a long query(130K characters) in Hive metastore, and there will be error:
```
javax.jdo.JDOException: Exception thrown when executing query : SELECT DISTINCT 'org.apache.hadoop.hive.metastore.model.MPartition' AS NUCLEUS_TYPE,A0.CREATE_TIME,A0.LAST_ACCESS_TIME,A0.PART_NAME,A0.PART_ID,A0.PART_NAME AS NUCORDER0 FROM PARTITIONS A0 LEFT OUTER JOIN TBLS B0 ON A0.TBL_ID = B0.TBL_ID LEFT OUTER JOIN DBS C0 ON B0.DB_ID = C0.DB_ID WHERE B0.TBL_NAME = ? AND C0."NAME" = ? AND ((((((A0.PART_NAME LIKE '%/p1=1' ESCAPE '\' ) OR (A0.PART_NAME LIKE '%/p1=2' ESCAPE '\' )) OR (A0.PART_NAME LIKE '%/p1=3' ESCAPE '\' )) OR ((A0.PART_NAME LIKE '%/p1=4' ESCAPE '\' ) O ...
```

Essentially, we just need to traverse predicate and extract the convertible sub-predicates like what we did in #24598. There is no need to maintain the CNF result set.

### Why are the changes needed?

A better implementation for pushing down disjunctive and complex predicates. The pushed down predicates is always equal or shorter than the CNF result.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

Closes #29101 from gengliangwang/pushJoin.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants