Skip to content

Commit

Permalink
Fixed docs and styles.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Mar 11, 2014
1 parent a24fefc commit cb0a5a6
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[spark] trait CleanerListener {
}

/**
* Cleans RDDs and shuffle data. This should be instantiated only on the driver.
* Cleans RDDs and shuffle data.
*/
private[spark] class ContextCleaner(env: SparkEnv) extends Logging {

Expand Down Expand Up @@ -62,12 +62,13 @@ private[spark] class ContextCleaner(env: SparkEnv) extends Logging {
cleaningThread.interrupt()
}

/** Clean all data and metadata related to a RDD, including shuffle files and metadata */
/** Clean (unpersist) RDD data. */
def cleanRDD(rdd: RDD[_]) {
enqueue(CleanRDD(rdd.sparkContext, rdd.id))
logDebug("Enqueued RDD " + rdd + " for cleaning up")
}

/** Clean shuffle data. */
def cleanShuffle(shuffleId: Int) {
enqueue(CleanShuffle(shuffleId))
logDebug("Enqueued shuffle " + shuffleId + " for cleaning up")
Expand Down Expand Up @@ -102,16 +103,16 @@ private[spark] class ContextCleaner(env: SparkEnv) extends Logging {

/** Perform RDD cleaning */
private def doCleanRDD(sc: SparkContext, rddId: Int) {
logDebug("Cleaning rdd "+ rddId)
logDebug("Cleaning rdd " + rddId)
sc.env.blockManager.master.removeRdd(rddId, false)
sc.persistentRdds.remove(rddId)
listeners.foreach(_.rddCleaned(rddId))
logInfo("Cleaned rdd "+ rddId)
logInfo("Cleaned rdd " + rddId)
}

/** Perform shuffle cleaning */
private def doCleanShuffle(shuffleId: Int) {
logDebug("Cleaning shuffle "+ shuffleId)
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
blockManager.master.removeShuffle(shuffleId)
listeners.foreach(_.shuffleCleaned(shuffleId))
Expand All @@ -123,4 +124,4 @@ private[spark] class ContextCleaner(env: SparkEnv) extends Logging {
private def blockManager = env.blockManager

private def isStopped = synchronized { stopped }
}
}
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
}
}

/**
* Class that keeps track of the location of the location of the mapt output of
* a stage. This is abstract because different versions of MapOutputTracker
* (driver and worker) use different HashMap to store its metadata.
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {

private val timeout = AkkaUtils.askTimeout(conf)
Expand Down Expand Up @@ -181,6 +186,10 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}

/**
* MapOutputTracker for the workers. This uses BoundedHashMap to keep track of
* a limited number of most recently used map output information.
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {

/**
Expand All @@ -192,7 +201,10 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
protected val mapStatuses = new BoundedHashMap[Int, Array[MapStatus]](MAX_MAP_STATUSES, true)
}


/**
* MapOutputTracker for the driver. This uses TimeStampedHashMap to keep track of map
* output information, which allows old output information based on a TTL.
*/
private[spark] class MapOutputTrackerMaster(conf: SparkConf)
extends MapOutputTracker(conf) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa
override def get(key: A): Option[B] = {
val timeStampedValue = internalMap.get(key)
if (updateTimeStampOnGet && timeStampedValue != null) {
internalJavaMap.replace(key, timeStampedValue, TimeStampedValue(currentTime, timeStampedValue.value))
internalJavaMap.replace(key, timeStampedValue,
TimeStampedValue(currentTime, timeStampedValue.value))
}
Option(timeStampedValue).map(_.value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[util] case class TimeStampedWeakValue[T](timestamp: Long, weakValue: Wea
* A map that stores the timestamp of when a key was inserted along with the value,
* while ensuring that the values are weakly referenced. If the value is garbage collected and
* the weak reference is null, get() operation returns the key be non-existent. However,
* the key is actually not remmoved in the current implementation. Key-value pairs whose
* the key is actually not removed in the current implementation. Key-value pairs whose
* timestamps are older than a particular threshold time can then be removed using the
* clearOldValues method. It exposes a scala.collection.mutable.Map interface to allow it to be a
* drop-in replacement for Scala HashMaps.
Expand Down

0 comments on commit cb0a5a6

Please sign in to comment.