Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3948][Shuffle]Fix stream corruption bug in sort-based shuffle #2824

Closed
wants to merge 3 commits into from

Conversation

jerryshao
Copy link
Contributor

Kernel 2.6.32 bug will lead to unexpected behavior of transferTo in copyStream, and this will corrupt the shuffle output file in sort-based shuffle, which will somehow introduce PARSING_ERROR(2), deserialization error or offset out of range. Here fix this by adding append flag, also add some position checking code. Details can be seen in SPARK-3948.

// Position will not be increased to the expected length after calling transferTo in
// kernel version 2.6.32, this issue can be seen in
// scalastyle:off
// https://bugs.openjdk.java.net/browse/JDK-7052359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

en...I guess this line will trigger scalastyle checker error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, I found some code in KafkaInputDStream also use this scalastyle:off to turn off scalacheck.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I didn't notice that line, I think that shall be good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove url part after the '?' here.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21805/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 16, 2014

QA tests have started for PR 2824 at commit e17ada2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 16, 2014

QA tests have finished for PR 2824 at commit e17ada2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21807/
Test PASSed.

@adrian-wang
Copy link
Contributor

The patch works fine for me, on my 2.6.32 cluster. Thanks!
@dbtsai You should also try this, for your SPARK-3630 issue.

@pwendell
Copy link
Contributor

@adrian-wang, Just so I understand - were you seeing the issue before applying this patch, and then the patch made it go away?

@jerryshao Could we also have a branch-1.1 version of this that simply logs an error instead of throwing an exception (via assert)? I just want to be more conservative with the existing branch and not cause programs to terminate.

@JoshRosen
Copy link
Contributor

Maybe this is being overly-conservative, but could we add an undocumented configuration option that allows users to bypass the transferTo here? If we ship 1.1.1 or 1.2 and this bug resurfaces, it would be nice if there was a way for users to revert back to the older version of this code without having to recompile. A lot of users can't simply upgrade their kernel, so I think we need to offer an easy workaround for them.

@adrian-wang
Copy link
Contributor

@pwendell yes, I was suffering from running some certain queries over sort-base shuffle, just like the discussion in SPARK-3630. And with this patch the issue is gone. Thanks! -- My cluster is 4-node redhat 6.2, with kernel 2.6.32.

@jerryshao
Copy link
Contributor Author

I think if bug is occurred when running job, even if we do not throw an exception here, we will still meet other exceptions in reduce side, so I use assert here. Besides I think an undocumented config is a good idea, but since we need SparkConf object to pass it, some other code path which use copyStream also need to change.

@jerryshao
Copy link
Contributor Author

Hi @JoshRosen , I just add a configuration that can bypass the NIO way of copying stream. Would you mind taking a look at it?

@SparkQA
Copy link

SparkQA commented Oct 17, 2014

QA tests have started for PR 2824 at commit a82b184.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 17, 2014

QA tests have finished for PR 2824 at commit a82b184.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21854/
Test PASSed.

val finalPos = outChannel.position()
assert(finalPos == initialPos + size,
s"""
|Current position $finalPos do not equal to expected position ${initialPos + count}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert checks whether finalPos is initialPos + size, but this error message uses initialPos + count; could this lead to confusion?

I suppose that count >= size here, so it's probably fine, but it might be confusing if count was ever greater than size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think normally size would be equal to count. I will change to size to keep consistency.

@SparkQA
Copy link

SparkQA commented Oct 20, 2014

QA tests have started for PR 2824 at commit be0533a.

  • This patch merges cleanly.

@jerryshao
Copy link
Contributor Author

Hi @JoshRosen , I just set transferToEnabled to false as default value, unless users explicitly set it to true, transferTo will not be enabled.

Currently, only ExternalSorter use this API as file to file copying and this is controlled by configuration spark.file.transferTo, other uses of copyStream in Spark code are all not file to file copying, so this parameter will not take effect.

If future uses of copyStream, user have to get transferToEnabled from configuration, I add some usage notes here. Still user can bypass spark.file.transferTo and directly set this parameter to true, but they have to be responsible for the correctness of usage.

The reason I didn't take SparkConf as a parameter to control the behavior is that it should modify lots of the current codes to get SparkConf in which it calls copyStream.

So what is your opinion? Thanks a lot.

@JoshRosen
Copy link
Contributor

HI @jerryshao,

Changing the default is exactly what I had in mind. This looks good to me! (Going to bed now; I'll merge this tomorrow and backport to branch-1.1)

@jerryshao
Copy link
Contributor Author

Thanks a lot :).

@SparkQA
Copy link

SparkQA commented Oct 20, 2014

QA tests have finished for PR 2824 at commit be0533a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21906/
Test PASSed.

@asfgit asfgit closed this in c7aeecd Oct 20, 2014
@JoshRosen
Copy link
Contributor

I've merged this into master and branch-1.1.

Thanks a lot :).

Thank YOU (and @mridulm) for helping to diagnose this really subtle bug!

asfgit pushed a commit that referenced this pull request Oct 20, 2014
Kernel 2.6.32 bug will lead to unexpected behavior of transferTo in copyStream, and this will corrupt the shuffle output file in sort-based shuffle, which will somehow introduce PARSING_ERROR(2), deserialization error or offset out of range. Here fix this by adding append flag, also add some position checking code. Details can be seen in [SPARK-3948](https://issues.apache.org/jira/browse/SPARK-3948).

Author: jerryshao <saisai.shao@intel.com>

Closes #2824 from jerryshao/SPARK-3948 and squashes the following commits:

be0533a [jerryshao] Address the comments
a82b184 [jerryshao] add configuration to control the NIO way of copying stream
e17ada2 [jerryshao] Fix kernel 2.6.32 bug led unexpected behavior of transferTo

(cherry picked from commit c7aeecd)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/util/Utils.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants