Skip to content

Commit

Permalink
Remove java related
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Dec 6, 2014
1 parent 4d1e249 commit a70c500
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 29 deletions.
18 changes: 10 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,8 @@ abstract class RDD[T: ClassTag](
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with SparkContext.setCheckpointDir() and all references to its parent
* RDDs will be removed. It is strongly recommended that this RDD is persisted in
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint() {
Expand Down Expand Up @@ -1278,20 +1279,21 @@ abstract class RDD[T: ClassTag](
}

// Avoid handling doCheckpoint multiple times to prevent excessive recursion
@transient private var doCheckpointCalled = 0
@transient private var doCheckpointCalled = false

/**
* Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint() {
if (checkpointData == None && doCheckpointCalled == 0) {
dependencies.foreach(_.rdd.doCheckpoint())
doCheckpointCalled = 1
} else if (checkpointData.isDefined && doCheckpointCalled < 2) {
checkpointData.get.doCheckpoint()
doCheckpointCalled = 2
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}

Expand Down
11 changes: 0 additions & 11 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,6 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
assert(flatMappedRDD.collect() === result)
}

test("After call count method, checkpoint should also work") {
val parCollection = sc.makeRDD(1 to 4)
val flatMappedRDD = parCollection.flatMap(x => 1 to x)
flatMappedRDD.count
flatMappedRDD.checkpoint()
assert(flatMappedRDD.dependencies.head.rdd == parCollection)
val result = flatMappedRDD.collect()
assert(flatMappedRDD.dependencies.head.rdd != parCollection)
assert(flatMappedRDD.collect() === result)
}

test("RDDs with one-to-one dependencies") {
testRDD(_.map(x => x.toString))
testRDD(_.flatMap(x => 1 to x))
Expand Down
15 changes: 5 additions & 10 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -376,18 +376,13 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val rdd = sc.parallelize(ring)
val graph = Graph.fromEdges(rdd, 1.0F)
graph.checkpoint()
graph.edges.map(_.attr).count()
graph.vertices.map(_._2).count()

val edgesDependencies = graph.edges.partitionsRDD.dependencies
val verticesDependencies = graph.vertices.partitionsRDD.dependencies
val edges = graph.edges.collect().map(_.attr)
val vertices = graph.vertices.collect().map(_._2)

graph.vertices.count()
graph.edges.count()

assert(graph.edges.partitionsRDD.dependencies != edgesDependencies)
assert(graph.vertices.partitionsRDD.dependencies != verticesDependencies)
assert(graph.vertices.collect().map(_._2) === vertices)
assert(graph.edges.collect().map(_.attr) === edges)
assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
}
}

Expand Down

0 comments on commit a70c500

Please sign in to comment.