From 118167f444f05db26e8b1a8b52dd741720ed2447 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Tue, 23 Jul 2024 14:41:47 -0500 Subject: [PATCH] [SPARK-48928] Log Warning for Calling .unpersist() on Locally Checkpointed RDDs ### What changes were proposed in this pull request? This pull request proposes logging a warning message when the `.unpersist()` method is called on RDDs that have been locally checkpointed. The goal is to inform users about the potential risks associated with unpersisting locally checkpointed RDDs without changing the current behavior of the method. ### Why are the changes needed? Local checkpointing truncates the lineage of an RDD, preventing it from being recomputed from its source. If a locally checkpointed RDD is unpersisted, it loses its data and cannot be regenerated, potentially leading to job failures if subsequent actions or transformations are attempted on the RDD (which was seen on some user workloads). Logging a warning message helps users avoid such pitfalls and aids in debugging. ### Does this PR introduce _any_ user-facing change? Yes, this PR adds a warning log message when .unpersist() is called on a locally checkpointed RDD, but it does not alter any existing behavior. ### How was this patch tested? This PR does not change any existing behavior and therefore no tests are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47391 from mingkangli-db/warning_unpersist. Authored-by: Mingkang Li Signed-off-by: Mridul Muralidharan gmail.com> --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ac93abf3fe7a0..0db0133f632bf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -211,6 +211,11 @@ abstract class RDD[T: ClassTag]( * @return This RDD. */ def unpersist(blocking: Boolean = false): this.type = { + if (isLocallyCheckpointed) { + // This means its lineage has been truncated and cannot be recomputed once unpersisted. + logWarning(log"RDD ${MDC(RDD_ID, id)} was locally checkpointed, its lineage has been" + + log" truncated and cannot be recomputed after unpersisting") + } logInfo(log"Removing RDD ${MDC(RDD_ID, id)} from persistence list") sc.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE