From f201a8d3c2f3c95da986760ac7ce4acb199f4e71 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 27 Mar 2014 15:39:51 -0700 Subject: [PATCH] Test broadcast cleanup in ContextCleanerSuite + remove BoundedHashMap --- .../apache/spark/util/BoundedHashMap.scala | 67 -------- .../apache/spark/ContextCleanerSuite.scala | 147 +++++++++++------- .../spark/util/WrappedJavaHashMapSuite.scala | 5 - 3 files changed, 94 insertions(+), 125 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala diff --git a/core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala b/core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala deleted file mode 100644 index 888a06b2408c9..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import scala.collection.mutable.{ArrayBuffer, SynchronizedMap} - -import java.util.{Collections, LinkedHashMap} -import java.util.Map.{Entry => JMapEntry} -import scala.reflect.ClassTag - -/** - * A map that upper bounds the number of key-value pairs present in it. It can be configured to - * drop the least recently user pair or the earliest inserted pair. It exposes a - * scala.collection.mutable.Map interface to allow it to be a drop-in replacement for Scala - * HashMaps. - * - * Internally, a Java LinkedHashMap is used to get insert-order or access-order behavior. - * Note that the LinkedHashMap is not thread-safe and hence, it is wrapped in a - * Collections.synchronizedMap. However, getting the Java HashMap's iterator and - * using it can still lead to ConcurrentModificationExceptions. Hence, the iterator() - * function is overridden to copy the all pairs into an ArrayBuffer and then return the - * iterator to the ArrayBuffer. Also, the class apply the trait SynchronizedMap which - * ensures that all calls to the Scala Map API are synchronized. This together ensures - * that ConcurrentModificationException is never thrown. - * - * @param bound max number of key-value pairs - * @param useLRU true = least recently used/accessed will be dropped when bound is reached, - * false = earliest inserted will be dropped - */ -private[spark] class BoundedHashMap[A, B](bound: Int, useLRU: Boolean) - extends WrappedJavaHashMap[A, B, A, B] with SynchronizedMap[A, B] { - - private[util] val internalJavaMap = Collections.synchronizedMap(new LinkedHashMap[A, B]( - bound / 8, (0.75).toFloat, useLRU) { - override protected def removeEldestEntry(eldest: JMapEntry[A, B]): Boolean = { - size() > bound - } - }) - - private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { - new BoundedHashMap[K1, V1](bound, useLRU) - } - - /** - * Overriding iterator to make sure that the internal Java HashMap's iterator - * is not concurrently modified. This can be a performance issue and this should be overridden - * if it is known that this map will not be used in a multi-threaded environment. - */ - override def iterator: Iterator[(A, B)] = { - (new ArrayBuffer[(A, B)] ++= super.iterator).iterator - } -} diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 77d9825434706..6a12cb6603700 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark import java.lang.ref.WeakReference -import scala.collection.mutable.{ArrayBuffer, HashSet, SynchronizedSet} +import scala.collection.mutable.{HashSet, SynchronizedSet} import scala.util.Random import org.scalatest.{BeforeAndAfter, FunSuite} @@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD -import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId} +import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId} class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { @@ -46,9 +46,9 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo // Explicit cleanup cleaner.cleanupRDD(rdd) - tester.assertCleanup + tester.assertCleanup() - // verify that RDDs can be re-executed after cleaning up + // Verify that RDDs can be re-executed after cleaning up assert(rdd.collect().toList === collected) } @@ -59,87 +59,101 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo // Explicit cleanup shuffleDeps.foreach(s => cleaner.cleanupShuffle(s)) - tester.assertCleanup + tester.assertCleanup() // Verify that shuffles can be re-executed after cleaning up assert(rdd.collect().toList === collected) } + test("cleanup broadcast") { + val broadcast = newBroadcast + val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id)) + + // Explicit cleanup + cleaner.cleanupBroadcast(broadcast) + tester.assertCleanup() + } + test("automatically cleanup RDD") { var rdd = newRDD.persist() rdd.count() - // test that GC does not cause RDD cleanup due to a strong reference + // Test that GC does not cause RDD cleanup due to a strong reference val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id)) runGC() intercept[Exception] { - preGCTester.assertCleanup(timeout(1000 millis)) + preGCTester.assertCleanup()(timeout(1000 millis)) } - // test that GC causes RDD cleanup after dereferencing the RDD + // Test that GC causes RDD cleanup after dereferencing the RDD val postGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id)) - rdd = null // make RDD out of scope + rdd = null // Make RDD out of scope runGC() - postGCTester.assertCleanup + postGCTester.assertCleanup() } test("automatically cleanup shuffle") { var rdd = newShuffleRDD rdd.count() - // test that GC does not cause shuffle cleanup due to a strong reference - val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) + // Test that GC does not cause shuffle cleanup due to a strong reference + val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) runGC() intercept[Exception] { - preGCTester.assertCleanup(timeout(1000 millis)) + preGCTester.assertCleanup()(timeout(1000 millis)) } - // test that GC causes shuffle cleanup after dereferencing the RDD + // Test that GC causes shuffle cleanup after dereferencing the RDD val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) - rdd = null // make RDD out of scope, so that corresponding shuffle goes out of scope + rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope runGC() - postGCTester.assertCleanup + postGCTester.assertCleanup() } - test("automatically cleanup RDD + shuffle") { + test("automatically cleanup broadcast") { + var broadcast = newBroadcast - def randomRDD: RDD[_] = { - val rdd: RDD[_] = Random.nextInt(3) match { - case 0 => newRDD - case 1 => newShuffleRDD - case 2 => newPairRDD.join(newPairRDD) - } - if (Random.nextBoolean()) rdd.persist() - rdd.count() - rdd + // Test that GC does not cause broadcast cleanup due to a strong reference + val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id)) + runGC() + intercept[Exception] { + preGCTester.assertCleanup()(timeout(1000 millis)) } - val buffer = new ArrayBuffer[RDD[_]] - for (i <- 1 to 500) { - buffer += randomRDD - } + // Test that GC causes broadcast cleanup after dereferencing the broadcast variable + val postGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id)) + broadcast = null // Make broadcast variable out of scope + runGC() + postGCTester.assertCleanup() + } + test("automatically cleanup RDD + shuffle + broadcast") { + val numRdds = 100 + val numBroadcasts = 4 // Broadcasts are more costly + val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer + val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer val rddIds = sc.persistentRdds.keys.toSeq val shuffleIds = 0 until sc.newShuffleId + val broadcastIds = 0L until numBroadcasts - val preGCTester = new CleanerTester(sc, rddIds, shuffleIds) + val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) runGC() intercept[Exception] { - preGCTester.assertCleanup(timeout(1000 millis)) + preGCTester.assertCleanup()(timeout(1000 millis)) } - // test that GC causes shuffle cleanup after dereferencing the RDD - val postGCTester = new CleanerTester(sc, rddIds, shuffleIds) - buffer.clear() + + // Test that GC triggers the cleanup of all variables after the dereferencing them + val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) + broadcastBuffer.clear() + rddBuffer.clear() runGC() - postGCTester.assertCleanup + postGCTester.assertCleanup() } def newRDD = sc.makeRDD(1 to 10) - def newPairRDD = newRDD.map(_ -> 1) - def newShuffleRDD = newPairRDD.reduceByKey(_ + _) - + def newBroadcast = sc.broadcast(1 to 100) def newRDDWithShuffleDependencies: (RDD[_], Seq[ShuffleDependency[_, _]]) = { def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = { rdd.dependencies ++ rdd.dependencies.flatMap { dep => @@ -149,11 +163,27 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo val rdd = newShuffleRDD // Get all the shuffle dependencies - val shuffleDeps = getAllDependencies(rdd).filter(_.isInstanceOf[ShuffleDependency[_, _]]) + val shuffleDeps = getAllDependencies(rdd) + .filter(_.isInstanceOf[ShuffleDependency[_, _]]) .map(_.asInstanceOf[ShuffleDependency[_, _]]) (rdd, shuffleDeps) } + def randomRdd = { + val rdd: RDD[_] = Random.nextInt(3) match { + case 0 => newRDD + case 1 => newShuffleRDD + case 2 => newPairRDD.join(newPairRDD) + } + if (Random.nextBoolean()) rdd.persist() + rdd.count() + rdd + } + + def randomBroadcast = { + sc.broadcast(Random.nextInt(Int.MaxValue)) + } + /** Run GC and make sure it actually has run */ def runGC() { val weakRef = new WeakReference(new Object()) @@ -208,7 +238,7 @@ class CleanerTester( sc.cleaner.attachListener(cleanerListener) /** Assert that all the stuff has been cleaned up */ - def assertCleanup(implicit waitTimeout: Eventually.Timeout) { + def assertCleanup()(implicit waitTimeout: Eventually.Timeout) { try { eventually(waitTimeout, interval(10 millis)) { assert(isAllCleanedUp) @@ -222,7 +252,7 @@ class CleanerTester( /** Verify that RDDs, shuffles, etc. occupy resources */ private def preCleanupValidate() { - assert(rddIds.nonEmpty || shuffleIds.nonEmpty, "Nothing to cleanup") + assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, "Nothing to cleanup") // Verify the RDDs have been persisted and blocks are present assert(rddIds.forall(sc.persistentRdds.contains), @@ -233,8 +263,12 @@ class CleanerTester( // Verify the shuffle ids are registered and blocks are present assert(shuffleIds.forall(mapOutputTrackerMaster.containsShuffle), "One or more shuffles have not been registered cannot start cleaner test") - assert(shuffleIds.forall(shuffleId => diskBlockManager.containsBlock(shuffleBlockId(shuffleId))), + assert(shuffleIds.forall(sid => diskBlockManager.containsBlock(shuffleBlockId(sid))), "One or more shuffles' blocks cannot be found in disk manager, cannot start cleaner test") + + // Verify that the broadcast is in the driver's block manager + assert(broadcastIds.forall(bid => blockManager.getLevel(broadcastBlockId(bid)).isDefined), + "One ore more broadcasts have not been persisted in the driver's block manager") } /** @@ -247,14 +281,19 @@ class CleanerTester( attempts += 1 logInfo("Attempt: " + attempts) try { - // Verify all the RDDs have been unpersisted + // Verify all RDDs have been unpersisted assert(rddIds.forall(!sc.persistentRdds.contains(_))) assert(rddIds.forall(rddId => !blockManager.master.contains(rddBlockId(rddId)))) - // Verify all the shuffle have been deregistered and cleaned up + // Verify all shuffles have been deregistered and cleaned up assert(shuffleIds.forall(!mapOutputTrackerMaster.containsShuffle(_))) - assert(shuffleIds.forall(shuffleId => - !diskBlockManager.containsBlock(shuffleBlockId(shuffleId)))) + assert(shuffleIds.forall(sid => !diskBlockManager.containsBlock(shuffleBlockId(sid)))) + + // Verify all broadcasts have been unpersisted + assert(broadcastIds.forall { bid => + blockManager.master.askForStorageLevels(broadcastBlockId(bid)).isEmpty + }) + return } catch { case t: Throwable => @@ -271,18 +310,20 @@ class CleanerTester( s""" |\tRDDs = ${toBeCleanedRDDIds.mkString("[", ", ", "]")} |\tShuffles = ${toBeCleanedShuffleIds.mkString("[", ", ", "]")} + |\tBroadcasts = ${toBeCleanedBroadcstIds.mkString("[", ", ", "]")} """.stripMargin } - private def isAllCleanedUp = toBeCleanedRDDIds.isEmpty && toBeCleanedShuffleIds.isEmpty - - private def shuffleBlockId(shuffleId: Int) = ShuffleBlockId(shuffleId, 0, 0) + private def isAllCleanedUp = + toBeCleanedRDDIds.isEmpty && + toBeCleanedShuffleIds.isEmpty && + toBeCleanedBroadcstIds.isEmpty private def rddBlockId(rddId: Int) = RDDBlockId(rddId, 0) + private def shuffleBlockId(shuffleId: Int) = ShuffleBlockId(shuffleId, 0, 0) + private def broadcastBlockId(broadcastId: Long) = BroadcastBlockId(broadcastId) private def blockManager = sc.env.blockManager - private def diskBlockManager = blockManager.diskBlockManager - private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] -} \ No newline at end of file +} diff --git a/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala index e446c7f75dc0b..0b9847174ac84 100644 --- a/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala @@ -33,11 +33,6 @@ class WrappedJavaHashMapSuite extends FunSuite { // Test a simple WrappedJavaHashMap testMap(new TestMap[String, String]()) - // Test BoundedHashMap - testMap(new BoundedHashMap[String, String](100, true)) - - testMapThreadSafety(new BoundedHashMap[String, String](100, true)) - // Test TimeStampedHashMap testMap(new TimeStampedHashMap[String, String])