Skip to content

Commit

Permalink
Add new listener event for thread dumps.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Oct 27, 2014
1 parent 8c10216 commit 87b8b65
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ class DAGScheduler(
*/
def executorHeartbeatReceived(
execId: String,
threadDump: Array[ThreadStackTrace],
threadStackTraces: Array[ThreadStackTrace],
taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics)
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics, threadDump))
listenerBus.post(SparkListenerExecutorThreadDump(execId, threadStackTraces))
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
implicit val timeout = Timeout(600 seconds)

Await.result(
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,21 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerExecutorThreadDump(
execId: String,
threadStackTraces: Array[ThreadStackTrace])
extends SparkListenerEvent

/**
* Periodic updates from executors.
* @param execId executor id
* @param taskMetrics sequence of (task id, stage id, stage attempt, metrics)
* @param threadDump stack traces from all threads
*/
@DeveloperApi
case class SparkListenerExecutorMetricsUpdate(
execId: String,
taskMetrics: Seq[(Long, Int, Int, TaskMetrics)],
threadDump: Array[ThreadStackTrace])
taskMetrics: Seq[(Long, Int, Int, TaskMetrics)])
extends SparkListenerEvent

@DeveloperApi
Expand Down Expand Up @@ -174,6 +178,11 @@ trait SparkListener {
*/
def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }

/**
* Called when the driver receives thread dumps from an executor in a heartbeat.
*/
def onExecutorThreadDump(executorThreadDump: SparkListenerExecutorThreadDump) {}

/**
* Called when the driver receives task metrics from an executor in a heartbeat.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.executorRunTime += timeDelta
}

override def onExecutorThreadDump(executorThreadDump: SparkListenerExecutorThreadDump) {
val timeAndThreadDump = (System.currentTimeMillis(), executorThreadDump.threadStackTraces)
executorIdToLastThreadDump.put(executorThreadDump.execId, timeAndThreadDump)
}

override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) {
val timeAndThreadDump = (System.currentTimeMillis(), executorMetricsUpdate.threadDump)
executorIdToLastThreadDump.put(executorMetricsUpdate.execId, timeAndThreadDump)
for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), {
logWarning("Metrics update for task in unknown stage " + sid)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private[spark] object JsonProtocol {

// These aren't used, but keeps compiler happy
case SparkListenerShutdown => JNothing
case SparkListenerExecutorMetricsUpdate(_, _, _) => JNothing
case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
case SparkListenerExecutorThreadDump(_, _) => JNothing
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
(1234L, 0, 0, makeTaskMetrics(0)),
(1235L, 0, 0, makeTaskMetrics(100)),
(1236L, 1, 0, makeTaskMetrics(200))), Array.empty))
(1236L, 1, 0, makeTaskMetrics(200)))))

var stage0Data = listener.stageIdToData.get((0, 0)).get
var stage1Data = listener.stageIdToData.get((1, 0)).get
Expand Down

0 comments on commit 87b8b65

Please sign in to comment.