Skip to content

Commit

Permalink
Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Apr 17, 2015
1 parent e5949c2 commit b08b3c9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
listeners.foreach(_.checkpointCleaned(rddId))
logInfo("Cleaned rdd checkpoint data " + rddId)
}
catch {
Expand All @@ -260,4 +261,5 @@ private[spark] trait CleanerListener {
def shuffleCleaned(shuffleId: Int)
def broadcastCleaned(broadcastId: Long)
def accumCleaned(accId: Long)
def checkpointCleaned(rddId: Long)
}
12 changes: 10 additions & 2 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,14 @@ class CleanerTester(
sc: SparkContext,
rddIds: Seq[Int] = Seq.empty,
shuffleIds: Seq[Int] = Seq.empty,
broadcastIds: Seq[Long] = Seq.empty)
broadcastIds: Seq[Long] = Seq.empty,
checkpointIds: Seq[Long] = Seq.empty)
extends Logging {

val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal

val cleanerListener = new CleanerListener {
Expand All @@ -433,6 +435,11 @@ class CleanerTester(
def accumCleaned(accId: Long): Unit = {
logInfo("Cleaned accId " + accId + " cleaned")
}

def checkpointCleaned(rddId: Long): Unit = {
toBeCheckpointIds -= rddId
logInfo("checkpoint rddId " + rddId + " cleaned")
}
}

val MAX_VALIDATION_ATTEMPTS = 10
Expand Down Expand Up @@ -547,7 +554,8 @@ class CleanerTester(
private def isAllCleanedUp =
toBeCleanedRDDIds.isEmpty &&
toBeCleanedShuffleIds.isEmpty &&
toBeCleanedBroadcstIds.isEmpty
toBeCleanedBroadcstIds.isEmpty &&
toBeCheckpointIds.isEmpty

private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {
Expand Down

0 comments on commit b08b3c9

Please sign in to comment.