diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 7bbab24966580..a1ed6e61e8bed 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -30,7 +30,7 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI.UI_ENABLED -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui._ @@ -49,30 +49,32 @@ object HiveThriftServer2 extends Logging { /** * :: DeveloperApi :: - * Starts a new thrift server with the given context. + * Starts a new thrift server with the given SparkSession. * - * @param sqlContext SQLContext to use for the server + * @param sparkSession SparkSession to use for the server * @param exitOnError Whether to exit the JVM if HiveThriftServer2 fails to initialize. When true, * the call logs the error and exits the JVM with exit code -1. When false, the * call throws an exception instead. */ @Since("4.0.0") @DeveloperApi - def startWithContext(sqlContext: SQLContext, exitOnError: Boolean): HiveThriftServer2 = { + def startWithSparkSession( + sparkSession: SparkSession, + exitOnError: Boolean): HiveThriftServer2 = { systemExitOnError.set(exitOnError) val executionHive = HiveUtils.newClientForExecution( - sqlContext.sparkContext.conf, - sqlContext.sessionState.newHadoopConf()) + sparkSession.sparkContext.conf, + sparkSession.sessionState.newHadoopConf()) // Cleanup the scratch dir before starting ServerUtils.cleanUpScratchDir(executionHive.conf) - val server = new HiveThriftServer2(sqlContext) + val server = new HiveThriftServer2(sparkSession) server.init(executionHive.conf) server.start() logInfo("HiveThriftServer2 started") - createListenerAndUI(server, sqlContext.sparkContext) + createListenerAndUI(server, sparkSession.sparkContext) server } @@ -82,10 +84,11 @@ object HiveThriftServer2 extends Logging { * * @param sqlContext SQLContext to use for the server */ + @deprecated("Use startWithSparkSession instead", since = "4.0.0") @Since("2.0.0") @DeveloperApi def startWithContext(sqlContext: SQLContext): HiveThriftServer2 = { - startWithContext(sqlContext, exitOnError = true) + startWithSparkSession(sqlContext.sparkSession, exitOnError = true) } private def createListenerAndUI(server: HiveThriftServer2, sc: SparkContext): Unit = { @@ -122,7 +125,7 @@ object HiveThriftServer2 extends Logging { } try { - startWithContext(SparkSQLEnv.sqlContext) + startWithContext(SparkSQLEnv.sparkSession.sqlContext) // If application was killed before HiveThriftServer2 start successfully then SparkSubmit // process can not exit, so check whether if SparkContext was stopped. if (SparkSQLEnv.sparkContext.stopped.get()) { @@ -142,7 +145,7 @@ object HiveThriftServer2 extends Logging { } } -private[hive] class HiveThriftServer2(sqlContext: SQLContext) +private[hive] class HiveThriftServer2(sparkSession: SparkSession) extends HiveServer2 with ReflectedCompositeService { // state is tracked internally so that the server only attempts to shut down if it successfully @@ -150,7 +153,7 @@ private[hive] class HiveThriftServer2(sqlContext: SQLContext) private val started = new AtomicBoolean(false) override def init(hiveConf: HiveConf): Unit = { - val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext) + val sparkSqlCliService = new SparkSQLCLIService(this, sparkSession) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 51a5e88aa633e..2cd67cdd03bd9 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -32,7 +32,7 @@ import org.apache.hive.service.rpc.thrift.{TCLIServiceConstants, TColumnDesc, TP import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} @@ -40,7 +40,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils => SparkUtils} private[hive] class SparkExecuteStatementOperation( - val sqlContext: SQLContext, + val session: SparkSession, parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], @@ -50,8 +50,6 @@ private[hive] class SparkExecuteStatementOperation( with SparkOperation with Logging { - val session = sqlContext.sparkSession - // If a timeout value `queryTimeout` is specified by users and it is smaller than // a global timeout value, we use the user-specified value. // This code follows the Hive timeout behaviour (See #29933 for details). @@ -90,10 +88,10 @@ private[hive] class SparkExecuteStatementOperation( def getNextRowSet(order: FetchOrientation, maxRowsL: Long): TRowSet = withLocalProperties { try { - sqlContext.sparkContext.setJobGroup(statementId, redactedStatement, forceCancel) + session.sparkContext.setJobGroup(statementId, redactedStatement, forceCancel) getNextRowSetInternal(order, maxRowsL) } finally { - sqlContext.sparkContext.clearJobGroup() + session.sparkContext.clearJobGroup() } } @@ -224,7 +222,7 @@ private[hive] class SparkExecuteStatementOperation( } } // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + val executionHiveClassLoader = session.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) // Always set the session state classloader to `executionHiveClassLoader` even for sync mode @@ -232,12 +230,12 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader) } - sqlContext.sparkContext.setJobGroup(statementId, redactedStatement, forceCancel) - result = sqlContext.sql(statement) + session.sparkContext.setJobGroup(statementId, redactedStatement, forceCancel) + result = session.sql(statement) logDebug(result.queryExecution.toString()) HiveThriftServer2.eventManager.onStatementParsed(statementId, result.queryExecution.toString()) - iter = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { + iter = if (session.conf.get(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { new IterableFetchIterator[Row](new Iterable[Row] { override def iterator: Iterator[Row] = result.toLocalIterator().asScala }) @@ -254,7 +252,7 @@ private[hive] class SparkExecuteStatementOperation( // task interrupted, it may have start some spark job, so we need to cancel again to // make sure job was cancelled when background thread was interrupted if (statementId != null) { - sqlContext.sparkContext.cancelJobGroup(statementId) + session.sparkContext.cancelJobGroup(statementId) } val currentState = getStatus().getState() if (currentState.isTerminal) { @@ -271,7 +269,7 @@ private[hive] class SparkExecuteStatementOperation( e match { case _: HiveSQLException => throw e case _ => throw HiveThriftServerErrors.runningQueryError( - e, sqlContext.sparkSession.sessionState.conf.errorMessageFormat) + e, session.sessionState.conf.errorMessageFormat) } } } finally { @@ -281,7 +279,7 @@ private[hive] class SparkExecuteStatementOperation( HiveThriftServer2.eventManager.onStatementFinish(statementId) } } - sqlContext.sparkContext.clearJobGroup() + session.sparkContext.clearJobGroup() } } @@ -318,7 +316,7 @@ private[hive] class SparkExecuteStatementOperation( } // RDDs will be cleaned automatically upon garbage collection. if (statementId != null) { - sqlContext.sparkContext.cancelJobGroup(statementId) + session.sparkContext.cancelJobGroup(statementId) } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala index fd99a5b246d9b..e4bb91d466ff4 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala @@ -24,16 +24,16 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession /** * Spark's own GetCatalogsOperation * - * @param sqlContext SQLContext to use + * @param session SparkSession to use * @param parentSession a HiveSession from SessionManager */ private[hive] class SparkGetCatalogsOperation( - val sqlContext: SQLContext, + val session: SparkSession, parentSession: HiveSession) extends GetCatalogsOperation(parentSession) with SparkOperation @@ -44,7 +44,7 @@ private[hive] class SparkGetCatalogsOperation( logInfo(log"Listing catalogs with ${MDC(STATEMENT_ID, statementId)}") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + val executionHiveClassLoader = session.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) HiveThriftServer2.eventManager.onStatementStart( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 507dfc2ec50eb..1004ca8cf2712 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -29,7 +29,7 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.types._ @@ -37,7 +37,7 @@ import org.apache.spark.sql.types._ /** * Spark's own SparkGetColumnsOperation * - * @param sqlContext SQLContext to use + * @param session SparkSession to use * @param parentSession a HiveSession from SessionManager * @param catalogName catalog name. NULL if not applicable. * @param schemaName database name, NULL or a concrete database name @@ -45,7 +45,7 @@ import org.apache.spark.sql.types._ * @param columnName column name */ private[hive] class SparkGetColumnsOperation( - val sqlContext: SQLContext, + val session: SparkSession, parentSession: HiveSession, catalogName: String, schemaName: String, @@ -55,7 +55,7 @@ private[hive] class SparkGetColumnsOperation( with SparkOperation with Logging { - val catalog: SessionCatalog = sqlContext.sessionState.catalog + val catalog: SessionCatalog = session.sessionState.catalog override def runInternal(): Unit = { // Do not change cmdStr. It's used for Hive auditing and authorization. @@ -72,7 +72,7 @@ private[hive] class SparkGetColumnsOperation( setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + val executionHiveClassLoader = session.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) HiveThriftServer2.eventManager.onStatementStart( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index b060bf3d4ec8d..515e64f5f529c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -28,19 +28,19 @@ import org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATA import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, LogKeys, MDC} -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession /** * Spark's own GetFunctionsOperation * - * @param sqlContext SQLContext to use + * @param session SparkSession to use * @param parentSession a HiveSession from SessionManager * @param catalogName catalog name. null if not applicable * @param schemaName database name, null or a concrete database name * @param functionName function name pattern */ private[hive] class SparkGetFunctionsOperation( - val sqlContext: SQLContext, + val session: SparkSession, parentSession: HiveSession, catalogName: String, schemaName: String, @@ -58,10 +58,10 @@ private[hive] class SparkGetFunctionsOperation( logInfo(logMDC + log" with ${MDC(LogKeys.STATEMENT_ID, statementId)}") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + val executionHiveClassLoader = session.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - val catalog = sqlContext.sessionState.catalog + val catalog = session.sessionState.catalog // get databases for schema pattern val schemaPattern = convertSchemaPattern(schemaName) val matchingDbs = catalog.listDatabases(schemaPattern) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index db1cf201b2e92..0e2c35b5ef550 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -27,18 +27,18 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession /** * Spark's own GetSchemasOperation * - * @param sqlContext SQLContext to use + * @param session SparkSession to use * @param parentSession a HiveSession from SessionManager * @param catalogName catalog name. null if not applicable. * @param schemaName database name, null or a concrete database name */ private[hive] class SparkGetSchemasOperation( - val sqlContext: SQLContext, + val session: SparkSession, parentSession: HiveSession, catalogName: String, schemaName: String) @@ -59,7 +59,7 @@ private[hive] class SparkGetSchemasOperation( setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + val executionHiveClassLoader = session.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) if (isAuthV2Enabled) { @@ -75,11 +75,11 @@ private[hive] class SparkGetSchemasOperation( try { val schemaPattern = convertSchemaPattern(schemaName) - sqlContext.sessionState.catalog.listDatabases(schemaPattern).foreach { dbName => + session.sessionState.catalog.listDatabases(schemaPattern).foreach { dbName => rowSet.addRow(Array[AnyRef](dbName, DEFAULT_HIVE_CATALOG)) } - val globalTempViewDb = sqlContext.sessionState.catalog.globalTempDatabase + val globalTempViewDb = session.sessionState.catalog.globalTempDatabase val databasePattern = Pattern.compile(CLIServiceUtils.patternToRegex(schemaName)) if (schemaName == null || schemaName.isEmpty || databasePattern.matcher(globalTempViewDb).matches()) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala index a0c6cd1dcd92f..9709739a64a42 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -26,17 +26,17 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTableType /** * Spark's own GetTableTypesOperation * - * @param sqlContext SQLContext to use + * @param session SparkSession to use * @param parentSession a HiveSession from SessionManager */ private[hive] class SparkGetTableTypesOperation( - val sqlContext: SQLContext, + val session: SparkSession, parentSession: HiveSession) extends GetTableTypesOperation(parentSession) with SparkOperation @@ -48,7 +48,7 @@ private[hive] class SparkGetTableTypesOperation( logInfo(log"Listing table types with ${MDC(STATEMENT_ID, statementId)}") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + val executionHiveClassLoader = session.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) if (isAuthV2Enabled) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 9d90878050678..e1dd6e8dd95bc 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -29,13 +29,13 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ /** * Spark's own GetTablesOperation * - * @param sqlContext SQLContext to use + * @param session SparkSession to use * @param parentSession a HiveSession from SessionManager * @param catalogName catalog name. null if not applicable * @param schemaName database name, null or a concrete database name @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ * @param tableTypes list of allowed table types, e.g. "TABLE", "VIEW" */ private[hive] class SparkGetTablesOperation( - val sqlContext: SQLContext, + val session: SparkSession, parentSession: HiveSession, catalogName: String, schemaName: String, @@ -68,10 +68,10 @@ private[hive] class SparkGetTablesOperation( log"with ${MDC(STATEMENT_ID, statementId)}") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + val executionHiveClassLoader = session.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - val catalog = sqlContext.sessionState.catalog + val catalog = session.sessionState.catalog val schemaPattern = convertSchemaPattern(schemaName) val tablePattern = convertIdentifierPattern(tableName, true) val matchingDbs = catalog.listDatabases(schemaPattern) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala index 9ae62ed2fed74..456ec44678c5c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala @@ -28,16 +28,16 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession /** * Spark's own GetTypeInfoOperation * - * @param sqlContext SQLContext to use + * @param session SparkSession to use * @param parentSession a HiveSession from SessionManager */ private[hive] class SparkGetTypeInfoOperation( - val sqlContext: SQLContext, + val session: SparkSession, parentSession: HiveSession) extends GetTypeInfoOperation(parentSession) with SparkOperation @@ -49,7 +49,7 @@ private[hive] class SparkGetTypeInfoOperation( logInfo(log"Listing type info with ${MDC(STATEMENT_ID, statementId)}") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + val executionHiveClassLoader = session.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) if (isAuthV2Enabled) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala index 11e4817fe2a4c..b56888f49c1bf 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala @@ -23,7 +23,7 @@ import org.apache.hive.service.cli.operation.Operation import org.apache.spark.SparkContext import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.{HIVE_OPERATION_TYPE, STATEMENT_ID} -import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW} @@ -35,7 +35,7 @@ import org.apache.spark.util.Utils */ private[hive] trait SparkOperation extends Operation with Logging { - protected def sqlContext: SQLContext + protected def session: SparkSession protected var statementId = getHandle().getHandleIdentifier().getPublicId().toString() @@ -62,17 +62,17 @@ private[hive] trait SparkOperation extends Operation with Logging { // - set appropriate SparkSession // - set scheduler pool for the operation def withLocalProperties[T](f: => T): T = { - val originalProps = Utils.cloneProperties(sqlContext.sparkContext.getLocalProperties) + val originalProps = Utils.cloneProperties(session.sparkContext.getLocalProperties) val originalSession = SparkSession.getActiveSession try { // Set active SparkSession - SparkSession.setActiveSession(sqlContext.sparkSession) + SparkSession.setActiveSession(session) // Set scheduler pool - sqlContext.sparkSession.conf.getOption(SQLConf.THRIFTSERVER_POOL.key) match { + session.conf.getOption(SQLConf.THRIFTSERVER_POOL.key) match { case Some(pool) => - sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool) + session.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool) case None => } CURRENT_USER.set(getParentSession.getUserName) @@ -81,7 +81,7 @@ private[hive] trait SparkOperation extends Operation with Logging { } finally { CURRENT_USER.remove() // reset local properties, will also reset SPARK_SCHEDULER_POOL - sqlContext.sparkContext.setLocalProperties(originalProps) + session.sparkContext.setLocalProperties(originalProps) originalSession match { case Some(session) => SparkSession.setActiveSession(session) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index e64e1c283e27c..faab14bb9e365 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -167,7 +167,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Spark's SessionResourceLoader to obtain these jars. val auxJars = HiveConf.getVar(conf, HiveConf.getConfVars("hive.aux.jars.path")) if (StringUtils.isNotBlank(auxJars)) { - val resourceLoader = SparkSQLEnv.sqlContext.sessionState.resourceLoader + val resourceLoader = SparkSQLEnv.sparkSession.sessionState.resourceLoader StringUtils.split(auxJars, ",").foreach(resourceLoader.addJar(_)) } @@ -176,7 +176,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { // sharedState.jarClassLoader which contain jar path passed by --jars in main thread. // We set CliSessionState's conf class loader to sharedState.jarClassLoader. // Thus we can load all jars passed by --jars and AddJarsCommand. - sessionState.getConf.setClassLoader(SparkSQLEnv.sqlContext.sharedState.jarClassLoader) + sessionState.getConf.setClassLoader(SparkSQLEnv.sparkSession.sharedState.jarClassLoader) // TODO work around for set the log output to console, because the HiveContext // will set the output into an invalid buffer. @@ -193,11 +193,11 @@ private[hive] object SparkSQLCLIDriver extends Logging { // [[SharedState.loadHiveConfFile]] based on the user specified or default values of // spark.sql.warehouse.dir and hive.metastore.warehouse.dir. for ((k, v) <- newHiveConf if k != "hive.metastore.warehouse.dir") { - SparkSQLEnv.sqlContext.setConf(k, v) + SparkSQLEnv.sparkSession.conf.set(k, v) } if (sessionState.database != null) { - SparkSQLEnv.sqlContext.sql(s"USE ${sessionState.database}") + SparkSQLEnv.sparkSession.sql(s"USE ${sessionState.database}") } // Execute -i init files (always in silent mode) @@ -261,9 +261,9 @@ private[hive] object SparkSQLCLIDriver extends Logging { var prefix = "" def currentDB = { - if (!SparkSQLEnv.sqlContext.sparkSession.sessionState.conf + if (!SparkSQLEnv.sparkSession.sessionState.conf .getConf(LEGACY_EMPTY_CURRENT_DB_IN_CLI)) { - s" (${SparkSQLEnv.sqlContext.sparkSession.catalog.currentDatabase})" + s" (${SparkSQLEnv.sparkSession.catalog.currentDatabase})" } else { ReflectionUtils.invokeStatic(classOf[CliDriver], "getFormattedDb", classOf[HiveConf] -> conf, classOf[CliSessionState] -> sessionState) @@ -409,7 +409,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { override def setHiveVariables(hiveVariables: java.util.Map[String, String]): Unit = { hiveVariables.asScala.foreach(kv => - SparkSQLEnv.sqlContext.sparkSession.sessionState.conf.setConfString(kv._1, kv._2)) + SparkSQLEnv.sparkSession.sessionState.conf.setConfString(kv._1, kv._2)) } def printMasterAndAppId(): Unit = { @@ -464,7 +464,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { ret = rc.getResponseCode if (ret != 0) { - val format = SparkSQLEnv.sqlContext.sparkSession.sessionState.conf.errorMessageFormat + val format = SparkSQLEnv.sparkSession.sessionState.conf.errorMessageFormat val e = rc.getException val msg = e match { case st: SparkThrowable with Throwable => SparkThrowableHelper.getMessage(st, format) @@ -483,7 +483,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { val res = new JArrayList[String]() if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER) || - SparkSQLEnv.sqlContext.sparkSession.sessionState.conf.cliPrintHeader) { + SparkSQLEnv.sparkSession.sessionState.conf.cliPrintHeader) { // Print the column names. Option(driver.getSchema.getFieldSchemas).foreach { fields => out.println(fields.asScala.map(_.getName).mkString("\t")) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 46537f75f1a11..092b6b444fdb8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -35,19 +35,19 @@ import org.apache.hive.service.cli._ import org.apache.hive.service.server.HiveServer2 import org.apache.spark.internal.SparkLogger -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util.SQLKeywordUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLContext) +private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sparkSession: SparkSession) extends CLIService(hiveServer) with ReflectedCompositeService { override def init(hiveConf: HiveConf): Unit = { setSuperField(this, "hiveConf", hiveConf) - val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, sqlContext) + val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, sparkSession) setSuperField(this, "sessionManager", sparkSqlSessionManager) addService(sparkSqlSessionManager) var sparkServiceUGI: UserGroupInformation = null @@ -103,7 +103,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC getInfoType match { case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL") case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Spark SQL") - case GetInfoType.CLI_DBMS_VER => new GetInfoValue(sqlContext.sparkContext.version) + case GetInfoType.CLI_DBMS_VER => new GetInfoValue(sparkSession.version) case GetInfoType.CLI_ODBC_KEYWORDS => new GetInfoValue(SQLKeywordUtils.keywords.mkString(",")) case _ => super.getInfo(sessionHandle, getInfoType) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index a9c5d3e250797..650a5df340215 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -29,14 +29,14 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.SparkThrowable import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.COMMAND -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.CommandResult import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.execution.HiveResult.hiveResultString import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} -private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlContext) +private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv.sparkSession) extends Driver with Logging { @@ -62,11 +62,11 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont override def run(command: String): CommandProcessorResponse = { try { - val substitutorCommand = SQLConf.withExistingConf(context.sparkSession.sessionState.conf) { + val substitutorCommand = SQLConf.withExistingConf(sparkSession.sessionState.conf) { new VariableSubstitution().substitute(command) } - context.sparkContext.setJobDescription(substitutorCommand) - val execution = context.sessionState.executePlan(context.sql(command).logicalPlan) + sparkSession.sparkContext.setJobDescription(substitutorCommand) + val execution = sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan) // The SQL command has been executed above via `executePlan`, therefore we don't need to // wrap it again with a new execution ID when getting Hive result. execution.logical match { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 88a5c87eab5d9..f1a49745fa33c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets.UTF_8 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveUtils._ import org.apache.spark.sql.internal.SQLConf @@ -33,11 +33,11 @@ import org.apache.spark.util.Utils private[hive] object SparkSQLEnv extends Logging { logDebug("Initializing SparkSQLEnv") - var sqlContext: SQLContext = _ + var sparkSession: SparkSession = _ var sparkContext: SparkContext = _ def init(): Unit = { - if (sqlContext == null) { + if (sparkSession == null) { val sparkConf = new SparkConf(loadDefaults = true) // If user doesn't specify the appName, we want to get [SparkSQL::localHostName] instead of // the default appName [SparkSQLCLIDriver] in cli or beeline. @@ -61,9 +61,8 @@ private[hive] object SparkSQLEnv extends Logging { if (!shouldUseInMemoryCatalog) { builder.enableHiveSupport() } - val sparkSession = builder.getOrCreate() + sparkSession = builder.getOrCreate() sparkContext = sparkSession.sparkContext - sqlContext = sparkSession.sqlContext // SPARK-29604: force initialization of the session state with the Spark class loader, // instead of having it happen during the initialization of the Hive client (which may use a @@ -87,7 +86,7 @@ private[hive] object SparkSQLEnv extends Logging { if (SparkSQLEnv.sparkContext != null) { sparkContext.stop(exitCode) sparkContext = null - sqlContext = null + sparkSession = null } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 7acc485b01e57..ce3ece75c0aeb 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -26,13 +26,13 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.hive.service.server.HiveServer2 import org.apache.spark.internal.Logging -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager import org.apache.spark.sql.internal.SQLConf -private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) +private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sparkSession: SparkSession) extends SessionManager(hiveServer) with ReflectedCompositeService with Logging { @@ -55,22 +55,22 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation, delegationToken) try { - val session = super.getSession(sessionHandle) + val hiveSession = super.getSession(sessionHandle) HiveThriftServer2.eventManager.onSessionCreated( - session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) - val ctx = if (sqlContext.sparkSession.sessionState.conf.hiveThriftServerSingleSession) { - sqlContext + hiveSession.getIpAddress, sessionHandle.getSessionId.toString, hiveSession.getUsername) + val session = if (sparkSession.sessionState.conf.hiveThriftServerSingleSession) { + sparkSession } else { - sqlContext.newSession() + sparkSession.newSession() } - ctx.setConf(SQLConf.DATETIME_JAVA8API_ENABLED, true) - val hiveSessionState = session.getSessionState - setConfMap(ctx, hiveSessionState.getOverriddenConfigurations) - setConfMap(ctx, hiveSessionState.getHiveVariables) + session.sessionState.conf.setConf(SQLConf.DATETIME_JAVA8API_ENABLED, true) + val hiveSessionState = hiveSession.getSessionState + setConfMap(session, hiveSessionState.getOverriddenConfigurations) + setConfMap(session, hiveSessionState.getHiveVariables) if (sessionConf != null && sessionConf.containsKey("use:database")) { - ctx.sql(s"use ${sessionConf.get("use:database")}") + session.sql(s"use ${sessionConf.get("use:database")}") } - sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx) + sparkSqlOperationManager.sessionToContexts.put(sessionHandle, session) sessionHandle } catch { case NonFatal(e) => @@ -86,17 +86,18 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: override def closeSession(sessionHandle: SessionHandle): Unit = { HiveThriftServer2.eventManager.onSessionClosed(sessionHandle.getSessionId.toString) - val ctx = sparkSqlOperationManager.sessionToContexts.getOrDefault(sessionHandle, sqlContext) - ctx.sparkSession.sessionState.catalog.getTempViewNames().foreach(ctx.uncacheTable) + val session = sparkSqlOperationManager.sessionToContexts + .getOrDefault(sessionHandle, sparkSession) + session.sessionState.catalog.getTempViewNames().foreach(session.catalog.uncacheTable) super.closeSession(sessionHandle) sparkSqlOperationManager.sessionToContexts.remove(sessionHandle) } - def setConfMap(conf: SQLContext, confMap: java.util.Map[String, String]): Unit = { + def setConfMap(sparkSession: SparkSession, confMap: java.util.Map[String, String]): Unit = { val iterator = confMap.entrySet().iterator() while (iterator.hasNext) { val kv = iterator.next() - conf.setConf(kv.getKey, kv.getValue) + sparkSession.conf.set(kv.getKey, kv.getValue) } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index ba42eefed2a22..8e12165dd6f14 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -25,7 +25,7 @@ import org.apache.hive.service.cli.operation._ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.thriftserver._ @@ -38,7 +38,7 @@ private[thriftserver] class SparkSQLOperationManager() val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") - val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() + val sessionToContexts = new ConcurrentHashMap[SessionHandle, SparkSession]() override def newExecuteStatementOperation( parentSession: HiveSession, @@ -46,13 +46,13 @@ private[thriftserver] class SparkSQLOperationManager() confOverlay: JMap[String, String], async: Boolean, queryTimeout: Long): ExecuteStatementOperation = synchronized { - val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) - require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + - s" initialized or had already closed.") - val conf = sqlContext.sessionState.conf + val sparkSession = sessionToContexts.get(parentSession.getSessionHandle) + require(sparkSession != null, s"Session handle: ${parentSession.getSessionHandle} " + + s"has not been initialized or had already closed.") + val conf = sparkSession.sessionState.conf val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation( - sqlContext, parentSession, statement, confOverlay, runInBackground, queryTimeout) + sparkSession, parentSession, statement, confOverlay, runInBackground, queryTimeout) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") @@ -61,10 +61,10 @@ private[thriftserver] class SparkSQLOperationManager() override def newGetCatalogsOperation( parentSession: HiveSession): GetCatalogsOperation = synchronized { - val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) - require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + val session = sessionToContexts.get(parentSession.getSessionHandle) + require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetCatalogsOperation(sqlContext, parentSession) + val operation = new SparkGetCatalogsOperation(session, parentSession) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetCatalogsOperation with session=$parentSession.") operation @@ -74,10 +74,10 @@ private[thriftserver] class SparkSQLOperationManager() parentSession: HiveSession, catalogName: String, schemaName: String): GetSchemasOperation = synchronized { - val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) - require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + val session = sessionToContexts.get(parentSession.getSessionHandle) + require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetSchemasOperation(sqlContext, parentSession, catalogName, schemaName) + val operation = new SparkGetSchemasOperation(session, parentSession, catalogName, schemaName) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetSchemasOperation with session=$parentSession.") operation @@ -89,10 +89,10 @@ private[thriftserver] class SparkSQLOperationManager() schemaName: String, tableName: String, tableTypes: JList[String]): MetadataOperation = synchronized { - val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) - require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + val session = sessionToContexts.get(parentSession.getSessionHandle) + require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetTablesOperation(sqlContext, parentSession, + val operation = new SparkGetTablesOperation(session, parentSession, catalogName, schemaName, tableName, tableTypes) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetTablesOperation with session=$parentSession.") @@ -105,10 +105,10 @@ private[thriftserver] class SparkSQLOperationManager() schemaName: String, tableName: String, columnName: String): GetColumnsOperation = synchronized { - val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) - require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + val session = sessionToContexts.get(parentSession.getSessionHandle) + require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetColumnsOperation(sqlContext, parentSession, + val operation = new SparkGetColumnsOperation(session, parentSession, catalogName, schemaName, tableName, columnName) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetColumnsOperation with session=$parentSession.") @@ -117,10 +117,10 @@ private[thriftserver] class SparkSQLOperationManager() override def newGetTableTypesOperation( parentSession: HiveSession): GetTableTypesOperation = synchronized { - val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) - require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + val session = sessionToContexts.get(parentSession.getSessionHandle) + require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetTableTypesOperation(sqlContext, parentSession) + val operation = new SparkGetTableTypesOperation(session, parentSession) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetTableTypesOperation with session=$parentSession.") operation @@ -131,10 +131,10 @@ private[thriftserver] class SparkSQLOperationManager() catalogName: String, schemaName: String, functionName: String): GetFunctionsOperation = synchronized { - val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) - require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + val session = sessionToContexts.get(parentSession.getSessionHandle) + require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetFunctionsOperation(sqlContext, parentSession, + val operation = new SparkGetFunctionsOperation(session, parentSession, catalogName, schemaName, functionName) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetFunctionsOperation with session=$parentSession.") @@ -143,10 +143,10 @@ private[thriftserver] class SparkSQLOperationManager() override def newGetTypeInfoOperation( parentSession: HiveSession): GetTypeInfoOperation = synchronized { - val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) - require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + val session = sessionToContexts.get(parentSession.getSessionHandle) + require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetTypeInfoOperation(sqlContext, parentSession) + val operation = new SparkGetTypeInfoOperation(session, parentSession) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetTypeInfoOperation with session=$parentSession.") operation diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala index edd247e48f530..6f0fedcb85368 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala @@ -128,21 +128,22 @@ trait SharedThriftServer extends SharedSparkSession { private def startThriftServer(attempt: Int): Unit = { logInfo(s"Trying to start HiveThriftServer2: mode=$mode, attempt=$attempt") - val sqlContext = spark.newSession().sqlContext + val sparkSession = spark.newSession() // Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could // randomly pick any free port to use. // It's much more robust than set a random port generated by ourselves ahead - sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0") - sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0") - sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString) - sqlContext.setConf("hive.exec.scratchdir", tempScratchDir.getAbsolutePath) - sqlContext.setConf(ConfVars.HIVE_START_CLEANUP_SCRATCHDIR.varname, "true") + val sessionConf = sparkSession.sessionState.conf + sessionConf.setConfString(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0") + sessionConf.setConfString(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0") + sessionConf.setConfString(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString) + sessionConf.setConfString("hive.exec.scratchdir", tempScratchDir.getAbsolutePath) + sessionConf.setConfString(ConfVars.HIVE_START_CLEANUP_SCRATCHDIR.varname, "true") try { // Set exitOnError to false to avoid exiting the JVM process and tearing down the SparkContext // instance in case of any exceptions here. Otherwise, the following retries are doomed to // fail on a stopped context. - hiveServer2 = HiveThriftServer2.startWithContext(sqlContext, exitOnError = false) + hiveServer2 = HiveThriftServer2.startWithSparkSession(sparkSession, exitOnError = false) hiveServer2.getServices.asScala.foreach { case t: ThriftCLIService => serverPort = t.getPortNumber diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index d085f596397f7..83f1824c26d2b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -30,7 +30,7 @@ import org.mockito.Mockito.{doReturn, mock, spy, when, RETURNS_DEEP_STUBS} import org.mockito.invocation.InvocationOnMock import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2EventManager import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, NullType, StringType, StructField, StructType} @@ -78,7 +78,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark HiveThriftServer2.eventManager = mock(classOf[HiveThriftServer2EventManager]) - val spySqlContext = spy[SQLContext](sqlContext) + val spySparkSession = spy[SparkSession](spark) // When cancel() is called on the operation, cleanup causes an exception to be thrown inside // of execute(). This should not cause the state to become ERROR. The exception here will be @@ -90,9 +90,9 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark throw new RuntimeException("Operation was cancelled by test cleanup.") }) val statement = "stmt" - doReturn(dataFrame, Nil: _*).when(spySqlContext).sql(statement) + doReturn(dataFrame, Nil: _*).when(spySparkSession).sql(statement) - val executeStatementOperation = new MySparkExecuteStatementOperation(spySqlContext, + val executeStatementOperation = new MySparkExecuteStatementOperation(spySparkSession, hiveSession, statement, signal, finalState) val run = new Thread() { @@ -110,12 +110,12 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark } private class MySparkExecuteStatementOperation( - sqlContext: SQLContext, + session: SparkSession, hiveSession: HiveSession, statement: String, signal: Semaphore, finalState: OperationState) - extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement, + extends SparkExecuteStatementOperation(session, hiveSession, statement, new util.HashMap, false, 0) { override def cleanup(): Unit = {