Skip to content

Commit

Permalink
Protect InsertIntoHive.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Apr 2, 2015
1 parent fee7e9c commit 1e241af
Showing 1 changed file with 29 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class InsertIntoHiveTable(
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val db = Hive.get(sc.hiveconf)
@transient private lazy val catalog = sc.catalog

private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
Expand Down Expand Up @@ -199,38 +199,45 @@ case class InsertIntoHiveTable(
orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(""))
}
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
db.validatePartitionNameCharacters(partVals)
catalog.synchronized {
catalog.client.validatePartitionNameCharacters(partVals)
}
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
if (numDynamicPartitions > 0) {
db.loadDynamicPartitions(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir
)
catalog.synchronized {
catalog.client.loadDynamicPartitions(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir
}
} else {
db.loadPartition(
catalog.synchronized {
catalog.client.loadPartition(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
}
}
} else {
catalog.synchronized {
catalog.client.loadTable(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
holdDDLTime)
}
} else {
db.loadTable(
outputPath,
qualifiedTableName,
overwrite,
holdDDLTime)
}

// Invalidate the cache.
Expand Down

0 comments on commit 1e241af

Please sign in to comment.