Skip to content

Commit

Permalink
[SPARK-26709][SQL] OptimizeMetadataOnlyQuery does not handle empty re…
Browse files Browse the repository at this point in the history
…cords correctly

## What changes were proposed in this pull request?

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in apache#13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

## How was this patch tested?

Unit test

Closes apache#23635 from gengliangwang/optimizeMetadata.

Lead-authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Co-authored-by: Xiao Li <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
(cherry picked from commit f5b9370)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
2 people authored and kai-chi committed Jul 23, 2019
1 parent 79c4b90 commit c00c811
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 14 deletions.
12 changes: 0 additions & 12 deletions docs/sql-data-sources-parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,18 +295,6 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
</p>
</td>
</tr>
<tr>
<td><code>spark.sql.optimizer.metadataOnly</code></td>
<td>true</td>
<td>
<p>
When true, enable the metadata-only query optimization that use the table's metadata to
produce the partition columns instead of table scans. It applies when all the columns scanned
are partition columns and the query has an aggregate operator that satisfies distinct
semantics.
</p>
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.writeLegacyFormat</code></td>
<td>false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,12 +561,14 @@ object SQLConf {
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)

val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.internal()
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
"scanned are partition columns and the query has an aggregate operator that satisfies " +
"distinct semantics.")
"distinct semantics. By default the optimization is disabled, since it may return " +
"incorrect results when the files are empty.")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)

val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
.doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
})
}
if (isAllDistinctAgg) {
logWarning("Since configuration `spark.sql.optimizer.metadataOnly` is enabled, " +
"Spark will scan partition-level metadata without scanning data files. " +
"This could result in wrong results when the partition metadata exists but the " +
"inclusive data files are empty."
)
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
} else {
a
Expand Down
37 changes: 37 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2966,6 +2966,43 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
withTable("t") {
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
if (enableOptimizeMetadataOnlyQuery) {
// The result is wrong if we enable the configuration.
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
} else {
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
}
checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
}

withTempPath { path =>
val tabLocation = path.getCanonicalPath
val partLocation1 = tabLocation + "/p=3"
val partLocation2 = tabLocation + "/p=1"
// SPARK-23271 empty RDD when saved should write a metadata only file
val df = spark.emptyDataFrame.select(lit(1).as("col"))
df.write.parquet(partLocation1)
val df2 = spark.range(10).toDF("col")
df2.write.parquet(partLocation2)
val readDF = spark.read.parquet(tabLocation)
if (enableOptimizeMetadataOnlyQuery) {
// The result is wrong if we enable the configuration.
checkAnswer(readDF.selectExpr("max(p)"), Row(3))
} else {
checkAnswer(readDF.selectExpr("max(p)"), Row(1))
}
checkAnswer(readDF.selectExpr("max(col)"), Row(9))
}
}
}
}
}

case class Foo(bar: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -2290,4 +2290,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
withTable("t") {
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
if (enableOptimizeMetadataOnlyQuery) {
// The result is wrong if we enable the configuration.
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
} else {
checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
}
checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
}
}
}
}

}

0 comments on commit c00c811

Please sign in to comment.