Skip to content

Commit

Permalink
[SPARK-4308][SQL] Sets SQL operation state to ERROR when exception is…
Browse files Browse the repository at this point in the history
… thrown

In `HiveThriftServer2`, when an exception is thrown during a SQL execution, the SQL operation state should be set to `ERROR`, but now it remains `RUNNING`. This affects the result of the `GetOperationStatus` Thrift API.

Author: Cheng Lian <lian@databricks.com>

Closes apache#3175 from liancheng/fix-op-state and squashes the following commits:

6d4c1fe [Cheng Lian] Sets SQL operation state to ERROR when exception is thrown
  • Loading branch information
liancheng authored and tianyi committed Dec 4, 2014
1 parent c0e631a commit 8e046b2
Showing 1 changed file with 83 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.math._

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
Expand Down Expand Up @@ -77,6 +81,32 @@ private[hive] class SparkExecuteStatementOperation(
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _

private def runInternal(cmd: String) = {
try {
result = hiveContext.sql(cmd)
logDebug(result.queryExecution.toString())
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
iter = {
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
result.toLocalIterator
} else {
result.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
setState(OperationState.ERROR)
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
}

def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug("CLOSING")
Expand Down Expand Up @@ -154,17 +184,62 @@ private[hive] class SparkExecuteStatementOperation(
}
}

private def getConfigForOperation: HiveConf = {
var sqlOperationConf: HiveConf = getParentSession.getHiveConf
if (!getConfOverlay.isEmpty || shouldRunAsync) {
sqlOperationConf = new HiveConf(sqlOperationConf)
import scala.collection.JavaConversions._
for (confEntry <- getConfOverlay.entrySet) {
try {
sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
}
catch { case e: IllegalArgumentException =>
throw new HiveSQLException("Error applying statement specific settings", e)
}
}
}
sqlOperationConf
}

def run(): Unit = {
logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING)
try {
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
sessionToActivePool(parentSession) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
setHasResultSet(true)

if (!shouldRunAsync) {
runInternal(statement)
setState(OperationState.FINISHED)
} else {
val parentSessionState = SessionState.get
val sessionHive: Hive = Hive.get
val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)

val backgroundOperation: Runnable = new Runnable {
def run() {
val doAsAction: PrivilegedExceptionAction[AnyRef] =
new PrivilegedExceptionAction[AnyRef] {
def run: AnyRef = {
Hive.set(sessionHive)
SessionState.setCurrentSessionState(parentSessionState)
try {
runInternal(statement)
}
catch { case e: HiveSQLException =>
setOperationException(e)
logError("Error running hive query: ", e)
}
null
}
}
try {
ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
}
catch { case e: Exception =>
setOperationException(new HiveSQLException(e))
logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
}
setState(OperationState.FINISHED)
}
}

val groupId = round(random * 1000000).toString
Expand Down

0 comments on commit 8e046b2

Please sign in to comment.