Skip to content

Commit

Permalink
Fixed issue with Tachyon + new BlockManager methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 8, 2014
1 parent f489fdc commit 61b8d6e
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 36 deletions.
21 changes: 5 additions & 16 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

/**
* Whether the cleaning thread will block on cleanup tasks.
* This is set to true only for tests. */
* This is set to true only for tests.
*/
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", false)

Expand Down Expand Up @@ -133,7 +134,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform RDD cleanup. */
private def doCleanupRDD(rddId: Int, blocking: Boolean) {
def doCleanupRDD(rddId: Int, blocking: Boolean) {
try {
logDebug("Cleaning RDD " + rddId)
sc.unpersistRDD(rddId, blocking)
Expand All @@ -145,7 +146,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform shuffle cleanup, asynchronously. */
private def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
try {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
Expand All @@ -158,7 +159,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform broadcast cleanup. */
private def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
try {
logDebug("Cleaning broadcast " + broadcastId)
broadcastManager.unbroadcast(broadcastId, true, blocking)
Expand All @@ -175,18 +176,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

// Used for testing. These methods explicitly blocks until cleanup is completed
// to ensure that more reliable testing.

def cleanupRDD(rdd: RDD[_]) {
doCleanupRDD(rdd.id, blocking = true)
}

def cleanupShuffle(shuffleDependency: ShuffleDependency[_, _]) {
doCleanupShuffle(shuffleDependency.shuffleId, blocking = true)
}

def cleanupBroadcast[T](broadcast: Broadcast[T]) {
doCleanupBroadcast(broadcast.id, blocking = true)
}
}

private object ContextCleaner {
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,13 @@ class SparkContext(
@volatile private[spark] var dagScheduler = new DAGScheduler(this)
dagScheduler.start()

private[spark] val cleaner: Option[ContextCleaner] =
private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else None

} else {
None
}
}
cleaner.foreach(_.start())

postEnvironmentUpdate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.SparkConf
* entire Spark job.
*/
trait BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager)
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean)
def stop()
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
def stop(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
val start = System.nanoTime
value_ = HttpBroadcast.read[T](id)
/*
* Storing the broadcast data in BlockManager so that all
* so that all subsequent tasks using the broadcast variable
* does not need to fetch it again. The BlockManagerMaster
* does not need to be told about this block as no one
* needs to know about this data block.
* We cache broadcast data in the BlockManager so that subsequent tasks using it
* do not need to re-fetch. This data is only used locally and no other node
* needs to fetch this block, so we don't notify the master.
*/
SparkEnv.get.blockManager.putSingle(
blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,16 @@ private[spark] class BlockManager(
}
}

/** Get the BlockStatus for the block identified by the given ID, if it exists. */
/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
* NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfo.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
val tachyonSize = if (tachyonStore.contains(blockId)) tachyonStore.getSize(blockId) else 0L
BlockStatus(info.level, memSize, diskSize, tachyonSize)
// Assume that block is not in Tachyon
BlockStatus(info.level, memSize, diskSize, 0L)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
getBlockLocation(blockId).file.exists()
}

/** List all the blocks currently stored in disk by the disk manager. */
/** List all the blocks currently stored on disk by the disk manager. */
def getAllBlocks(): Seq[BlockId] = {
// Get all the files inside the array of array of directories
subDirs.flatten.filter(_ != null).flatMap { dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa
}
}

// Should we return previous value directly or as Option?
def putIfAbsent(key: A, value: B): Option[B] = {
val prev = internalMap.putIfAbsent(key, TimeStampedValue(value, currentTime))
Option(prev).map(_.value)
Expand Down Expand Up @@ -148,5 +147,4 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa
def getTimestamp(key: A): Option[Long] = {
getTimeStampedValue(key).map(_.timestamp)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
val tester = new CleanerTester(sc, rddIds = Seq(rdd.id))

// Explicit cleanup
cleaner.cleanupRDD(rdd)
cleaner.doCleanupRDD(rdd.id, blocking = true)
tester.assertCleanup()

// Verify that RDDs can be re-executed after cleaning up
Expand All @@ -70,7 +70,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))

// Explicit cleanup
shuffleDeps.foreach(s => cleaner.cleanupShuffle(s))
shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true))
tester.assertCleanup()

// Verify that shuffles can be re-executed after cleaning up
Expand All @@ -82,7 +82,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))

// Explicit cleanup
cleaner.cleanupBroadcast(broadcast)
cleaner.doCleanupBroadcast(broadcast.id, blocking = true)
tester.assertCleanup()
}

Expand Down

0 comments on commit 61b8d6e

Please sign in to comment.