diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 250d9d55c6211..54e08d7866f75 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -65,7 +65,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { /** * Whether the cleaning thread will block on cleanup tasks. - * This is set to true only for tests. */ + * This is set to true only for tests. + */ private val blockOnCleanupTasks = sc.conf.getBoolean( "spark.cleaner.referenceTracking.blocking", false) @@ -133,7 +134,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** Perform RDD cleanup. */ - private def doCleanupRDD(rddId: Int, blocking: Boolean) { + def doCleanupRDD(rddId: Int, blocking: Boolean) { try { logDebug("Cleaning RDD " + rddId) sc.unpersistRDD(rddId, blocking) @@ -145,7 +146,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** Perform shuffle cleanup, asynchronously. */ - private def doCleanupShuffle(shuffleId: Int, blocking: Boolean) { + def doCleanupShuffle(shuffleId: Int, blocking: Boolean) { try { logDebug("Cleaning shuffle " + shuffleId) mapOutputTrackerMaster.unregisterShuffle(shuffleId) @@ -158,7 +159,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** Perform broadcast cleanup. */ - private def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) { + def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) { try { logDebug("Cleaning broadcast " + broadcastId) broadcastManager.unbroadcast(broadcastId, true, blocking) @@ -175,18 +176,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { // Used for testing. These methods explicitly blocks until cleanup is completed // to ensure that more reliable testing. - - def cleanupRDD(rdd: RDD[_]) { - doCleanupRDD(rdd.id, blocking = true) - } - - def cleanupShuffle(shuffleDependency: ShuffleDependency[_, _]) { - doCleanupShuffle(shuffleDependency.shuffleId, blocking = true) - } - - def cleanupBroadcast[T](broadcast: Broadcast[T]) { - doCleanupBroadcast(broadcast.id, blocking = true) - } } private object ContextCleaner { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index dfc173357c12b..d7124616d3bfb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -233,11 +233,13 @@ class SparkContext( @volatile private[spark] var dagScheduler = new DAGScheduler(this) dagScheduler.start() - private[spark] val cleaner: Option[ContextCleaner] = + private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) - } else None - + } else { + None + } + } cleaner.foreach(_.start()) postEnvironmentUpdate() diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index a7867bcaabfc2..c7f7c59cfb449 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -27,8 +27,8 @@ import org.apache.spark.SparkConf * entire Spark job. */ trait BroadcastFactory { - def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) + def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T] - def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) - def stop() + def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit + def stop(): Unit } diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 51399bb980fcd..f6a8a8af91e4b 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -86,11 +86,9 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea val start = System.nanoTime value_ = HttpBroadcast.read[T](id) /* - * Storing the broadcast data in BlockManager so that all - * so that all subsequent tasks using the broadcast variable - * does not need to fetch it again. The BlockManagerMaster - * does not need to be told about this block as no one - * needs to know about this data block. + * We cache broadcast data in the BlockManager so that subsequent tasks using it + * do not need to re-fetch. This data is only used locally and no other node + * needs to fetch this block, so we don't notify the master. */ SparkEnv.get.blockManager.putSingle( blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a9e3e48767b1b..b021564477c47 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -220,13 +220,16 @@ private[spark] class BlockManager( } } - /** Get the BlockStatus for the block identified by the given ID, if it exists. */ + /** + * Get the BlockStatus for the block identified by the given ID, if it exists. + * NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon. + */ def getStatus(blockId: BlockId): Option[BlockStatus] = { blockInfo.get(blockId).map { info => val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L - val tachyonSize = if (tachyonStore.contains(blockId)) tachyonStore.getSize(blockId) else 0L - BlockStatus(info.level, memSize, diskSize, tachyonSize) + // Assume that block is not in Tachyon + BlockStatus(info.level, memSize, diskSize, 0L) } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 866c52150a48e..7a24c8f57f43b 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -95,7 +95,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD getBlockLocation(blockId).file.exists() } - /** List all the blocks currently stored in disk by the disk manager. */ + /** List all the blocks currently stored on disk by the disk manager. */ def getAllBlocks(): Seq[BlockId] = { // Get all the files inside the array of array of directories subDirs.flatten.filter(_ != null).flatMap { dir => diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index 5c239329588d8..8de75ba9a9c92 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -108,7 +108,6 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa } } - // Should we return previous value directly or as Option? def putIfAbsent(key: A, value: B): Option[B] = { val prev = internalMap.putIfAbsent(key, TimeStampedValue(value, currentTime)) Option(prev).map(_.value) @@ -148,5 +147,4 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa def getTimestamp(key: A): Option[Long] = { getTimeStampedValue(key).map(_.timestamp) } - } diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 5b6120d965c5c..e50981cf6fb20 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -57,7 +57,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo val tester = new CleanerTester(sc, rddIds = Seq(rdd.id)) // Explicit cleanup - cleaner.cleanupRDD(rdd) + cleaner.doCleanupRDD(rdd.id, blocking = true) tester.assertCleanup() // Verify that RDDs can be re-executed after cleaning up @@ -70,7 +70,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId)) // Explicit cleanup - shuffleDeps.foreach(s => cleaner.cleanupShuffle(s)) + shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true)) tester.assertCleanup() // Verify that shuffles can be re-executed after cleaning up @@ -82,7 +82,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id)) // Explicit cleanup - cleaner.cleanupBroadcast(broadcast) + cleaner.doCleanupBroadcast(broadcast.id, blocking = true) tester.assertCleanup() }