Skip to content

Commit

Permalink
For comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 16, 2015
1 parent 448774f commit dc58498
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return whether this RDD has been checkpointed and materialized or not
*/
def isCheckpointedAndMaterialized: Boolean = rdd.isCheckpointedAndMaterialized
private[spark] def isCheckpointedAndMaterialized: Boolean = rdd.isCheckpointedAndMaterialized

/**
* Gets the name of the file to which this RDD was checkpointed
Expand Down
9 changes: 2 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1546,18 +1546,13 @@ abstract class RDD[T: ClassTag](
/**
* Return whether this RDD is marked for checkpointing, either reliably or locally.
*/
def isCheckpointed: Boolean = {
checkpointData match {
case Some(_: RDDCheckpointData[_]) => true
case _ => false
}
}
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)

/**
* Return whether this RDD is marked for checkpointing and materialized,
* either reliably or locally.
*/
def isCheckpointedAndMaterialized: Boolean = checkpointData.exists(_.isCheckpointed)
private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed

/**
* Return whether this RDD is marked for local checkpointing.
Expand Down
8 changes: 6 additions & 2 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1419,11 +1419,13 @@ public String call(Integer t) {
public void checkpointAndComputation() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
sc.setCheckpointDir(tempDir.getAbsolutePath());
Assert.assertFalse(rdd.isCheckpointed());
Assert.assertFalse(rdd.isCheckpointedAndMaterialized());
rdd.checkpoint();
Assert.assertTrue(rdd.isCheckpointed());
Assert.assertFalse(rdd.isCheckpointed());
Assert.assertFalse(rdd.isCheckpointedAndMaterialized());
rdd.count(); // Forces the DAG to cause a checkpoint
Assert.assertTrue(rdd.isCheckpointed());
Assert.assertTrue(rdd.isCheckpointedAndMaterialized());
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
}
Expand All @@ -1432,11 +1434,13 @@ public void checkpointAndComputation() {
public void checkpointAndRestore() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
sc.setCheckpointDir(tempDir.getAbsolutePath());
Assert.assertFalse(rdd.isCheckpointed());
Assert.assertFalse(rdd.isCheckpointedAndMaterialized());
rdd.checkpoint();
Assert.assertTrue(rdd.isCheckpointed());
Assert.assertFalse(rdd.isCheckpointed());
Assert.assertFalse(rdd.isCheckpointedAndMaterialized());
rdd.count(); // Forces the DAG to cause a checkpoint
Assert.assertTrue(rdd.isCheckpointed());
Assert.assertTrue(rdd.isCheckpointedAndMaterialized());

Assert.assertTrue(rdd.getCheckpointFile().isPresent());
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,13 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging
runTest("CheckpointRDD with zero partitions") { reliableCheckpoint: Boolean =>
val rdd = new BlockRDD[Int](sc, Array[BlockId]())
assert(rdd.partitions.size === 0)
assert(rdd.isCheckpointed === false)
assert(rdd.isCheckpointedAndMaterialized === false)
checkpoint(rdd, reliableCheckpoint)
assert(rdd.isCheckpointed === true)
assert(rdd.isCheckpointed === false)
assert(rdd.isCheckpointedAndMaterialized === false)
assert(rdd.count() === 0)
assert(rdd.isCheckpointed === true)
assert(rdd.isCheckpointedAndMaterialized === true)
assert(rdd.partitions.size === 0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
partitionsRDD.checkpoint()
}

override def isCheckpointedAndMaterialized: Boolean = {
firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointedAndMaterialized
override def isCheckpointed: Boolean = {
firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
}

override def getCheckpointFile: Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}

override def isCheckpointed: Boolean = {
vertices.isCheckpointedAndMaterialized && replicatedVertexView.edges.isCheckpointedAndMaterialized
vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed
}

override def getCheckpointFiles: Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class VertexRDDImpl[VD] private[graphx] (
partitionsRDD.checkpoint()
}

override def isCheckpointedAndMaterialized: Boolean = {
firstParent[ShippableVertexPartition[VD]].isCheckpointedAndMaterialized
override def isCheckpointed: Boolean = {
firstParent[ShippableVertexPartition[VD]].isCheckpointed
}

override def getCheckpointFile: Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[mllib] class PeriodicRDDCheckpointer[T](

override protected def checkpoint(data: RDD[T]): Unit = data.checkpoint()

override protected def isCheckpointed(data: RDD[T]): Boolean = data.isCheckpointedAndMaterialized
override protected def isCheckpointed(data: RDD[T]): Boolean = data.isCheckpointed

override protected def persist(data: RDD[T]): Unit = {
if (data.getStorageLevel == StorageLevel.NONE) {
Expand Down

0 comments on commit dc58498

Please sign in to comment.