Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25363][SQL] Fix schema pruning in where clause by ignoring unnecessary root fields #22357

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -110,7 +110,17 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
val projectionRootFields = projects.flatMap(getRootFields)
val filterRootFields = filters.flatMap(getRootFields)

(projectionRootFields ++ filterRootFields).distinct
// Kind of expressions don't need to access any fields of a root fields, e.g., `IsNotNull`.
// For them, if there are any nested fields accessed in the query, we don't need to add root
// field access of above expressions.
// For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`,
// we don't need to read nested fields of `name` struct other than `first` field.
Copy link
Contributor

@mallman mallman Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having trouble accepting this statement, but perhaps I'm reading too much into it (or not enough). Let me illustrate with a couple of queries and their physical plans.

Assuming the data model in ParquetSchemaPruningSuite.scala, the physical plan for the query

select employer.id from contacts where employer is not null

is

== Physical Plan ==
*(1) Project [employer#36.id AS id#46]
+- *(1) Filter isnotnull(employer#36)
   +- *(1) FileScan parquet [employer#36,p#37] Batched: false, Format: Parquet,
    PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)],
    ReadSchema: struct<employer:struct<id:int>>

The physical plan for the query

select employer.id from contacts where employer.id is not null

is

== Physical Plan ==
*(1) Project [employer#36.id AS id#47]
+- *(1) Filter (isnotnull(employer#36) && isnotnull(employer#36.id))
   +- *(1) FileScan parquet [employer#36,p#37] Batched: false, Format: Parquet,
    PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)],
    ReadSchema: struct<employer:struct<id:int>>

The read schemata are the same, but the query filters are not. The file scan for the second query looks as I would expect, but the scan for the first query appears to only read employer.id even though it needs to check employer is not null. If it only reads employer.id, how does it check that employer.company is not null? Perhaps employer.id is null but employer.company is not null for some row...

I have run some tests to validate that this PR is returning the correct results for both queries, and it is. But I don't understand why.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first query, the constrain is employer is not null.

When employer.id is not null, employer will always not be null; as a result, this PR will work.

However, when employer.id is null, employer can be null or something, so we need to check if employer is something to return a null of employer.id.

I checked in the ParquetFilter, IsNotNull(employer) will be ignored since it's not a valid parquet filter as parquet doesn't support pushdown on the struct; thus, with this PR, this query will return wrong answer.

I think in this scenario, as @mallman suggested, we might need to read the full data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked in the ParquetFilter, IsNotNull(employer) will be ignored since it's not a valid parquet filter as parquet doesn't support pushdown on the struct; thus, with this PR, this query will return wrong answer.

We may not worry about wrong answer from datasource like Parquet in predicate pushdown. As not all predicates are supported by pushdown, we always have a SparkSQL Filter on top of scan node to make sure to receive correct answer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A complex column is null and its fields are null are different. I think we don't need to read all the fields to check if the complex column is null. In other words, in above case, when we only read employer.id and it is null, the predicate employer is not null will still be true because it is a complex column containing a null field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya, I see your point about the difference between a complex type being null and a subfield being null. So to answer the following query

select address from contacts where name is not null

do we need to read any of the fields in name? Or perhaps just read one arbitrary field of simple type, like name.first? That's surprising, but I'm starting to believe it's true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently under this PR, name will be fully read. This is not perfect. However, to pick one arbitrary field from name sounds a little bit hacky to me. WDYT? cc @dbtsai @cloud-fan

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I think this is not seen as schema pruning case in the sense of original PR, so maybe we can leave it as it for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm okay with leaving it as-is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of reading any arbitrary field of simple type (which may not exist if it's a deeply nested struct), I think we should implement the pushdown with complex type in parquet with similar logic, and let parquet reader handle it.

@viirya Can you create a followup JIRA for this?

Thanks.

val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields)
.distinct.partition(_.contentAccessed)

optRootFields.filter { opt =>
!rootFields.exists(_.field.name == opt.field.name)
} ++ rootFields
}

/**
Expand Down Expand Up @@ -156,7 +166,7 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
// in the resulting schema may differ from their ordering in the logical relation's
// original schema
val mergedSchema = requestedRootFields
.map { case RootField(field, _) => StructType(Array(field)) }
.map { case root: RootField => StructType(Array(root.field)) }
.reduceLeft(_ merge _)
val dataSchemaFieldNames = fileDataSchema.fieldNames.toSet
val mergedDataSchema =
Expand Down Expand Up @@ -199,6 +209,15 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
case att: Attribute =>
RootField(StructField(att.name, att.dataType, att.nullable), derivedFromAtt = true) :: Nil
case SelectedField(field) => RootField(field, derivedFromAtt = false) :: Nil
// Root field accesses by `IsNotNull` and `IsNull` are special cases as the expressions
// don't actually use any nested fields. These root field accesses might be excluded later
// if there are any nested fields accesses in the query plan.
case IsNotNull(SelectedField(field)) =>
RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
case IsNull(SelectedField(field)) =>
RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbtsai The question you mentioned at https://github.com/apache/spark/pull/22357/files#r216204022 was addressed by this.

case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false))
case _ =>
expr.children.flatMap(getRootFields)
}
Expand Down Expand Up @@ -250,8 +269,11 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
}

/**
* A "root" schema field (aka top-level, no-parent) and whether it was derived from
* an attribute or had a proper child.
* This represents a "root" schema field (aka top-level, no-parent). `field` is the
* `StructField` for field name and datatype. `derivedFromAtt` indicates whether it
* was derived from an attribute or had a proper child. `contentAccessed` means whether
* it was accessed with its content by the expressions refer it.
*/
private case class RootField(field: StructField, derivedFromAtt: Boolean)
private case class RootField(field: StructField, derivedFromAtt: Boolean,
contentAccessed: Boolean = true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,29 @@ class ParquetSchemaPruningSuite
with SchemaPruningTest
with SharedSQLContext {
case class FullName(first: String, middle: String, last: String)
case class Company(name: String, address: String)
case class Employer(id: Int, company: Company)
case class Contact(
id: Int,
name: FullName,
address: String,
pets: Int,
friends: Array[FullName] = Array.empty,
relatives: Map[String, FullName] = Map.empty)
relatives: Map[String, FullName] = Map.empty,
employer: Employer = null)

val janeDoe = FullName("Jane", "X.", "Doe")
val johnDoe = FullName("John", "Y.", "Doe")
val susanSmith = FullName("Susan", "Z.", "Smith")

val employer = Employer(0, Company("abc", "123 Business Street"))
val employerWithNullCompany = Employer(1, null)

private val contacts =
Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
relatives = Map("brother" -> johnDoe)) ::
Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil
relatives = Map("brother" -> johnDoe), employer = employer) ::
Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe),
employer = employerWithNullCompany) :: Nil

case class Name(first: String, last: String)
case class BriefContact(id: Int, name: Name, address: String)
Expand All @@ -66,13 +73,14 @@ class ParquetSchemaPruningSuite
pets: Int,
friends: Array[FullName] = Array(),
relatives: Map[String, FullName] = Map(),
employer: Employer = null,
p: Int)

case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int)

private val contactsWithDataPartitionColumn =
contacts.map { case Contact(id, name, address, pets, friends, relatives) =>
ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) }
contacts.map { case Contact(id, name, address, pets, friends, relatives, employer) =>
ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, employer, 1) }
private val briefContactsWithDataPartitionColumn =
briefContacts.map { case BriefContact(id, name, address) =>
BriefContactWithDataPartitionColumn(id, name, address, 2) }
Expand Down Expand Up @@ -155,6 +163,60 @@ class ParquetSchemaPruningSuite
Row(null) :: Row(null) :: Nil)
}

testSchemaPruning("select a single complex field and in where clause") {
val query1 = sql("select name.first from contacts where name.first = 'Jane'")
checkScan(query1, "struct<name:struct<first:string>>")
checkAnswer(query1, Row("Jane") :: Nil)

val query2 = sql("select name.first, name.last from contacts where name.first = 'Jane'")
checkScan(query2, "struct<name:struct<first:string,last:string>>")
checkAnswer(query2, Row("Jane", "Doe") :: Nil)

val query3 = sql("select name.first from contacts " +
"where employer.company.name = 'abc' and p = 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say a user adds where employer.company is not null, can we still read schema with employer:struct<company:struct<name:string>>> as we only mark contentAccessed = false when IsNotNull is on an attribute?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one query test for this case. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there is a nested field access in the query like employer.company.name, then we don't need other fields inside employ.company other than name.

But if there is no such access but just employer.company is not null in where clause, it will read full schema of employ.company.

checkScan(query3, "struct<name:struct<first:string>," +
"employer:struct<company:struct<name:string>>>")
checkAnswer(query3, Row("Jane") :: Nil)

val query4 = sql("select name.first, employer.company.name from contacts " +
"where employer.company is not null and p = 1")
checkScan(query4, "struct<name:struct<first:string>," +
"employer:struct<company:struct<name:string>>>")
checkAnswer(query4, Row("Jane", "abc") :: Nil)
}

testSchemaPruning("select nullable complex field and having is not null predicate") {
val query = sql("select employer.company from contacts " +
"where employer is not null and p = 1")
checkScan(query, "struct<employer:struct<company:struct<name:string,address:string>>>")
checkAnswer(query, Row(Row("abc", "123 Business Street")) :: Row(null) :: Nil)
}

testSchemaPruning("select a single complex field and is null expression in project") {
val query = sql("select name.first, address is not null from contacts")
checkScan(query, "struct<name:struct<first:string>,address:string>")
checkAnswer(query.orderBy("id"),
Row("Jane", true) :: Row("John", true) :: Row("Janet", true) :: Row("Jim", true) :: Nil)
}

testSchemaPruning("select a single complex field array and in clause") {
val query = sql("select friends.middle from contacts where friends.first[0] = 'Susan'")
checkScan(query,
"struct<friends:array<struct<first:string,middle:string>>>")
checkAnswer(query.orderBy("id"),
Row(Array("Z.")) :: Nil)
}

testSchemaPruning("select a single complex field from a map entry and in clause") {
val query =
sql("select relatives[\"brother\"].middle from contacts " +
"where relatives[\"brother\"].first = 'John'")
checkScan(query,
"struct<relatives:map<string,struct<first:string,middle:string>>>")
checkAnswer(query.orderBy("id"),
Row("Y.") :: Nil)
}

private def testSchemaPruning(testName: String)(testThunk: => Unit) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
test(s"Spark vectorized reader - without partition data column - $testName") {
Expand Down Expand Up @@ -238,10 +300,7 @@ class ParquetSchemaPruningSuite

testMixedCasePruning("filter with different-case column names") {
val query = sql("select id from mixedcase where Col2.b = 2")
// Pruning with filters is currently unsupported. As-is, the file reader will read the id column
// and the entire coL2 struct. Once pruning with filters has been implemented we can uncomment
// this line
// checkScan(query, "struct<id:int,coL2:struct<B:int>>")
checkScan(query, "struct<id:int,coL2:struct<B:int>>")
checkAnswer(query.orderBy("id"), Row(1) :: Nil)
}

Expand Down