From 0f198acb65c43d6a98c674f049bd96ea7816fdab Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 25 Oct 2014 19:57:28 -0700 Subject: [PATCH 01/13] [SPARK-611] Display executor thread dumps in web UI This patch allows executor thread dumps to be viewed in the Spark web UI. Thread dumps obtained from Thread.getAllStackTraces() are piggybacked on the periodic executor -> driver heartbeats. JobProgressListener stores these heartbeats for display in the UI. One current limitation is that the driver thread dumps are not viewable except when running in local mode. --- .../org/apache/spark/HeartbeatReceiver.scala | 9 ++++---- .../org/apache/spark/executor/Executor.scala | 10 +++++--- .../spark/executor/ThreadStackTrace.scala | 23 +++++++++++++++++++ .../apache/spark/scheduler/DAGScheduler.scala | 5 ++-- .../spark/scheduler/SparkListener.scala | 6 +++-- .../spark/scheduler/TaskScheduler.scala | 10 +++++--- .../spark/scheduler/TaskSchedulerImpl.scala | 6 ++--- .../spark/scheduler/local/LocalBackend.scala | 2 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 6 ++++- .../apache/spark/ui/exec/ExecutorsTab.scala | 4 +++- .../spark/ui/jobs/JobProgressListener.scala | 7 +++++- .../org/apache/spark/util/JsonProtocol.scala | 2 +- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- 13 files changed, 69 insertions(+), 23 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/executor/ThreadStackTrace.scala diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 83ae57b7f1516..58197f9d84dfc 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -18,7 +18,7 @@ package org.apache.spark import akka.actor.Actor -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler.TaskScheduler import org.apache.spark.util.ActorLogReceive @@ -29,21 +29,22 @@ import org.apache.spark.util.ActorLogReceive */ private[spark] case class Heartbeat( executorId: String, + threadDump: Array[ThreadStackTrace], taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics blockManagerId: BlockManagerId) private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) /** - * Lives in the driver to receive heartbeats from executors.. + * Lives in the driver to receive heartbeats from executors. */ private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor with ActorLogReceive with Logging { override def receiveWithLogging = { - case Heartbeat(executorId, taskMetrics, blockManagerId) => + case Heartbeat(executorId, threadDump, taskMetrics, blockManagerId) => val response = HeartbeatResponse( - !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) + ! scheduler.executorHeartbeatReceived(executorId, threadDump, taskMetrics, blockManagerId)) sender ! response } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2889f59e33e84..d36b08572590a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -360,7 +360,6 @@ private[spark] class Executor( override def run() { // Sleep a random interval so the heartbeats don't end up in sync Thread.sleep(interval + (math.random * interval).asInstanceOf[Int]) - while (!isStopped) { val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() for (taskRunner <- runningTasks.values()) { @@ -380,8 +379,13 @@ private[spark] class Executor( } } } - - val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) + val threadDump = Thread.getAllStackTraces.toArray.sortBy(_._1.getId).map { + case (thread, stackElements) => + val stackTrace = stackElements.map(_.toString).mkString("\n") + ThreadStackTrace(thread.getId, thread.getName, thread.getState.toString, stackTrace) + } + val message = Heartbeat(executorId, threadDump, tasksMetrics.toArray, + env.blockManager.blockManagerId) try { val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, retryAttempts, retryIntervalMs, timeout) diff --git a/core/src/main/scala/org/apache/spark/executor/ThreadStackTrace.scala b/core/src/main/scala/org/apache/spark/executor/ThreadStackTrace.scala new file mode 100644 index 0000000000000..610c11c699971 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ThreadStackTrace.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +/** + * Used for shipping per-thread stacktraces from the executors to driver. + */ +private[spark] case class ThreadStackTrace(id: Long, name: String, state: String, trace: String) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f81fa6d8089fc..5420624759d67 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -35,7 +35,7 @@ import akka.util.Timeout import org.apache.spark._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage._ @@ -164,9 +164,10 @@ class DAGScheduler( */ def executorHeartbeatReceived( execId: String, + threadDump: Array[ThreadStackTrace], taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics) blockManagerId: BlockManagerId): Boolean = { - listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics)) + listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics, threadDump)) implicit val timeout = Timeout(600 seconds) Await.result( diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 86afe3bd5265f..f6bf408ae6967 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{Distribution, Utils} @@ -81,11 +81,13 @@ case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent * Periodic updates from executors. * @param execId executor id * @param taskMetrics sequence of (task id, stage id, stage attempt, metrics) + * @param threadDump stack traces from all threads */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, - taskMetrics: Seq[(Long, Int, Int, TaskMetrics)]) + taskMetrics: Seq[(Long, Int, Int, TaskMetrics)], + threadDump: Array[ThreadStackTrace]) extends SparkListenerEvent @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index a129a434c9a1a..e652b941fa759 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} import org.apache.spark.storage.BlockManagerId /** @@ -64,8 +64,12 @@ private[spark] trait TaskScheduler { * alive. Return true if the driver knows about the given block manager. Otherwise, return false, * indicating that the block manager should re-register. */ - def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], - blockManagerId: BlockManagerId): Boolean + def executorHeartbeatReceived( + execId: String, + threadDump: Array[ThreadStackTrace], + taskMetrics: Array[(Long, TaskMetrics)], + blockManagerId: BlockManagerId + ): Boolean /** * Get an application ID associated with the job. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2b39c7fc872da..789717ee42f6a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -32,9 +32,8 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} import org.apache.spark.storage.BlockManagerId -import akka.actor.Props /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. @@ -329,6 +328,7 @@ private[spark] class TaskSchedulerImpl( */ override def executorHeartbeatReceived( execId: String, + threadDump: Array[ThreadStackTrace], taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics blockManagerId: BlockManagerId): Boolean = { @@ -339,7 +339,7 @@ private[spark] class TaskSchedulerImpl( .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics)) } } - dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) + dagScheduler.executorHeartbeatReceived(execId, threadDump, metricsWithStageIds, blockManagerId) } def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 58b78f041cd85..c69af69187b3e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -47,7 +47,7 @@ private[spark] class LocalActor( private var freeCores = totalCores - private val localExecutorId = "localhost" + private val localExecutorId = "" private val localExecutorHostname = "localhost" val executor = new Executor( diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index b0e3bb3b552fd..5ca12f70b1854 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -42,7 +42,7 @@ private case class ExecutorSummaryInfo( maxMemory: Long) private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { - private val listener = parent.listener + private val listener = parent.executorsListener def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList @@ -75,6 +75,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { Shuffle Write + Thread Dump {execInfoSorted.map(execRow)} @@ -133,6 +134,9 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { {Utils.bytesToString(info.totalShuffleWrite)} + + Thread Dump + } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 689cf02b25b70..18e9f5d419001 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -26,9 +26,11 @@ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.{SparkUI, SparkUITab} private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { - val listener = parent.executorsListener + val executorsListener = parent.executorsListener + val jobProgressListener = parent.jobProgressListener attachPage(new ExecutorsPage(this)) + attachPage(new ThreadDumpPage(this)) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b5207360510dd..23e3acc458d6b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.{HashMap, ListBuffer} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId @@ -65,6 +65,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() + /** Map entries are (last update timestamp, thread dump) pairs */ + val executorIdToLastThreadDump = HashMap[String, (Long, Array[ThreadStackTrace])]() + var schedulingMode: Option[SchedulingMode] = None def blockManagerIds = executorIdToBlockManagerId.values.toSeq @@ -266,6 +269,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { + val timeAndThreadDump = (System.currentTimeMillis(), executorMetricsUpdate.threadDump) + executorIdToLastThreadDump.put(executorMetricsUpdate.execId, timeAndThreadDump) for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) { val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), { logWarning("Metrics update for task in unknown stage " + sid) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 5b2e7d3a7edb9..df00b1d8c3b07 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -73,7 +73,7 @@ private[spark] object JsonProtocol { // These aren't used, but keeps compiler happy case SparkListenerShutdown => JNothing - case SparkListenerExecutorMetricsUpdate(_, _) => JNothing + case SparkListenerExecutorMetricsUpdate(_, _, _) => JNothing } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 3370dd4156c3f..3f1491596e961 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -177,7 +177,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( (1234L, 0, 0, makeTaskMetrics(0)), (1235L, 0, 0, makeTaskMetrics(100)), - (1236L, 1, 0, makeTaskMetrics(200))))) + (1236L, 1, 0, makeTaskMetrics(200))), Array.empty)) var stage0Data = listener.stageIdToData.get((0, 0)).get var stage1Data = listener.stageIdToData.get((1, 0)).get From 8c1021636efcc34af925ec6c43f4757b8034ce56 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 25 Oct 2014 20:10:33 -0700 Subject: [PATCH 02/13] Add missing file. --- .../apache/spark/ui/exec/ThreadDumpPage.scala | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala new file mode 100644 index 0000000000000..64e7f1d2a4a8c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.exec + +import javax.servlet.http.HttpServletRequest + +import scala.xml.{Text, Node} + +import org.apache.spark.ui.{UIUtils, WebUIPage} + +class ThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") { + + def render(request: HttpServletRequest): Seq[Node] = { + val executorId = Option(request.getParameter("executorId")).getOrElse { + return Text(s"Missing executorId parameter") + } + val maybeThreadDump = parent.jobProgressListener synchronized { + parent.jobProgressListener.executorIdToLastThreadDump.get(executorId) + } + val content = maybeThreadDump.map { case (time, threadDump) => + val dumpRows = threadDump.map { thread => +
+ + +
+ } + +
+

Updated at {UIUtils.formatDate(time)}

+ { + // scalastyle:off +

+ Expand All +

+

+ // scalastyle:on + } +
{dumpRows}
+
+ }.getOrElse(Text("No thread dump to display")) + UIUtils.headerSparkPage(s"Thread dump for executor $executorId", content, parent) + } +} From 87b8b6559e9ffc217cc9977cca01fee50de133a4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Oct 2014 11:53:14 -0700 Subject: [PATCH 03/13] Add new listener event for thread dumps. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 5 +++-- .../apache/spark/scheduler/SparkListener.scala | 15 ++++++++++++--- .../spark/ui/jobs/JobProgressListener.scala | 7 +++++-- .../org/apache/spark/util/JsonProtocol.scala | 3 ++- .../spark/ui/jobs/JobProgressListenerSuite.scala | 2 +- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5420624759d67..5f2d3195500d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -164,10 +164,11 @@ class DAGScheduler( */ def executorHeartbeatReceived( execId: String, - threadDump: Array[ThreadStackTrace], + threadStackTraces: Array[ThreadStackTrace], taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics) blockManagerId: BlockManagerId): Boolean = { - listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics, threadDump)) + listenerBus.post(SparkListenerExecutorThreadDump(execId, threadStackTraces)) + listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics)) implicit val timeout = Timeout(600 seconds) Await.result( diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index f6bf408ae6967..e2b7e1f34c02b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -77,17 +77,21 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerExecutorThreadDump( + execId: String, + threadStackTraces: Array[ThreadStackTrace]) + extends SparkListenerEvent + /** * Periodic updates from executors. * @param execId executor id * @param taskMetrics sequence of (task id, stage id, stage attempt, metrics) - * @param threadDump stack traces from all threads */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, - taskMetrics: Seq[(Long, Int, Int, TaskMetrics)], - threadDump: Array[ThreadStackTrace]) + taskMetrics: Seq[(Long, Int, Int, TaskMetrics)]) extends SparkListenerEvent @DeveloperApi @@ -174,6 +178,11 @@ trait SparkListener { */ def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { } + /** + * Called when the driver receives thread dumps from an executor in a heartbeat. + */ + def onExecutorThreadDump(executorThreadDump: SparkListenerExecutorThreadDump) {} + /** * Called when the driver receives task metrics from an executor in a heartbeat. */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 23e3acc458d6b..b089cfaf2670d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -268,9 +268,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.executorRunTime += timeDelta } + override def onExecutorThreadDump(executorThreadDump: SparkListenerExecutorThreadDump) { + val timeAndThreadDump = (System.currentTimeMillis(), executorThreadDump.threadStackTraces) + executorIdToLastThreadDump.put(executorThreadDump.execId, timeAndThreadDump) + } + override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { - val timeAndThreadDump = (System.currentTimeMillis(), executorMetricsUpdate.threadDump) - executorIdToLastThreadDump.put(executorMetricsUpdate.execId, timeAndThreadDump) for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) { val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), { logWarning("Metrics update for task in unknown stage " + sid) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index df00b1d8c3b07..8bd8e9412cd67 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -73,7 +73,8 @@ private[spark] object JsonProtocol { // These aren't used, but keeps compiler happy case SparkListenerShutdown => JNothing - case SparkListenerExecutorMetricsUpdate(_, _, _) => JNothing + case SparkListenerExecutorMetricsUpdate(_, _) => JNothing + case SparkListenerExecutorThreadDump(_, _) => JNothing } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 3f1491596e961..3370dd4156c3f 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -177,7 +177,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( (1234L, 0, 0, makeTaskMetrics(0)), (1235L, 0, 0, makeTaskMetrics(100)), - (1236L, 1, 0, makeTaskMetrics(200))), Array.empty)) + (1236L, 1, 0, makeTaskMetrics(200))))) var stage0Data = listener.stageIdToData.get((0, 0)).get var stage1Data = listener.stageIdToData.get((1, 0)).get From cc3e6b3b61b1c4561bf2d0726da4ca1b2f753f16 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 27 Oct 2014 12:00:55 -0700 Subject: [PATCH 04/13] Fix test code in DAGSchedulerSuite. --- .../org/apache/spark/scheduler/TaskScheduler.scala | 2 +- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index e652b941fa759..fcdf621bf8b66 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -66,7 +66,7 @@ private[spark] trait TaskScheduler { */ def executorHeartbeatReceived( execId: String, - threadDump: Array[ThreadStackTrace], + threadStackTraces: Array[ThreadStackTrace], taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId ): Boolean diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a2e4f712db55b..a3e9318e7e0a9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.util.CallSite -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} class BuggyDAGEventProcessActor extends Actor { val state = 0 @@ -81,7 +81,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def schedulingMode: SchedulingMode = SchedulingMode.NONE override def start() = {} override def stop() = {} - override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], + override def executorHeartbeatReceived( + execId: String, + threadStackTraces: Array[ThreadStackTrace], + taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager @@ -371,7 +374,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 - override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], + override def executorHeartbeatReceived( + execId: String, + threadStackTraces: Array[ThreadStackTrace], + taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true } val noKillScheduler = new DAGScheduler( From 2b8bdf384f83e89529a1458fa2f7f3adbe460885 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 28 Oct 2014 14:52:44 -0700 Subject: [PATCH 05/13] Enable thread dumps from the driver when running in non-local mode. --- .../org/apache/spark/HeartbeatReceiver.scala | 4 ++-- .../scala/org/apache/spark/SparkContext.scala | 16 ++++++++++++++-- .../org/apache/spark/executor/Executor.scala | 7 +------ .../apache/spark/scheduler/DAGScheduler.scala | 4 ++-- .../apache/spark/scheduler/SparkListener.scala | 4 ++-- .../spark/scheduler/SparkListenerBus.scala | 2 ++ .../apache/spark/scheduler/TaskScheduler.scala | 3 ++- .../spark/scheduler/TaskSchedulerImpl.scala | 4 ++-- .../spark/ui/jobs/JobProgressListener.scala | 3 ++- .../{executor => util}/ThreadStackTrace.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 9 +++++++++ .../spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- 12 files changed, 41 insertions(+), 21 deletions(-) rename core/src/main/scala/org/apache/spark/{executor => util}/ThreadStackTrace.scala (96%) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 58197f9d84dfc..e6a4aaf8ecec9 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -18,10 +18,10 @@ package org.apache.spark import akka.actor.Actor -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler.TaskScheduler -import org.apache.spark.util.ActorLogReceive +import org.apache.spark.util.{ActorLogReceive, ThreadStackTrace} /** * A heartbeat from executors to the driver. This is a shared message used by several internal diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e8fdfff04390d..7e7dca5a6811a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -21,9 +21,8 @@ import scala.language.implicitConversions import java.io._ import java.net.URI -import java.util.Arrays +import java.util.{Arrays, Properties, Timer, TimerTask, UUID} import java.util.concurrent.atomic.AtomicInteger -import java.util.{Properties, UUID} import java.util.UUID.randomUUID import scala.collection.{Map, Set} import scala.collection.generic.Growable @@ -242,6 +241,18 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // the bound port to the cluster manager properly ui.foreach(_.bind()) + // If we are not running in local mode, then start a new timer thread for capturing driver thread + // dumps for display in the web UI (in local mode, this is handled by the local Executor): + private val threadDumpTimer = new Timer("Driver thread dump timer", true) + if (!isLocal) { + val threadDumpInterval = conf.getInt("spark.executor.heartbeatInterval", 10000) + threadDumpTimer.scheduleAtFixedRate(new TimerTask { + override def run(): Unit = { + listenerBus.post(SparkListenerExecutorThreadDump("", Utils.getThreadDump())) + } + }, threadDumpInterval, threadDumpInterval) + } + /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf) @@ -960,6 +971,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { def stop() { postApplicationEnd() ui.foreach(_.stop()) + threadDumpTimer.cancel() // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d36b08572590a..91f036595d048 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -379,12 +379,7 @@ private[spark] class Executor( } } } - val threadDump = Thread.getAllStackTraces.toArray.sortBy(_._1.getId).map { - case (thread, stackElements) => - val stackTrace = stackElements.map(_.toString).mkString("\n") - ThreadStackTrace(thread.getId, thread.getName, thread.getState.toString, stackTrace) - } - val message = Heartbeat(executorId, threadDump, tasksMetrics.toArray, + val message = Heartbeat(executorId, Utils.getThreadDump(), tasksMetrics.toArray, env.blockManager.blockManagerId) try { val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5f2d3195500d4..14f26daa3665b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -35,11 +35,11 @@ import akka.util.Timeout import org.apache.spark._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage._ -import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils} +import org.apache.spark.util._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index e2b7e1f34c02b..71e72fe2b97f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -24,9 +24,9 @@ import scala.collection.mutable import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{Distribution, Utils} +import org.apache.spark.util.{Distribution, Utils, ThreadStackTrace} @DeveloperApi sealed trait SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e79ffd7a3587d..11b4f4e44ae51 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -70,6 +70,8 @@ private[spark] trait SparkListenerBus extends Logging { foreachListener(_.onApplicationEnd(applicationEnd)) case metricsUpdate: SparkListenerExecutorMetricsUpdate => foreachListener(_.onExecutorMetricsUpdate(metricsUpdate)) + case threadDump: SparkListenerExecutorThreadDump => + foreachListener(_.onExecutorThreadDump(threadDump)) case SparkListenerShutdown => } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index fcdf621bf8b66..54a11d3ae35f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -18,8 +18,9 @@ package org.apache.spark.scheduler import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.ThreadStackTrace /** * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 789717ee42f6a..41a663b31d37c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -31,8 +31,8 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.util.Utils -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.util.{ThreadStackTrace, Utils} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId /** diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b089cfaf2670d..a1a0b1c0d7fee 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -21,11 +21,12 @@ import scala.collection.mutable.{HashMap, ListBuffer} import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.ui.jobs.UIData._ +import org.apache.spark.util.ThreadStackTrace /** * :: DeveloperApi :: diff --git a/core/src/main/scala/org/apache/spark/executor/ThreadStackTrace.scala b/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala similarity index 96% rename from core/src/main/scala/org/apache/spark/executor/ThreadStackTrace.scala rename to core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala index 610c11c699971..f9514097f2b89 100644 --- a/core/src/main/scala/org/apache/spark/executor/ThreadStackTrace.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.executor +package org.apache.spark.util /** * Used for shipping per-thread stacktraces from the executors to driver. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e1dc49238733c..7d21c6fd24925 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1568,6 +1568,15 @@ private[spark] object Utils extends Logging { s"$className: $desc\n$st" } + /** Return a thread dump of all threads' stacktraces. Used to capture dumps for the web UI */ + def getThreadDump(): Array[ThreadStackTrace] = { + Thread.getAllStackTraces.toArray.sortBy(_._1.getId).map { + case (thread, stackElements) => + val stackTrace = stackElements.map(_.toString).mkString("\n") + ThreadStackTrace(thread.getId, thread.getName, thread.getState.toString, stackTrace) + } + } + /** * Convert all spark properties set in the given SparkConf to a sequence of java options. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a3e9318e7e0a9..14124fe998ebe 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -31,8 +31,8 @@ import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.CallSite -import org.apache.spark.executor.{TaskMetrics, ThreadStackTrace} +import org.apache.spark.util.{CallSite, ThreadStackTrace} +import org.apache.spark.executor.TaskMetrics class BuggyDAGEventProcessActor extends Actor { val state = 0 From 4c87d7ff9fe381bb0bf71fddf7fd426b65ef277f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 28 Oct 2014 17:18:12 -0700 Subject: [PATCH 06/13] Use separate RPC for sending thread dumps. --- .../scala/org/apache/spark/HeartbeatReceiver.scala | 12 +++++++++--- .../scala/org/apache/spark/executor/Executor.scala | 5 +++-- .../org/apache/spark/scheduler/DAGScheduler.scala | 9 +++++++-- .../org/apache/spark/scheduler/TaskScheduler.scala | 13 +++++++------ .../apache/spark/scheduler/TaskSchedulerImpl.scala | 7 +++++-- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 13 +++++-------- 6 files changed, 36 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index e6a4aaf8ecec9..7d83909075806 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -29,12 +29,16 @@ import org.apache.spark.util.{ActorLogReceive, ThreadStackTrace} */ private[spark] case class Heartbeat( executorId: String, - threadDump: Array[ThreadStackTrace], taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics blockManagerId: BlockManagerId) private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) +/** + * A thread dump sent from executors to the driver. + */ +private[spark] case class ThreadDump(executorId: String, threadStackTraces: Array[ThreadStackTrace]) + /** * Lives in the driver to receive heartbeats from executors. */ @@ -42,9 +46,11 @@ private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor with ActorLogReceive with Logging { override def receiveWithLogging = { - case Heartbeat(executorId, threadDump, taskMetrics, blockManagerId) => + case Heartbeat(executorId, taskMetrics, blockManagerId) => val response = HeartbeatResponse( - ! scheduler.executorHeartbeatReceived(executorId, threadDump, taskMetrics, blockManagerId)) + ! scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) sender ! response + case ThreadDump(executorId, stackTraces: Array[ThreadStackTrace]) => + scheduler.executorThreadDumpReceived(executorId, stackTraces) } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 91f036595d048..6fa82f68be039 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -361,6 +361,8 @@ private[spark] class Executor( // Sleep a random interval so the heartbeats don't end up in sync Thread.sleep(interval + (math.random * interval).asInstanceOf[Int]) while (!isStopped) { + // Send the thread-dump as a fire-and-forget, best-effort message: + heartbeatReceiverRef ! ThreadDump(executorId, Utils.getThreadDump()) val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() for (taskRunner <- runningTasks.values()) { if (!taskRunner.attemptedTask.isEmpty) { @@ -379,8 +381,7 @@ private[spark] class Executor( } } } - val message = Heartbeat(executorId, Utils.getThreadDump(), tasksMetrics.toArray, - env.blockManager.blockManagerId) + val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) try { val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, retryAttempts, retryIntervalMs, timeout) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 14f26daa3665b..73a7f6a2ba977 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -164,10 +164,8 @@ class DAGScheduler( */ def executorHeartbeatReceived( execId: String, - threadStackTraces: Array[ThreadStackTrace], taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics) blockManagerId: BlockManagerId): Boolean = { - listenerBus.post(SparkListenerExecutorThreadDump(execId, threadStackTraces)) listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics)) implicit val timeout = Timeout(600 seconds) @@ -176,6 +174,13 @@ class DAGScheduler( timeout.duration).asInstanceOf[Boolean] } + /** + * Called by the TaskScheduler when a thread dump is received from an executor. + */ + def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) { + listenerBus.post(SparkListenerExecutorThreadDump(execId, stackTraces)) + } + // Called by TaskScheduler when an executor fails. def executorLost(execId: String) { eventProcessActor ! ExecutorLost(execId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 54a11d3ae35f3..58057c77f07b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -65,12 +65,13 @@ private[spark] trait TaskScheduler { * alive. Return true if the driver knows about the given block manager. Otherwise, return false, * indicating that the block manager should re-register. */ - def executorHeartbeatReceived( - execId: String, - threadStackTraces: Array[ThreadStackTrace], - taskMetrics: Array[(Long, TaskMetrics)], - blockManagerId: BlockManagerId - ): Boolean + def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], + blockManagerId: BlockManagerId): Boolean + + /** + * Called when a thread dump has been received from an executor. + */ + def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) /** * Get an application ID associated with the job. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 41a663b31d37c..7c66a23af616d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -328,7 +328,6 @@ private[spark] class TaskSchedulerImpl( */ override def executorHeartbeatReceived( execId: String, - threadDump: Array[ThreadStackTrace], taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics blockManagerId: BlockManagerId): Boolean = { @@ -339,7 +338,11 @@ private[spark] class TaskSchedulerImpl( .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics)) } } - dagScheduler.executorHeartbeatReceived(execId, threadDump, metricsWithStageIds, blockManagerId) + dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) + } + + override def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) { + dagScheduler.executorThreadDumpReceived(execId, stackTraces) } def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 14124fe998ebe..bde6e9b87f4f8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -81,11 +81,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def schedulingMode: SchedulingMode = SchedulingMode.NONE override def start() = {} override def stop() = {} - override def executorHeartbeatReceived( - execId: String, - threadStackTraces: Array[ThreadStackTrace], - taskMetrics: Array[(Long, TaskMetrics)], + override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true + override def executorThreadDumpReceived(execId: String, threadDump: Array[ThreadStackTrace]) {} override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) @@ -374,11 +372,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 - override def executorHeartbeatReceived( - execId: String, - threadStackTraces: Array[ThreadStackTrace], - taskMetrics: Array[(Long, TaskMetrics)], + override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true + override def executorThreadDumpReceived(execId: String, threadDump: Array[ThreadStackTrace]) { + } } val noKillScheduler = new DAGScheduler( sc, From dfec08befeaefac8ce33027af79d8d8ee0ed76da Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 28 Oct 2014 18:25:08 -0700 Subject: [PATCH 07/13] Add option to disable thread dumps in UI. --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/executor/Executor.scala | 7 +++++-- .../apache/spark/ui/exec/ExecutorsPage.scala | 19 ++++++++++++++----- .../apache/spark/ui/exec/ExecutorsTab.scala | 7 +++++-- docs/configuration.md | 7 +++++++ 5 files changed, 32 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7e7dca5a6811a..0d6ba4176ec5f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -244,7 +244,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // If we are not running in local mode, then start a new timer thread for capturing driver thread // dumps for display in the web UI (in local mode, this is handled by the local Executor): private val threadDumpTimer = new Timer("Driver thread dump timer", true) - if (!isLocal) { + if (!isLocal && conf.getBoolean("spark.executor.sendThreadDumps", true)) { val threadDumpInterval = conf.getInt("spark.executor.heartbeatInterval", 10000) threadDumpTimer.scheduleAtFixedRate(new TimerTask { override def run(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 6fa82f68be039..a1682832711c4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -355,14 +355,17 @@ private[spark] class Executor( val retryAttempts = AkkaUtils.numRetries(conf) val retryIntervalMs = AkkaUtils.retryWaitMs(conf) val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem) + val threadDumpsEnabled = conf.getBoolean("spark.executor.sendThreadDumps", true) val t = new Thread() { override def run() { // Sleep a random interval so the heartbeats don't end up in sync Thread.sleep(interval + (math.random * interval).asInstanceOf[Int]) while (!isStopped) { - // Send the thread-dump as a fire-and-forget, best-effort message: - heartbeatReceiverRef ! ThreadDump(executorId, Utils.getThreadDump()) + if (threadDumpsEnabled) { + // Send the thread-dump as a fire-and-forget, best-effort message: + heartbeatReceiverRef ! ThreadDump(executorId, Utils.getThreadDump()) + } val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() for (taskRunner <- runningTasks.values()) { if (!taskRunner.attemptedTask.isEmpty) { diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 5ca12f70b1854..39dac30f7f5e0 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -41,7 +41,10 @@ private case class ExecutorSummaryInfo( totalShuffleWrite: Long, maxMemory: Long) -private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { +private[ui] class ExecutorsPage( + parent: ExecutorsTab, + threadDumpEnabled: Boolean) + extends WebUIPage("") { private val listener = parent.executorsListener def render(request: HttpServletRequest): Seq[Node] = { @@ -75,7 +78,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { Shuffle Write - Thread Dump + {if (threadDumpEnabled) Thread Dump else Seq.empty} {execInfoSorted.map(execRow)} @@ -134,9 +137,15 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { {Utils.bytesToString(info.totalShuffleWrite)} - - Thread Dump - + { + if (threadDumpEnabled) { + + Thread Dump + + } else { + Seq.empty + } + } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 18e9f5d419001..6cd16e67824b6 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -28,9 +28,12 @@ import org.apache.spark.ui.{SparkUI, SparkUITab} private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { val executorsListener = parent.executorsListener val jobProgressListener = parent.jobProgressListener + val threadDumpEnabled = parent.conf.getBoolean("spark.executor.sendThreadDumps", true) - attachPage(new ExecutorsPage(this)) - attachPage(new ThreadDumpPage(this)) + attachPage(new ExecutorsPage(this, threadDumpEnabled)) + if (threadDumpEnabled) { + attachPage(new ThreadDumpPage(this)) + } } /** diff --git a/docs/configuration.md b/docs/configuration.md index 3007706a2586e..d4c80bcc0fc91 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -655,6 +655,13 @@ Apart from these, the following properties are also available, and may be useful the driver know that the executor is still alive and update it with metrics for in-progress tasks. + + spark.executor.sendThreadDumps + true + If set to true, executors will periodically send thread dumps to the driver for display + in the web UI. The frequency of these dumps is controlled by + spark.executor.heartbeatInterval./td> + #### Networking From f4ac1c1099019cc43525508545452e22eb677a70 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 28 Oct 2014 22:48:26 -0700 Subject: [PATCH 08/13] Switch to on-demand collection of thread dumps Uses a (hack) driver -> executor RPC path that I built. --- .../org/apache/spark/HeartbeatReceiver.scala | 13 ++------ .../scala/org/apache/spark/SparkContext.scala | 30 +++++++++---------- .../CoarseGrainedExecutorBackend.scala | 3 +- .../org/apache/spark/executor/Executor.scala | 14 +++++---- .../apache/spark/scheduler/DAGScheduler.scala | 9 +----- .../spark/scheduler/SparkListener.scala | 13 +------- .../spark/scheduler/SparkListenerBus.scala | 2 -- .../spark/scheduler/TaskScheduler.scala | 8 +---- .../spark/scheduler/TaskSchedulerImpl.scala | 7 ++--- .../spark/scheduler/local/LocalBackend.scala | 2 +- .../spark/storage/BlockManagerMaster.scala | 4 +++ .../storage/BlockManagerMasterActor.scala | 14 +++++++++ .../spark/storage/BlockManagerMessages.scala | 2 ++ .../apache/spark/ui/exec/ExecutorsTab.scala | 3 +- .../apache/spark/ui/exec/ThreadDumpPage.scala | 13 ++++---- .../spark/ui/jobs/JobProgressListener.scala | 9 ------ .../org/apache/spark/util/AkkaUtils.scala | 14 +++++++++ .../org/apache/spark/util/JsonProtocol.scala | 1 - .../spark/scheduler/DAGSchedulerSuite.scala | 5 +--- docs/configuration.md | 14 ++++----- 20 files changed, 86 insertions(+), 94 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 7d83909075806..83ae57b7f1516 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -21,7 +21,7 @@ import akka.actor.Actor import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler.TaskScheduler -import org.apache.spark.util.{ActorLogReceive, ThreadStackTrace} +import org.apache.spark.util.ActorLogReceive /** * A heartbeat from executors to the driver. This is a shared message used by several internal @@ -35,12 +35,7 @@ private[spark] case class Heartbeat( private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) /** - * A thread dump sent from executors to the driver. - */ -private[spark] case class ThreadDump(executorId: String, threadStackTraces: Array[ThreadStackTrace]) - -/** - * Lives in the driver to receive heartbeats from executors. + * Lives in the driver to receive heartbeats from executors.. */ private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor with ActorLogReceive with Logging { @@ -48,9 +43,7 @@ private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) override def receiveWithLogging = { case Heartbeat(executorId, taskMetrics, blockManagerId) => val response = HeartbeatResponse( - ! scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) + !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) sender ! response - case ThreadDump(executorId, stackTraces: Array[ThreadStackTrace]) => - scheduler.executorThreadDumpReceived(executorId, stackTraces) } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0d6ba4176ec5f..19c0c9cc5ad72 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -21,7 +21,7 @@ import scala.language.implicitConversions import java.io._ import java.net.URI -import java.util.{Arrays, Properties, Timer, TimerTask, UUID} +import java.util.{Arrays, Properties, UUID} import java.util.concurrent.atomic.AtomicInteger import java.util.UUID.randomUUID import scala.collection.{Map, Set} @@ -40,6 +40,7 @@ import akka.actor.Props import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import org.apache.spark.executor.TriggerThreadDump import org.apache.spark.input.WholeTextFileInputFormat import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ @@ -50,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.JobProgressListener -import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} +import org.apache.spark.util._ /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -241,18 +242,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // the bound port to the cluster manager properly ui.foreach(_.bind()) - // If we are not running in local mode, then start a new timer thread for capturing driver thread - // dumps for display in the web UI (in local mode, this is handled by the local Executor): - private val threadDumpTimer = new Timer("Driver thread dump timer", true) - if (!isLocal && conf.getBoolean("spark.executor.sendThreadDumps", true)) { - val threadDumpInterval = conf.getInt("spark.executor.heartbeatInterval", 10000) - threadDumpTimer.scheduleAtFixedRate(new TimerTask { - override def run(): Unit = { - listenerBus.post(SparkListenerExecutorThreadDump("", Utils.getThreadDump())) - } - }, threadDumpInterval, threadDumpInterval) - } - /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf) @@ -362,6 +351,18 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { override protected def childValue(parent: Properties): Properties = new Properties(parent) } + /** Called by the web UI to obtain executor thread dumps */ + private[spark] def getExecutorThreadDump(executorId: String): Array[ThreadStackTrace] = { + if (executorId == "") { + Utils.getThreadDump() + } else { + val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get + val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem) + AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump(), actorRef, + AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)) + } + } + private[spark] def getLocalProperties: Properties = localProperties.get() private[spark] def setLocalProperties(props: Properties) { @@ -971,7 +972,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { def stop() { postApplicationEnd() ui.foreach(_.stop()) - threadDumpTimer.cancel() // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 697154d762d41..3711824a40cfc 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -131,7 +131,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Create a new ActorSystem using driver's Spark properties to run the backend. val driverConf = new SparkConf().setAll(props) val (actorSystem, boundPort) = AkkaUtils.createActorSystem( - "sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf)) + SparkEnv.executorActorSystemName, + hostname, port, driverConf, new SecurityManager(driverConf)) // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a1682832711c4..a306e54c8f555 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal -import akka.actor.ActorSystem +import akka.actor.{Props, ActorSystem} import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil @@ -92,6 +92,10 @@ private[spark] class Executor( } } + // Create an actor for receiving RPCs from the driver + private val executorActor = env.actorSystem.actorOf( + Props(new ExecutorActor(executorId)), "ExecutorActor") + // Create our ClassLoader // do this after SparkEnv creation so can access the SecurityManager private val urlClassLoader = createClassLoader() @@ -128,6 +132,7 @@ private[spark] class Executor( def stop() { env.metricsSystem.report() + env.actorSystem.stop(executorActor) isStopped = true threadPool.shutdown() if (!isLocal) { @@ -355,17 +360,13 @@ private[spark] class Executor( val retryAttempts = AkkaUtils.numRetries(conf) val retryIntervalMs = AkkaUtils.retryWaitMs(conf) val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem) - val threadDumpsEnabled = conf.getBoolean("spark.executor.sendThreadDumps", true) val t = new Thread() { override def run() { // Sleep a random interval so the heartbeats don't end up in sync Thread.sleep(interval + (math.random * interval).asInstanceOf[Int]) + while (!isStopped) { - if (threadDumpsEnabled) { - // Send the thread-dump as a fire-and-forget, best-effort message: - heartbeatReceiverRef ! ThreadDump(executorId, Utils.getThreadDump()) - } val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() for (taskRunner <- runningTasks.values()) { if (!taskRunner.attemptedTask.isEmpty) { @@ -384,6 +385,7 @@ private[spark] class Executor( } } } + val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) try { val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 73a7f6a2ba977..f81fa6d8089fc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -39,7 +39,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage._ -import org.apache.spark.util._ +import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils} import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat /** @@ -174,13 +174,6 @@ class DAGScheduler( timeout.duration).asInstanceOf[Boolean] } - /** - * Called by the TaskScheduler when a thread dump is received from an executor. - */ - def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) { - listenerBus.post(SparkListenerExecutorThreadDump(execId, stackTraces)) - } - // Called by TaskScheduler when an executor fails. def executorLost(execId: String) { eventProcessActor ! ExecutorLost(execId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 71e72fe2b97f3..86afe3bd5265f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -26,7 +26,7 @@ import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{Distribution, Utils, ThreadStackTrace} +import org.apache.spark.util.{Distribution, Utils} @DeveloperApi sealed trait SparkListenerEvent @@ -77,12 +77,6 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent -@DeveloperApi -case class SparkListenerExecutorThreadDump( - execId: String, - threadStackTraces: Array[ThreadStackTrace]) - extends SparkListenerEvent - /** * Periodic updates from executors. * @param execId executor id @@ -178,11 +172,6 @@ trait SparkListener { */ def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { } - /** - * Called when the driver receives thread dumps from an executor in a heartbeat. - */ - def onExecutorThreadDump(executorThreadDump: SparkListenerExecutorThreadDump) {} - /** * Called when the driver receives task metrics from an executor in a heartbeat. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 11b4f4e44ae51..e79ffd7a3587d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -70,8 +70,6 @@ private[spark] trait SparkListenerBus extends Logging { foreachListener(_.onApplicationEnd(applicationEnd)) case metricsUpdate: SparkListenerExecutorMetricsUpdate => foreachListener(_.onExecutorMetricsUpdate(metricsUpdate)) - case threadDump: SparkListenerExecutorThreadDump => - foreachListener(_.onExecutorThreadDump(threadDump)) case SparkListenerShutdown => } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 58057c77f07b7..a129a434c9a1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.ThreadStackTrace /** * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl. @@ -66,12 +65,7 @@ private[spark] trait TaskScheduler { * indicating that the block manager should re-register. */ def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], - blockManagerId: BlockManagerId): Boolean - - /** - * Called when a thread dump has been received from an executor. - */ - def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) + blockManagerId: BlockManagerId): Boolean /** * Get an application ID associated with the job. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7c66a23af616d..2b39c7fc872da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -31,9 +31,10 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.util.{ThreadStackTrace, Utils} +import org.apache.spark.util.Utils import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId +import akka.actor.Props /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. @@ -341,10 +342,6 @@ private[spark] class TaskSchedulerImpl( dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) } - override def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) { - dagScheduler.executorThreadDumpReceived(execId, stackTraces) - } - def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) { taskSetManager.handleTaskGettingResult(tid) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index c69af69187b3e..58b78f041cd85 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -47,7 +47,7 @@ private[spark] class LocalActor( private var freeCores = totalCores - private val localExecutorId = "" + private val localExecutorId = "localhost" private val localExecutorHostname = "localhost" val executor = new Executor( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index d08e1419e3e41..b63c7f191155c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -88,6 +88,10 @@ class BlockManagerMaster( askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId)) } + def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = { + askDriverWithReply[Option[(String, Int)]](GetActorSystemHostPortForExecutor(executorId)) + } + /** * Remove a block from the slaves that have it. This can only be used to remove * blocks that the driver knows about. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 5e375a2553979..f8f1c8ae1dc83 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -86,6 +86,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case GetPeers(blockManagerId) => sender ! getPeers(blockManagerId) + case GetActorSystemHostPortForExecutor(executorId) => + sender ! getActorSystemHostPortForExecutor(executorId) + case GetMemoryStatus => sender ! memoryStatus @@ -412,6 +415,17 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus Seq.empty } } + + private def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = { + for ( + blockManagerId <- blockManagerIdByExecutor.get(executorId); + info <- blockManagerInfo.get(blockManagerId); + host <- info.slaveActor.path.address.host; + port <- info.slaveActor.path.address.port + ) yield { + (host, port) + } + } } @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 291ddfcc113ac..3f32099d08cc9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -92,6 +92,8 @@ private[spark] object BlockManagerMessages { case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster + case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster + case class RemoveExecutor(execId: String) extends ToBlockManagerMaster case object StopBlockManagerMaster extends ToBlockManagerMaster diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 6cd16e67824b6..edd6d1fef623b 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -28,7 +28,8 @@ import org.apache.spark.ui.{SparkUI, SparkUITab} private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { val executorsListener = parent.executorsListener val jobProgressListener = parent.jobProgressListener - val threadDumpEnabled = parent.conf.getBoolean("spark.executor.sendThreadDumps", true) + val threadDumpEnabled = parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) + val sc = parent.sc attachPage(new ExecutorsPage(this, threadDumpEnabled)) if (threadDumpEnabled) { diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala index 64e7f1d2a4a8c..006d0c51532cd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala @@ -19,20 +19,23 @@ package org.apache.spark.ui.exec import javax.servlet.http.HttpServletRequest +import scala.util.Try import scala.xml.{Text, Node} import org.apache.spark.ui.{UIUtils, WebUIPage} class ThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") { + private val sc = parent.sc + def render(request: HttpServletRequest): Seq[Node] = { val executorId = Option(request.getParameter("executorId")).getOrElse { return Text(s"Missing executorId parameter") } - val maybeThreadDump = parent.jobProgressListener synchronized { - parent.jobProgressListener.executorIdToLastThreadDump.get(executorId) - } - val content = maybeThreadDump.map { case (time, threadDump) => + val time = System.currentTimeMillis() + val maybeThreadDump = Try(sc.get.getExecutorThreadDump(executorId)) + + val content = maybeThreadDump.map { threadDump => val dumpRows = threadDump.map { thread =>
@@ -62,7 +65,7 @@ class ThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") { }
{dumpRows}
- }.getOrElse(Text("No thread dump to display")) + }.getOrElse(Text("Error fetching thread dump")) UIUtils.headerSparkPage(s"Thread dump for executor $executorId", content, parent) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index a1a0b1c0d7fee..b5207360510dd 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -26,7 +26,6 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.ui.jobs.UIData._ -import org.apache.spark.util.ThreadStackTrace /** * :: DeveloperApi :: @@ -66,9 +65,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() - /** Map entries are (last update timestamp, thread dump) pairs */ - val executorIdToLastThreadDump = HashMap[String, (Long, Array[ThreadStackTrace])]() - var schedulingMode: Option[SchedulingMode] = None def blockManagerIds = executorIdToBlockManagerId.values.toSeq @@ -269,11 +265,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.executorRunTime += timeDelta } - override def onExecutorThreadDump(executorThreadDump: SparkListenerExecutorThreadDump) { - val timeAndThreadDump = (System.currentTimeMillis(), executorThreadDump.threadStackTraces) - executorIdToLastThreadDump.put(executorThreadDump.execId, timeAndThreadDump) - } - override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) { val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), { diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index f41c8d0315cb3..625f490192c4d 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -201,4 +201,18 @@ private[spark] object AkkaUtils extends Logging { logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } + + def makeExecutorRef( + name: String, + conf: SparkConf, + host: String, + port: Int, + actorSystem: ActorSystem): ActorRef = { + val executorActorSystemName = SparkEnv.executorActorSystemName + Utils.checkHost(host, "Expected hostname") + val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name" + val timeout = AkkaUtils.lookupTimeout(conf) + logInfo(s"Connecting to $name: $url") + Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) + } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8bd8e9412cd67..5b2e7d3a7edb9 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -74,7 +74,6 @@ private[spark] object JsonProtocol { // These aren't used, but keeps compiler happy case SparkListenerShutdown => JNothing case SparkListenerExecutorMetricsUpdate(_, _) => JNothing - case SparkListenerExecutorThreadDump(_, _) => JNothing } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index bde6e9b87f4f8..a2e4f712db55b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{CallSite, ThreadStackTrace} +import org.apache.spark.util.CallSite import org.apache.spark.executor.TaskMetrics class BuggyDAGEventProcessActor extends Actor { @@ -83,7 +83,6 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def stop() = {} override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true - override def executorThreadDumpReceived(execId: String, threadDump: Array[ThreadStackTrace]) {} override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) @@ -374,8 +373,6 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def defaultParallelism() = 2 override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true - override def executorThreadDumpReceived(execId: String, threadDump: Array[ThreadStackTrace]) { - } } val noKillScheduler = new DAGScheduler( sc, diff --git a/docs/configuration.md b/docs/configuration.md index d4c80bcc0fc91..4e8e3769b600f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -394,6 +394,13 @@ Apart from these, the following properties are also available, and may be useful Allows stages and corresponding jobs to be killed from the web ui. + + spark.ui.threadDumpsEnabled + true + + Allows executor and driver thread dumps to be collected and viewed from the web ui. + + spark.eventLog.enabled false @@ -655,13 +662,6 @@ Apart from these, the following properties are also available, and may be useful the driver know that the executor is still alive and update it with metrics for in-progress tasks. - - spark.executor.sendThreadDumps - true - If set to true, executors will periodically send thread dumps to the driver for display - in the web UI. The frequency of these dumps is controlled by - spark.executor.heartbeatInterval./td> - #### Networking From bc1e675f0204e6f111cf7ec49cdb318150ecef54 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 28 Oct 2014 23:06:15 -0700 Subject: [PATCH 09/13] Undo some leftover changes from the earlier approach. --- .../main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 2 +- .../src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 39dac30f7f5e0..048fee3ce1ff4 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -45,7 +45,7 @@ private[ui] class ExecutorsPage( parent: ExecutorsTab, threadDumpEnabled: Boolean) extends WebUIPage("") { - private val listener = parent.executorsListener + private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index edd6d1fef623b..902c56ebe1bb6 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -26,8 +26,7 @@ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.{SparkUI, SparkUITab} private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { - val executorsListener = parent.executorsListener - val jobProgressListener = parent.jobProgressListener + val listener = parent.executorsListener val threadDumpEnabled = parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) val sc = parent.sc From 3dfc2d46ec018a140d86d5f0931d4fc3936f0963 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 29 Oct 2014 14:56:15 -0700 Subject: [PATCH 10/13] Add missing file. --- .../apache/spark/executor/ExecutorActor.scala | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala new file mode 100644 index 0000000000000..6508c278e663c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import akka.actor.Actor +import org.apache.spark.Logging + +import org.apache.spark.util.{Utils, ActorLogReceive} + +/** + * Driver -> Executor message to trigger a thread dump. + */ +private[spark] case class TriggerThreadDump() + +/** + * Actor that runs inside of executors to enable driver -> executor RPC. + */ +private[spark] +class ExecutorActor(executorId: String) extends Actor with ActorLogReceive with Logging { + + override def receiveWithLogging = { + case TriggerThreadDump() => + sender ! Utils.getThreadDump() + } + +} From 127a1306f733fa00af7e2c320a9a30d60d480ff7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 31 Oct 2014 10:41:25 -0700 Subject: [PATCH 11/13] Update to use SparkContext.DRIVER_IDENTIFIER --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5460122a28400..8105baff2352d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -363,7 +363,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { /** Called by the web UI to obtain executor thread dumps */ private[spark] def getExecutorThreadDump(executorId: String): Array[ThreadStackTrace] = { - if (executorId == "") { + if (executorId == SparkContext.DRIVER_IDENTIFIER) { Utils.getThreadDump() } else { val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get From 19707b04d58a32d8951c08e7cae753c381dfaa57 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 31 Oct 2014 10:44:26 -0700 Subject: [PATCH 12/13] Add one comment. --- .../org/apache/spark/storage/BlockManagerMasterActor.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index f8f1c8ae1dc83..685b2e11440fb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -416,6 +416,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } } + /** + * Returns the hostname and port of an executor's actor system, based on the Akka address of its + * BlockManagerSlaveActor. + */ private def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = { for ( blockManagerId <- blockManagerIdByExecutor.get(executorId); From 3c21a5dcd7be7ad648d9e17b6643a72404cf34a4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 3 Nov 2014 14:38:34 -0800 Subject: [PATCH 13/13] Address review comments: - Rename ThreadDumpPage -> ExecutorThreadDumpPage - Make page private[ui] - Make TriggerThreadDump into a case object - Rename fields in ThreadStackTrace - Use ThreadMxBean to obtain thread dumps instead of Thread.getAllStackTraces() - Remove documentation of spark.ui.threadDumpsEnabled configuration, but leave the option as an internal configuration - Guard against exceptions in SparkContext.getExecutorThreadDump() - Disable thread dump page and button in history server. --- .../scala/org/apache/spark/SparkContext.scala | 29 +++++++++++++------ .../apache/spark/executor/ExecutorActor.scala | 4 +-- ...age.scala => ExecutorThreadDumpPage.scala} | 10 ++++--- .../apache/spark/ui/exec/ExecutorsTab.scala | 5 ++-- .../apache/spark/util/ThreadStackTrace.scala | 6 +++- .../scala/org/apache/spark/util/Utils.scala | 12 +++++--- docs/configuration.md | 7 ----- 7 files changed, 44 insertions(+), 29 deletions(-) rename core/src/main/scala/org/apache/spark/ui/exec/{ThreadDumpPage.scala => ExecutorThreadDumpPage.scala} (87%) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1ae70d55298f8..40444c237b738 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -361,15 +361,26 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { override protected def childValue(parent: Properties): Properties = new Properties(parent) } - /** Called by the web UI to obtain executor thread dumps */ - private[spark] def getExecutorThreadDump(executorId: String): Array[ThreadStackTrace] = { - if (executorId == SparkContext.DRIVER_IDENTIFIER) { - Utils.getThreadDump() - } else { - val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get - val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem) - AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump(), actorRef, - AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)) + /** + * Called by the web UI to obtain executor thread dumps. This method may be expensive. + * Logs an error and returns None if we failed to obtain a thread dump, which could occur due + * to an executor being dead or unresponsive or due to network issues while sending the thread + * dump message back to the driver. + */ + private[spark] def getExecutorThreadDump(executorId: String): Option[Array[ThreadStackTrace]] = { + try { + if (executorId == SparkContext.DRIVER_IDENTIFIER) { + Some(Utils.getThreadDump()) + } else { + val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get + val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem) + Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, actorRef, + AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf))) + } + } catch { + case e: Exception => + logError(s"Exception getting thread dump from executor $executorId", e) + None } } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala index 6508c278e663c..41925f7e97e84 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala @@ -25,7 +25,7 @@ import org.apache.spark.util.{Utils, ActorLogReceive} /** * Driver -> Executor message to trigger a thread dump. */ -private[spark] case class TriggerThreadDump() +private[spark] case object TriggerThreadDump /** * Actor that runs inside of executors to enable driver -> executor RPC. @@ -34,7 +34,7 @@ private[spark] class ExecutorActor(executorId: String) extends Actor with ActorLogReceive with Logging { override def receiveWithLogging = { - case TriggerThreadDump() => + case TriggerThreadDump => sender ! Utils.getThreadDump() } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala similarity index 87% rename from core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala rename to core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala index 006d0c51532cd..e9c755e36f716 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala @@ -24,7 +24,7 @@ import scala.xml.{Text, Node} import org.apache.spark.ui.{UIUtils, WebUIPage} -class ThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") { +private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") { private val sc = parent.sc @@ -33,17 +33,19 @@ class ThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") { return Text(s"Missing executorId parameter") } val time = System.currentTimeMillis() - val maybeThreadDump = Try(sc.get.getExecutorThreadDump(executorId)) + val maybeThreadDump = sc.get.getExecutorThreadDump(executorId) val content = maybeThreadDump.map { threadDump => val dumpRows = threadDump.map { thread => diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index e8e242d4e4b34..ba97630f025c1 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -27,12 +27,13 @@ import org.apache.spark.ui.{SparkUI, SparkUITab} private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { val listener = parent.executorsListener - val threadDumpEnabled = parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) val sc = parent.sc + val threadDumpEnabled = + sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) attachPage(new ExecutorsPage(this, threadDumpEnabled)) if (threadDumpEnabled) { - attachPage(new ThreadDumpPage(this)) + attachPage(new ExecutorThreadDumpPage(this)) } } diff --git a/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala b/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala index f9514097f2b89..d4e0ad93b966a 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala @@ -20,4 +20,8 @@ package org.apache.spark.util /** * Used for shipping per-thread stacktraces from the executors to driver. */ -private[spark] case class ThreadStackTrace(id: Long, name: String, state: String, trace: String) +private[spark] case class ThreadStackTrace( + threadId: Long, + threadName: String, + threadState: Thread.State, + stackTrace: String) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 49547abb1935d..6ab94af9f3739 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import java.io._ +import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer import java.util.jar.Attributes.Name @@ -1613,10 +1614,13 @@ private[spark] object Utils extends Logging { /** Return a thread dump of all threads' stacktraces. Used to capture dumps for the web UI */ def getThreadDump(): Array[ThreadStackTrace] = { - Thread.getAllStackTraces.toArray.sortBy(_._1.getId).map { - case (thread, stackElements) => - val stackTrace = stackElements.map(_.toString).mkString("\n") - ThreadStackTrace(thread.getId, thread.getName, thread.getState.toString, stackTrace) + // We need to filter out null values here because dumpAllThreads() may return null array + // elements for threads that are dead / don't exist. + val threadInfos = ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != null) + threadInfos.sortBy(_.getThreadId).map { case threadInfo => + val stackTrace = threadInfo.getStackTrace.map(_.toString).mkString("\n") + ThreadStackTrace(threadInfo.getThreadId, threadInfo.getThreadName, + threadInfo.getThreadState, stackTrace) } } diff --git a/docs/configuration.md b/docs/configuration.md index bc4ca0d760d25..685101ea5c9c9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -416,13 +416,6 @@ Apart from these, the following properties are also available, and may be useful Allows stages and corresponding jobs to be killed from the web ui. - - spark.ui.threadDumpsEnabled - true - - Allows executor and driver thread dumps to be collected and viewed from the web ui. - - spark.eventLog.enabled false