Skip to content

Commit

Permalink
Expose SparkListeners and relevant classes as DeveloperApi
Browse files Browse the repository at this point in the history
Hopefully this can go into 1.0, as a few people on the user list have asked for this.

Author: Andrew Or <andrewor14@gmail.com>

Closes #648 from andrewor14/expose-listeners and squashes the following commits:

e45e1ef [Andrew Or] Add missing colons (minor)
350d643 [Andrew Or] Expose SparkListeners and relevant classes as DeveloperApi
(cherry picked from commit ea10b31)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
  • Loading branch information
andrewor14 authored and pwendell committed May 6, 2014
1 parent 01e3ff0 commit a5f765c
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 33 deletions.
24 changes: 17 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ package org.apache.spark.storage

import java.util.UUID

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Identifies a particular Block of data, usually associated with a single file.
* A Block can be uniquely identified by its filename, but each type of Block has a different
* set of keys which produce its unique name.
*
* If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
*/
private[spark] sealed abstract class BlockId {
@DeveloperApi
sealed abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String

Expand All @@ -44,24 +48,29 @@ private[spark] sealed abstract class BlockId {
}
}

private[spark] case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
def name = "rdd_" + rddId + "_" + splitIndex
}

private[spark] case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
extends BlockId {
def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

private[spark] case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
@DeveloperApi
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}

private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId {
@DeveloperApi
case class TaskResultBlockId(taskId: Long) extends BlockId {
def name = "taskresult_" + taskId
}

private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
@DeveloperApi
case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
def name = "input-" + streamId + "-" + uniqueId
}

Expand All @@ -75,7 +84,8 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
def name = "test_" + id
}

private[spark] object BlockId {
@DeveloperApi
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@ package org.apache.spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
* This class represent an unique identifier for a BlockManager.
* The first 2 constructors of this class is made private to ensure that
* BlockManagerId objects can be created only using the apply method in
* the companion object. This allows de-duplication of ID objects.
* Also, constructor parameters are private to ensure that parameters cannot
* be modified from outside this class.
*
* The first 2 constructors of this class is made private to ensure that BlockManagerId objects
* can be created only using the apply method in the companion object. This allows de-duplication
* of ID objects. Also, constructor parameters are private to ensure that parameters cannot be
* modified from outside this class.
*/
private[spark] class BlockManagerId private (
@DeveloperApi
class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
private var port_ : Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
Expand Down Expand Up @@ -411,7 +412,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
}

private[spark] case class BlockStatus(
@DeveloperApi
case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to
* keep the data in memory in a serialized format, and whether to replicate the RDD partitions on
* multiple nodes.
*
* The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
* factory method of the singleton object (`StorageLevel(...)`).
*/
@DeveloperApi
class StorageLevel private(
private var useDisk_ : Boolean,
private var useMemory_ : Boolean,
Expand All @@ -54,9 +57,9 @@ class StorageLevel private(
assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")

if (useOffHeap) {
require(useDisk == false, "Off-heap storage level does not support using disk")
require(useMemory == false, "Off-heap storage level does not support using heap memory")
require(deserialized == false, "Off-heap storage level does not support deserialized storage")
require(!useDisk, "Off-heap storage level does not support using disk")
require(!useMemory, "Off-heap storage level does not support using heap memory")
require(!deserialized, "Off-heap storage level does not support deserialized storage")
require(replication == 1, "Off-heap storage level does not support multiple replication")
}

Expand Down Expand Up @@ -146,7 +149,7 @@ object StorageLevel {

/**
* :: DeveloperApi ::
* Create a new StorageLevel object without setting useOffHeap
* Create a new StorageLevel object without setting useOffHeap.
*/
@DeveloperApi
def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean,
Expand All @@ -155,7 +158,7 @@ object StorageLevel {

/**
* :: DeveloperApi ::
* Create a new StorageLevel object
* Create a new StorageLevel object.
*/
@DeveloperApi
def apply(useDisk: Boolean, useMemory: Boolean,
Expand All @@ -164,15 +167,15 @@ object StorageLevel {

/**
* :: DeveloperApi ::
* Create a new StorageLevel object from its integer representation
* Create a new StorageLevel object from its integer representation.
*/
@DeveloperApi
def apply(flags: Int, replication: Int): StorageLevel =
getCachedStorageLevel(new StorageLevel(flags, replication))

/**
* :: DeveloperApi ::
* Read StorageLevel object from ObjectInput stream
* Read StorageLevel object from ObjectInput stream.
*/
@DeveloperApi
def apply(in: ObjectInput): StorageLevel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package org.apache.spark.storage

import scala.collection.mutable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._

/**
* A SparkListener that maintains executor storage status
* :: DeveloperApi ::
* A SparkListener that maintains executor storage status.
*/
private[spark] class StorageStatusListener extends SparkListener {
@DeveloperApi
class StorageStatusListener extends SparkListener {
private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()

def storageStatusList = executorIdToStorageStatus.values.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi

/** Storage information for each BlockManager. */
private[spark] class StorageStatus(
/**
* :: DeveloperApi ::
* Storage information for each BlockManager.
*/
@DeveloperApi
class StorageStatus(
val blockManagerId: BlockManagerId,
val maxMem: Long,
val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.ui.env

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.ui._

Expand All @@ -30,9 +31,11 @@ private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "envi
}

/**
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the EnvironmentTab
*/
private[ui] class EnvironmentListener extends SparkListener {
@DeveloperApi
class EnvironmentListener extends SparkListener {
var jvmInformation = Seq[(String, String)]()
var sparkProperties = Seq[(String, String)]()
var systemProperties = Seq[(String, String)]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.ui.exec
import scala.collection.mutable.HashMap

import org.apache.spark.ExceptionFailure
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.{SparkUI, WebUITab}
Expand All @@ -34,9 +35,11 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut
}

/**
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the ExecutorsTab
*/
private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
@DeveloperApi
class ExecutorsListener(storageStatusListener: StorageStatusListener)
extends SparkListener {

val executorToTasksActive = HashMap[String, Int]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

package org.apache.spark.ui.jobs

/** class for reporting aggregated metrics for each executors in stageUI */
private[ui] class ExecutorSummary {
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Class for reporting aggregated metrics for each executor in stage UI.
*/
@DeveloperApi
class ExecutorSummary {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable.{HashMap, ListBuffer}

import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, Success}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId

/**
* :: DeveloperApi ::
* Tracks task-level information to be displayed in the UI.
*
* All access to the data structures in this class must be synchronized on the
* class, since the UI thread and the EventBus loop may otherwise be reading and
* updating the internal data structures concurrently.
*/
private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
@DeveloperApi
class JobProgressListener(conf: SparkConf) extends SparkListener {

import JobProgressListener._

Expand Down Expand Up @@ -246,7 +249,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {

}

private[ui] case class TaskUIData(
@DeveloperApi
case class TaskUIData(
taskInfo: TaskInfo,
taskMetrics: Option[TaskMetrics] = None,
exception: Option[ExceptionFailure] = None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.ui.storage

import scala.collection.mutable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.ui._
import org.apache.spark.scheduler._
import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
Expand All @@ -35,9 +36,11 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
}

/**
* A SparkListener that prepares information to be displayed on the BlockManagerUI
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the BlockManagerUI.
*/
private[ui] class StorageListener(storageStatusListener: StorageStatusListener)
@DeveloperApi
class StorageListener(storageStatusListener: StorageStatusListener)
extends SparkListener {

private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
Expand Down

0 comments on commit a5f765c

Please sign in to comment.