Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23355][SQL] convertMetastore should not ignore table properties #20522

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,28 @@ case class RelationConversions(
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
key.startsWith("orc.") || key.contains(".orc.")

private def isParquetProperty(key: String) =
key.startsWith("parquet.") || key.contains(".parquet.")

private def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.storage.properties
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
relation.tableMeta.storage.properties
if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,7 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
compressionCodecs = compressCodecs,
tableCompressionCodecs = compressCodecs) {
case (tableCodec, sessionCodec, realCodec, tableSize) =>
// For non-partitioned table and when convertMetastore is true, Expect session-level
// take effect, and in other cases expect table-level take effect
// TODO: It should always be table-level taking effect when the bug(SPARK-22926)
// is fixed
val expectCodec =
if (convertMetastore && !isPartitioned) sessionCodec else tableCodec.get
val expectCodec = tableCodec.get
assert(expectCodec == realCodec)
assert(checkTableSize(
format, expectCodec, isPartitioned, convertMetastore, usingCTAS, tableSize))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METAS
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -2144,6 +2145,86 @@ class HiveDDLSuite
}
}

private def getReader(path: String): org.apache.orc.Reader = {
val conf = spark.sessionState.newHadoopConf()
val files = org.apache.spark.sql.execution.datasources.orc.OrcUtils.listOrcFiles(path, conf)
assert(files.length == 1)
val file = files.head
val fs = file.getFileSystem(conf)
val readerOptions = org.apache.orc.OrcFile.readerOptions(conf).filesystem(fs)
org.apache.orc.OrcFile.createReader(file, readerOptions)
}

test("SPARK-23355 convertMetastoreOrc should not ignore table properties - STORED AS") {
Seq("native", "hive").foreach { orcImpl =>
withSQLConf(ORC_IMPLEMENTATION.key -> orcImpl, CONVERT_METASTORE_ORC.key -> "true") {
withTable("t") {
withTempPath { path =>
sql(
s"""
|CREATE TABLE t(id int) STORED AS ORC
|TBLPROPERTIES (
| orc.compress 'ZLIB',
| orc.compress.size '1001',
| orc.row.index.stride '2002',
| hive.exec.orc.default.block.size '3003',
| hive.exec.orc.compression.strategy 'COMPRESSION')
|LOCATION '${path.toURI}'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde.get.contains("orc"))
val properties = table.properties
assert(properties.get("orc.compress") == Some("ZLIB"))
assert(properties.get("orc.compress.size") == Some("1001"))
assert(properties.get("orc.row.index.stride") == Some("2002"))
assert(properties.get("hive.exec.orc.default.block.size") == Some("3003"))
assert(properties.get("hive.exec.orc.compression.strategy") == Some("COMPRESSION"))
assert(spark.table("t").collect().isEmpty)

sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))

val reader = getReader(maybeFile.head.getCanonicalPath)
assert(reader.getCompressionKind.name === "ZLIB")
assert(reader.getCompressionSize == 1001)
assert(reader.getRowIndexStride == 2002)
}
}
}
}
}

test("SPARK-23355 convertMetastoreParquet should not ignore table properties - STORED AS") {
withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") {
withTable("t") {
withTempPath { path =>
sql(
s"""
|CREATE TABLE t(id int) STORED AS PARQUET
|TBLPROPERTIES (
| parquet.compression 'GZIP'
|)
|LOCATION '${path.toURI}'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde.get.contains("parquet"))
val properties = table.properties
assert(properties.get("parquet.compression") == Some("GZIP"))
assert(spark.table("t").collect().isEmpty)

sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))

assertCompression(maybeFile, "parquet", "GZIP")
}
}
}
}

test("load command for non local invalid path validation") {
withTable("tbl") {
sql("CREATE TABLE tbl(i INT, j STRING)")
Expand Down