-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25363][SQL] Fix schema pruning in where clause by ignoring unnecessary root fields #22357
Conversation
cc @dbtsai |
testSchemaPruning("select a single complex field and in where clause") { | ||
val query = sql("select name.first from contacts where name.first = 'Jane'") | ||
checkScan(query, "struct<name:struct<first:string>>") | ||
checkAnswer(query, Row("Jane") :: Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add another tests that select name.first
and name.last,
and apply where clause
on name.first
. We should only read name.first
and name.last
without name.middle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Added test case for it.
@@ -196,6 +196,7 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { | |||
*/ | |||
private def getRootFields(expr: Expression): Seq[RootField] = { | |||
expr match { | |||
case IsNotNull(_: Attribute) | IsNull(_: Attribute) => Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmmm .. shouldn't we exclude this only for filters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is in projects, I think we also don't need to include all nested fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the case mentioned here looks specific to the pushed filter itself. Can we add a simple test for project case as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, for instance, this case select address is not null, name.last from contacts
it wouldn't work. I thought this is a quick bandaid fix to resolve a basic case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. This was a case I didn't test. Fixed it and added test case.
Test build #95780 has finished for PR 22357 at commit
|
retest this please |
Test build #95794 has finished for PR 22357 at commit
|
Test build #95787 has finished for PR 22357 at commit
|
retest this please. |
Test build #95799 has finished for PR 22357 at commit
|
Hi @viirya, Thanks for this PR! I have an alternative implementation which I'd like to submit for comparison. My implementation was something I removed from my original patch. I hope to have my PR submitted sometime today. I have another PR to submit, too. I'll be sure to refer to your PR in mine. Cheers. |
I have reconstructed my original patch for this issue, but I've discovered it will require more work to complete. However, as part of that reconstruction I've discovered a couple of cases where our patches create different physical plans. The query results are the same, but I'm not sure which—if either—plan is correct. I want to go into detail on that, but it's complicated and I have to call it quits tonight. I have a flight in the morning, and I'll be on break next week. In the meantime, I'll just copy and paste two queries—based on the data in First query:
This PR (as of d68f808) produces:
My WIP patch produces:
Second query:
This PR produces:
My WIP patch produces:
I wanted to give my thoughts on the differences of these in detail, but I have to wrap up my work for the night. I'll be visiting family next week. I don't know how responsive I'll be in that time, but I'll at least try to check back. Cheers. |
Thanks! @mallman For the first query, I think the query plan produced by your WIP patch is not correct. We don't need to read the For the second, your WIP patch doesn't push down That is the important difference I noticed for now. |
@mallman It will be great that we can have this fix in 2.4 release as this can dramatically reduce the data being read in many applications which is the purpose of the original work. As we're going to cut 2.4 RC soon, and you will not be available next week, can you post your implementation so we can work together to make this happen? We can now track the co-authorships in Spark and Github with contribution stats, so it will be cool to see multiple people collaborate on one PR :) Thanks. |
@@ -196,6 +201,9 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { | |||
*/ | |||
private def getRootFields(expr: Expression): Seq[RootField] = { | |||
expr match { | |||
// Those expressions don't really use the nested fields of a root field. | |||
case i@(IsNotNull(_: Attribute) | IsNull(_: Attribute)) => | |||
getRootFields(i.children(0)).map(_.copy(contentAccessed = false)) | |||
case att: Attribute => | |||
RootField(StructField(att.name, att.dataType, att.nullable), derivedFromAtt = true) :: Nil | |||
case SelectedField(field) => RootField(field, derivedFromAtt = false) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false))
case _ =>
expr.children.flatMap(getRootFields)
In general, the approach looks right to me except couple minor points. Thanks. |
*/ | ||
private case class RootField(field: StructField, derivedFromAtt: Boolean) | ||
private case class RootField(field: StructField, derivedFromAtt: Boolean, | ||
contentAccessed: Boolean = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting and please elaborate the comment
@@ -17,7 +17,7 @@ | |||
|
|||
package org.apache.spark.sql.execution.datasources.parquet | |||
|
|||
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, NamedExpression} | |||
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, IsNotNull, IsNull, NamedExpression} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which can be wildcard when there are more than 6 entities per https://github.com/databricks/scala-style-guide#imports
cc @beettlle |
@@ -110,7 +110,12 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { | |||
val projectionRootFields = projects.flatMap(getRootFields) | |||
val filterRootFields = filters.flatMap(getRootFields) | |||
|
|||
(projectionRootFields ++ filterRootFields).distinct | |||
val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields) | |||
.distinct.partition(_.contentAccessed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments here please.
checkAnswer(query2, Row("Jane", "Doe") :: Nil) | ||
|
||
val query3 = sql("select name.first from contacts " + | ||
"where employer.company.name = 'abc' and p = 1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say a user adds where employer.company is not null
, can we still read schema with employer:struct<company:struct<name:string>>>
as we only mark contentAccessed = false
when IsNotNull
is on an attribute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added one query test for this case. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When there is a nested field access in the query like employer.company.name
, then we don't need other fields inside employ.company
other than name
.
But if there is no such access but just employer.company is not null
in where clause, it will read full schema of employ.company
.
@@ -196,6 +201,9 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { | |||
*/ | |||
private def getRootFields(expr: Expression): Seq[RootField] = { | |||
expr match { | |||
// Those expressions don't really use the nested fields of a root field. | |||
case i@(IsNotNull(_: Attribute) | IsNull(_: Attribute)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: -> i @ (IsNotNull(_: ...
@@ -156,7 +161,7 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { | |||
// in the resulting schema may differ from their ordering in the logical relation's | |||
// original schema | |||
val mergedSchema = requestedRootFields | |||
.map { case RootField(field, _) => StructType(Array(field)) } | |||
.map { case RootField(field, _, _) => StructType(Array(field)) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big deal but .map { root: RootField => StructType(Array(root.field)) }
per https://github.com/databricks/scala-style-guide#pattern-matching
Thanks @dbtsai and @HyukjinKwon. Your comments are addressed. |
case IsNotNull(SelectedField(field)) => | ||
RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil | ||
case IsNull(SelectedField(field)) => | ||
RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dbtsai The question you mentioned at https://github.com/apache/spark/pull/22357/files#r216204022 was addressed by this.
Btw, this PR isn't intended to address filter push down for schema pruning. I do think it should be another one topic. |
Test build #95884 has finished for PR 22357 at commit
|
Test build #95871 has finished for PR 22357 at commit
|
FYI, per further checking code and discussion with @dbtsai regarding with predicate pushdown, we know that predicate push down only works for primitive types on Parquet datasource. So both |
if recall, parquet reader can have filter pushdown? only not so in spark parquet data source? |
I would expect |
// For them, if there are any nested fields accessed in the query, we don't need to add root | ||
// field access of above expressions. | ||
// For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`, | ||
// we don't need to read nested fields of `name` struct other than `first` field. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having trouble accepting this statement, but perhaps I'm reading too much into it (or not enough). Let me illustrate with a couple of queries and their physical plans.
Assuming the data model in ParquetSchemaPruningSuite.scala
, the physical plan for the query
select employer.id from contacts where employer is not null
is
== Physical Plan ==
*(1) Project [employer#36.id AS id#46]
+- *(1) Filter isnotnull(employer#36)
+- *(1) FileScan parquet [employer#36,p#37] Batched: false, Format: Parquet,
PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)],
ReadSchema: struct<employer:struct<id:int>>
The physical plan for the query
select employer.id from contacts where employer.id is not null
is
== Physical Plan ==
*(1) Project [employer#36.id AS id#47]
+- *(1) Filter (isnotnull(employer#36) && isnotnull(employer#36.id))
+- *(1) FileScan parquet [employer#36,p#37] Batched: false, Format: Parquet,
PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)],
ReadSchema: struct<employer:struct<id:int>>
The read schemata are the same, but the query filters are not. The file scan for the second query looks as I would expect, but the scan for the first query appears to only read employer.id
even though it needs to check employer is not null
. If it only reads employer.id
, how does it check that employer.company
is not null? Perhaps employer.id
is null but employer.company
is not null for some row...
I have run some tests to validate that this PR is returning the correct results for both queries, and it is. But I don't understand why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the first query, the constrain is employer is not null
.
When employer.id
is not null
, employer
will always not be null
; as a result, this PR will work.
However, when employer.id
is null
, employer
can be null
or something
, so we need to check if employer
is something
to return a null of employer.id
.
I checked in the ParquetFilter
, IsNotNull(employer)
will be ignored since it's not a valid parquet filter as parquet doesn't support pushdown on the struct; thus, with this PR, this query will return wrong answer.
I think in this scenario, as @mallman suggested, we might need to read the full data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked in the ParquetFilter, IsNotNull(employer) will be ignored since it's not a valid parquet filter as parquet doesn't support pushdown on the struct; thus, with this PR, this query will return wrong answer.
We may not worry about wrong answer from datasource like Parquet in predicate pushdown. As not all predicates are supported by pushdown, we always have a SparkSQL Filter on top of scan node to make sure to receive correct answer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A complex column is null and its fields are null are different. I think we don't need to read all the fields to check if the complex column is null. In other words, in above case, when we only read employer.id
and it is null, the predicate employer is not null
will still be true because it is a complex column containing a null field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya, I see your point about the difference between a complex type being null and a subfield being null. So to answer the following query
select address from contacts where name is not null
do we need to read any of the fields in name
? Or perhaps just read one arbitrary field of simple type, like name.first
? That's surprising, but I'm starting to believe it's true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently under this PR, name
will be fully read. This is not perfect. However, to pick one arbitrary field from name
sounds a little bit hacky to me. WDYT? cc @dbtsai @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, I think this is not seen as schema pruning case in the sense of original PR, so maybe we can leave it as it for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm okay with leaving it as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of reading any arbitrary field of simple type (which may not exist if it's a deeply nested struct), I think we should implement the pushdown with complex type in parquet with similar logic, and let parquet reader handle it.
@viirya Can you create a followup JIRA for this?
Thanks.
FYI, @mallman I'm working on having |
Can anyone point me out if there are non addressed comments or problems here? Looks pretty good to me. I think this is rather a bandaid, small and safe fix to get into branch-2.4. |
Test build #95931 has finished for PR 22357 at commit
|
checkAnswer(query4, Row("Jane", "abc") :: Nil) | ||
} | ||
|
||
testSchemaPruning("select nullable complex field and having is null predicate") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean having is not null predicate
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, yes, thanks.
@viirya Please amend Lines 303 to 306 in d684a0f
|
I have some bad news. The methods Based on what I'm seeing, I think it's fair to say that schema pruning is broken under certain circumstances when using a table schema that includes column names with upper-case characters (note that the test schema for contacts in Fortunately schema pruning is disabled by default, and I think it's still considered "experimental" technology. I think that fixing In any case, I will create an issue in Jira and submit a PR. |
Test build #95945 has finished for PR 22357 at commit
|
retest this please. |
FYI, the PR I previously mentioned about fixing the use of |
This LGTM. I'm not going to submit a PR for my approach to this problem. Thanks @viirya! |
And FYI this is the Jira issue I promised in |
That would be pretty cool. |
LGTM. Thank you all for participating the discussion. @cloud-fan and @gatorsmile, do you have any further comment? If not, I would like to merge it tomorrow into both master and rc branch as it's an important performance fix for schema pruning. Thanks. |
Test build #95950 has finished for PR 22357 at commit
|
LGTM from me too. |
…ecessary root fields ## What changes were proposed in this pull request? Schema pruning doesn't work if nested column is used in where clause. For example, ``` sql("select name.first from contacts where name.first = 'David'") == Physical Plan == *(1) Project [name#19.first AS first#40] +- *(1) Filter (isnotnull(name#19) && (name#19.first = David)) +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>> ``` In above query plan, the scan node reads the entire schema of `name` column. This issue is reported by: #21320 (comment) The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields. ## How was this patch tested? Unit tests. Closes #22357 from viirya/SPARK-25363. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: DB Tsai <d_tsai@apple.com> (cherry picked from commit 3030b82) Signed-off-by: DB Tsai <d_tsai@apple.com>
Thanks all again. Merged into 2.4 branch and master. |
…ecessary root fields ## What changes were proposed in this pull request? Schema pruning doesn't work if nested column is used in where clause. For example, ``` sql("select name.first from contacts where name.first = 'David'") == Physical Plan == *(1) Project [name#19.first AS first#40] +- *(1) Filter (isnotnull(name#19) && (name#19.first = David)) +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>> ``` In above query plan, the scan node reads the entire schema of `name` column. This issue is reported by: apache#21320 (comment) The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields. ## How was this patch tested? Unit tests. Closes apache#22357 from viirya/SPARK-25363. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
…ecessary root fields Schema pruning doesn't work if nested column is used in where clause. For example, ``` sql("select name.first from contacts where name.first = 'David'") == Physical Plan == *(1) Project [name#19.first AS first#40] +- *(1) Filter (isnotnull(name#19) && (name#19.first = David)) +- *(1) FileScan parquet [name#19] Batched: false, Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:struct<first:string,middle:string,last:string>> ``` In above query plan, the scan node reads the entire schema of `name` column. This issue is reported by: apache#21320 (comment) The cause is that we infer a root field from expression `IsNotNull(name)`. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields. Unit tests. Closes apache#22357 from viirya/SPARK-25363. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: DB Tsai <d_tsai@apple.com> (cherry picked from commit 3030b82) Signed-off-by: DB Tsai <d_tsai@apple.com> Ref: LIHADOOP-48531
What changes were proposed in this pull request?
Schema pruning doesn't work if nested column is used in where clause.
For example,
In above query plan, the scan node reads the entire schema of
name
column.This issue is reported by:
#21320 (comment)
The cause is that we infer a root field from expression
IsNotNull(name)
. However, for such expression, we don't really use the nested fields of this root field, so we can ignore the unnecessary nested fields.How was this patch tested?
Unit tests.