diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index e1a273593cce5..c45c5c90048f3 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -112,8 +112,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } /** - * Called from executors to get the server URIs and - * output sizes of the map outputs of a given shuffle + * Called from executors to get the server URIs and output sizes of the map outputs of + * a given shuffle. */ def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { val statuses = mapStatuses.get(shuffleId).orNull @@ -218,10 +218,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) private var cacheEpoch = epoch /** - * Timestamp based HashMap for storing mapStatuses and cached serialized statuses - * in the master, so that statuses are dropped only by explicit deregistering or - * by TTL-based cleaning (if set). Other than these two - * scenarios, nothing should be dropped from this HashMap. + * Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master, + * so that statuses are dropped only by explicit deregistering or by TTL-based cleaning (if set). + * Other than these two scenarios, nothing should be dropped from this HashMap. */ protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]() private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]() diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fe84b812ba8d0..79574c271cfb6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -230,6 +230,7 @@ class SparkContext( private[spark] val cleaner = new ContextCleaner(this) cleaner.start() + postEnvironmentUpdate() /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ @@ -773,7 +774,7 @@ class SparkContext( * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ def addJar(path: String) { - if (path == null) { + if (path == null) { logWarning("null specified as parameter to addJar") } else { var key = "" diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 521182021dd4b..62398dc930993 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -185,6 +185,7 @@ object SparkEnv extends Logging { } else { new MapOutputTrackerWorker(conf) } + // Have to assign trackerActor after initialization as MapOutputTrackerActor // requires the MapOutputTracker itself mapOutputTracker.trackerActor = registerOrLookup( diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index 850650951e603..9ff1675e76a5e 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -27,8 +27,8 @@ import org.apache.spark.SparkConf * entire Spark job. */ trait BroadcastFactory { - def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit + def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T] def unbroadcast(id: Long, removeFromDriver: Boolean) - def stop(): Unit + def stop() } diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index d5e3d60a5b2b7..d8981bb42e684 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -90,7 +90,7 @@ private[spark] object HttpBroadcast extends Logging { private var securityManager: SecurityManager = null // TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist - val files = new TimeStampedHashSet[String] + private val files = new TimeStampedHashSet[String] private var cleaner: MetadataCleaner = null private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt @@ -195,7 +195,7 @@ private[spark] object HttpBroadcast extends Logging { def unpersist(id: Long, removeFromDriver: Boolean) = synchronized { SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver) if (removeFromDriver) { - val file = new File(broadcastDir, BroadcastBlockId(id).name) + val file = getFile(id) files.remove(file.toString) deleteBroadcastFile(file) } @@ -217,10 +217,9 @@ private[spark] object HttpBroadcast extends Logging { } } - /** Delete the given broadcast file. */ private def deleteBroadcastFile(file: File) { try { - if (!file.exists()) { + if (!file.exists) { logWarning("Broadcast file to be deleted does not exist: %s".format(file)) } else if (file.delete()) { logInfo("Deleted broadcast file: %s".format(file)) @@ -229,7 +228,7 @@ private[spark] object HttpBroadcast extends Logging { } } catch { case e: Exception => - logWarning("Exception while deleting broadcast file: %s".format(file), e) + logError("Exception while deleting broadcast file: %s".format(file), e) } } diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index ace71575f5390..ab280fad4e28f 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -72,7 +72,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo } /** - * Remove all persisted state associated with this HTTP broadcast. + * Remove all persisted state associated with this Torrent broadcast. * @param removeFromDriver Whether to remove state from the driver. */ override def unpersist(removeFromDriver: Boolean) { @@ -177,13 +177,12 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo } private[spark] object TorrentBroadcast extends Logging { + private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024 private var initialized = false private var conf: SparkConf = null - lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024 - def initialize(_isDriver: Boolean, conf: SparkConf) { - TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests + TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests synchronized { if (!initialized) { initialized = true diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e5638d0132e88..e8d36e6bfc810 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -158,7 +158,7 @@ abstract class RDD[T: ClassTag]( */ def unpersist(blocking: Boolean = true): RDD[T] = { logInfo("Removing RDD " + id + " from persistence list") - sc.unpersistRDD(this.id, blocking) + sc.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE this } @@ -1128,4 +1128,5 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index edef40e7309f6..f31f0580c36fe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1090,7 +1090,6 @@ class DAGScheduler( eventProcessActor ! StopDAGScheduler } taskScheduler.stop() - listenerBus.stop() } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 78dc32b4b1525..24ec8d3ab44bf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -49,8 +49,8 @@ private[spark] class BlockManager( maxMemory: Long, val conf: SparkConf, securityManager: SecurityManager, - mapOutputTracker: MapOutputTracker - ) extends Logging { + mapOutputTracker: MapOutputTracker) + extends Logging { val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 674322e3034c8..5c9ea88d6b1a4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -82,7 +82,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log /** * Check if block manager master has a block. Note that this can be used to check for only - * those blocks that are expected to be reported to block manager master. + * those blocks that are reported to block manager master. */ def contains(blockId: BlockId) = { !getLocations(blockId).isEmpty diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index f83c26dafe2e9..3271d4f1375ef 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -167,7 +167,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus * from the executors, but not from the driver. */ private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) { - // TODO(aor): Consolidate usages of + // TODO: Consolidate usages of val removeMsg = RemoveBroadcast(broadcastId) blockManagerInfo.values .filter { info => removeFromDriver || info.blockManagerId.executorId != "" } @@ -350,7 +350,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus // Note that this logic will select the same node multiple times if there aren't enough peers Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq } - } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 1d3e94c4b6533..9a29c39a28ab1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -22,11 +22,9 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import akka.actor.ActorRef private[storage] object BlockManagerMessages { - ////////////////////////////////////////////////////////////////////////////////// // Messages from the master to slaves. ////////////////////////////////////////////////////////////////////////////////// - sealed trait ToBlockManagerSlave // Remove a block from the slaves that have it. This can only be used to remove @@ -50,7 +48,6 @@ private[storage] object BlockManagerMessages { ////////////////////////////////////////////////////////////////////////////////// // Messages from slaves to the master. ////////////////////////////////////////////////////////////////////////////////// - sealed trait ToBlockManagerMaster case class RegisterBlockManager( @@ -122,5 +119,4 @@ private[storage] object BlockManagerMessages { // For testing. Have the master ask all slaves for the given block's storage level. case class AskForStorageLevels(blockId: BlockId) extends ToBlockManagerMaster - } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index a57e6f710305a..fcad84669c79a 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -90,7 +90,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD def getFile(blockId: BlockId): File = getFile(blockId.name) - /** Check if disk block manager has a block */ + /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { getBlockLocation(blockId).file.exists() } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index cf83a60ffb9e8..06233153c56d4 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -169,13 +169,13 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { throw new IllegalStateException("Failed to find shuffle block: " + id) } - /** Remove all the blocks / files and metadata related to a particular shuffle */ + /** Remove all the blocks / files and metadata related to a particular shuffle. */ def removeShuffle(shuffleId: ShuffleId) { removeShuffleBlocks(shuffleId) shuffleStates.remove(shuffleId) } - /** Remove all the blocks / files related to a particular shuffle */ + /** Remove all the blocks / files related to a particular shuffle. */ private def removeShuffleBlocks(shuffleId: ShuffleId) { shuffleStates.get(shuffleId) match { case Some(state) => diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index 7b75215846a9a..a107c5182b3be 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -48,7 +48,7 @@ private[spark] object ThreadingTest { val block = (1 to blockSize).map(_ => Random.nextInt()) val level = randomLevel() val startTime = System.currentTimeMillis() - manager.put(blockId, block.iterator, level, true) + manager.put(blockId, block.iterator, level, tellMaster = true) println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") queue.add((blockId, block)) } diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index 2ef853710a554..7ebed5105b9fd 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -78,15 +78,16 @@ private[spark] object MetadataCleaner { conf.getInt("spark.cleaner.ttl", -1) } - def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int = - { - conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString) - .toInt + def getDelaySeconds( + conf: SparkConf, + cleanerType: MetadataCleanerType.MetadataCleanerType): Int = { + conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString).toInt } - def setDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType, - delay: Int) - { + def setDelaySeconds( + conf: SparkConf, + cleanerType: MetadataCleanerType.MetadataCleanerType, + delay: Int) { conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString) } diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala index 09a6faf33ec60..9f3247a27ba38 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala @@ -17,14 +17,14 @@ package org.apache.spark.util -import scala.collection.{JavaConversions, immutable} - -import java.util import java.lang.ref.WeakReference +import java.util import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConversions import org.apache.spark.Logging -import java.util.concurrent.atomic.AtomicInteger private[util] case class TimeStampedWeakValue[T](timestamp: Long, weakValue: WeakReference[T]) { def this(timestamp: Long, value: T) = this(timestamp, new WeakReference[T](value)) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index b83033c35f6b7..6b2571cd9295e 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -96,7 +96,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { assert(tracker.getServerStatuses(10, 0).isEmpty) } - test("master register shuffle and unregister mapoutput and fetch") { + test("master register shuffle and unregister map output and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) tracker.trackerActor = diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 04e64ee7a45b3..1f5bcca64fc39 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -28,8 +28,7 @@ import org.scalatest.concurrent.Timeouts._ import org.scalatest.matchers.ShouldMatchers._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext} -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} diff --git a/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala index 0b9847174ac84..f6e6a4c77c820 100644 --- a/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.util -import java.util import java.lang.ref.WeakReference +import java.util import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import scala.util.Random