-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing #19218
Conversation
…ation doesn't take effect on tables with partition field(s)
Could you add tests? Probably, you could insert some data then check if the data compressed by listing up files in temp dir? |
…ation doesn't take effect on tables with partition field(s) Add test.
cc @maropu I have added the test. However, all of my local use cases do not work properly, so I'm not sure if the new use case will pass, but I will always be concerned. |
@gatorsmile Is it worth fixing this? If so, could you trigger tests? |
ok to test |
Test build #81813 has finished for PR 19218 at commit
|
@maropu @fjh100456 If the issue is true, we should fix it for sure. However, the PR description must be wrong. If this issue exists, it should be applicable to both partitioned and non-partitioned ORC/Parquet tables. cc @dongjoon-hyun I think you might be interested in ORC side. |
|
…ation doesn't take effect on tables with partition field(s) Fix scala style.
Test build #81815 has finished for PR 19218 at commit
|
…ation doesn't take effect on tables with partition field(s) Fix scala style.
Test build #81816 has finished for PR 19218 at commit
|
…ation doesn't take effect on tables with partition field(s) Fix test problem
Test build #81820 has finished for PR 19218 at commit
|
Thank you for pinging me, @gatorsmile. |
@@ -101,6 +101,13 @@ case class InsertIntoHiveTable( | |||
val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) | |||
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) | |||
|
|||
tableDesc.getOutputFileFormatClassName match { | |||
case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Parquet: It seems that you need to consider another output format, parquet.hive.DeprecatedParquetOutputFormat, too.
- ORC: We have spark.sql.orc.compression.codec by SPARK-21839.
@dongjoon-hyun Thank you very much, I'll fix it now. |
…ation doesn't take effect on tables with partition field(s) Fix the similar issue of orc compression
…ation doesn't take effect on tables with partition field(s) Fix test problem
Test build #81839 has finished for PR 19218 at commit
|
Test build #81840 has finished for PR 19218 at commit
|
…ation doesn't take effect on tables with partition field(s) Fix test problem
A problem has been encountered, There are two ways to specify the compression format:
If the table already has been specified a compression format when it was created, and then specified another compression format by setting 'orc.compress', the latter will take effect. So whether the spark side should not have the default value, we can distinguish by 'undefined'; or discard this change, and explain in the document that 'spark.sql.parquet.compression.codec' for partitioned tables does not take effect, and 'spark.sql.orc.compression.codec ' is not valid for hive tables. Or your other better solution. |
Test build #81841 has finished for PR 19218 at commit
|
I see. If you set Please update your PR description and title. |
@@ -101,6 +101,19 @@ case class InsertIntoHiveTable( | |||
val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) | |||
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) | |||
|
|||
tableDesc.getOutputFileFormatClassName match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the whole logics into saveAsHiveFile
, which is being shared by InsertIntoHiveDirCommand
and InsertIntoHiveTable
. Both need these logics.
@fjh100456 We have priority for three different inputs. Here, you just consider one of three. Please also add the extra checks. Hopefully, @dongjoon-hyun can help you answer your questions. He just finished the work in #19055 |
Sorry, guys. I've been away from keyboard since last Friday night. I'll be back on next Tuesday (PST). |
I will change the code with the suggestion of @gatorsmile ,it's a little busy this days.I will do it tomorrow. |
…rk.sql.orc.compression.codec' configuration doesn't take effect on hive table writing Move the whole determination logics to HiveOptions
@gatorsmile @maropu Does it look better now? About statistic issue, is there any suggestion? @SparkQA Please start test, thanks. |
ok to test |
Test build #85154 has finished for PR 19218 at commit
|
…rk.sql.orc.compression.codec' configuration doesn't take effect on hive table writing Fix scala style
Test build #85173 has finished for PR 19218 at commit
|
@gatorsmile Could you help to review it? Thanks very much! |
Option((ParquetOutputFormat.COMPRESSION, compressionCodec)) | ||
case formatName if formatName.endsWith("orcoutputformat") => | ||
val compressionCodec = new OrcOptions(tableInfo.getProperties.asScala.toMap, | ||
sqlConf).compressionCodec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also update OrcOptions
's compressionCodec
to compressionCodecClassName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compressionCodec
is used in several places, do you mean I should fix them all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Just to make it consistent
@@ -35,7 +39,7 @@ case class TestData(key: Int, value: String) | |||
case class ThreeCloumntable(key: Int, value: String, key1: String) | |||
|
|||
class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter | |||
with SQLTestUtils { | |||
with ParquetTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the insert suite. We are unable to do this.
Could you create a separate suite in the current package org.apache.spark.sql.hive
? The suite name can be CompressionCodecSuite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also check whether the compression takes an effect? Compare the size whether is smaller than the original size without compressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems compressed table does not always be smaller than uncompressed tables.
SNAPPY
Compression size may be bigger than non-compression size when the amount of data is not big. So I'd like to check the size not equal when compression are different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine to me. Thanks!
tableInfo.getOutputFileFormatClassName.toLowerCase match { | ||
case formatName if formatName.endsWith("parquetoutputformat") => | ||
val compressionCodec = new ParquetOptions(tableInfo.getProperties.asScala.toMap, | ||
sqlConf).compressionCodecClassName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We normally do not split the code like this. We like the following way:
val tableProps = tableInfo.getProperties.asScala.toMap
tableInfo.getOutputFileFormatClassName.toLowerCase match {
case formatName if formatName.endsWith("parquetoutputformat") =>
val compressionCodec = new ParquetOptions(tableProps, sqlConf).compressionCodecClassName
Option((ParquetOutputFormat.COMPRESSION, compressionCodec))
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it looks better, I will change it.
@@ -19,7 +19,16 @@ package org.apache.spark.sql.hive.execution | |||
|
|||
import java.util.Locale | |||
|
|||
import scala.collection.JavaConverters._ | |||
|
|||
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileSinkDesc
is still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove it.
val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION) | ||
val codecName = parameters | ||
.get("compression") | ||
.orElse(parquetCompressionConf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this new? Do we support parquet.compression
before this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's new. I guess PartitionOptions
did not used when writing hive table before, because it's invisible for hive. I changeed it to public
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we keep the old behavior? We could add it later? We do not want to mix multiple issues in the same PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, parquet's table-level compression may be overwrited in this PR, and it may not be what we want.
Shall I fix it first in another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can submit a separate PR for that issue. The behavior change needs to be documented in SparkSQL doc.
HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) | ||
.foreach{ case (compression, codec) => | ||
hadoopConf.set(compression, codec) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.foreach { case (compression, codec) => hadoopConf.set(compression, codec) }
Could you also add another test scenario? For the existing Hive tables (created by Hive), does our Spark respect it? Do we use the existing compression configuration? |
@gatorsmile |
What are multiple compressions? |
Test build #85334 has finished for PR 19218 at commit
|
Test build #85335 has finished for PR 19218 at commit
|
Test build #85340 has finished for PR 19218 at commit
|
@fjh100456 Yeah. Please also add the test cases with the table containing mixed compression codec. Thanks! I have some comments about your fix. See my commit: I will review your fix later |
class CompressionCodecSuite extends TestHiveSingleton with ParquetTest { | ||
import spark.implicits._ | ||
|
||
private val maxRecordNum = 100000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you reduce it to a smaller number? The test cases are very slow to run.
I'd finished to write the test case with the table containing mixed compression codec. But maybe I'd made a mistake, the original branch was deleted mistakenly, I will closed this PR and create another PR. Sorry. |
Please go to #20087 |
…rk.sql.orc.compression.codec' configuration doesn't take effect on hive table writing [SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing What changes were proposed in this pull request? Pass ‘spark.sql.parquet.compression.codec’ value to ‘parquet.compression’. Pass ‘spark.sql.orc.compression.codec’ value to ‘orc.compress’. How was this patch tested? Add test. Note: This is the same issue mentioned in #19218 . That branch was deleted mistakenly, so make a new pr instead. gatorsmile maropu dongjoon-hyun discipleforteen Author: fjh100456 <fu.jinhua6@zte.com.cn> Author: Takeshi Yamamuro <yamamuro@apache.org> Author: Wenchen Fan <wenchen@databricks.com> Author: gatorsmile <gatorsmile@gmail.com> Author: Yinan Li <liyinan926@gmail.com> Author: Marcelo Vanzin <vanzin@cloudera.com> Author: Juliusz Sompolski <julek@databricks.com> Author: Felix Cheung <felixcheung_m@hotmail.com> Author: jerryshao <sshao@hortonworks.com> Author: Li Jin <ice.xelloss@gmail.com> Author: Gera Shegalov <gera@apache.org> Author: chetkhatri <ckhatrimanjal@gmail.com> Author: Joseph K. Bradley <joseph@databricks.com> Author: Bago Amirbekian <bago@databricks.com> Author: Xianjin YE <advancedxy@gmail.com> Author: Bruce Robbins <bersprockets@gmail.com> Author: zuotingbing <zuo.tingbing9@zte.com.cn> Author: Kent Yao <yaooqinn@hotmail.com> Author: hyukjinkwon <gurwls223@gmail.com> Author: Adrian Ionescu <adrian@databricks.com> Closes #20087 from fjh100456/HiveTableWriting. (cherry picked from commit 00d1691) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…rk.sql.orc.compression.codec' configuration doesn't take effect on hive table writing [SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing What changes were proposed in this pull request? Pass ‘spark.sql.parquet.compression.codec’ value to ‘parquet.compression’. Pass ‘spark.sql.orc.compression.codec’ value to ‘orc.compress’. How was this patch tested? Add test. Note: This is the same issue mentioned in apache#19218 . That branch was deleted mistakenly, so make a new pr instead. gatorsmile maropu dongjoon-hyun discipleforteen Author: fjh100456 <fu.jinhua6@zte.com.cn> Author: Takeshi Yamamuro <yamamuro@apache.org> Author: Wenchen Fan <wenchen@databricks.com> Author: gatorsmile <gatorsmile@gmail.com> Author: Yinan Li <liyinan926@gmail.com> Author: Marcelo Vanzin <vanzin@cloudera.com> Author: Juliusz Sompolski <julek@databricks.com> Author: Felix Cheung <felixcheung_m@hotmail.com> Author: jerryshao <sshao@hortonworks.com> Author: Li Jin <ice.xelloss@gmail.com> Author: Gera Shegalov <gera@apache.org> Author: chetkhatri <ckhatrimanjal@gmail.com> Author: Joseph K. Bradley <joseph@databricks.com> Author: Bago Amirbekian <bago@databricks.com> Author: Xianjin YE <advancedxy@gmail.com> Author: Bruce Robbins <bersprockets@gmail.com> Author: zuotingbing <zuo.tingbing9@zte.com.cn> Author: Kent Yao <yaooqinn@hotmail.com> Author: hyukjinkwon <gurwls223@gmail.com> Author: Adrian Ionescu <adrian@databricks.com> Closes apache#20087 from fjh100456/HiveTableWriting.
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing
What changes were proposed in this pull request?
Pass ‘spark.sql.parquet.compression.codec’ value to ‘parquet.compression’.
Pass ‘spark.sql.orc.compression.codec’ value to ‘orc.compress’.
How was this patch tested?
Add test.