From 620eca349808befa6c339bc5acc351c484495557 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 25 Mar 2014 13:05:47 -0700 Subject: [PATCH] Changes based on PR comments. --- .../org/apache/spark/MapOutputTracker.scala | 4 ++-- .../storage/BlockManagerMasterActor.scala | 2 +- .../apache/spark/util/BoundedHashMap.scala | 4 ++-- .../spark/util/TimeStampedHashMap.scala | 4 ++-- .../util/TimeStampedWeakValueHashMap.scala | 19 ++++++++----------- .../spark/util/WrappedJavaHashMap.scala | 10 +++++++--- .../spark/util/WrappedJavaHashMapSuite.scala | 10 +++++----- 7 files changed, 27 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ffdf9115e1aae..ad9ee73e6b2e0 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -70,7 +70,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * their cache of map output locations if this happens. */ protected var epoch: Long = 0 - protected val epochLock = new java.lang.Object + protected val epochLock = new AnyRef /** Remembers which map output locations are currently being fetched on a worker */ private val fetching = new HashSet[Int] @@ -305,7 +305,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) cachedSerializedStatuses.clear() } - protected def cleanup(cleanupTime: Long) { + private def cleanup(cleanupTime: Long) { mapStatuses.clearOldValues(cleanupTime) cachedSerializedStatuses.clearOldValues(cleanupTime) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 64cbedc8afcd3..cefbd28511bfd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -150,7 +150,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private def removeShuffle(shuffleId: Int) { // Nothing to do in the BlockManagerMasterActor data structures val removeMsg = RemoveShuffle(shuffleId) - blockManagerInfo.values.map { bm => + blockManagerInfo.values.foreach { bm => bm.slaveActor ! removeMsg } } diff --git a/core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala b/core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala index c4f7df1ee0a7b..888a06b2408c9 100644 --- a/core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala @@ -45,14 +45,14 @@ import scala.reflect.ClassTag private[spark] class BoundedHashMap[A, B](bound: Int, useLRU: Boolean) extends WrappedJavaHashMap[A, B, A, B] with SynchronizedMap[A, B] { - protected[util] val internalJavaMap = Collections.synchronizedMap(new LinkedHashMap[A, B]( + private[util] val internalJavaMap = Collections.synchronizedMap(new LinkedHashMap[A, B]( bound / 8, (0.75).toFloat, useLRU) { override protected def removeEldestEntry(eldest: JMapEntry[A, B]): Boolean = { size() > bound } }) - protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { + private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { new BoundedHashMap[K1, V1](bound, useLRU) } diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index 60901c5e36130..c4d770fecdf74 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -39,9 +39,9 @@ private[util] case class TimeStampedValue[T](timestamp: Long, value: T) private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false) extends WrappedJavaHashMap[A, B, A, TimeStampedValue[B]] with Logging { - protected[util] val internalJavaMap = new ConcurrentHashMap[A, TimeStampedValue[B]]() + private[util] val internalJavaMap = new ConcurrentHashMap[A, TimeStampedValue[B]]() - protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { + private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { new TimeStampedHashMap[K1, V1]() } diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala index bd86d78b8010f..09a6faf33ec60 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala @@ -51,11 +51,11 @@ private[spark] class TimeStampedWeakValueHashMap[A, B]() /** Counter for counting the number of inserts */ private val insertCounts = new AtomicInteger(0) - protected[util] val internalJavaMap: util.Map[A, TimeStampedWeakValue[B]] = { + private[util] val internalJavaMap: util.Map[A, TimeStampedWeakValue[B]] = { new ConcurrentHashMap[A, TimeStampedWeakValue[B]]() } - protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { + private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { new TimeStampedWeakValueHashMap[K1, V1]() } @@ -68,15 +68,12 @@ private[spark] class TimeStampedWeakValueHashMap[A, B]() } override def get(key: A): Option[B] = { - Option(internalJavaMap.get(key)) match { - case Some(weakValue) => - val value = weakValue.weakValue.get - if (value == null) { - internalJavaMap.remove(key) - } - Option(value) - case None => - None + Option(internalJavaMap.get(key)).flatMap { weakValue => + val value = weakValue.weakValue.get + if (value == null) { + internalJavaMap.remove(key) + } + Option(value) } } diff --git a/core/src/main/scala/org/apache/spark/util/WrappedJavaHashMap.scala b/core/src/main/scala/org/apache/spark/util/WrappedJavaHashMap.scala index 59e35c3abf172..6cc3007f5d7ac 100644 --- a/core/src/main/scala/org/apache/spark/util/WrappedJavaHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/WrappedJavaHashMap.scala @@ -46,11 +46,15 @@ private[spark] abstract class WrappedJavaHashMap[K, V, IK, IV] extends Map[K, V] /* Methods that must be defined. */ - /** Internal Java HashMap that is being wrapped. */ - protected[util] val internalJavaMap: JMap[IK, IV] + /** + * Internal Java HashMap that is being wrapped. + * Scoped private[util] so that rest of Spark code cannot + * directly access the internal map. + */ + private[util] val internalJavaMap: JMap[IK, IV] /** Method to get a new instance of the internal Java HashMap. */ - protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] + private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] /* Methods that convert between internal and external types. These implementations diff --git a/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala index 37c1f748a6f3d..e446c7f75dc0b 100644 --- a/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala @@ -17,12 +17,12 @@ package org.apache.spark.util -import scala.collection.mutable.{ArrayBuffer, HashMap, Map} -import scala.util.Random - import java.util import java.lang.ref.WeakReference +import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.util.Random + import org.scalatest.FunSuite class WrappedJavaHashMapSuite extends FunSuite { @@ -203,9 +203,9 @@ class WrappedJavaHashMapSuite extends FunSuite { } class TestMap[A, B] extends WrappedJavaHashMap[A, B, A, B] { - protected[util] val internalJavaMap: util.Map[A, B] = new util.HashMap[A, B]() + private[util] val internalJavaMap: util.Map[A, B] = new util.HashMap[A, B]() - protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { + private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { new TestMap[K1, V1] } }