From e68272441cd7af46faa77d134cb0fc4a4ad18b28 Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Sun, 28 Sep 2014 23:21:34 +0800 Subject: [PATCH 1/3] Graph should support the checkpoint operation --- .../main/scala/org/apache/spark/rdd/RDD.scala | 15 +++++------ .../org/apache/spark/CheckpointSuite.scala | 11 ++++++++ .../scala/org/apache/spark/graphx/Graph.scala | 2 ++ .../spark/graphx/impl/EdgeRDDImpl.scala | 4 +++ .../apache/spark/graphx/impl/GraphImpl.scala | 5 ++++ .../spark/graphx/impl/VertexRDDImpl.scala | 4 +++ .../org/apache/spark/graphx/GraphSuite.scala | 26 +++++++++++++++++++ 7 files changed, 59 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 214f22bc5b603..5518fc520295a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1279,7 +1279,7 @@ abstract class RDD[T: ClassTag]( } // Avoid handling doCheckpoint multiple times to prevent excessive recursion - @transient private var doCheckpointCalled = false + @transient private var doCheckpointCalled = 0 /** * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD @@ -1287,13 +1287,12 @@ abstract class RDD[T: ClassTag]( * doCheckpoint() is called recursively on the parent RDDs. */ private[spark] def doCheckpoint() { - if (!doCheckpointCalled) { - doCheckpointCalled = true - if (checkpointData.isDefined) { - checkpointData.get.doCheckpoint() - } else { - dependencies.foreach(_.rdd.doCheckpoint()) - } + if (checkpointData == None && doCheckpointCalled == 0) { + dependencies.foreach(_.rdd.doCheckpoint()) + doCheckpointCalled = 1 + } else if (checkpointData.isDefined && doCheckpointCalled < 2) { + checkpointData.get.doCheckpoint() + doCheckpointCalled = 2 } } diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 3b10b3a042317..b62744eb69396 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -55,6 +55,17 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { assert(flatMappedRDD.collect() === result) } + test("After call count method, checkpoint should also work") { + val parCollection = sc.makeRDD(1 to 4) + val flatMappedRDD = parCollection.flatMap(x => 1 to x) + flatMappedRDD.count + flatMappedRDD.checkpoint() + assert(flatMappedRDD.dependencies.head.rdd == parCollection) + val result = flatMappedRDD.collect() + assert(flatMappedRDD.dependencies.head.rdd != parCollection) + assert(flatMappedRDD.collect() === result) + } + test("RDDs with one-to-one dependencies") { testRDD(_.map(x => x.toString)) testRDD(_.flatMap(x => 1 to x)) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 637791543514c..810064193e7e6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -96,6 +96,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def cache(): Graph[VD, ED] + def checkpoint():Unit + /** * Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative * algorithms that modify the vertex attributes but reuse the edges. This method can be used to diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 897c7ee12a436..d090b2c01b2ff 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -64,6 +64,10 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( this } + override def checkpoint(): Unit = { + partitionsRDD.checkpoint() + } + /** Persists the edge partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */ override def cache(): this.type = { partitionsRDD.persist(targetStorageLevel) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 0eae2a673874a..a617d84aea9d4 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( this } + override def checkpoint(): Unit = { + vertices.checkpoint() + replicatedVertexView.edges.checkpoint() + } + override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { vertices.unpersist(blocking) // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 9732c5b00c6d9..a219ce765f127 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -65,6 +65,10 @@ class VertexRDDImpl[VD] private[graphx] ( this } + override def checkpoint(): Unit = { + partitionsRDD.checkpoint() + } + /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */ override def cache(): this.type = { partitionsRDD.persist(targetStorageLevel) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index a05d1ddb21295..3991de30b4131 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.graphx import org.scalatest.FunSuite +import com.google.common.io.Files + import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -365,4 +367,28 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("checkpoint") { + val checkpointDir = Files.createTempDir() + checkpointDir.deleteOnExit() + withSpark { sc => + sc.setCheckpointDir(checkpointDir.getAbsolutePath) + val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)} + val rdd = sc.parallelize(ring) + val graph = Graph.fromEdges(rdd, 1.0F) + graph.checkpoint() + val edgesDependencies = graph.edges.partitionsRDD.dependencies + val verticesDependencies = graph.vertices.partitionsRDD.dependencies + val edges = graph.edges.collect().map(_.attr) + val vertices = graph.vertices.collect().map(_._2) + + graph.vertices.count() + graph.edges.count() + + assert(graph.edges.partitionsRDD.dependencies != edgesDependencies) + assert(graph.vertices.partitionsRDD.dependencies != verticesDependencies) + assert(graph.vertices.collect().map(_._2) === vertices) + assert(graph.edges.collect().map(_.attr) === edges) + } + } + } From 4d1e249461d9f162ca82926d260ad7ef136b388f Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Wed, 22 Oct 2014 22:34:04 +0800 Subject: [PATCH 2/3] Add comments --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 3 +-- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 8 +++++++- .../scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 4 ---- .../org/apache/spark/graphx/impl/VertexRDDImpl.scala | 4 ---- 4 files changed, 8 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 5518fc520295a..55a8f8a921c73 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1210,8 +1210,7 @@ abstract class RDD[T: ClassTag]( /** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with SparkContext.setCheckpointDir() and all references to its parent - * RDDs will be removed. This function must be called before any job has been - * executed on this RDD. It is strongly recommended that this RDD is persisted in + * RDDs will be removed. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ def checkpoint() { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 810064193e7e6..23538b71562de 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -96,7 +96,13 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def cache(): Graph[VD, ED] - def checkpoint():Unit + /** + * Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint + * directory set with SparkContext.setCheckpointDir() and all references to its parent + * RDDs will be removed. It is strongly recommended that this Graph is persisted in + * memory, otherwise saving it on a file will require recomputation. + */ + def checkpoint(): Unit /** * Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index d090b2c01b2ff..897c7ee12a436 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -64,10 +64,6 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( this } - override def checkpoint(): Unit = { - partitionsRDD.checkpoint() - } - /** Persists the edge partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */ override def cache(): this.type = { partitionsRDD.persist(targetStorageLevel) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index a219ce765f127..9732c5b00c6d9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -65,10 +65,6 @@ class VertexRDDImpl[VD] private[graphx] ( this } - override def checkpoint(): Unit = { - partitionsRDD.checkpoint() - } - /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */ override def cache(): this.type = { partitionsRDD.persist(targetStorageLevel) From a70c5001977b7ab0a10716f69190ed0a6a797d5d Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Sat, 6 Dec 2014 09:51:30 +0800 Subject: [PATCH 3/3] Remove java related --- .../main/scala/org/apache/spark/rdd/RDD.scala | 18 ++++++++++-------- .../org/apache/spark/CheckpointSuite.scala | 11 ----------- .../org/apache/spark/graphx/GraphSuite.scala | 15 +++++---------- 3 files changed, 15 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 55a8f8a921c73..214f22bc5b603 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1210,7 +1210,8 @@ abstract class RDD[T: ClassTag]( /** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with SparkContext.setCheckpointDir() and all references to its parent - * RDDs will be removed. It is strongly recommended that this RDD is persisted in + * RDDs will be removed. This function must be called before any job has been + * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ def checkpoint() { @@ -1278,7 +1279,7 @@ abstract class RDD[T: ClassTag]( } // Avoid handling doCheckpoint multiple times to prevent excessive recursion - @transient private var doCheckpointCalled = 0 + @transient private var doCheckpointCalled = false /** * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD @@ -1286,12 +1287,13 @@ abstract class RDD[T: ClassTag]( * doCheckpoint() is called recursively on the parent RDDs. */ private[spark] def doCheckpoint() { - if (checkpointData == None && doCheckpointCalled == 0) { - dependencies.foreach(_.rdd.doCheckpoint()) - doCheckpointCalled = 1 - } else if (checkpointData.isDefined && doCheckpointCalled < 2) { - checkpointData.get.doCheckpoint() - doCheckpointCalled = 2 + if (!doCheckpointCalled) { + doCheckpointCalled = true + if (checkpointData.isDefined) { + checkpointData.get.doCheckpoint() + } else { + dependencies.foreach(_.rdd.doCheckpoint()) + } } } diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index b62744eb69396..3b10b3a042317 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -55,17 +55,6 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { assert(flatMappedRDD.collect() === result) } - test("After call count method, checkpoint should also work") { - val parCollection = sc.makeRDD(1 to 4) - val flatMappedRDD = parCollection.flatMap(x => 1 to x) - flatMappedRDD.count - flatMappedRDD.checkpoint() - assert(flatMappedRDD.dependencies.head.rdd == parCollection) - val result = flatMappedRDD.collect() - assert(flatMappedRDD.dependencies.head.rdd != parCollection) - assert(flatMappedRDD.collect() === result) - } - test("RDDs with one-to-one dependencies") { testRDD(_.map(x => x.toString)) testRDD(_.flatMap(x => 1 to x)) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 3991de30b4131..9da0064104fb6 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -376,18 +376,13 @@ class GraphSuite extends FunSuite with LocalSparkContext { val rdd = sc.parallelize(ring) val graph = Graph.fromEdges(rdd, 1.0F) graph.checkpoint() + graph.edges.map(_.attr).count() + graph.vertices.map(_._2).count() + val edgesDependencies = graph.edges.partitionsRDD.dependencies val verticesDependencies = graph.vertices.partitionsRDD.dependencies - val edges = graph.edges.collect().map(_.attr) - val vertices = graph.vertices.collect().map(_._2) - - graph.vertices.count() - graph.edges.count() - - assert(graph.edges.partitionsRDD.dependencies != edgesDependencies) - assert(graph.vertices.partitionsRDD.dependencies != verticesDependencies) - assert(graph.vertices.collect().map(_._2) === vertices) - assert(graph.edges.collect().map(_.attr) === edges) + assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) + assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) } }