Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-611] Display executor thread dumps in web UI #2944

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import scala.language.implicitConversions

import java.io._
import java.net.URI
import java.util.Arrays
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
Expand All @@ -41,6 +40,7 @@ import akka.actor.Props
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand All @@ -51,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
import org.apache.spark.util._

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -361,6 +361,29 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}

/**
* Called by the web UI to obtain executor thread dumps. This method may be expensive.
* Logs an error and returns None if we failed to obtain a thread dump, which could occur due
* to an executor being dead or unresponsive or due to network issues while sending the thread
* dump message back to the driver.
*/
private[spark] def getExecutorThreadDump(executorId: String): Option[Array[ThreadStackTrace]] = {
try {
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
Some(Utils.getThreadDump())
} else {
val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem)
Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, actorRef,
AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)))
}
} catch {
case e: Exception =>
logError(s"Exception getting thread dump from executor $executorId", e)
None
}
}

private[spark] def getLocalProperties: Properties = localProperties.get()

private[spark] def setLocalProperties(props: Properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Create a new ActorSystem using driver's Spark properties to run the backend.
val driverConf = new SparkConf().setAll(props)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf))
SparkEnv.executorActorSystemName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, we had this variable but we just never used it.

hostname, port, driverConf, new SecurityManager(driverConf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import akka.actor.ActorSystem
import akka.actor.{Props, ActorSystem}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -92,6 +92,10 @@ private[spark] class Executor(
}
}

// Create an actor for receiving RPCs from the driver
private val executorActor = env.actorSystem.actorOf(
Props(new ExecutorActor(executorId)), "ExecutorActor")

// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
private val urlClassLoader = createClassLoader()
Expand Down Expand Up @@ -131,6 +135,7 @@ private[spark] class Executor(

def stop() {
env.metricsSystem.report()
env.actorSystem.stop(executorActor)
isStopped = true
threadPool.shutdown()
if (!isLocal) {
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import akka.actor.Actor
import org.apache.spark.Logging

import org.apache.spark.util.{Utils, ActorLogReceive}

/**
* Driver -> Executor message to trigger a thread dump.
*/
private[spark] case object TriggerThreadDump

/**
* Actor that runs inside of executors to enable driver -> executor RPC.
*/
private[spark]
class ExecutorActor(executorId: String) extends Actor with ActorLogReceive with Logging {

override def receiveWithLogging = {
case TriggerThreadDump =>
sender ! Utils.getThreadDump()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class BlockManagerMaster(
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
askDriverWithReply[Option[(String, Int)]](GetActorSystemHostPortForExecutor(executorId))
}

/**
* Remove a block from the slaves that have it. This can only be used to remove
* blocks that the driver knows about.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetPeers(blockManagerId) =>
sender ! getPeers(blockManagerId)

case GetActorSystemHostPortForExecutor(executorId) =>
sender ! getActorSystemHostPortForExecutor(executorId)

case GetMemoryStatus =>
sender ! memoryStatus

Expand Down Expand Up @@ -412,6 +415,21 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
Seq.empty
}
}

/**
* Returns the hostname and port of an executor's actor system, based on the Akka address of its
* BlockManagerSlaveActor.
*/
private def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This verbosely-named method is the big "hack" for enabling driver -> executor RPC. Basically, I needed to have a way to address the remote ExecutorActor from the driver. Here, we rely on the fact that every executor registers a BlockManager actor with the BlockManagerMasterActor and that there is only one actor system per executor / driver.

for (
blockManagerId <- blockManagerIdByExecutor.get(executorId);
info <- blockManagerInfo.get(blockManagerId);
host <- info.slaveActor.path.address.host;
port <- info.slaveActor.path.address.port
) yield {
(host, port)
}
}
}

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ private[spark] object BlockManagerMessages {

case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster

case class RemoveExecutor(execId: String) extends ToBlockManagerMaster

case object StopBlockManagerMaster extends ToBlockManagerMaster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui.exec

import javax.servlet.http.HttpServletRequest

import scala.util.Try
import scala.xml.{Text, Node}

import org.apache.spark.ui.{UIUtils, WebUIPage}

private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") {

private val sc = parent.sc

def render(request: HttpServletRequest): Seq[Node] = {
val executorId = Option(request.getParameter("executorId")).getOrElse {
return Text(s"Missing executorId parameter")
}
val time = System.currentTimeMillis()
val maybeThreadDump = sc.get.getExecutorThreadDump(executorId)

val content = maybeThreadDump.map { threadDump =>
val dumpRows = threadDump.map { thread =>
<div class="accordion-group">
<div class="accordion-heading" onclick="$(this).next().toggleClass('hidden')">
<a class="accordion-toggle">
Thread {thread.threadId}: {thread.threadName} ({thread.threadState})
</a>
</div>
<div class="accordion-body hidden">
<div class="accordion-inner">
<pre>{thread.stackTrace}</pre>
</div>
</div>
</div>
}

<div class="row-fluid">
<p>Updated at {UIUtils.formatDate(time)}</p>
{
// scalastyle:off
<p><a class="expandbutton"
onClick="$('.accordion-body').removeClass('hidden'); $('.expandbutton').toggleClass('hidden')">
Expand All
</a></p>
<p><a class="expandbutton hidden"
onClick="$('.accordion-body').addClass('hidden'); $('.expandbutton').toggleClass('hidden')">
Collapse All
</a></p>
// scalastyle:on
}
<div class="accordion">{dumpRows}</div>
</div>
}.getOrElse(Text("Error fetching thread dump"))
UIUtils.headerSparkPage(s"Thread dump for executor $executorId", content, parent)
}
}
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ private case class ExecutorSummaryInfo(
totalShuffleWrite: Long,
maxMemory: Long)

private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
private[ui] class ExecutorsPage(
parent: ExecutorsTab,
threadDumpEnabled: Boolean)
extends WebUIPage("") {
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
Expand Down Expand Up @@ -75,6 +78,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
Shuffle Write
</span>
</th>
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
{execInfoSorted.map(execRow)}
Expand Down Expand Up @@ -133,6 +137,15 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
<td sorttable_customkey={info.totalShuffleWrite.toString}>
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
{
if (threadDumpEnabled) {
<td>
<a href={s"threadDump/?executorId=${info.id}"}>Thread Dump</a>
</td>
} else {
Seq.empty
}
}
</tr>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}

private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
val listener = parent.executorsListener
val sc = parent.sc
val threadDumpEnabled =
sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)

attachPage(new ExecutorsPage(this))
attachPage(new ExecutorsPage(this, threadDumpEnabled))
if (threadDumpEnabled) {
attachPage(new ExecutorThreadDumpPage(this))
}
}

/**
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,18 @@ private[spark] object AkkaUtils extends Logging {
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}

def makeExecutorRef(
name: String,
conf: SparkConf,
host: String,
port: Int,
actorSystem: ActorSystem): ActorRef = {
val executorActorSystemName = SparkEnv.executorActorSystemName
Utils.checkHost(host, "Expected hostname")
val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
}
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

/**
* Used for shipping per-thread stacktraces from the executors to driver.
*/
private[spark] case class ThreadStackTrace(
threadId: Long,
threadName: String,
threadState: Thread.State,
stackTrace: String)
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.util

import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.jar.Attributes.Name
Expand Down Expand Up @@ -1611,6 +1612,18 @@ private[spark] object Utils extends Logging {
s"$className: $desc\n$st"
}

/** Return a thread dump of all threads' stacktraces. Used to capture dumps for the web UI */
def getThreadDump(): Array[ThreadStackTrace] = {
// We need to filter out null values here because dumpAllThreads() may return null array
// elements for threads that are dead / don't exist.
val threadInfos = ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != null)
threadInfos.sortBy(_.getThreadId).map { case threadInfo =>
val stackTrace = threadInfo.getStackTrace.map(_.toString).mkString("\n")
ThreadStackTrace(threadInfo.getThreadId, threadInfo.getThreadName,
threadInfo.getThreadState, stackTrace)
}
}

/**
* Convert all spark properties set in the given SparkConf to a sequence of java options.
*/
Expand Down