Skip to content

Commit

Permalink
Improve error messages for unsupported features in Hive (delta-io#9)
Browse files Browse the repository at this point in the history
This PR makes some minor improvements in the error messages for unsupported features. It also adds the table property `spark.sql.sources.provider` so that a Delta table created by Hive can be read by Spark 3.0.0+ when they share the same metastore.
  • Loading branch information
zsxwing committed Nov 28, 2019
1 parent e932a92 commit 5a2198b
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 39 deletions.
26 changes: 26 additions & 0 deletions hive/src/main/scala/io/delta/hive/DeltaOutputFormat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.delta.hive

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.{ArrayWritable, NullWritable}
import org.apache.hadoop.mapred.{JobConf, OutputFormat, RecordWriter}
import org.apache.hadoop.util.Progressable

/**
* This class is not a real implementation. We use it to prevent from writing to a Delta table in
* Hive before we support it.
*/
class DeltaOutputFormat extends OutputFormat[NullWritable, ArrayWritable] {

private def writingNotSupported[T](): T = {
throw new UnsupportedOperationException(
"Writing to a Delta table in Hive is not supported. Please use Spark to write.")
}

override def getRecordWriter(
ignored: FileSystem,
job: JobConf,
name: String,
progress: Progressable): RecordWriter[NullWritable, ArrayWritable] = writingNotSupported()

override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = writingNotSupported()
}
32 changes: 20 additions & 12 deletions hive/src/main/scala/io/delta/hive/DeltaStorageHandler.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package io.delta.hive

import com.google.common.base.Joiner
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.metastore.HiveMetaHook
import org.apache.hadoop.hive.metastore.MetaStoreUtils
Expand All @@ -22,9 +19,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.AbstractSerDe
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat}
import org.apache.spark.sql.delta.DeltaHelper
import org.apache.spark.sql.delta.DeltaPushFilter
import org.slf4j.LoggerFactory
Expand All @@ -35,9 +30,15 @@ class DeltaStorageHandler extends DefaultStorageHandler with HiveMetaHook with H

private val LOG = LoggerFactory.getLogger(classOf[DeltaStorageHandler])


override def getInputFormatClass: Class[_ <: InputFormat[_, _]] = classOf[DeltaInputFormat]

/**
* Returns a special [[OutputFormat]] to prevent from writing to a Delta table in Hive before we
* support it. We have to give Hive some class when creating a table, hence we have to implement
* an [[OutputFormat]] which throws an exception when Hive is using it.
*/
override def getOutputFormatClass: Class[_ <: OutputFormat[_, _]] = classOf[DeltaOutputFormat]

override def getSerDeClass(): Class[_ <: AbstractSerDe] = classOf[ParquetHiveSerDe]

override def configureInputJobProperties(tableDesc: TableDesc, jobProperties: java.util.Map[String, String]): Unit = {
Expand Down Expand Up @@ -127,13 +128,19 @@ class DeltaStorageHandler extends DefaultStorageHandler with HiveMetaHook with H
override def getMetaHook: HiveMetaHook = this

override def preCreateTable(tbl: Table): Unit = {
val isExternal = MetaStoreUtils.isExternalTable(tbl)
if (!isExternal) {
throw new MetaException("HiveOnDelta should be an external table.")
} else if (tbl.getPartitionKeysSize() > 0) {
throw new MetaException("HiveOnDelta does not support to create a partition hive table")
if (!MetaStoreUtils.isExternalTable(tbl)) {
throw new UnsupportedOperationException(
s"The type of table ${tbl.getDbName}:${tbl.getTableName} is ${tbl.getTableType}." +
"Only external Delta tables can be read in Hive right now")
}

if (tbl.getPartitionKeysSize > 0) {
throw new MetaException(
s"Found partition columns " +
s"(${tbl.getPartitionKeys.asScala.map(_.getName).mkString(",")}) in table " +
s"${tbl.getDbName}:${tbl.getTableName}. The partition columns in a Delta table " +
s"will be read from its own metadata and should not be set manually.") }

val deltaRootString = tbl.getSd().getLocation()
if (deltaRootString == null || deltaRootString.trim().length() == 0) {
throw new MetaException("table location should be set when creating table")
Expand All @@ -146,6 +153,7 @@ class DeltaStorageHandler extends DefaultStorageHandler with HiveMetaHook with H
val partitionProps = DeltaHelper.checkHiveColsInDelta(deltaPath, tbl.getSd().getCols())
tbl.getSd().getSerdeInfo().getParameters().putAll(partitionProps.asJava)
tbl.getSd().getSerdeInfo().getParameters().put(DELTA_TABLE_PATH, deltaRootString)
tbl.getParameters.put("spark.sql.sources.provider", "DELTA")
LOG.info("write partition cols/types to table properties " +
partitionProps.map(kv => s"${kv._1}=${kv._2}").mkString(", "))
}
Expand Down
70 changes: 43 additions & 27 deletions hive/src/test/scala/io/delta/hive/HiveConnectorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,18 @@ class HiveConnectorSuite extends HiveTest with BeforeAndAfterEach {
DeltaLog.clearCache()
}

test("DDL: HiveOnDelta should be a external table ") {
withTable("deltaTbl") {
withTempDir { dir =>
val e = intercept[Exception] {
runQuery(
s"""
|create table deltaTbl(a string, b int)
|stored by 'io.delta.hive.DeltaStorageHandler' location '${dir.getCanonicalPath}'
""".stripMargin
)
}.getMessage
assert(e.contains("HiveOnDelta should be an external table"))
}
test("should not allow to create a non external Delta table") {
val e = intercept[Exception] {
runQuery(
s"""
|create table deltaTbl(a string, b int)
|stored by 'io.delta.hive.DeltaStorageHandler'""".stripMargin
)
}
assert(e.getMessage != null && e.getMessage.contains("Only external Delta tables"))
}

test("DDL: location should be set when creating table") {
test("location should be set when creating table") {
withTable("deltaTbl") {
val e = intercept[Exception] {
runQuery(
Expand All @@ -49,24 +44,45 @@ class HiveConnectorSuite extends HiveTest with BeforeAndAfterEach {
}
}

test("DDL: HiveOnDelta should not be a partitioned hive table") {
test("should not allow to specify partition columns") {
withTempDir { dir =>
val e = intercept[Exception] {
runQuery(
s"""
|CREATE EXTERNAL TABLE deltaTbl(a STRING, b INT)
|PARTITIONED BY(c STRING)
|STORED BY 'io.delta.hive.DeltaStorageHandler'
|LOCATION '${dir.getCanonicalPath}' """.stripMargin)
}
assert(e.getMessage != null && e.getMessage.matches(
"(?s).*partition columns.*should not be set manually.*"))
}
}

test("should not allow to write to a Delta table") {
withTable("deltaTbl") {
withTempDir { dir =>
withSparkSession { spark =>
import spark.implicits._
val testData = (0 until 10).map(x => (x, s"foo${x % 2}"))
testData.toDS.toDF("a", "b").write.format("delta").save(dir.getCanonicalPath)
}

runQuery(
s"""
|CREATE EXTERNAL TABLE deltaTbl(a INT, b STRING)
|STORED BY 'io.delta.hive.DeltaStorageHandler'
|LOCATION '${dir.getCanonicalPath}'""".stripMargin)
val e = intercept[Exception] {
runQuery(
s"""
|create external table deltaTbl(a string, b int)
|partitioned by(c string)
|stored by 'io.delta.hive.DeltaStorageHandler' location '${dir.getCanonicalPath}'
""".stripMargin
)
}.getMessage
assert(e.contains("HiveOnDelta does not support to create a partition hive table"))
runQuery("INSERT INTO deltaTbl(a, b) VALUES(123, 'foo')")
}
assert(e.getMessage != null && e.getMessage.contains(
"Writing to a Delta table in Hive is not supported"))
}
}
}

test("DDL: the delta root path should be existed when create hive table") {
test("the delta root path should be existed when create hive table") {
withTable("deltaTbl") {
withTempDir { dir =>
JavaUtils.deleteRecursively(dir)
Expand All @@ -84,7 +100,7 @@ class HiveConnectorSuite extends HiveTest with BeforeAndAfterEach {
}
}

test("DDL: when creating hive table on a partitioned delta, " +
test("when creating hive table on a partitioned delta, " +
"the partition columns should be after data columns") {
withTable("deltaTbl") {
withTempDir { dir =>
Expand All @@ -110,7 +126,7 @@ class HiveConnectorSuite extends HiveTest with BeforeAndAfterEach {
}

// check column number & column name
test("DDL: Hive schema should match delta's schema") {
test("Hive schema should match delta's schema") {
withTable("deltaTbl") {
withTempDir { dir =>
val testData = (0 until 10).map(x => (x, s"foo${x % 2}", s"test${x % 3}"))
Expand Down

0 comments on commit 5a2198b

Please sign in to comment.