Skip to content

Commit

Permalink
Remove StatusAPI mixin trait.
Browse files Browse the repository at this point in the history
This makes binary compatibility easier to reason about and might avoid
some pitfalls that I’ve run into while attempting to refactor other
parts of SparkContext to use mixin traits (see #3071, for example).

Requiring users to access status API methods through `sc.statusAPI.*`
also avoids SparkContext bloat and buys us extra freedom for adding
parallel higher / lower-level APIs.
  • Loading branch information
JoshRosen committed Nov 10, 2014
1 parent c6f4e70 commit c47e294
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 102 deletions.
68 changes: 67 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
Expand Down Expand Up @@ -61,7 +62,7 @@ import org.apache.spark.util._
* this config overrides the default configs as well as system properties.
*/

class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
class SparkContext(config: SparkConf) extends Logging {

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
Expand Down Expand Up @@ -228,6 +229,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
private[spark] val jobProgressListener = new JobProgressListener(conf)
listenerBus.addListener(jobProgressListener)

val statusAPI = SparkStatusAPI(this)

// Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Expand Down Expand Up @@ -1001,6 +1004,69 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
/** The version of Spark on which this application is running. */
def version = SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
(blockManagerId.host + ":" + blockManagerId.port, mem)
}
}

/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
rddInfos.filter(_.isCached)
}

/**
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
* Note that this does not necessarily mean the caching or computation was successful.
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap

/**
* :: DeveloperApi ::
* Return information about blocks stored in all of the slaves
*/
@DeveloperApi
def getExecutorStorageStatus: Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}

/**
* :: DeveloperApi ::
* Return pools for fair scheduler
*/
@DeveloperApi
def getAllPools: Seq[Schedulable] = {
// TODO(xiajunluan): We should take nested pools into account
taskScheduler.rootPool.schedulableQueue.toSeq
}

/**
* :: DeveloperApi ::
* Return the pool associated with the given name, if one exists
*/
@DeveloperApi
def getPoolForName(pool: String): Option[Schedulable] = {
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
}

/**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}

/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
Expand Down
92 changes: 18 additions & 74 deletions core/src/main/scala/org/apache/spark/SparkStatusAPI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,83 +17,21 @@

package org.apache.spark

import scala.collection.Map
import scala.collection.JavaConversions._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SchedulingMode, Schedulable}
import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo}

/**
* Trait that implements Spark's status APIs. This trait is designed to be mixed into
* SparkContext; it allows the status API code to live in its own file.
* Low-level status reporting APIs for monitoring job and stage progress.
*
* These APIs intentionally provide very weak consistency semantics; consumers of these APIs should
* be prepared to handle empty / missing information. For example, a job's stage ids may be known
* but the status API may not have any information about the details of those stages, so
* `getStageInfo` could potentially return `None` for a valid stage id.
*
* To limit memory usage, these APIs only provide information on recent jobs / stages. These APIs
* will provide information for the last `spark.ui.retainedStages` stages and
* `spark.ui.retainedJobs` jobs.
*/
private[spark] trait SparkStatusAPI { this: SparkContext =>

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
(blockManagerId.host + ":" + blockManagerId.port, mem)
}
}

/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
rddInfos.filter(_.isCached)
}

/**
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
* Note that this does not necessarily mean the caching or computation was successful.
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap

/**
* :: DeveloperApi ::
* Return information about blocks stored in all of the slaves
*/
@DeveloperApi
def getExecutorStorageStatus: Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}

/**
* :: DeveloperApi ::
* Return pools for fair scheduler
*/
@DeveloperApi
def getAllPools: Seq[Schedulable] = {
// TODO(xiajunluan): We should take nested pools into account
taskScheduler.rootPool.schedulableQueue.toSeq
}

/**
* :: DeveloperApi ::
* Return the pool associated with the given name, if one exists
*/
@DeveloperApi
def getPoolForName(pool: String): Option[Schedulable] = {
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
}

/**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}
class SparkStatusAPI private (sc: SparkContext) {

private val jobProgressListener = sc.jobProgressListener

/**
* Return a list of all known jobs in a particular job group. The returned list may contain
Expand Down Expand Up @@ -140,3 +78,9 @@ private[spark] trait SparkStatusAPI { this: SparkContext =>
}
}
}

private[spark] object SparkStatusAPI {
def apply(sc: SparkContext): SparkStatusAPI = {
new SparkStatusAPI(sc)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class JavaSparkContext(val sc: SparkContext)

private[spark] val env = sc.env

def statusAPI = JavaSparkStatusAPI(sc)

def isLocal: java.lang.Boolean = sc.isLocal

def sparkUser: String = sc.sparkUser
Expand Down Expand Up @@ -134,25 +136,6 @@ class JavaSparkContext(val sc: SparkContext)
/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions


/**
* Return a list of all known jobs in a particular job group. The returned list may contain
* running, failed, and completed jobs, and may vary across invocations of this method. This
* method does not guarantee the order of the elements in its result.
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup)

/**
* Returns job information, or `null` if the job info could not be found or was garbage collected.
*/
def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull

/**
* Returns stage information, or `null` if the stage info could not be found or was
* garbage collected.
*/
def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull

/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/scala/org/apache/spark/StatusAPISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext {
jobIds.head
}
val jobInfo = eventually(timeout(10 seconds)) {
sc.getJobInfo(jobId).get
sc.statusAPI.getJobInfo(jobId).get
}
jobInfo.status() should not be FAILED
val stageIds = jobInfo.stageIds()
stageIds.size should be(2)

val firstStageInfo = eventually(timeout(10 seconds)) {
sc.getStageInfo(stageIds(0)).get
sc.statusAPI.getStageInfo(stageIds(0)).get
}
firstStageInfo.stageId() should be(stageIds(0))
firstStageInfo.currentAttemptId() should be(0)
firstStageInfo.numTasks() should be(2)
eventually(timeout(10 seconds)) {
val updatedFirstStageInfo = sc.getStageInfo(stageIds(0)).get
val updatedFirstStageInfo = sc.statusAPI.getStageInfo(stageIds(0)).get
updatedFirstStageInfo.numCompletedTasks() should be(2)
updatedFirstStageInfo.numActiveTasks() should be(0)
updatedFirstStageInfo.numFailedTasks() should be(0)
Expand All @@ -59,20 +59,20 @@ class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext {

test("getJobIdsForGroup()") {
sc.setJobGroup("my-job-group", "description")
sc.getJobIdsForGroup("my-job-group") should be (Seq.empty)
sc.statusAPI.getJobIdsForGroup("my-job-group") should be (Seq.empty)
val firstJobFuture = sc.parallelize(1 to 1000).countAsync()
val firstJobId = eventually(timeout(10 seconds)) {
firstJobFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
sc.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
sc.statusAPI.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
}
val secondJobFuture = sc.parallelize(1 to 1000).countAsync()
val secondJobId = eventually(timeout(10 seconds)) {
secondJobFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
sc.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId))
sc.statusAPI.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public static void main(String[] args) throws Exception {
continue;
}
int currentJobId = jobIds.get(jobIds.size() - 1);
SparkJobInfo jobInfo = sc.getJobInfo(currentJobId);
SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]);
SparkJobInfo jobInfo = sc.statusAPI().getJobInfo(currentJobId);
SparkStageInfo stageInfo = sc.statusAPI().getStageInfo(jobInfo.stageIds()[0]);
System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() +
" active, " + stageInfo.numCompletedTasks() + " complete");
}
Expand Down

0 comments on commit c47e294

Please sign in to comment.