Skip to content

Commit

Permalink
fix AE job desc (apache#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
windpiger authored and luzhonghao committed Dec 11, 2018
1 parent e20055a commit 4422458
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,21 @@ object SQLExecution {
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
}
}

def withExecutionIdAndJobDesc[T](
sc: SparkContext,
executionId: String,
jobDesc: String)(body: => T): T = {
val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val oldJobDesc = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)

try {
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, jobDesc)
body
} finally {
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, oldJobDesc)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package org.apache.spark.sql.execution.adaptive

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.MapOutputStatistics
import org.apache.spark.broadcast
import org.apache.spark.{MapOutputStatistics, SparkContext, broadcast}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -54,14 +52,15 @@ abstract class QueryStage extends UnaryExecNode {
*/
def executeChildStages(): Unit = {
val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val jobDesc = sqlContext.sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)

// Handle broadcast stages
val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect {
case bqs: BroadcastQueryStageInput => bqs.childStage
}
val broadcastFutures = broadcastQueryStages.map { queryStage =>
Future {
SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) {
SQLExecution.withExecutionIdAndJobDesc(sqlContext.sparkContext, executionId, jobDesc) {
queryStage.prepareBroadcast()
}
}(QueryStage.executionContext)
Expand All @@ -73,7 +72,7 @@ abstract class QueryStage extends UnaryExecNode {
}
val shuffleStageFutures = shuffleQueryStages.map { queryStage =>
Future {
SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) {
SQLExecution.withExecutionIdAndJobDesc(sqlContext.sparkContext, executionId, jobDesc) {
queryStage.execute()
}
}(QueryStage.executionContext)
Expand Down

0 comments on commit 4422458

Please sign in to comment.