diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 5d996ed34dff5..d499af20502d0 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -17,10 +17,11 @@ package org.apache.spark +import java.lang.ref.{ReferenceQueue, WeakReference} + import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} -import org.apache.spark.storage.StorageLevel +import org.apache.spark.rdd.RDD /** Listener class used for testing when any item has been cleaned by the Cleaner class */ private[spark] trait CleanerListener { @@ -34,20 +35,27 @@ private[spark] trait CleanerListener { private[spark] class ContextCleaner(sc: SparkContext) extends Logging { /** Classes to represent cleaning tasks */ - private sealed trait CleaningTask - private case class CleanRDD(rddId: Int) extends CleaningTask - private case class CleanShuffle(shuffleId: Int) extends CleaningTask + private sealed trait CleanupTask + private case class CleanRDD(rddId: Int) extends CleanupTask + private case class CleanShuffle(shuffleId: Int) extends CleanupTask // TODO: add CleanBroadcast - private val queue = new LinkedBlockingQueue[CleaningTask] + private val referenceBuffer = new ArrayBuffer[WeakReferenceWithCleanupTask] + with SynchronizedBuffer[WeakReferenceWithCleanupTask] + private val referenceQueue = new ReferenceQueue[AnyRef] - protected val listeners = new ArrayBuffer[CleanerListener] + private val listeners = new ArrayBuffer[CleanerListener] with SynchronizedBuffer[CleanerListener] private val cleaningThread = new Thread() { override def run() { keepCleaning() }} + private val REF_QUEUE_POLL_TIMEOUT = 100 + @volatile private var stopped = false + private class WeakReferenceWithCleanupTask(referent: AnyRef, val task: CleanupTask) + extends WeakReference(referent, referenceQueue) + /** Start the cleaner */ def start() { cleaningThread.setDaemon(true) @@ -62,21 +70,27 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** - * Schedule cleanup of RDD data. Do not perform any time or resource intensive - * computation in this function as this is called from a finalize() function. + * Register a RDD for cleanup when it is garbage collected. */ - def scheduleRDDCleanup(rddId: Int) { - enqueue(CleanRDD(rddId)) - logDebug("Enqueued RDD " + rddId + " for cleaning up") + def registerRDDForCleanup(rdd: RDD[_]) { + registerForCleanup(rdd, CleanRDD(rdd.id)) } /** - * Schedule cleanup of shuffle data. Do not perform any time or resource intensive - * computation in this function as this is called from a finalize() function. + * Register a shuffle dependency for cleanup when it is garbage collected. */ - def scheduleShuffleCleanup(shuffleId: Int) { - enqueue(CleanShuffle(shuffleId)) - logDebug("Enqueued shuffle " + shuffleId + " for cleaning up") + def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _]) { + registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId)) + } + + /** Cleanup RDD. */ + def cleanupRDD(rdd: RDD[_]) { + doCleanupRDD(rdd.id) + } + + /** Cleanup shuffle. */ + def cleanupShuffle(shuffleDependency: ShuffleDependency[_, _]) { + doCleanupShuffle(shuffleDependency.shuffleId) } /** Attach a listener object to get information of when objects are cleaned. */ @@ -91,24 +105,23 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { sc.persistentRdds.remove(rddId) } - /** - * Enqueue a cleaning task. Do not perform any time or resource intensive - * computation in this function as this is called from a finalize() function. - */ - private def enqueue(task: CleaningTask) { - queue.put(task) + /** Register an object for cleanup. */ + private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) { + referenceBuffer += new WeakReferenceWithCleanupTask(objectForCleanup, task) } /** Keep cleaning RDDs and shuffle data */ private def keepCleaning() { while (!isStopped) { try { - val taskOpt = Option(queue.poll(100, TimeUnit.MILLISECONDS)) - taskOpt.foreach { task => - logDebug("Got cleaning task " + taskOpt.get) + val reference = Option(referenceQueue.remove(REF_QUEUE_POLL_TIMEOUT)) + .map(_.asInstanceOf[WeakReferenceWithCleanupTask]) + reference.map(_.task).foreach { task => + logDebug("Got cleaning task " + task) + referenceBuffer -= reference.get task match { - case CleanRDD(rddId) => doCleanRDD(rddId) - case CleanShuffle(shuffleId) => doCleanShuffle(shuffleId) + case CleanRDD(rddId) => doCleanupRDD(rddId) + case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId) } } } catch { @@ -119,8 +132,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } - /** Perform RDD cleaning */ - private def doCleanRDD(rddId: Int) { + /** Perform RDD cleanup. */ + private def doCleanupRDD(rddId: Int) { try { logDebug("Cleaning RDD " + rddId) unpersistRDD(rddId, false) @@ -131,8 +144,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } - /** Perform shuffle cleaning */ - private def doCleanShuffle(shuffleId: Int) { + /** Perform shuffle cleanup. */ + private def doCleanupShuffle(shuffleId: Int) { try { logDebug("Cleaning shuffle " + shuffleId) mapOutputTrackerMaster.unregisterShuffle(shuffleId) @@ -144,7 +157,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } - private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + private def mapOutputTrackerMaster = + sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] private def blockManagerMaster = sc.env.blockManager.master diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 557d424d7a786..132468ebdb4f8 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -53,24 +53,7 @@ class ShuffleDependency[K, V]( val shuffleId: Int = rdd.context.newShuffleId() - override def finalize() { - try { - if (rdd != null) { - rdd.sparkContext.cleaner.scheduleShuffleCleanup(shuffleId) - } - } catch { - case t: Throwable => - // Paranoia - If logError throws error as well, report to stderr. - try { - logError("Error in finalize", t) - } catch { - case _ : Throwable => - System.err.println("Error in finalize (and could not write to logError): " + t) - } - } finally { - super.finalize() - } - } + rdd.sparkContext.cleaner.registerShuffleForCleanup(this) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a75bca42257d4..364156a8e0779 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -147,6 +147,7 @@ abstract class RDD[T: ClassTag]( } storageLevel = newLevel // Register the RDD with the SparkContext + sc.cleaner.registerRDDForCleanup(this) sc.persistentRdds(id) = this this } @@ -1102,21 +1103,4 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } - - override def finalize() { - try { - sc.cleaner.scheduleRDDCleanup(id) - } catch { - case t: Throwable => - // Paranoia - If logError throws error as well, report to stderr. - try { - logError("Error in finalize", t) - } catch { - case _ : Throwable => - System.err.println("Error in finalize (and could not write to logError): " + t) - } - } finally { - super.finalize() - } - } } diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 8556888c96e06..a5f17309b4ec5 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -9,7 +9,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkContext._ import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId} -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{ShuffleCoGroupSplitDep, RDD} import scala.util.Random import java.lang.ref.WeakReference @@ -23,18 +23,28 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo test("cleanup RDD") { val rdd = newRDD.persist() - rdd.count() + val collected = rdd.collect().toList val tester = new CleanerTester(sc, rddIds = Seq(rdd.id)) - cleaner.scheduleRDDCleanup(rdd.id) + + // Explicit cleanup + cleaner.cleanupRDD(rdd) tester.assertCleanup + + // verify that RDDs can be re-executed after cleaning up + assert(rdd.collect().toList === collected) } test("cleanup shuffle") { - val rdd = newShuffleRDD - rdd.count() - val tester = new CleanerTester(sc, shuffleIds = Seq(0)) - cleaner.scheduleShuffleCleanup(0) + val (rdd, shuffleDeps) = newRDDWithShuffleDependencies + val collected = rdd.collect().toList + val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId)) + + // Explicit cleanup + shuffleDeps.foreach(s => cleaner.cleanupShuffle(s)) tester.assertCleanup + + // Verify that shuffles can be re-executed after cleaning up + assert(rdd.collect().toList === collected) } test("automatically cleanup RDD") { @@ -43,7 +53,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo // test that GC does not cause RDD cleanup due to a strong reference val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id)) - doGC() + runGC() intercept[Exception] { preGCTester.assertCleanup(timeout(1000 millis)) } @@ -51,7 +61,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo // test that GC causes RDD cleanup after dereferencing the RDD val postGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id)) rdd = null // make RDD out of scope - doGC() + runGC() postGCTester.assertCleanup } @@ -61,7 +71,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo // test that GC does not cause shuffle cleanup due to a strong reference val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) - doGC() + runGC() intercept[Exception] { preGCTester.assertCleanup(timeout(1000 millis)) } @@ -69,7 +79,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo // test that GC causes shuffle cleanup after dereferencing the RDD val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) rdd = null // make RDD out of scope, so that corresponding shuffle goes out of scope - doGC() + runGC() postGCTester.assertCleanup } @@ -87,7 +97,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo } val buffer = new ArrayBuffer[RDD[_]] - for (i <- 1 to 1000) { + for (i <- 1 to 500) { buffer += randomRDD } @@ -95,34 +105,47 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo val shuffleIds = 0 until sc.newShuffleId val preGCTester = new CleanerTester(sc, rddIds, shuffleIds) + runGC() intercept[Exception] { preGCTester.assertCleanup(timeout(1000 millis)) } - // test that GC causes shuffle cleanup after dereferencing the RDD val postGCTester = new CleanerTester(sc, rddIds, shuffleIds) buffer.clear() - doGC() + runGC() postGCTester.assertCleanup } - // TODO (TD): Test that cleaned up RDD and shuffle can be recomputed again correctly. - def newRDD = sc.makeRDD(1 to 10) def newPairRDD = newRDD.map(_ -> 1) def newShuffleRDD = newPairRDD.reduceByKey(_ + _) - def doGC() { + def newRDDWithShuffleDependencies: (RDD[_], Seq[ShuffleDependency[_, _]]) = { + def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = { + rdd.dependencies ++ rdd.dependencies.flatMap { dep => + getAllDependencies(dep.rdd) + } + } + val rdd = newShuffleRDD + + // Get all the shuffle dependencies + val shuffleDeps = getAllDependencies(rdd).filter(_.isInstanceOf[ShuffleDependency[_, _]]) + .map(_.asInstanceOf[ShuffleDependency[_, _]]) + (rdd, shuffleDeps) + } + + /** Run GC and make sure it actually has run */ + def runGC() { val weakRef = new WeakReference(new Object()) val startTime = System.currentTimeMillis System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. - System.runFinalization() // Make a best effort to call finalizer on all cleaned objects. + // Wait until a weak reference object has been GCed while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) { System.gc() System.runFinalization() - Thread.sleep(100) + Thread.sleep(200) } } @@ -149,10 +172,14 @@ class CleanerTester(sc: SparkContext, rddIds: Seq[Int] = Nil, shuffleIds: Seq[In } } + val MAX_VALIDATION_ATTEMPTS = 10 + val VALIDATION_ATTEMPT_INTERVAL = 100 + logInfo("Attempting to validate before cleanup:\n" + uncleanedResourcesToString) preCleanupValidate() sc.cleaner.attachListener(cleanerListener) + /** Assert that all the stuff has been cleaned up */ def assertCleanup(implicit waitTimeout: Eventually.Timeout) { try { eventually(waitTimeout, interval(10 millis)) { @@ -165,6 +192,7 @@ class CleanerTester(sc: SparkContext, rddIds: Seq[Int] = Nil, shuffleIds: Seq[In } } + /** Verify that RDDs, shuffles, etc. occupy resources */ private def preCleanupValidate() { assert(rddIds.nonEmpty || shuffleIds.nonEmpty, "Nothing to cleanup") @@ -181,14 +209,34 @@ class CleanerTester(sc: SparkContext, rddIds: Seq[Int] = Nil, shuffleIds: Seq[In "One or more shuffles' blocks cannot be found in disk manager, cannot start cleaner test") } + /** + * Verify that RDDs, shuffles, etc. do not occupy resources. Tests multiple times as there is + * as there is not guarantee on how long it will take clean up the resources. + */ private def postCleanupValidate() { - // Verify all the RDDs have been persisted - assert(rddIds.forall(!sc.persistentRdds.contains(_))) - assert(rddIds.forall(rddId => !blockManager.master.contains(rddBlockId(rddId)))) - - // Verify all the shuffle have been deregistered and cleaned up - assert(shuffleIds.forall(!mapOutputTrackerMaster.containsShuffle(_))) - assert(shuffleIds.forall(shuffleId => !diskBlockManager.containsBlock(shuffleBlockId(shuffleId)))) + var attempts = 0 + while (attempts < MAX_VALIDATION_ATTEMPTS) { + attempts += 1 + logInfo("Attempt: " + attempts) + try { + // Verify all the RDDs have been unpersisted + assert(rddIds.forall(!sc.persistentRdds.contains(_))) + assert(rddIds.forall(rddId => !blockManager.master.contains(rddBlockId(rddId)))) + + // Verify all the shuffle have been deregistered and cleaned up + assert(shuffleIds.forall(!mapOutputTrackerMaster.containsShuffle(_))) + assert(shuffleIds.forall(shuffleId => + !diskBlockManager.containsBlock(shuffleBlockId(shuffleId)))) + return + } catch { + case t: Throwable => + if (attempts >= MAX_VALIDATION_ATTEMPTS) { + throw t + } else { + Thread.sleep(VALIDATION_ATTEMPT_INTERVAL) + } + } + } } private def uncleanedResourcesToString = {