-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Disable Comet shuffle with AQE coalesce partitions enabled #380
Conversation
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", | ||
CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are sensitive to partitioning, once coalesce partition is disabled, they won't produce same results as Spark. So I added the COMET_SHUFFLE_ENFORCE_MODE_ENABLED
config to enforce Comet shuffle even COALESCE_PARTITIONS_ENABLED
is enabled.
test("Disable Comet shuffle with AQE coalesce partitions enabled") { | ||
withSQLConf( | ||
CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", | ||
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", | ||
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { | ||
val df = sql( | ||
"SELECT * FROM (SELECT * FROM testData WHERE key = 0) t1 FULL JOIN " + | ||
"testData2 t2 ON t1.key = t2.a") | ||
checkShuffleAnswer(df, 0) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test can reproduce the Java Arrow memory leak issue.
@@ -916,7 +916,10 @@ object CometSparkSessionExtensions extends Logging { | |||
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean = | |||
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && | |||
(conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") == | |||
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") | |||
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") && | |||
// TODO: AQE coalesce partitions feature causes Comet shuffle memory leak. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coalesce partitions is a grate feature of AQE, which is enabled by default in Spark. It would be better to handle the combined case in Comet Shuffle rather than disable Comet Shuffle when Coalesce partitions
is enabled. Do you have any clue why there's memory leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned in the ticket, when coalesce partitions is enabled, Spark will combine the partitions of multiple reducers. I suspect that causes incorrect format to read for Java Arrow StreamReader.
We should address this issue further to unblock Comet shuffle with coalesce partitions. But for now, I think it is better to disable it temporarily for the cases we know it will cause some issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for putting this behind a config and disabling by default until we have a solution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an issue logged for the followup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But for now, I think it is better to disable it temporarily for the cases we know it will cause some issues.
Ah, yeah. It makes sense to disable it first if it takes a lot of time and resources to debug and fix later on.
The issue you described seems like a similar problem I encountered when adding support for CometRowToColumnar in #206. So I just went ahead and did a quick investigation based on your branch. It seems that we cannot close the allocator prematurely as the record might still be used in the native side, see these comments for more details: https://github.com/apache/datafusion-comet/pull/206/files#diff-04037044481f9a656275a63ebb6a3a63badf866f19700d4a6909d2e17c8d7b72R37-R46
I also submit a new commit(advancedxy@48517ef) as a potential fix in my branch, hoping that helps you out. The test code is just modified for demonstration purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an issue logged for the followup?
Let me create one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #387 to track it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @advancedxy. I debugged this issue but didn't find a quick fix so decided to disable it temporarily.
I took a look at your branch. The Java allocator instance will report the memory leak when getting closed if it has allocated memory size is larger then zero. So as you did, if we find there is non-zero number (getAllocatedMemory
> 0), we don't close the allocator, it won't report that.
However, I'm not sure if it is correct fix and if we will ignore real memory leak. Maybe it is a false positive one. But If it is real memory leak and we ignore it, it will be a potential issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The allocator is force closed at two places: task completion callback and the CompletionIterator per input stream. The memory leak issue should be reported if the arrow buffers are not released in these two places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is a false positive one. But If it is real memory leak and we ignore it, it will be a potential issue.
Ah, yeah. I’m not 100 percent sure that the memory leak report is a false positive as I haven’t verified at the native side with jvm running(it might be quite tricky). Based on previous experience, the allocator could be closed without failure at task completion though.
😂😂, it comes back to our previous conclusion that we may need to bridge the java side with arrow-rs instead of arrow-java in the long-term. The allocator API in the arrow-java is easy to misuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @viirya
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@@ -916,7 +916,10 @@ object CometSparkSessionExtensions extends Logging { | |||
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean = | |||
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && | |||
(conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") == | |||
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") | |||
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") && | |||
// TODO: AQE coalesce partitions feature causes Comet shuffle memory leak. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an issue logged for the followup?
Thanks for review. I'm going to merge this and we can look at the memory leak issue further in the create ticket. Thank you. |
…he#380) * fix: Disable Comet shuffle with AQE coalesce partitions enabled * Update plan stability * Fix * Fix * Remove debug info * Refine test
Which issue does this PR close?
Closes #381.
Rationale for this change
What changes are included in this PR?
How are these changes tested?