Skip to content

Commit

Permalink
Imports, comments, and code formatting (minor)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Mar 11, 2014
1 parent 77ba283 commit dc93915
Show file tree
Hide file tree
Showing 35 changed files with 124 additions and 118 deletions.
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.serializer.{Serializer, SerializerManager}
import org.apache.spark.storage._
import org.apache.spark.util.{AkkaUtils, Utils}


/**
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
Expand Down Expand Up @@ -251,7 +250,7 @@ object SparkEnv extends Logging {
/**
* Return a map representation of jvm information, Spark properties, system properties, and
* class paths. Map keys define the category, and map values represent the corresponding
* attributes as a sequence of KV pairs.
* attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate.
*/
private[spark]
def environmentDetails(
Expand All @@ -274,12 +273,11 @@ object SparkEnv extends Logging {
}
val sparkProperties = conf.getAll.sorted ++ additionalFields

// System properties that are not java classpaths
val systemProperties = System.getProperties.iterator.toSeq
val classPathProperty = systemProperties.find { case (k, v) =>
k == "java.class.path"
}.getOrElse(("", ""))

// System properties that are not java classpaths
val otherProperties = systemProperties.filter { case (k, v) =>
k != "java.class.path" && !k.startsWith("spark.")
}.sorted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// If application events are logged, use them to rebuild the UI
startPersistedSparkUI(app).map { ui =>
app.desc.appUiUrl = ui.basePath
webUi.attachUI(ui)
appIdToUI(app.id) = ui
webUi.attachUI(ui)
}.getOrElse {
// Avoid broken links if the UI is not reconstructed
app.desc.appUiUrl = ""
Expand All @@ -640,7 +640,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}

/**
* Start a new SparkUI rendered from persisted storage. If unsuccessful for any reason,
* Start a new SparkUI rendered from persisted storage. If this is unsuccessful for any reason,
* return None. Otherwise return the reconstructed UI.
*/
def startPersistedSparkUI(app: ApplicationInfo): Option[SparkUI] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
private val host = Utils.localHostName()
private val port = requestedPort

val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
var server: Option[Server] = None
var boundPort: Option[Int] = None

private val host = Utils.localHostName()
private val port = requestedPort
private val applicationPage = new ApplicationPage(this)
private val indexPage = new IndexPage(this)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,11 @@ class DAGScheduler(
val activeInGroup = activeJobs.filter(activeJob =>
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach { handleJobCancellation }
jobIds.foreach(handleJobCancellation)

case AllJobsCancelled =>
// Cancel all running jobs.
runningStages.map(_.jobId).foreach { handleJobCancellation }
runningStages.map(_.jobId).foreach(handleJobCancellation)
activeJobs.clear() // These should already be empty by this point,
stageIdToActiveJob.clear() // but just in case we lost track of some jobs...

Expand Down Expand Up @@ -1094,11 +1094,11 @@ class DAGScheduler(
"stageToInfos" -> stageToInfos,
"jobIdToStageIds" -> jobIdToStageIds,
"stageIdToJobIds" -> stageIdToJobIds).
foreach { case(s, t) => {
val sizeBefore = t.size
t.clearOldValues(cleanupTime)
logInfo("%s %d --> %d".format(s, sizeBefore, t.size))
}}
foreach { case(s, t) =>
val sizeBefore = t.size
t.clearOldValues(cleanupTime)
logInfo("%s %d --> %d".format(s, sizeBefore, t.size))
}
}

def stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* A SparkListenerEvent bus that relays events to its listeners.
* A SparkListenerEvent bus that relays events to its listeners
*/
private[spark] trait EventBus {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ import org.apache.spark.executor.TaskMetrics
* for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass
* of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext
* is created. Note that each JobLogger only works for one SparkContext
*
* NOTE: The functionality of this class is heavily stripped down to accommodate for a general
* refactor of the SparkListener interface. In its place, the EventLoggingListener is introduced
* to log application information as SparkListenerEvents through the SparkUI. To enable this
* functionality, set spark.eventLog.enabled to true.
*/

class JobLogger(val user: String, val logDirName: String)
extends SparkListener with Logging {
@deprecated("Log application information by setting spark.eventLog.enabled.", "1.0.0")
class JobLogger(val user: String, val logDirName: String) extends SparkListener with Logging {

def this() = this(System.getProperty("user.name", "<unknown>"),
String.valueOf(System.currentTimeMillis()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import java.util.Properties

import scala.collection.Map

import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.{BlockManagerId, StorageStatus}
import org.apache.spark.storage.BlockManagerId

sealed trait SparkListenerEvent

Expand All @@ -37,8 +37,13 @@ case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends Spar

case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

case class SparkListenerTaskEnd(stageId: Int, taskType: String, reason: TaskEndReason,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) extends SparkListenerEvent
case class SparkListenerTaskEnd(
stageId: Int,
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
taskMetrics: TaskMetrics)
extends SparkListenerEvent

case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null)
extends SparkListenerEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import org.apache.hadoop.fs.{Path, FileSystem}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.util.{Utils, JsonProtocol}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}

/**
* An EventBus that replays logged events from persisted storage.
* An EventBus that replays logged events from persisted storage
*/
private[spark] class SparkReplayerBus(conf: SparkConf) extends EventBus with Logging {
private val compressed = conf.getBoolean("spark.eventLog.compress", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ class StageInfo(
val name: String,
val numTasks: Int,
val rddInfo: RDDInfo,
val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] =
mutable.Buffer[(TaskInfo, TaskMetrics)]()) {
val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer.empty) {

/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler.cluster

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.deploy.{Command, ApplicationDescription}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.util.Utils
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.storage
import java.io.{File, InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{HashMap, ArrayBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Random
Expand Down Expand Up @@ -122,11 +122,11 @@ private[spark] class BlockManager(
* Construct a BlockManager with a memory limit set based on system properties.
*/
def this(
execId: String,
actorSystem: ActorSystem,
master: BlockManagerMaster,
serializer: Serializer,
conf: SparkConf) = {
execId: String,
actorSystem: ActorSystem,
master: BlockManagerMaster,
serializer: Serializer,
conf: SparkConf) = {
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
case None =>
blockManagerIdByExecutor(id.executorId) = id
}
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
maxMemSize, slaveActor)
blockManagerInfo(id) =
new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
}
val blockManagerGained = SparkListenerBlockManagerGained(id, maxMemSize)
statusListener.foreach(_.onBlockManagerGained(blockManagerGained))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.storage

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.scheduler._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,5 +284,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

private case class ResultWithDroppedBlocks(success: Boolean,
droppedBlocks: Seq[(BlockId, BlockStatus)])
private case class ResultWithDroppedBlocks(
success: Boolean,
droppedBlocks: Seq[(BlockId, BlockStatus)])
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.spark.storage

import scala.collection.mutable
import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.SparkContext
import org.apache.spark.util.Utils


private[spark]
class StorageStatus(
val blockManagerId: BlockManagerId,
Expand Down Expand Up @@ -74,7 +73,7 @@ object StorageUtils {
/** Returns RDD-level information from a list of StorageStatus objects and SparkContext */
def rddInfoFromStorageStatus(
storageStatusList: Seq[StorageStatus],
sc: SparkContext) : Array[RDDInfo] = {
sc: SparkContext): Array[RDDInfo] = {
val blockStatusMap = blockStatusMapFromStorageStatus(storageStatusList)
val rddInfoList = rddInfoFromSparkContext(blockStatusMap.keys.toSeq, sc)
val rddInfoMap = rddInfoList.map { info => (info.id, info) }.toMap
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.eclipse.jetty.server.{Handler, Server}
import org.eclipse.jetty.server.handler.ContextHandlerCollection

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.scheduler.{EventLoggingListener, EventLoggingInfo, SparkReplayerBus}
import org.apache.spark.scheduler.{EventLoggingInfo, EventLoggingListener, SparkReplayerBus}
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.env.EnvironmentUI
Expand All @@ -38,6 +38,8 @@ private[spark] class SparkUI(
val basePath: String = "")
extends Logging {

import SparkUI._

def this(sc: SparkContext) = this(sc, sc.conf, sc.appName)
def this(conf: SparkConf, appName: String) = this(null, conf, appName)
def this(conf: SparkConf, appName: String, basePath: String) =
Expand All @@ -47,7 +49,7 @@ private[spark] class SparkUI(
val live = sc != null

private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
private val port = conf.get("spark.ui.port", DEFAULT_PORT).toInt
private var boundPort: Option[Int] = None
private var server: Option[Server] = None
private var started = false
Expand All @@ -69,7 +71,7 @@ private[spark] class SparkUI(
exec.getHandlers ++
metricsServletHandlers ++
Seq[(String, Handler)](
("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)),
("/static", createStaticHandler(STATIC_RESOURCE_DIR)),
("/", createRedirectHandler("/stages", basePath))
)
}
Expand Down Expand Up @@ -165,6 +167,5 @@ private[spark] class SparkUI(

private[spark] object SparkUI {
val DEFAULT_PORT = "4040"
val DEFAULT_PERSISTED_PORT = "14040"
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIReloader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import org.apache.spark.SparkConf
object UIReloader {
def main(args: Array[String]) {
if (args.length < 1) {
println("Usage: ./bin/spark-class org.apache.spark.ui.UIReloader [log path] [port]")
println("Usage: ./bin/spark-class org.apache.spark.ui.UIReloader [log path]")
System.exit(1)
}

val conf = new SparkConf()
conf.set("spark.ui.port", "14040")
val ui = new SparkUI(conf, "Reloaded Application")
val ui = new SparkUI(conf, "My Application")
ui.bind()
ui.start()
val success = ui.renderFromPersistedStorage(args(0))
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.xml.Node

/** Utility functions for generating XML pages with spark content. */
private[spark] object UIUtils {

import Page._

// Yarn has to go through a proxy so the base uri is provided and has to be on all links
Expand Down Expand Up @@ -62,8 +63,8 @@ private[spark] object UIUtils {
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
<link rel="stylesheet" href={prependBaseUri(basePath, "/static/bootstrap.min.css")}
type="text/css" />
<link rel="stylesheet"
href={prependBaseUri(basePath, "/static/webui.css")} type="text/css" />
<link rel="stylesheet" href={prependBaseUri(basePath, "/static/webui.css")}
type="text/css" />
<script src={prependBaseUri(basePath, "/static/sorttable.js")} ></script>
<title>{appName} - {title}</title>
</head>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[ui] class EnvironmentUI(parent: SparkUI) {
lazy val listener = _listener.get

def start() {
_listener = Some(new EnvironmentListener())
_listener = Some(new EnvironmentListener)
}

def getHandlers = Seq[(String, Handler)](
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.Page.Executors
import org.apache.spark.ui._
import org.apache.spark.ui.{SparkUI, UIUtils}
import org.apache.spark.util.Utils

private[ui] class ExecutorsUI(parent: SparkUI) {
Expand Down Expand Up @@ -67,11 +67,11 @@ private[ui] class ExecutorsUI(parent: SparkUI) {
</ul>
</div>
</div>
<div class = "row">
<div class="span12">
{execTable}
</div>
</div>;
<div class = "row">
<div class="span12">
{execTable}
</div>
</div>;

UIUtils.headerSparkPage(
content, basePath, appName, "Executors (" + execInfo.size + ")", Executors)
Expand Down
Loading

0 comments on commit dc93915

Please sign in to comment.