Skip to content

Commit

Permalink
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spa…
Browse files Browse the repository at this point in the history
…rk.sql.orc.compression.codec' configuration doesn't take effect on hive table writing

[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing

What changes were proposed in this pull request?

Pass ‘spark.sql.parquet.compression.codec’ value to ‘parquet.compression’.
Pass ‘spark.sql.orc.compression.codec’ value to ‘orc.compress’.

How was this patch tested?

Add test.

Note:
This is the same issue mentioned in #19218 . That branch was deleted mistakenly, so make a new pr instead.

gatorsmile maropu dongjoon-hyun discipleforteen

Author: fjh100456 <fu.jinhua6@zte.com.cn>
Author: Takeshi Yamamuro <yamamuro@apache.org>
Author: Wenchen Fan <wenchen@databricks.com>
Author: gatorsmile <gatorsmile@gmail.com>
Author: Yinan Li <liyinan926@gmail.com>
Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Juliusz Sompolski <julek@databricks.com>
Author: Felix Cheung <felixcheung_m@hotmail.com>
Author: jerryshao <sshao@hortonworks.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: Gera Shegalov <gera@apache.org>
Author: chetkhatri <ckhatrimanjal@gmail.com>
Author: Joseph K. Bradley <joseph@databricks.com>
Author: Bago Amirbekian <bago@databricks.com>
Author: Xianjin YE <advancedxy@gmail.com>
Author: Bruce Robbins <bersprockets@gmail.com>
Author: zuotingbing <zuo.tingbing9@zte.com.cn>
Author: Kent Yao <yaooqinn@hotmail.com>
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Adrian Ionescu <adrian@databricks.com>

Closes #20087 from fjh100456/HiveTableWriting.

(cherry picked from commit 00d1691)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
fjh100456 authored and gatorsmile committed Jan 20, 2018
1 parent e11d5ea commit b9c1367
Show file tree
Hide file tree
Showing 5 changed files with 397 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ object OrcOptions {
"snappy" -> "SNAPPY",
"zlib" -> "ZLIB",
"lzo" -> "LZO")

def getORCCompressionCodecName(name: String): String = shortOrcCompressionCodecNames(name)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf
/**
* Options for the Parquet data source.
*/
private[parquet] class ParquetOptions(
class ParquetOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient private val sqlConf: SQLConf)
extends Serializable {
Expand Down Expand Up @@ -82,4 +82,8 @@ object ParquetOptions {
"snappy" -> CompressionCodecName.SNAPPY,
"gzip" -> CompressionCodecName.GZIP,
"lzo" -> CompressionCodecName.LZO)

def getParquetCompressionCodecName(name: String): String = {
shortParquetCompressionCodecNames(name).name()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,16 @@ package org.apache.spark.sql.hive.execution

import java.util.Locale

import scala.collection.JavaConverters._

import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.orc.OrcConf.COMPRESS
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.orc.OrcOptions
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.internal.SQLConf

/**
* Options for the Hive data source. Note that rule `DetermineHiveSerde` will extract Hive
Expand Down Expand Up @@ -102,4 +111,17 @@ object HiveOptions {
"collectionDelim" -> "colelction.delim",
"mapkeyDelim" -> "mapkey.delim",
"lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase(Locale.ROOT) -> v }

def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): Option[(String, String)] = {
val tableProps = tableInfo.getProperties.asScala.toMap
tableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match {
case formatName if formatName.endsWith("parquetoutputformat") =>
val compressionCodec = new ParquetOptions(tableProps, sqlConf).compressionCodecClassName
Option((ParquetOutputFormat.COMPRESSION, compressionCodec))
case formatName if formatName.endsWith("orcoutputformat") =>
val compressionCodec = new OrcOptions(tableProps, sqlConf).compressionCodec
Option((COMPRESS.getAttribute, compressionCodec))
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,28 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
partitionAttributes: Seq[Attribute] = Nil): Set[String] = {

val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
val isCompressed =
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match {
case formatName if formatName.endsWith("orcoutputformat") =>
// For ORC,"mapreduce.output.fileoutputformat.compress",
// "mapreduce.output.fileoutputformat.compress.codec", and
// "mapreduce.output.fileoutputformat.compress.type"
// have no impact because it uses table properties to store compression information.
false
case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean
}

if (isCompressed) {
// Please note that isCompressed, "mapreduce.output.fileoutputformat.compress",
// "mapreduce.output.fileoutputformat.compress.codec", and
// "mapreduce.output.fileoutputformat.compress.type"
// have no impact on ORC because it uses table properties to store compression information.
hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
fileSinkConf.setCompressed(true)
fileSinkConf.setCompressCodec(hadoopConf
.get("mapreduce.output.fileoutputformat.compress.codec"))
fileSinkConf.setCompressType(hadoopConf
.get("mapreduce.output.fileoutputformat.compress.type"))
} else {
// Set compression by priority
HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf)
.foreach { case (compression, codec) => hadoopConf.set(compression, codec) }
}

val committer = FileCommitProtocol.instantiate(
Expand Down
Loading

0 comments on commit b9c1367

Please sign in to comment.