Skip to content

Commit

Permalink
[SPARK-26551][SQL] Fix schema pruning error when selecting one comple…
Browse files Browse the repository at this point in the history
…x field and having is not null predicate on another one

## What changes were proposed in this pull request?

Schema pruning has errors when selecting one complex field and having is not null predicate on another one:

```scala
val query = sql("select * from contacts")
  .where("name.middle is not null")
  .select(
    "id",
    "name.first",
    "name.middle",
    "name.last"
  )
  .where("last = 'Jones'")
  .select(count("id"))
```

```
java.lang.IllegalArgumentException: middle does not exist. Available: last
[info]   at org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:303)
[info]   at scala.collection.immutable.Map$Map1.getOrElse(Map.scala:119)
[info]   at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:302)
[info]   at org.apache.spark.sql.execution.ProjectionOverSchema.$anonfun$getProjection$6(ProjectionOverSchema.scala:58)
[info]   at scala.Option.map(Option.scala:163)
[info]   at org.apache.spark.sql.execution.ProjectionOverSchema.getProjection(ProjectionOverSchema.scala:56)
[info]   at org.apache.spark.sql.execution.ProjectionOverSchema.unapply(ProjectionOverSchema.scala:32)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaPruning$$anonfun$$nestedInanonfun$buildNewProjection$1$1.applyOrElse(Parque
tSchemaPruning.scala:153)
```

## How was this patch tested?

Added tests.

Closes apache#23474 from viirya/SPARK-26551.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
(cherry picked from commit 50ebf3a)
Signed-off-by: DB Tsai <d_tsai@apple.com>
  • Loading branch information
viirya authored and kai-chi committed Aug 1, 2019
1 parent ddad358 commit 9b4d3cd
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,28 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
// For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`,
// we don't need to read nested fields of `name` struct other than `first` field.
val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields)
.distinct.partition(_.contentAccessed)
.distinct.partition(!_.prunedIfAnyChildAccessed)

optRootFields.filter { opt =>
!rootFields.exists(_.field.name == opt.field.name)
!rootFields.exists { root =>
root.field.name == opt.field.name && {
// Checking if current optional root field can be pruned.
// For each required root field, we merge it with the optional root field:
// 1. If this optional root field has nested fields and any nested field of it is used
// in the query, the merged field type must equal to the optional root field type.
// We can prune this optional root field. For example, for optional root field
// `struct<name:struct<middle:string,last:string>>`, if its field
// `struct<name:struct<last:string>>` is used, we don't need to add this optional
// root field.
// 2. If this optional root field has no nested fields, the merged field type equals
// to the optional root field only if they are the same. If they are, we can prune
// this optional root field too.
val rootFieldType = StructType(Array(root.field))
val optFieldType = StructType(Array(opt.field))
val merged = optFieldType.merge(rootFieldType)
merged.sameType(optFieldType)
}
}
} ++ rootFields
}

Expand Down Expand Up @@ -213,11 +231,11 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
// don't actually use any nested fields. These root field accesses might be excluded later
// if there are any nested fields accesses in the query plan.
case IsNotNull(SelectedField(field)) =>
RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil
case IsNull(SelectedField(field)) =>
RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil
case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false))
expr.children.flatMap(getRootFields).map(_.copy(prunedIfAnyChildAccessed = true))
case _ =>
expr.children.flatMap(getRootFields)
}
Expand Down Expand Up @@ -271,9 +289,9 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
/**
* This represents a "root" schema field (aka top-level, no-parent). `field` is the
* `StructField` for field name and datatype. `derivedFromAtt` indicates whether it
* was derived from an attribute or had a proper child. `contentAccessed` means whether
* it was accessed with its content by the expressions refer it.
* was derived from an attribute or had a proper child. `prunedIfAnyChildAccessed` means
* whether this root field can be pruned if any of child field is used in the query.
*/
private case class RootField(field: StructField, derivedFromAtt: Boolean,
contentAccessed: Boolean = true)
prunedIfAnyChildAccessed: Boolean = false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.SchemaPruningTest
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -217,6 +218,41 @@ class ParquetSchemaPruningSuite
Row("Y.") :: Nil)
}

testSchemaPruning("select one complex field and having is null predicate on another " +
"complex field") {
val query = sql("select * from contacts")
.where("name.middle is not null")
.select(
"id",
"name.first",
"name.middle",
"name.last"
)
.where("last = 'Jones'")
.select(count("id")).toDF()
checkScan(query,
"struct<id:int,name:struct<middle:string,last:string>>")
checkAnswer(query, Row(0) :: Nil)
}

testSchemaPruning("select one deep nested complex field and having is null predicate on " +
"another deep nested complex field") {
val query = sql("select * from contacts")
.where("employer.company.address is not null")
.selectExpr(
"id",
"name.first",
"name.middle",
"name.last",
"employer.id as employer_id"
)
.where("employer_id = 0")
.select(count("id")).toDF()
checkScan(query,
"struct<id:int,employer:struct<id:int,company:struct<address:string>>>")
checkAnswer(query, Row(1) :: Nil)
}

private def testSchemaPruning(testName: String)(testThunk: => Unit) {
test(s"Spark vectorized reader - without partition data column - $testName") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
Expand Down

0 comments on commit 9b4d3cd

Please sign in to comment.