From 918529bc58985c6dd8991abe5398b52771e1a702 Mon Sep 17 00:00:00 2001 From: Bo Zhang Date: Wed, 24 Jul 2024 16:49:15 -0700 Subject: [PATCH] [SPARK-47764][FOLLOW-UP] Change to use ShuffleDriverComponents.removeShuffle to remove shuffle properly ### What changes were proposed in this pull request? This is a follow-up for https://github.com/apache/spark/pull/45930, where we introduced ShuffleCleanupMode and implemented cleaning up of shuffle dependencies. There was a bug where `ShuffleManager.unregisterShuffle` was used on Driver, and in non-local mode it is not effective at all. This change fixed the bug by changing to use `ShuffleDriverComponents.removeShuffle` instead. ### Why are the changes needed? This is to address the comments in https://github.com/apache/spark/pull/45930#discussion_r1584223064 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46302 from bozhang2820/spark-47764-1. Authored-by: Bo Zhang Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/sql/execution/SQLExecution.scala | 5 ++++- .../org/apache/spark/sql/execution/QueryExecutionSuite.scala | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 7c03bad90ebbc..58fff2d4a1a29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -177,7 +177,10 @@ object SQLExecution extends Logging { shuffleIds.foreach { shuffleId => queryExecution.shuffleCleanupMode match { case RemoveShuffleFiles => - SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) + // Same as what we do in ContextCleaner.doCleanupShuffle, but do not unregister + // the shuffle on MapOutputTracker, so that stage retries would be triggered. + // Set blocking to Utils.isTesting to deflake unit tests. + sc.shuffleDriverComponents.removeShuffle(shuffleId, Utils.isTesting) case SkipMigration => SparkEnv.get.blockManager.migratableResolver.addShuffleToSkip(shuffleId) case _ => // this should not happen diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 3608e7c920767..974be2f627998 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -319,7 +319,7 @@ class QueryExecutionSuite extends SharedSparkSession { val blockManager = spark.sparkContext.env.blockManager blockManager.diskBlockManager.getAllBlocks().foreach { case ShuffleIndexBlockId(shuffleId, _, _) => - spark.sparkContext.env.shuffleManager.unregisterShuffle(shuffleId) + spark.sparkContext.shuffleDriverComponents.removeShuffle(shuffleId, true) case _ => } }