From 8e046b24fefc7f23a370e2b570c6b6bd687332b1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 10 Nov 2014 16:56:36 -0800 Subject: [PATCH] [SPARK-4308][SQL] Sets SQL operation state to ERROR when exception is thrown In `HiveThriftServer2`, when an exception is thrown during a SQL execution, the SQL operation state should be set to `ERROR`, but now it remains `RUNNING`. This affects the result of the `GetOperationStatus` Thrift API. Author: Cheng Lian Closes #3175 from liancheng/fix-op-state and squashes the following commits: 6d4c1fe [Cheng Lian] Sets SQL operation state to ERROR when exception is thrown --- .../spark/sql/hive/thriftserver/Shim13.scala | 91 +++++++++++++++++-- 1 file changed, 83 insertions(+), 8 deletions(-) diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index 17f1ad3e4690e..92b7f9a0e0ecb 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -24,7 +24,11 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.math._ +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation @@ -77,6 +81,32 @@ private[hive] class SparkExecuteStatementOperation( private var iter: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ + private def runInternal(cmd: String) = { + try { + result = hiveContext.sql(cmd) + logDebug(result.queryExecution.toString()) + val groupId = round(random * 1000000).toString + hiveContext.sparkContext.setJobGroup(groupId, statement) + iter = { + val useIncrementalCollect = + hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean + if (useIncrementalCollect) { + result.toLocalIterator + } else { + result.collect().iterator + } + } + dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray + } catch { + // Actually do need to catch Throwable as some failures don't inherit from Exception and + // HiveServer will silently swallow them. + case e: Throwable => + setState(OperationState.ERROR) + logError("Error executing query:",e) + throw new HiveSQLException(e.toString) + } + } + def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. logDebug("CLOSING") @@ -154,17 +184,62 @@ private[hive] class SparkExecuteStatementOperation( } } + private def getConfigForOperation: HiveConf = { + var sqlOperationConf: HiveConf = getParentSession.getHiveConf + if (!getConfOverlay.isEmpty || shouldRunAsync) { + sqlOperationConf = new HiveConf(sqlOperationConf) + import scala.collection.JavaConversions._ + for (confEntry <- getConfOverlay.entrySet) { + try { + sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue) + } + catch { case e: IllegalArgumentException => + throw new HiveSQLException("Error applying statement specific settings", e) + } + } + } + sqlOperationConf + } + def run(): Unit = { logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) - try { - result = hiveContext.sql(statement) - logDebug(result.queryExecution.toString()) - result.queryExecution.logical match { - case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) => - sessionToActivePool(parentSession) = value - logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") - case _ => + setHasResultSet(true) + + if (!shouldRunAsync) { + runInternal(statement) + setState(OperationState.FINISHED) + } else { + val parentSessionState = SessionState.get + val sessionHive: Hive = Hive.get + val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig) + + val backgroundOperation: Runnable = new Runnable { + def run() { + val doAsAction: PrivilegedExceptionAction[AnyRef] = + new PrivilegedExceptionAction[AnyRef] { + def run: AnyRef = { + Hive.set(sessionHive) + SessionState.setCurrentSessionState(parentSessionState) + try { + runInternal(statement) + } + catch { case e: HiveSQLException => + setOperationException(e) + logError("Error running hive query: ", e) + } + null + } + } + try { + ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction) + } + catch { case e: Exception => + setOperationException(new HiveSQLException(e)) + logError("Error running hive query as user : " + currentUGI.getShortUserName, e) + } + setState(OperationState.FINISHED) + } } val groupId = round(random * 1000000).toString