Skip to content

Commit

Permalink
[SPARK-48928] Log Warning for Calling .unpersist() on Locally Checkpo…
Browse files Browse the repository at this point in the history
…inted RDDs

### What changes were proposed in this pull request?

This pull request proposes logging a warning message when the `.unpersist()` method is called on RDDs that have been locally checkpointed. The goal is to inform users about the potential risks associated with unpersisting locally checkpointed RDDs without changing the current behavior of the method.

### Why are the changes needed?

Local checkpointing truncates the lineage of an RDD, preventing it from being recomputed from its source. If a locally checkpointed RDD is unpersisted, it loses its data and cannot be regenerated, potentially leading to job failures if subsequent actions or transformations are attempted on the RDD (which was seen on some user workloads). Logging a warning message helps users avoid such pitfalls and aids in debugging.

### Does this PR introduce _any_ user-facing change?

Yes, this PR adds a warning log message when .unpersist() is called on a locally checkpointed RDD, but it does not alter any existing behavior.

### How was this patch tested?

This PR does not change any existing behavior and therefore no tests are added.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47391 from mingkangli-db/warning_unpersist.

Authored-by: Mingkang Li <mingkang.li@databricks.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
mingkangli-db authored and Mridul Muralidharan committed Jul 23, 2024
1 parent c69f02e commit 118167f
Showing 1 changed file with 5 additions and 0 deletions.
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ abstract class RDD[T: ClassTag](
* @return This RDD.
*/
def unpersist(blocking: Boolean = false): this.type = {
if (isLocallyCheckpointed) {
// This means its lineage has been truncated and cannot be recomputed once unpersisted.
logWarning(log"RDD ${MDC(RDD_ID, id)} was locally checkpointed, its lineage has been" +
log" truncated and cannot be recomputed after unpersisting")
}
logInfo(log"Removing RDD ${MDC(RDD_ID, id)} from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
Expand Down

0 comments on commit 118167f

Please sign in to comment.