-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing #19218
Changes from 20 commits
677541b
4e70fff
3f022f9
6d77bf9
42aca3d
5cbe999
732266c
c7ff62c
384ee04
8c92074
dd5060a
d427df5
35cfa01
5387497
676d6a7
ae1da8f
fd73145
7615939
90cbcb3
dd6d635
4fe8170
aa31261
dfb36d9
c4801f6
105e129
dc12038
d779ee6
0cb7b7a
78e0403
7804f60
52cdd75
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { | |
.get("mapreduce.output.fileoutputformat.compress.type")) | ||
} | ||
|
||
fileSinkConf.tableInfo.getOutputFileFormatClassName match { | ||
case formatName if formatName.toLowerCase.endsWith("parquetoutputformat") => | ||
val compressionConf = "parquet.compression" | ||
val compressionCodec = getCompressionByPriority( | ||
fileSinkConf, | ||
compressionConf, | ||
default = sparkSession.sessionState.conf.parquetCompressionCodec) match { | ||
case "NONE" => "UNCOMPRESSED" | ||
case _@x => x | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same here. |
||
} | ||
hadoopConf.set(compressionConf, compressionCodec) | ||
case formatName if formatName.endsWith("OrcOutputFormat") => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
val compressionConf = "orc.compress" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. -> |
||
val compressionCodec = getCompressionByPriority( | ||
fileSinkConf, | ||
compressionConf, | ||
default = sparkSession.sessionState.conf.orcCompressionCodec) match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest to add a normalization logics for both ORC and Parquet. Check the |
||
case "UNCOMPRESSED" => "NONE" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why ORC and Parquet are different? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, they are different, the style of parameter names and parameter values are all different, and should be parquet and orc problems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why always making it upper case? This looks buggy. |
||
case _@x => x | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, the following process will check the correctness of this value, and because "orcoptions" is not accessable here, I have to add the "uncompressed" => "NONE" conversion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move the whole determination logics to |
||
hadoopConf.set(compressionConf, compressionCodec) | ||
case _ => | ||
} | ||
|
||
val committer = FileCommitProtocol.instantiate( | ||
sparkSession.sessionState.conf.fileCommitProtocolClass, | ||
jobId = java.util.UUID.randomUUID().toString, | ||
|
@@ -86,6 +110,19 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { | |
options = Map.empty) | ||
} | ||
|
||
// Because compression configurations can come in a variety of ways, | ||
// we choose the compression configuration in this order: | ||
// For parquet: `compression` > `parquet.compression` > `spark.sql.parquet.compression.codec` | ||
// For orc: `compression` > `orc.compress` > `spark.sql.orc.compression.codec` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it okay to leave this priority in the spark document or somewhere? https://spark.apache.org/docs/latest/sql-programming-guide.html#configuration |
||
private def getCompressionByPriority(fileSinkConf: FileSinkDesc, | ||
compressionConf: String, default: String): String = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. private def getCompressionByPriority(
fileSinkConf: FileSinkDesc,
compressionConf: String,
default: String): String = { There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add the description to explain the priority sequences? |
||
// The variable `default` was set to spark sql conf. | ||
val props = fileSinkConf.tableInfo.getProperties | ||
val priorities = List("compression", compressionConf) | ||
priorities.find(props.getProperty(_, null) != null) | ||
.map(props.getProperty).getOrElse(default).toUpperCase(Locale.ROOT) | ||
} | ||
|
||
protected def getExternalTmpPath( | ||
sparkSession: SparkSession, | ||
hadoopConf: Configuration, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,14 +19,18 @@ package org.apache.spark.sql.hive | |
|
||
import java.io.File | ||
|
||
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.hadoop.fs.Path | ||
import org.scalatest.BeforeAndAfter | ||
|
||
import org.apache.spark.SparkException | ||
import org.apache.spark.sql.{QueryTest, _} | ||
import org.apache.spark.sql.catalyst.parser.ParseException | ||
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable | ||
import org.apache.spark.sql.execution.datasources.parquet.ParquetTest | ||
import org.apache.spark.sql.hive.orc.OrcFileOperator | ||
import org.apache.spark.sql.hive.test.TestHiveSingleton | ||
import org.apache.spark.sql.test.SQLTestUtils | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.util.Utils | ||
|
||
|
@@ -35,7 +39,7 @@ case class TestData(key: Int, value: String) | |
case class ThreeCloumntable(key: Int, value: String, key1: String) | ||
|
||
class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter | ||
with SQLTestUtils { | ||
with ParquetTest { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the insert suite. We are unable to do this. Could you create a separate suite in the current package There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please also check whether the compression takes an effect? Compare the size whether is smaller than the original size without compressions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I will do it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems compressed table does not always be smaller than uncompressed tables. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fine to me. Thanks! |
||
import spark.implicits._ | ||
|
||
override lazy val testData = spark.sparkContext.parallelize( | ||
|
@@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter | |
assert(e.contains("mismatched input 'ROW'")) | ||
} | ||
} | ||
|
||
private def getConvertMetastoreConfName(format: String): String = format match { | ||
case "parquet" => "spark.sql.hive.convertMetastoreParquet" | ||
case "orc" => "spark.sql.hive.convertMetastoreOrc" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you use keys?
|
||
} | ||
|
||
private def getSparkCompressionConfName(format: String): String = format match { | ||
case "parquet" => "spark.sql.parquet.compression.codec" | ||
case "orc" => "spark.sql.orc.compression.codec" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here, too.
|
||
} | ||
|
||
private def getTableCompressPropName(format: String): String = { | ||
format.toLowerCase match { | ||
case "parquet" => "parquet.compression" | ||
case "orc" => "orc.compress" | ||
} | ||
} | ||
|
||
private def getTableCompressionCodec(path: String, format: String): String = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic means a compression codec from the files. The prefix There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change to getHiveCompressPropName, is it appropriate? |
||
val hadoopConf = spark.sessionState.newHadoopConf() | ||
val codecs = format match { | ||
case "parquet" => for { | ||
footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) | ||
block <- footer.getParquetMetadata.getBlocks.asScala | ||
column <- block.getColumns.asScala | ||
} yield column.getCodec.name() | ||
case "orc" => new File(path).listFiles().filter{ file => | ||
file.isFile && !file.getName.endsWith(".crc") && file.getName != "_SUCCESS" | ||
}.map { orcFile => | ||
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString | ||
}.toSeq | ||
} | ||
|
||
assert(codecs.distinct.length == 1) | ||
codecs.head | ||
} | ||
|
||
private def writeDataToTable( | ||
rootDir: File, | ||
tableName: String, | ||
isPartitioned: Boolean, | ||
format: String, | ||
compressionCodec: Option[String]) { | ||
val tblProperties = compressionCodec match { | ||
case Some(prop) => s"TBLPROPERTIES('${getTableCompressPropName(format)}'='$prop')" | ||
case _ => "" | ||
} | ||
val partitionCreate = if (isPartitioned) "PARTITIONED BY (p int)" else "" | ||
sql( | ||
s""" | ||
|CREATE TABLE $tableName(a int) | ||
|$partitionCreate | ||
|STORED AS $format | ||
|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' | ||
|$tblProperties | ||
""".stripMargin) | ||
|
||
val partitionInsert = if (isPartitioned) s"partition (p=10000)" else "" | ||
sql( | ||
s""" | ||
|INSERT OVERWRITE TABLE $tableName | ||
|$partitionInsert | ||
|SELECT * from table_source | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit. |
||
""".stripMargin) | ||
} | ||
|
||
private def checkCompressionCodecForTable( | ||
format: String, | ||
isPartitioned: Boolean, | ||
compressionCodec: Option[String])(assertion: String => Unit): Unit = { | ||
val tableName = s"tbl_$format${isPartitioned}" | ||
withTempDir { tmpDir => | ||
withTable(tableName) { | ||
writeDataToTable(tmpDir, tableName, isPartitioned, format, compressionCodec) | ||
val partition = if (isPartitioned) "p=10000" else "" | ||
val path = s"${tmpDir.getPath.stripSuffix("/")}/${tableName}/$partition" | ||
assertion(getTableCompressionCodec(path, format)) | ||
} | ||
} | ||
} | ||
|
||
private def checkTableCompressionCodecForCodecs( | ||
format: String, | ||
isPartitioned: Boolean, | ||
convertMetastore: Boolean, | ||
compressionCodecs: List[String], | ||
tableCompressionCodecs: List[String]) ( | ||
assertion: (Option[String], String, String) => Unit): Unit = { | ||
withSQLConf(getConvertMetastoreConfName(format) -> convertMetastore.toString) { | ||
tableCompressionCodecs.foreach { tableCompression => | ||
compressionCodecs.foreach { sessionCompressionCodec => | ||
withSQLConf(getSparkCompressionConfName(format) -> sessionCompressionCodec) { | ||
val compression = if (tableCompression == null) None else Some(tableCompression) | ||
checkCompressionCodecForTable(format, isPartitioned, compression) { | ||
case realCompressionCodec => assertion(compression, | ||
sessionCompressionCodec, realCompressionCodec) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
private def testCompressionCodec(testCondition: String)(f: => Unit): Unit = { | ||
test("[SPARK-21786] - Check the priority between table-level compression and " + | ||
s"session-level compression $testCondition") { | ||
withTempView("table_source") { | ||
(0 until 100000).toDF("a").createOrReplaceTempView("table_source") | ||
f | ||
} | ||
} | ||
} | ||
|
||
testCompressionCodec("when table-level and session-level compression are both configured and " + | ||
"convertMetastore is false") { | ||
def checkForTableWithCompressProp(format: String, compressCodecs: List[String]): Unit = { | ||
// For tables with table-level compression property, when | ||
// 'spark.sql.hive.convertMetastore[Parquet|Orc]' was set to 'false', partitioned tables | ||
// and non-partitioned tables will always take the table-level compression | ||
// configuration first and ignore session compression configuration. | ||
// Check for partitioned table, when convertMetastore is false | ||
checkTableCompressionCodecForCodecs( | ||
format = format, | ||
isPartitioned = true, | ||
convertMetastore = false, | ||
compressionCodecs = compressCodecs, | ||
tableCompressionCodecs = compressCodecs) { | ||
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) => | ||
// expect table-level take effect | ||
assert(tableCompressionCodec.get == realCompressionCodec) | ||
} | ||
|
||
// Check for non-partitioned table, when convertMetastoreParquet is false | ||
checkTableCompressionCodecForCodecs( | ||
format = format, | ||
isPartitioned = false, | ||
convertMetastore = false, | ||
compressionCodecs = compressCodecs, | ||
tableCompressionCodecs = compressCodecs) { | ||
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) => | ||
// expect table-level take effect | ||
assert(tableCompressionCodec.get == realCompressionCodec) | ||
} | ||
} | ||
|
||
checkForTableWithCompressProp("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP")) | ||
checkForTableWithCompressProp("orc", List("NONE", "SNAPPY", "ZLIB")) | ||
} | ||
|
||
testCompressionCodec("when there's no table-level compression and convertMetastore is false") { | ||
def checkForTableWithoutCompressProp(format: String, compressCodecs: List[String]): Unit = { | ||
// For tables without table-level compression property, session-level compression | ||
// configuration will take effect. | ||
// Check for partitioned table, when convertMetastore is false | ||
checkTableCompressionCodecForCodecs( | ||
format = format, | ||
isPartitioned = true, | ||
convertMetastore = false, | ||
compressionCodecs = compressCodecs, | ||
tableCompressionCodecs = List(null)) { | ||
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) => | ||
// expect session-level take effect | ||
assert(sessionCompressionCodec == realCompressionCodec) | ||
} | ||
|
||
// Check for non-partitioned table, when convertMetastore is false | ||
checkTableCompressionCodecForCodecs( | ||
format = format, | ||
isPartitioned = false, | ||
convertMetastore = false, | ||
compressionCodecs = compressCodecs, | ||
tableCompressionCodecs = List(null)) { | ||
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) => | ||
// expect session-level take effect | ||
assert(sessionCompressionCodec == realCompressionCodec) | ||
} | ||
} | ||
|
||
checkForTableWithoutCompressProp("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP")) | ||
checkForTableWithoutCompressProp("orc", List("NONE", "SNAPPY", "ZLIB")) | ||
} | ||
|
||
testCompressionCodec("when table-level and session-level compression are both configured and " + | ||
"convertMetastore is true") { | ||
def checkForTableWithCompressProp(format: String, compressCodecs: List[String]): Unit = { | ||
// For tables with table-level compression property, when | ||
// 'spark.sql.hive.convertMetastore[Parquet|Orc]' was set to 'true', partitioned tables | ||
// will always take the table-level compression configuration first, but non-partitioned | ||
// tables will take the session-level compression configuration. | ||
// Check for partitioned table, when convertMetastore is true | ||
checkTableCompressionCodecForCodecs( | ||
format = format, | ||
isPartitioned = true, | ||
convertMetastore = true, | ||
compressionCodecs = compressCodecs, | ||
tableCompressionCodecs = compressCodecs) { | ||
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) => | ||
// expect table-level take effect | ||
assert(tableCompressionCodec.get == realCompressionCodec) | ||
} | ||
|
||
// Check for non-partitioned table, when convertMetastore is true | ||
checkTableCompressionCodecForCodecs( | ||
format = format, | ||
isPartitioned = false, | ||
convertMetastore = true, | ||
compressionCodecs = compressCodecs, | ||
tableCompressionCodecs = compressCodecs) { | ||
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) => | ||
// expect session-level take effect | ||
assert(sessionCompressionCodec == realCompressionCodec) | ||
} | ||
} | ||
|
||
checkForTableWithCompressProp("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP")) | ||
checkForTableWithCompressProp("orc", List("NONE", "SNAPPY", "ZLIB")) | ||
} | ||
|
||
testCompressionCodec("when there's no table-level compression and convertMetastore is true") { | ||
def checkForTableWithoutCompressProp(format: String, compressCodecs: List[String]): Unit = { | ||
// For tables without table-level compression property, session-level compression | ||
// configuration will take effect. | ||
// Check for partitioned table, when convertMetastore is true | ||
checkTableCompressionCodecForCodecs( | ||
format = format, | ||
isPartitioned = true, | ||
convertMetastore = true, | ||
compressionCodecs = compressCodecs, | ||
tableCompressionCodecs = List(null)) { | ||
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) => | ||
// expect session-level take effect | ||
assert(sessionCompressionCodec == realCompressionCodec) | ||
} | ||
|
||
// Check for non-partitioned table, when convertMetastore is true | ||
checkTableCompressionCodecForCodecs( | ||
format = format, | ||
isPartitioned = false, | ||
convertMetastore = true, | ||
compressionCodecs = compressCodecs, | ||
tableCompressionCodecs = List(null)) { | ||
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec) => | ||
// expect session-level take effect | ||
assert(sessionCompressionCodec == realCompressionCodec) | ||
} | ||
} | ||
|
||
checkForTableWithoutCompressProp("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP")) | ||
checkForTableWithoutCompressProp("orc", List("NONE", "SNAPPY", "ZLIB")) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"parquet.compression"
->ParquetOutputFormat.COMPRESSION