From 124aab0da39ef9024ff37eb6459afcd2ef18ed82 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 12 Oct 2015 14:33:40 +0800 Subject: [PATCH 1/4] We should not allow local checkpointing after the RDD is materialized and checkpointed. --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a56e542242d5f..846700bb1defa 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1521,8 +1521,15 @@ abstract class RDD[T: ClassTag]( } checkpointData match { - case Some(reliable: ReliableRDDCheckpointData[_]) => logWarning( - "RDD was already marked for reliable checkpointing: overriding with local checkpoint.") + case Some(reliable: ReliableRDDCheckpointData[_]) => + if (isCheckpointed) { + logWarning( + "RDD was already materialized and checkpointed: can't override with local checkpoint.") + return this + } else { + logWarning( + "RDD was already marked for reliable checkpointing: overriding with local checkpoint.") + } case _ => } checkpointData = Some(new LocalRDDCheckpointData(this)) From 448774f719d41d02a0cf2ac6876857001951802e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 16 Oct 2015 00:45:36 +0800 Subject: [PATCH 2/4] Add isCheckpointedAndMaterialized to RDD. --- .../apache/spark/api/java/JavaRDDLike.scala | 5 ++ .../main/scala/org/apache/spark/rdd/RDD.scala | 46 +++++++++++++------ .../java/org/apache/spark/JavaAPISuite.java | 12 +++-- .../org/apache/spark/CheckpointSuite.scala | 6 ++- .../spark/graphx/impl/EdgeRDDImpl.scala | 4 +- .../apache/spark/graphx/impl/GraphImpl.scala | 2 +- .../spark/graphx/impl/VertexRDDImpl.scala | 4 +- .../mllib/impl/PeriodicRDDCheckpointer.scala | 2 +- 8 files changed, 55 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index fc817cdd6a3f8..7e130cd1e656b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -547,6 +547,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def isCheckpointed: Boolean = rdd.isCheckpointed + /** + * Return whether this RDD has been checkpointed and materialized or not + */ + def isCheckpointedAndMaterialized: Boolean = rdd.isCheckpointedAndMaterialized + /** * Gets the name of the file to which this RDD was checkpointed */ diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 846700bb1defa..9ed866f2e151d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -294,7 +294,11 @@ abstract class RDD[T: ClassTag]( */ private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { - if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context) + if (isCheckpointedAndMaterialized) { + firstParent[T].iterator(split, context) + } else { + compute(split, context) + } } /** @@ -1520,26 +1524,40 @@ abstract class RDD[T: ClassTag]( persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true) } - checkpointData match { - case Some(reliable: ReliableRDDCheckpointData[_]) => - if (isCheckpointed) { - logWarning( - "RDD was already materialized and checkpointed: can't override with local checkpoint.") - return this - } else { - logWarning( - "RDD was already marked for reliable checkpointing: overriding with local checkpoint.") - } - case _ => + // If this RDD is already checkpointed and materialized, its lineage is already truncated. + // We must not override our `checkpointData` in this case because it is needed to recover + // the checkpointed data. If it is overridden, next time materializing on this RDD will + // cause error. + if (isCheckpointedAndMaterialized) { + logWarning("Not marking RDD for local checkpoint because it was already " + + "checkpointed and materialized") + } else { + // Lineage is not truncated yet, so just override any existing checkpoint data with ours + checkpointData match { + case Some(_: ReliableRDDCheckpointData[_]) => logWarning( + "RDD was already marked for reliable checkpointing: overriding with local checkpoint.") + case _ => + } + checkpointData = Some(new LocalRDDCheckpointData(this)) } - checkpointData = Some(new LocalRDDCheckpointData(this)) this } /** * Return whether this RDD is marked for checkpointing, either reliably or locally. */ - def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed) + def isCheckpointed: Boolean = { + checkpointData match { + case Some(_: RDDCheckpointData[_]) => true + case _ => false + } + } + + /** + * Return whether this RDD is marked for checkpointing and materialized, + * either reliably or locally. + */ + def isCheckpointedAndMaterialized: Boolean = checkpointData.exists(_.isCheckpointed) /** * Return whether this RDD is marked for local checkpointing. diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index fd8f7f39b7cc8..314f243f7e25c 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1419,10 +1419,12 @@ public String call(Integer t) { public void checkpointAndComputation() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); sc.setCheckpointDir(tempDir.getAbsolutePath()); - Assert.assertFalse(rdd.isCheckpointed()); + Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); rdd.checkpoint(); - rdd.count(); // Forces the DAG to cause a checkpoint Assert.assertTrue(rdd.isCheckpointed()); + Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); + rdd.count(); // Forces the DAG to cause a checkpoint + Assert.assertTrue(rdd.isCheckpointedAndMaterialized()); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect()); } @@ -1430,10 +1432,12 @@ public void checkpointAndComputation() { public void checkpointAndRestore() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); sc.setCheckpointDir(tempDir.getAbsolutePath()); - Assert.assertFalse(rdd.isCheckpointed()); + Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); rdd.checkpoint(); - rdd.count(); // Forces the DAG to cause a checkpoint Assert.assertTrue(rdd.isCheckpointed()); + Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); + rdd.count(); // Forces the DAG to cause a checkpoint + Assert.assertTrue(rdd.isCheckpointedAndMaterialized()); Assert.assertTrue(rdd.getCheckpointFile().isPresent()); JavaRDD recovered = sc.checkpointFile(rdd.getCheckpointFile().get()); diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 4d70bfed909b6..1000c43320253 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -240,10 +240,12 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging runTest("CheckpointRDD with zero partitions") { reliableCheckpoint: Boolean => val rdd = new BlockRDD[Int](sc, Array[BlockId]()) assert(rdd.partitions.size === 0) - assert(rdd.isCheckpointed === false) + assert(rdd.isCheckpointedAndMaterialized === false) checkpoint(rdd, reliableCheckpoint) - assert(rdd.count() === 0) assert(rdd.isCheckpointed === true) + assert(rdd.isCheckpointedAndMaterialized === false) + assert(rdd.count() === 0) + assert(rdd.isCheckpointedAndMaterialized === true) assert(rdd.partitions.size === 0) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index c88b2f65a86cd..00b2eae3c3f4d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -76,8 +76,8 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( partitionsRDD.checkpoint() } - override def isCheckpointed: Boolean = { - firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed + override def isCheckpointedAndMaterialized: Boolean = { + firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointedAndMaterialized } override def getCheckpointFile: Option[String] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index da95314440d86..0640f93fbc216 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -71,7 +71,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } override def isCheckpointed: Boolean = { - vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed + vertices.isCheckpointedAndMaterialized && replicatedVertexView.edges.isCheckpointedAndMaterialized } override def getCheckpointFiles: Seq[String] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 7f4e7e9d79d6b..5a32f14b49e28 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -77,8 +77,8 @@ class VertexRDDImpl[VD] private[graphx] ( partitionsRDD.checkpoint() } - override def isCheckpointed: Boolean = { - firstParent[ShippableVertexPartition[VD]].isCheckpointed + override def isCheckpointedAndMaterialized: Boolean = { + firstParent[ShippableVertexPartition[VD]].isCheckpointedAndMaterialized } override def getCheckpointFile: Option[String] = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala index f31ed2aa90a64..cfe5463abc90a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala @@ -81,7 +81,7 @@ private[mllib] class PeriodicRDDCheckpointer[T]( override protected def checkpoint(data: RDD[T]): Unit = data.checkpoint() - override protected def isCheckpointed(data: RDD[T]): Boolean = data.isCheckpointed + override protected def isCheckpointed(data: RDD[T]): Boolean = data.isCheckpointedAndMaterialized override protected def persist(data: RDD[T]): Unit = { if (data.getStorageLevel == StorageLevel.NONE) { From dc5849865232965c6f25605fcc8cc9e1e0e46849 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 16 Oct 2015 10:43:46 +0800 Subject: [PATCH 3/4] For comments. --- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 9 ++------- core/src/test/java/org/apache/spark/JavaAPISuite.java | 8 ++++++-- .../test/scala/org/apache/spark/CheckpointSuite.scala | 4 +++- .../scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 4 ++-- .../scala/org/apache/spark/graphx/impl/GraphImpl.scala | 2 +- .../org/apache/spark/graphx/impl/VertexRDDImpl.scala | 4 ++-- .../spark/mllib/impl/PeriodicRDDCheckpointer.scala | 2 +- 8 files changed, 18 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 7e130cd1e656b..5c7548d56cdf3 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -550,7 +550,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return whether this RDD has been checkpointed and materialized or not */ - def isCheckpointedAndMaterialized: Boolean = rdd.isCheckpointedAndMaterialized + private[spark] def isCheckpointedAndMaterialized: Boolean = rdd.isCheckpointedAndMaterialized /** * Gets the name of the file to which this RDD was checkpointed diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9ed866f2e151d..141d17a561b0a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1546,18 +1546,13 @@ abstract class RDD[T: ClassTag]( /** * Return whether this RDD is marked for checkpointing, either reliably or locally. */ - def isCheckpointed: Boolean = { - checkpointData match { - case Some(_: RDDCheckpointData[_]) => true - case _ => false - } - } + def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed) /** * Return whether this RDD is marked for checkpointing and materialized, * either reliably or locally. */ - def isCheckpointedAndMaterialized: Boolean = checkpointData.exists(_.isCheckpointed) + private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed /** * Return whether this RDD is marked for local checkpointing. diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 314f243f7e25c..0818c50499fe4 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1419,11 +1419,13 @@ public String call(Integer t) { public void checkpointAndComputation() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); sc.setCheckpointDir(tempDir.getAbsolutePath()); + Assert.assertFalse(rdd.isCheckpointed()); Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); rdd.checkpoint(); - Assert.assertTrue(rdd.isCheckpointed()); + Assert.assertFalse(rdd.isCheckpointed()); Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); rdd.count(); // Forces the DAG to cause a checkpoint + Assert.assertTrue(rdd.isCheckpointed()); Assert.assertTrue(rdd.isCheckpointedAndMaterialized()); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect()); } @@ -1432,11 +1434,13 @@ public void checkpointAndComputation() { public void checkpointAndRestore() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); sc.setCheckpointDir(tempDir.getAbsolutePath()); + Assert.assertFalse(rdd.isCheckpointed()); Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); rdd.checkpoint(); - Assert.assertTrue(rdd.isCheckpointed()); + Assert.assertFalse(rdd.isCheckpointed()); Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); rdd.count(); // Forces the DAG to cause a checkpoint + Assert.assertTrue(rdd.isCheckpointed()); Assert.assertTrue(rdd.isCheckpointedAndMaterialized()); Assert.assertTrue(rdd.getCheckpointFile().isPresent()); diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 1000c43320253..119e5fc28e412 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -240,11 +240,13 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging runTest("CheckpointRDD with zero partitions") { reliableCheckpoint: Boolean => val rdd = new BlockRDD[Int](sc, Array[BlockId]()) assert(rdd.partitions.size === 0) + assert(rdd.isCheckpointed === false) assert(rdd.isCheckpointedAndMaterialized === false) checkpoint(rdd, reliableCheckpoint) - assert(rdd.isCheckpointed === true) + assert(rdd.isCheckpointed === false) assert(rdd.isCheckpointedAndMaterialized === false) assert(rdd.count() === 0) + assert(rdd.isCheckpointed === true) assert(rdd.isCheckpointedAndMaterialized === true) assert(rdd.partitions.size === 0) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 00b2eae3c3f4d..c88b2f65a86cd 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -76,8 +76,8 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( partitionsRDD.checkpoint() } - override def isCheckpointedAndMaterialized: Boolean = { - firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointedAndMaterialized + override def isCheckpointed: Boolean = { + firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed } override def getCheckpointFile: Option[String] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 0640f93fbc216..da95314440d86 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -71,7 +71,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } override def isCheckpointed: Boolean = { - vertices.isCheckpointedAndMaterialized && replicatedVertexView.edges.isCheckpointedAndMaterialized + vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed } override def getCheckpointFiles: Seq[String] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 5a32f14b49e28..7f4e7e9d79d6b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -77,8 +77,8 @@ class VertexRDDImpl[VD] private[graphx] ( partitionsRDD.checkpoint() } - override def isCheckpointedAndMaterialized: Boolean = { - firstParent[ShippableVertexPartition[VD]].isCheckpointedAndMaterialized + override def isCheckpointed: Boolean = { + firstParent[ShippableVertexPartition[VD]].isCheckpointed } override def getCheckpointFile: Option[String] = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala index cfe5463abc90a..f31ed2aa90a64 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala @@ -81,7 +81,7 @@ private[mllib] class PeriodicRDDCheckpointer[T]( override protected def checkpoint(data: RDD[T]): Unit = data.checkpoint() - override protected def isCheckpointed(data: RDD[T]): Boolean = data.isCheckpointedAndMaterialized + override protected def isCheckpointed(data: RDD[T]): Boolean = data.isCheckpointed override protected def persist(data: RDD[T]): Unit = { if (data.getStorageLevel == StorageLevel.NONE) { From 3958bc2aa307ad3397c9b0408997883b8b5c4252 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 17 Oct 2015 09:17:57 +0800 Subject: [PATCH 4/4] For comments. --- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 5 ----- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 7 ++++--- core/src/test/java/org/apache/spark/JavaAPISuite.java | 8 -------- 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 5c7548d56cdf3..fc817cdd6a3f8 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -547,11 +547,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def isCheckpointed: Boolean = rdd.isCheckpointed - /** - * Return whether this RDD has been checkpointed and materialized or not - */ - private[spark] def isCheckpointedAndMaterialized: Boolean = rdd.isCheckpointedAndMaterialized - /** * Gets the name of the file to which this RDD was checkpointed */ diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 141d17a561b0a..a97bb174438a5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1544,13 +1544,14 @@ abstract class RDD[T: ClassTag]( } /** - * Return whether this RDD is marked for checkpointing, either reliably or locally. + * Return whether this RDD is checkpointed and materialized, either reliably or locally. */ def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed) /** - * Return whether this RDD is marked for checkpointing and materialized, - * either reliably or locally. + * Return whether this RDD is checkpointed and materialized, either reliably or locally. + * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the + * return value. Exposed for testing. */ private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 0818c50499fe4..fd8f7f39b7cc8 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1420,13 +1420,9 @@ public void checkpointAndComputation() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); sc.setCheckpointDir(tempDir.getAbsolutePath()); Assert.assertFalse(rdd.isCheckpointed()); - Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); rdd.checkpoint(); - Assert.assertFalse(rdd.isCheckpointed()); - Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); rdd.count(); // Forces the DAG to cause a checkpoint Assert.assertTrue(rdd.isCheckpointed()); - Assert.assertTrue(rdd.isCheckpointedAndMaterialized()); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect()); } @@ -1435,13 +1431,9 @@ public void checkpointAndRestore() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); sc.setCheckpointDir(tempDir.getAbsolutePath()); Assert.assertFalse(rdd.isCheckpointed()); - Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); rdd.checkpoint(); - Assert.assertFalse(rdd.isCheckpointed()); - Assert.assertFalse(rdd.isCheckpointedAndMaterialized()); rdd.count(); // Forces the DAG to cause a checkpoint Assert.assertTrue(rdd.isCheckpointed()); - Assert.assertTrue(rdd.isCheckpointedAndMaterialized()); Assert.assertTrue(rdd.getCheckpointFile().isPresent()); JavaRDD recovered = sc.checkpointFile(rdd.getCheckpointFile().get());