Skip to content

Commit

Permalink
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'Par…
Browse files Browse the repository at this point in the history
…quetOptions', `parquet.compression` needs to be considered.

[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to be considered.

## What changes were proposed in this pull request?
Since Hive 1.1, Hive allows users to set parquet compression codec via table-level properties parquet.compression. See the JIRA: https://issues.apache.org/jira/browse/HIVE-7858 . We do support orc.compression for ORC. Thus, for external users, it is more straightforward to support both. See the stackflow question: https://stackoverflow.com/questions/36941122/spark-sql-ignores-parquet-compression-propertie-specified-in-tblproperties
In Spark side, our table-level compression conf compression was added by apache#11464 since Spark 2.0.
We need to support both table-level conf. Users might also use session-level conf spark.sql.parquet.compression.codec. The priority rule will be like
If other compression codec configuration was found through hive or parquet, the precedence would be compression, parquet.compression, spark.sql.parquet.compression.codec. Acceptable values include: none, uncompressed, snappy, gzip, lzo.
The rule for Parquet is consistent with the ORC after the change.

Changes:
1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the precedence order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`.

2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none".

3.Change `compressionCode` to `compressionCodecClassName`.

## How was this patch tested?
Add test.

Author: fjh100456 <fu.jinhua6@zte.com.cn>

Closes apache#20076 from fjh100456/ParquetOptionIssue.
  • Loading branch information
fjh100456 authored and gatorsmile committed Jan 6, 2018
1 parent be9a804 commit 7b78041
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 9 deletions.
6 changes: 4 additions & 2 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,10 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
<td><code>spark.sql.parquet.compression.codec</code></td>
<td>snappy</td>
<td>
Sets the compression codec use when writing Parquet files. Acceptable values include:
uncompressed, snappy, gzip, lzo.
Sets the compression codec used when writing Parquet files. If either `compression` or
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
none, uncompressed, snappy, gzip, lzo.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,13 @@ object SQLConf {
.createWithDefault(false)

val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
.doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
"uncompressed, snappy, gzip, lzo.")
.doc("Sets the compression codec used when writing Parquet files. If either `compression` or" +
"`parquet.compression` is specified in the table-specific options/properties, the precedence" +
"would be `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`." +
"Acceptable values include: none, uncompressed, snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
Expand Down Expand Up @@ -366,8 +368,10 @@ object SQLConf {
.createWithDefault(true)

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec use when writing ORC files. Acceptable values include: " +
"none, uncompressed, snappy, zlib, lzo.")
.doc("Sets the compression codec used when writing ORC files. If either `compression` or" +
"`orc.compress` is specified in the table-specific options/properties, the precedence" +
"would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." +
"Acceptable values include: none, uncompressed, snappy, zlib, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.util.Locale

import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand All @@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
* Acceptable values are defined in [[shortParquetCompressionCodecNames]].
*/
val compressionCodecClassName: String = {
val codecName = parameters.getOrElse("compression",
sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and
// `spark.sql.parquet.compression.codec`
// are in order of precedence from highest to lowest.
val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION)
val codecName = parameters
.get("compression")
.orElse(parquetCompressionConf)
.getOrElse(sqlConf.parquetCompressionCodec)
.toLowerCase(Locale.ROOT)
if (!shortParquetCompressionCodecNames.contains(codecName)) {
val availableCodecs =
shortParquetCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import java.io.File

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSQLContext {
test("Test `spark.sql.parquet.compression.codec` config") {
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
val expected = if (c == "NONE") "UNCOMPRESSED" else c
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
assert(option.compressionCodecClassName == expected)
}
}
}

test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") {
// When "compression" is configured, it should be the first choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip")
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "UNCOMPRESSED")
}

// When "compression" is not configured, "parquet.compression" should be the preferred choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "GZIP")
}

// When both "compression" and "parquet.compression" are not configured,
// spark.sql.parquet.compression.codec should be the right choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map.empty[String, String]
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "SNAPPY")
}
}

private def getTableCompressionCodec(path: String): Seq[String] = {
val hadoopConf = spark.sessionState.newHadoopConf()
val codecs = for {
footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
block <- footer.getParquetMetadata.getBlocks.asScala
column <- block.getColumns.asScala
} yield column.getCodec.name()
codecs.distinct
}

private def createTableWithCompression(
tableName: String,
isPartitioned: Boolean,
compressionCodec: String,
rootDir: File): Unit = {
val options =
s"""
|OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
|'parquet.compression'='$compressionCodec')
""".stripMargin
val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
sql(
s"""
|CREATE TABLE $tableName USING Parquet $options $partitionCreate
|AS SELECT 1 AS col1, 2 AS p
""".stripMargin)
}

private def checkCompressionCodec(compressionCodec: String, isPartitioned: Boolean): Unit = {
withTempDir { tmpDir =>
val tempTableName = "TempParquetTable"
withTable(tempTableName) {
createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir)
val partitionPath = if (isPartitioned) "p=2" else ""
val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
val realCompressionCodecs = getTableCompressionCodec(path)
assert(realCompressionCodecs.forall(_ == compressionCodec))
}
}
}

test("Create parquet table with compression") {
Seq(true, false).foreach { isPartitioned =>
Seq("UNCOMPRESSED", "SNAPPY", "GZIP").foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
}
}

test("Create table with unknown compression") {
Seq(true, false).foreach { isPartitioned =>
val exception = intercept[IllegalArgumentException] {
checkCompressionCodec("aa", isPartitioned)
}
assert(exception.getMessage.contains("Codec [aa] is not available"))
}
}
}

0 comments on commit 7b78041

Please sign in to comment.