Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Oct 5, 2014
1 parent 6bc9204 commit 10c3565
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private[hive] object HiveQl {
*/
def getAst(sql: String): ASTNode = {
/*
* Context has to be passed in in hive0.13.1.
* Context has to be passed in hive0.13.1.
* Otherwise, there will be Null pointer exception,
* when retrieving properties form HiveConf.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
// By clearing the port we force Spark to pick a new one. This allows us to rerun tests
// without restarting the JVM.
System.clearProperty("spark.hostPort")
CommandProcessorFactory.clean(hiveconf)

lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath
lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath
Expand All @@ -79,7 +80,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
// For some hive test case which contain ${system:test.tmp.dir}
System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath)

CommandProcessorFactory.clean(hiveconf)
configure() // Must be called before initializing the catalog below.

/** The location of the compiled hive distribution */
Expand Down Expand Up @@ -371,6 +371,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
* tests.
*/
protected val originalUdfs: JavaSet[String] = FunctionRegistry.getFunctionNames

// Database default may not exist in 0.13.1, create it if not exist
HiveShim.createDefaultDBIfNeeded(this)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()

// TODO: How it works? needs to add it back for other hive version.
// TODO: How does it works? needs to add it back for other hive version.
if (HiveShim.version =="0.12.0") {
assert(queryTotalSize("analyzeTable") === defaultSizeInBytes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,8 @@ class HiveQuerySuite extends HiveComparisonTest {
|WITH serdeproperties('s1'='9')
""".stripMargin)
}
// now only verify 0.12.0, and ignore other versions due to binary compatability
// Now only verify 0.12.0, and ignore other versions due to binary compatability
// current TestSerDe.jar is from 0.12.0
if (HiveShim.version == "0.12.0") {
sql(s"ADD JAR $testJar")
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[hive] object HiveShim {
ColumnProjectionUtils.appendReadColumnNames(conf, names)
}

def getExternalTmpPath(context: Context, uri: URI): String = {
def getExternalTmpPath(context: Context, uri: URI) = {
context.getExternalTmpFileURI(uri)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ import scala.language.implicitConversions
private[hive] object HiveShim {
val version = "0.13.1"
/*
* TODO: hive-0.13 support DECIMAL(precision, scale), DECIMAL in hive-0.12 is actually DECIMAL(10,0)
* TODO: hive-0.13 support DECIMAL(precision, scale), DECIMAL in hive-0.12 is actually DECIMAL(38,unbounded)
* Full support of new decimal feature need to be fixed in seperate PR.
*/
val metastoreDecimal = "decimal(10,0)"
val metastoreDecimal = "decimal\\((\\d+),(\\d+)\\)".r

def getTableDesc(
serdeClass: Class[_ <: Deserializer],
Expand Down Expand Up @@ -82,8 +82,7 @@ private[hive] object HiveShim {
for (col <- cols) {
if (first) {
first = false
}
else {
} else {
result.append(',')
}
result.append(col)
Expand All @@ -98,7 +97,9 @@ private[hive] object HiveShim {
if (ids != null && ids.size > 0) {
ColumnProjectionUtils.appendReadColumns(conf, ids)
}
appendReadColumnNames(conf, names)
if (names == null && names.size > 0) {
appendReadColumnNames(conf, names)
}
}

def getExternalTmpPath(context: Context, path: Path) = {
Expand All @@ -110,17 +111,16 @@ private[hive] object HiveShim {
def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsOf(tbl)

/*
* Bug introdiced in hive-0.13. FileSinkDesc is serilizable, but its member path is not.
* Bug introdiced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
* Fix it through wrapper.
* */
implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = {
var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed)
f.setCompressed(w.compressed)
f.setCompressCodec(w.compressCodec)
f.setCompressType(w.compressType)
f.setTableInfo(w.tableInfo)
f.setDestTableId(w.destTableId)
f
var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed)
f.setCompressCodec(w.compressCodec)
f.setCompressType(w.compressType)
f.setTableInfo(w.tableInfo)
f.setDestTableId(w.destTableId)
f
}
}

Expand Down

0 comments on commit 10c3565

Please sign in to comment.