-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47764][CORE][SQL] Cleanup shuffle dependencies based on ShuffleCleanupMode #45930
Conversation
/** | ||
* Mark a shuffle that should not be migrated. | ||
*/ | ||
def addShuffleToSkip(shuffleId: Int): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a default implememtation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
val SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED = | ||
buildConf("spark.sql.shuffleDependency.fileCleanup.enabled") | ||
.doc("When enabled, shuffle dependency files will be cleaned up at the end of SQL " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.doc("When enabled, shuffle dependency files will be cleaned up at the end of SQL " + | |
.doc("When enabled, shuffle files will be cleaned up at the end of SQL " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
@@ -108,7 +108,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { | |||
} else { | |||
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) | |||
val inMemoryRelation = sessionWithConfigsOff.withActive { | |||
val qe = sessionWithConfigsOff.sessionState.executePlan(planToCache) | |||
val qe = sessionWithConfigsOff.sessionState.executePlan( | |||
planToCache, shuffleCleanupMode = DoNotCleanup) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this the default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to be explicit here. Removed the unnecessary argument.
logicalPlan: LogicalPlan, | ||
shuffleCleanupMode: ShuffleCleanupMode): DataFrame = | ||
sparkSession.withActive { | ||
val qe = sparkSession.sessionState.executePlan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we new QueryExecution
here? Then we don't need to touch session state builder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Done.
Some(ShuffleBlockInfo(shuffleId, mapId)) | ||
case _ => | ||
None | ||
} | ||
} | ||
|
||
private val shuffleIdsToSkip = Collections.newSetFromMap[Int](new ConcurrentHashMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the life cycle of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to remove from this Set when the shuffle is unregistered.
@@ -187,6 +187,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager | |||
shuffleBlockResolver.removeDataByMap(shuffleId, mapTaskId) | |||
} | |||
} | |||
shuffleBlockResolver.removeShuffleToSkip(shuffleId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a weird place to do cleanup. Shall we cover all shuffle manager implementations? Shall we do it in the caller of this unregisterShuffle
function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is a bit weird... Changed to use a Guava cache with a fixed maximum size (1000) instead, so that we do not need to do cleanups for shufflesToSkip.
Some(ShuffleBlockInfo(shuffleId, mapId)) | ||
case _ => | ||
None | ||
} | ||
} | ||
|
||
private val shuffleIdsToSkip = | ||
CacheBuilder.newBuilder().maximumSize(1000).build[java.lang.Integer, java.lang.Boolean]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the value does not matter, shall we just use Object
type and always pass null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately Guava cache won't accept null values...
@@ -869,6 +874,8 @@ case class AdaptiveExecutionContext(session: SparkSession, qe: QueryExecution) { | |||
*/ | |||
val stageCache: TrieMap[SparkPlan, ExchangeQueryStageExec] = | |||
new TrieMap[SparkPlan, ExchangeQueryStageExec]() | |||
|
|||
val shuffleIds: TrieMap[Int, Boolean] = new TrieMap[Int, Boolean]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does the value mean? BTW, stageCache
uses TrieMap
because the key is SparkPlan
. For int key, I think normal hash map works fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a concurrent hash map is still required since the context are shared between the main query and all sub queries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, concurrent hash map with int key should be good here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
thanks, merging to master! |
shuffleIds.foreach { shuffleId => | ||
queryExecution.shuffleCleanupMode match { | ||
case RemoveShuffleFiles => | ||
SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we call shuffleDriverComponents.removeShuffle
? We are at driver side, shuffleManager.unregisterShuffle
would do nothing in non-local mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this! Will fix this in a follow-up asap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #46302.
@@ -161,6 +165,24 @@ object SQLExecution extends Logging { | |||
case e => | |||
Utils.exceptionString(e) | |||
} | |||
if (queryExecution.shuffleCleanupMode != DoNotCleanup | |||
&& isExecutedPlanAvailable) { | |||
val shuffleIds = queryExecution.executedPlan match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the root node can be a command. Shall we collect all the AdaptiveSparkPlanExec inside the plan ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh this is a good catch! I think we should. cc @bozhang2820
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could be wrong but I thought DataFrame
s for commands are created in SparkConnectPlanner
, and the ones for queries are only created in SparkConnectPlanExecution
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should clean up shuffles for CTAS and INSERT as well, as they also run queries.
…eCleanupMode ### What changes were proposed in this pull request? This change adds a new trait, `ShuffleCleanupMode` under `QueryExecution`, and two new configs, `spark.sql.shuffleDependency.skipMigration.enabled` and `spark.sql.shuffleDependency.fileCleanup.enabled`. For Spark Connect query executions, `ShuffleCleanupMode` is controlled by the two new configs, and shuffle dependency cleanup are performed accordingly. When `spark.sql.shuffleDependency.fileCleanup.enabled` is `true`, shuffle dependency files will be cleaned up at the end of query executions. When `spark.sql.shuffleDependency.skipMigration.enabled` is `true`, shuffle dependencies will be skipped at the shuffle data migration for node decommissions. ### Why are the changes needed? This is to: 1. speed up shuffle data migration at decommissions and 2. possibly (when file cleanup mode is enabled) release disk space occupied by unused shuffle files. ### Does this PR introduce _any_ user-facing change? Yes. This change adds two new configs, `spark.sql.shuffleDependency.skipMigration.enabled` and `spark.sql.shuffleDependency.fileCleanup.enabled` to control the cleanup behaviors. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45930 from bozhang2820/spark-47764. Authored-by: Bo Zhang <bo.zhang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…Shuffle to remove shuffle properly ### What changes were proposed in this pull request? This is a follow-up for #45930, where we introduced ShuffleCleanupMode and implemented cleaning up of shuffle dependencies. There was a bug where `ShuffleManager.unregisterShuffle` was used on Driver, and in non-local mode it is not effective at all. This change fixed the bug by changing to use `ShuffleDriverComponents.removeShuffle` instead. ### Why are the changes needed? This is to address the comments in #45930 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46302 from bozhang2820/spark-47764-1. Authored-by: Bo Zhang <bo.zhang@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…Shuffle to remove shuffle properly ### What changes were proposed in this pull request? This is a follow-up for apache#45930, where we introduced ShuffleCleanupMode and implemented cleaning up of shuffle dependencies. There was a bug where `ShuffleManager.unregisterShuffle` was used on Driver, and in non-local mode it is not effective at all. This change fixed the bug by changing to use `ShuffleDriverComponents.removeShuffle` instead. ### Why are the changes needed? This is to address the comments in apache#45930 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46302 from bozhang2820/spark-47764-1. Authored-by: Bo Zhang <bo.zhang@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…Shuffle to remove shuffle properly ### What changes were proposed in this pull request? This is a follow-up for apache#45930, where we introduced ShuffleCleanupMode and implemented cleaning up of shuffle dependencies. There was a bug where `ShuffleManager.unregisterShuffle` was used on Driver, and in non-local mode it is not effective at all. This change fixed the bug by changing to use `ShuffleDriverComponents.removeShuffle` instead. ### Why are the changes needed? This is to address the comments in apache#45930 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46302 from bozhang2820/spark-47764-1. Authored-by: Bo Zhang <bo.zhang@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…Shuffle to remove shuffle properly ### What changes were proposed in this pull request? This is a follow-up for apache#45930, where we introduced ShuffleCleanupMode and implemented cleaning up of shuffle dependencies. There was a bug where `ShuffleManager.unregisterShuffle` was used on Driver, and in non-local mode it is not effective at all. This change fixed the bug by changing to use `ShuffleDriverComponents.removeShuffle` instead. ### Why are the changes needed? This is to address the comments in apache#45930 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46302 from bozhang2820/spark-47764-1. Authored-by: Bo Zhang <bo.zhang@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This change adds a new trait,
ShuffleCleanupMode
underQueryExecution
, and two new configs,spark.sql.shuffleDependency.skipMigration.enabled
andspark.sql.shuffleDependency.fileCleanup.enabled
.For Spark Connect query executions,
ShuffleCleanupMode
is controlled by the two new configs, and shuffle dependency cleanup are performed accordingly.When
spark.sql.shuffleDependency.fileCleanup.enabled
istrue
, shuffle dependency files will be cleaned up at the end of query executions.When
spark.sql.shuffleDependency.skipMigration.enabled
istrue
, shuffle dependencies will be skipped at the shuffle data migration for node decommissions.Why are the changes needed?
This is to: 1. speed up shuffle data migration at decommissions and 2. possibly (when file cleanup mode is enabled) release disk space occupied by unused shuffle files.
Does this PR introduce any user-facing change?
Yes. This change adds two new configs,
spark.sql.shuffleDependency.skipMigration.enabled
andspark.sql.shuffleDependency.fileCleanup.enabled
to control the cleanup behaviors.How was this patch tested?
Existing tests.
Was this patch authored or co-authored using generative AI tooling?
No