-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing #20087
Changes from 56 commits
9bbfe6e
48cf108
5dbd3ed
5124f1b
6907a3e
67e40d4
e2526ca
8ae86ee
94ac716
43e041f
ee0c558
e9f705d
d3aa7a0
5244aaf
b96a213
a05e85e
b962488
27c949d
79f7263
a51212b
f51c8fd
1860a43
a7cfd6b
eb99b8a
1f5e354
bcfeef5
cd92913
bc4bef4
2ab4012
84707f0
ea9da61
158f7e6
145820b
5b524cc
f9dcdbc
fd4e304
0a30e93
d1f422c
55afac4
bf85301
3e3e938
7236914
e6449e8
0377755
b66700a
f9e7b0c
285d342
bd1a80a
584cdc2
5b150bc
2337edd
43e7eb5
4b89b44
6cf32e0
365c5bf
99271d6
2b9dfbe
5b5e1df
118f788
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,18 +55,28 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { | |
customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty, | ||
partitionAttributes: Seq[Attribute] = Nil): Set[String] = { | ||
|
||
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean | ||
val isCompressed = | ||
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match { | ||
case formatName if formatName.endsWith("orcoutputformat") => | ||
// For ORC,"mapreduce.output.fileoutputformat.compress", | ||
// "mapreduce.output.fileoutputformat.compress.codec", and | ||
// "mapreduce.output.fileoutputformat.compress.type" | ||
// have no impact because it uses table properties to store compression information. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although this is the existing behavior, but could you investigate how Hive behaves when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Surely, I'll do it this days. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For parquet, using a hive client, After this pr, the priority does not changed. If table-level compression was set, other compression would not take effect, even though For orc, My Hive version for this test is 1.1.0. Actully it's a little difficut for me to get a higher version runable Hive client. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment might not be correct now. We need to follow what the latest Hive works, if possible. The best way to try Hive (and the other RDBMS) is using docker. Maybe you can try the docker? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I'll try it. |
||
false | ||
case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean | ||
} | ||
|
||
if (isCompressed) { | ||
// Please note that isCompressed, "mapreduce.output.fileoutputformat.compress", | ||
// "mapreduce.output.fileoutputformat.compress.codec", and | ||
// "mapreduce.output.fileoutputformat.compress.type" | ||
// have no impact on ORC because it uses table properties to store compression information. | ||
hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") | ||
fileSinkConf.setCompressed(true) | ||
fileSinkConf.setCompressCodec(hadoopConf | ||
.get("mapreduce.output.fileoutputformat.compress.codec")) | ||
fileSinkConf.setCompressType(hadoopConf | ||
.get("mapreduce.output.fileoutputformat.compress.type")) | ||
} else { | ||
// Set compression by priority | ||
HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) | ||
.foreach { case (compression, codec) => hadoopConf.set(compression, codec) } | ||
} | ||
|
||
val committer = FileCommitProtocol.instantiate( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.