Skip to content

Commit

Permalink
Import, comments, and style fixes (minor)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Mar 28, 2014
1 parent c92e4d9 commit 0d17060
Show file tree
Hide file tree
Showing 20 changed files with 42 additions and 48 deletions.
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}

/**
* Called from executors to get the server URIs and
* output sizes of the map outputs of a given shuffle
* Called from executors to get the server URIs and output sizes of the map outputs of
* a given shuffle.
*/
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
val statuses = mapStatuses.get(shuffleId).orNull
Expand Down Expand Up @@ -218,10 +218,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
private var cacheEpoch = epoch

/**
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses
* in the master, so that statuses are dropped only by explicit deregistering or
* by TTL-based cleaning (if set). Other than these two
* scenarios, nothing should be dropped from this HashMap.
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master,
* so that statuses are dropped only by explicit deregistering or by TTL-based cleaning (if set).
* Other than these two scenarios, nothing should be dropped from this HashMap.
*/
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class SparkContext(

private[spark] val cleaner = new ContextCleaner(this)
cleaner.start()

postEnvironmentUpdate()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
Expand Down Expand Up @@ -773,7 +774,7 @@ class SparkContext(
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
if (path == null) {
if (path == null) {
logWarning("null specified as parameter to addJar")
} else {
var key = ""
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ object SparkEnv extends Logging {
} else {
new MapOutputTrackerWorker(conf)
}

// Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
mapOutputTracker.trackerActor = registerOrLookup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.SparkConf
* entire Spark job.
*/
trait BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager)
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def unbroadcast(id: Long, removeFromDriver: Boolean)
def stop(): Unit
def stop()
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[spark] object HttpBroadcast extends Logging {
private var securityManager: SecurityManager = null

// TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
val files = new TimeStampedHashSet[String]
private val files = new TimeStampedHashSet[String]
private var cleaner: MetadataCleaner = null

private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
Expand Down Expand Up @@ -195,7 +195,7 @@ private[spark] object HttpBroadcast extends Logging {
def unpersist(id: Long, removeFromDriver: Boolean) = synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
if (removeFromDriver) {
val file = new File(broadcastDir, BroadcastBlockId(id).name)
val file = getFile(id)
files.remove(file.toString)
deleteBroadcastFile(file)
}
Expand All @@ -217,10 +217,9 @@ private[spark] object HttpBroadcast extends Logging {
}
}

/** Delete the given broadcast file. */
private def deleteBroadcastFile(file: File) {
try {
if (!file.exists()) {
if (!file.exists) {
logWarning("Broadcast file to be deleted does not exist: %s".format(file))
} else if (file.delete()) {
logInfo("Deleted broadcast file: %s".format(file))
Expand All @@ -229,7 +228,7 @@ private[spark] object HttpBroadcast extends Logging {
}
} catch {
case e: Exception =>
logWarning("Exception while deleting broadcast file: %s".format(file), e)
logError("Exception while deleting broadcast file: %s".format(file), e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
}

/**
* Remove all persisted state associated with this HTTP broadcast.
* Remove all persisted state associated with this Torrent broadcast.
* @param removeFromDriver Whether to remove state from the driver.
*/
override def unpersist(removeFromDriver: Boolean) {
Expand Down Expand Up @@ -177,13 +177,12 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
}

private[spark] object TorrentBroadcast extends Logging {
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
private var initialized = false
private var conf: SparkConf = null

lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024

def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
initialized = true
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ abstract class RDD[T: ClassTag](
*/
def unpersist(blocking: Boolean = true): RDD[T] = {
logInfo("Removing RDD " + id + " from persistence list")
sc.unpersistRDD(this.id, blocking)
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
this
}
Expand Down Expand Up @@ -1128,4 +1128,5 @@ abstract class RDD[T: ClassTag](
def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,6 @@ class DAGScheduler(
eventProcessActor ! StopDAGScheduler
}
taskScheduler.stop()
listenerBus.stop()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ private[spark] class BlockManager(
maxMemory: Long,
val conf: SparkConf,
securityManager: SecurityManager,
mapOutputTracker: MapOutputTracker
) extends Logging {
mapOutputTracker: MapOutputTracker)
extends Logging {

val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log

/**
* Check if block manager master has a block. Note that this can be used to check for only
* those blocks that are expected to be reported to block manager master.
* those blocks that are reported to block manager master.
*/
def contains(blockId: BlockId) = {
!getLocations(blockId).isEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
* from the executors, but not from the driver.
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
// TODO(aor): Consolidate usages of <driver>
// TODO: Consolidate usages of <driver>
val removeMsg = RemoveBroadcast(broadcastId)
blockManagerInfo.values
.filter { info => removeFromDriver || info.blockManagerId.executorId != "<driver>" }
Expand Down Expand Up @@ -350,7 +350,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
// Note that this logic will select the same node multiple times if there aren't enough peers
Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import akka.actor.ActorRef

private[storage] object BlockManagerMessages {

//////////////////////////////////////////////////////////////////////////////////
// Messages from the master to slaves.
//////////////////////////////////////////////////////////////////////////////////

sealed trait ToBlockManagerSlave

// Remove a block from the slaves that have it. This can only be used to remove
Expand All @@ -50,7 +48,6 @@ private[storage] object BlockManagerMessages {
//////////////////////////////////////////////////////////////////////////////////
// Messages from slaves to the master.
//////////////////////////////////////////////////////////////////////////////////

sealed trait ToBlockManagerMaster

case class RegisterBlockManager(
Expand Down Expand Up @@ -122,5 +119,4 @@ private[storage] object BlockManagerMessages {

// For testing. Have the master ask all slaves for the given block's storage level.
case class AskForStorageLevels(blockId: BlockId) extends ToBlockManagerMaster

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD

def getFile(blockId: BlockId): File = getFile(blockId.name)

/** Check if disk block manager has a block */
/** Check if disk block manager has a block. */
def containsBlock(blockId: BlockId): Boolean = {
getBlockLocation(blockId).file.exists()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,13 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
throw new IllegalStateException("Failed to find shuffle block: " + id)
}

/** Remove all the blocks / files and metadata related to a particular shuffle */
/** Remove all the blocks / files and metadata related to a particular shuffle. */
def removeShuffle(shuffleId: ShuffleId) {
removeShuffleBlocks(shuffleId)
shuffleStates.remove(shuffleId)
}

/** Remove all the blocks / files related to a particular shuffle */
/** Remove all the blocks / files related to a particular shuffle. */
private def removeShuffleBlocks(shuffleId: ShuffleId) {
shuffleStates.get(shuffleId) match {
case Some(state) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] object ThreadingTest {
val block = (1 to blockSize).map(_ => Random.nextInt())
val level = randomLevel()
val startTime = System.currentTimeMillis()
manager.put(blockId, block.iterator, level, true)
manager.put(blockId, block.iterator, level, tellMaster = true)
println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
queue.add((blockId, block))
}
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,16 @@ private[spark] object MetadataCleaner {
conf.getInt("spark.cleaner.ttl", -1)
}

def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int =
{
conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString)
.toInt
def getDelaySeconds(
conf: SparkConf,
cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString).toInt
}

def setDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType,
delay: Int)
{
def setDelaySeconds(
conf: SparkConf,
cleanerType: MetadataCleanerType.MetadataCleanerType,
delay: Int) {
conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.spark.util

import scala.collection.{JavaConversions, immutable}

import java.util
import java.lang.ref.WeakReference
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConversions

import org.apache.spark.Logging
import java.util.concurrent.atomic.AtomicInteger

private[util] case class TimeStampedWeakValue[T](timestamp: Long, weakValue: WeakReference[T]) {
def this(timestamp: Long, value: T) = this(timestamp, new WeakReference[T](value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
assert(tracker.getServerStatuses(10, 0).isEmpty)
}

test("master register shuffle and unregister mapoutput and fetch") {
test("master register shuffle and unregister map output and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers._
import org.scalatest.time.SpanSugar._

import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.util

import java.util
import java.lang.ref.WeakReference
import java.util

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Random
Expand Down

0 comments on commit 0d17060

Please sign in to comment.