Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing #19218

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
677541b
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 13, 2017
4e70fff
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 14, 2017
3f022f9
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
6d77bf9
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
42aca3d
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 15, 2017
5cbe999
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
732266c
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
c7ff62c
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configur…
fjh100456 Sep 16, 2017
384ee04
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 20, 2017
8c92074
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 20, 2017
dd5060a
Merge branch 'master' into master
fjh100456 Sep 20, 2017
d427df5
Update InsertSuite.scala
fjh100456 Sep 20, 2017
35cfa01
Update InsertSuite.scala
fjh100456 Sep 20, 2017
5387497
Fix test problems
fjh100456 Sep 20, 2017
676d6a7
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 27, 2017
ae1da8f
Fix scala style issue
fjh100456 Sep 27, 2017
fd73145
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 28, 2017
7615939
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Sep 28, 2017
90cbcb3
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 10, 2017
dd6d635
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 10, 2017
4fe8170
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 12, 2017
aa31261
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
dfb36d9
Merge branch 'master' into master
fjh100456 Oct 16, 2017
c4801f6
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
105e129
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Oct 16, 2017
dc12038
Merge pull request #1 from apache/master
fjh100456 Dec 18, 2017
d779ee6
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Dec 19, 2017
0cb7b7a
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
fjh100456 Dec 20, 2017
78e0403
Resume the changing, and change it in another pr later.
fjh100456 Dec 23, 2017
7804f60
Change to public
fjh100456 Dec 23, 2017
52cdd75
Fix the code with gatorsmile's suggestion.
fjh100456 Dec 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class OrcFileFormat

conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, dataSchema.catalogString)

conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
conf.set(COMPRESS.getAttribute, orcOptions.compressionCodecClassName)

conf.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class OrcOptions(
* Compression codec to use.
* Acceptable values are defined in [[shortOrcCompressionCodecNames]].
*/
val compressionCodec: String = {
val compressionCodecClassName: String = {
// `compression`, `orc.compress`(i.e., OrcConf.COMPRESS), and `spark.sql.orc.compression.codec`
// are in order of precedence from highest to lowest.
val orcCompressionConf = parameters.get(COMPRESS.getAttribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf
/**
* Options for the Parquet data source.
*/
private[parquet] class ParquetOptions(
class ParquetOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient private val sqlConf: SQLConf)
extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,29 +134,30 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
val conf = spark.sessionState.conf
val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf)
assert(option.compressionCodec == "NONE")
assert(option.compressionCodecClassName == "NONE")
}

test("SPARK-21839: Add SQL config for ORC compression") {
val conf = spark.sessionState.conf
// Test if the default of spark.sql.orc.compression.codec is snappy
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY")
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "SNAPPY")

// OrcOptions's parameters have a higher priority than SQL configuration.
// `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "NONE")
val map1 = Map(COMPRESS.getAttribute -> "zlib")
val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo")
assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
assert(new OrcOptions(map1, conf).compressionCodecClassName == "ZLIB")
assert(new OrcOptions(map2, conf).compressionCodecClassName == "LZO")
}

// Test all the valid options of spark.sql.orc.compression.codec
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c =>
withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
val expected = if (c == "UNCOMPRESSED") "NONE" else c
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
assert(
new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == expected)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,16 @@ package org.apache.spark.sql.hive.execution

import java.util.Locale

import scala.collection.JavaConverters._

import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.orc.OrcConf.COMPRESS
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.orc.OrcOptions
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.internal.SQLConf

/**
* Options for the Hive data source. Note that rule `DetermineHiveSerde` will extract Hive
Expand Down Expand Up @@ -102,4 +111,17 @@ object HiveOptions {
"collectionDelim" -> "colelction.delim",
"mapkeyDelim" -> "mapkey.delim",
"lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase(Locale.ROOT) -> v }

def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): Option[(String, String)] = {
val tableProps = tableInfo.getProperties.asScala.toMap
tableInfo.getOutputFileFormatClassName.toLowerCase match {
case formatName if formatName.endsWith("parquetoutputformat") =>
val compressionCodec = new ParquetOptions(tableProps, sqlConf).compressionCodecClassName
Option((ParquetOutputFormat.COMPRESSION, compressionCodec))
case formatName if formatName.endsWith("orcoutputformat") =>
val compressionCodec = new OrcOptions(tableProps, sqlConf).compressionCodecClassName
Option((COMPRESS.getAttribute, compressionCodec))
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
.get("mapreduce.output.fileoutputformat.compress.type"))
}

// Set compression by priority
HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf)
.foreach { case (compression, codec) => hadoopConf.set(compression, codec) }

val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable

val configuration = job.getConfiguration

configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodecClassName)
configuration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
Expand Down
Loading