Skip to content

Commit

Permalink
Changed ContextCleaner to use ReferenceQueue instead of finalizer
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Mar 25, 2014
1 parent e1fba5f commit f2881fd
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 94 deletions.
80 changes: 47 additions & 33 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark

import java.lang.ref.{ReferenceQueue, WeakReference}

import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD

/** Listener class used for testing when any item has been cleaned by the Cleaner class */
private[spark] trait CleanerListener {
Expand All @@ -34,20 +35,27 @@ private[spark] trait CleanerListener {
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

/** Classes to represent cleaning tasks */
private sealed trait CleaningTask
private case class CleanRDD(rddId: Int) extends CleaningTask
private case class CleanShuffle(shuffleId: Int) extends CleaningTask
private sealed trait CleanupTask
private case class CleanRDD(rddId: Int) extends CleanupTask
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
// TODO: add CleanBroadcast

private val queue = new LinkedBlockingQueue[CleaningTask]
private val referenceBuffer = new ArrayBuffer[WeakReferenceWithCleanupTask]
with SynchronizedBuffer[WeakReferenceWithCleanupTask]
private val referenceQueue = new ReferenceQueue[AnyRef]

protected val listeners = new ArrayBuffer[CleanerListener]
private val listeners = new ArrayBuffer[CleanerListener]
with SynchronizedBuffer[CleanerListener]

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

private val REF_QUEUE_POLL_TIMEOUT = 100

@volatile private var stopped = false

private class WeakReferenceWithCleanupTask(referent: AnyRef, val task: CleanupTask)
extends WeakReference(referent, referenceQueue)

/** Start the cleaner */
def start() {
cleaningThread.setDaemon(true)
Expand All @@ -62,21 +70,27 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/**
* Schedule cleanup of RDD data. Do not perform any time or resource intensive
* computation in this function as this is called from a finalize() function.
* Register a RDD for cleanup when it is garbage collected.
*/
def scheduleRDDCleanup(rddId: Int) {
enqueue(CleanRDD(rddId))
logDebug("Enqueued RDD " + rddId + " for cleaning up")
def registerRDDForCleanup(rdd: RDD[_]) {
registerForCleanup(rdd, CleanRDD(rdd.id))
}

/**
* Schedule cleanup of shuffle data. Do not perform any time or resource intensive
* computation in this function as this is called from a finalize() function.
* Register a shuffle dependency for cleanup when it is garbage collected.
*/
def scheduleShuffleCleanup(shuffleId: Int) {
enqueue(CleanShuffle(shuffleId))
logDebug("Enqueued shuffle " + shuffleId + " for cleaning up")
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _]) {
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
}

/** Cleanup RDD. */
def cleanupRDD(rdd: RDD[_]) {
doCleanupRDD(rdd.id)
}

/** Cleanup shuffle. */
def cleanupShuffle(shuffleDependency: ShuffleDependency[_, _]) {
doCleanupShuffle(shuffleDependency.shuffleId)
}

/** Attach a listener object to get information of when objects are cleaned. */
Expand All @@ -91,24 +105,23 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
sc.persistentRdds.remove(rddId)
}

/**
* Enqueue a cleaning task. Do not perform any time or resource intensive
* computation in this function as this is called from a finalize() function.
*/
private def enqueue(task: CleaningTask) {
queue.put(task)
/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
referenceBuffer += new WeakReferenceWithCleanupTask(objectForCleanup, task)
}

/** Keep cleaning RDDs and shuffle data */
private def keepCleaning() {
while (!isStopped) {
try {
val taskOpt = Option(queue.poll(100, TimeUnit.MILLISECONDS))
taskOpt.foreach { task =>
logDebug("Got cleaning task " + taskOpt.get)
val reference = Option(referenceQueue.remove(REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[WeakReferenceWithCleanupTask])
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) => doCleanRDD(rddId)
case CleanShuffle(shuffleId) => doCleanShuffle(shuffleId)
case CleanRDD(rddId) => doCleanupRDD(rddId)
case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId)
}
}
} catch {
Expand All @@ -119,8 +132,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}

/** Perform RDD cleaning */
private def doCleanRDD(rddId: Int) {
/** Perform RDD cleanup. */
private def doCleanupRDD(rddId: Int) {
try {
logDebug("Cleaning RDD " + rddId)
unpersistRDD(rddId, false)
Expand All @@ -131,8 +144,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}

/** Perform shuffle cleaning */
private def doCleanShuffle(shuffleId: Int) {
/** Perform shuffle cleanup. */
private def doCleanupShuffle(shuffleId: Int) {
try {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
Expand All @@ -144,7 +157,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}

private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
private def mapOutputTrackerMaster =
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

private def blockManagerMaster = sc.env.blockManager.master

Expand Down
19 changes: 1 addition & 18 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,7 @@ class ShuffleDependency[K, V](

val shuffleId: Int = rdd.context.newShuffleId()

override def finalize() {
try {
if (rdd != null) {
rdd.sparkContext.cleaner.scheduleShuffleCleanup(shuffleId)
}
} catch {
case t: Throwable =>
// Paranoia - If logError throws error as well, report to stderr.
try {
logError("Error in finalize", t)
} catch {
case _ : Throwable =>
System.err.println("Error in finalize (and could not write to logError): " + t)
}
} finally {
super.finalize()
}
}
rdd.sparkContext.cleaner.registerShuffleForCleanup(this)
}


Expand Down
18 changes: 1 addition & 17 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ abstract class RDD[T: ClassTag](
}
storageLevel = newLevel
// Register the RDD with the SparkContext
sc.cleaner.registerRDDForCleanup(this)
sc.persistentRdds(id) = this
this
}
Expand Down Expand Up @@ -1102,21 +1103,4 @@ abstract class RDD[T: ClassTag](
def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
}

override def finalize() {
try {
sc.cleaner.scheduleRDDCleanup(id)
} catch {
case t: Throwable =>
// Paranoia - If logError throws error as well, report to stderr.
try {
logError("Error in finalize", t)
} catch {
case _ : Throwable =>
System.err.println("Error in finalize (and could not write to logError): " + t)
}
} finally {
super.finalize()
}
}
}
Loading

0 comments on commit f2881fd

Please sign in to comment.