Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-27562][Shuffle]Complete the verification mechanism for shuffle transmitted data #24447

Closed
wants to merge 2 commits into from

Conversation

turboFei
Copy link
Member

@turboFei turboFei commented Apr 24, 2019

What changes were proposed in this pull request?

We've seen some shuffle data corruption during shuffle read phase.

As described in SPARK-26089, spark only checks small shuffle blocks before PR #23453, which is proposed by ankuriitg.

There are two changes/improvements that are made in PR #23453.

  1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as smaller blocks, so if a
    large block is corrupt in the starting, that block will be re-fetched and if that also fails,
    FetchFailureException will be thrown.
  2. If large blocks are corrupt after size maxBytesInFlight/3, then any IOException thrown while
    reading the stream will be converted to FetchFailureException. This is slightly more aggressive
    than was originally intended but since the consumer of the stream may have already read some records and processed them, we can't just re-fetch the block, we need to fail the whole task. Additionally, we also thought about maybe adding a new type of TaskEndReason, which would re-try the task couple of times before failing the previous stage, but given the complexity involved in that solution we decided to not proceed in that direction.

However, I think there still exists some problems with the current shuffle transmitted data verification mechanism:

  • For a large block, it is checked upto maxBytesInFlight/3 size when fetching shuffle data. So if a large block is corrupt after size maxBytesInFlight/3, it can not be detected in data fetch phase. This has been described in the previous section.
  • Only the compressed or wrapped blocks are checked, I think we should also check thease blocks which are not wrapped.

This pr complete the verification mechanism for shuffle transmitted data:

Firstly, crc32 is choosed for the checksum verification of shuffle data.

Crc is also used for checksum verification in hadoop, it is simple and fast.

During shuffle write phase, after completing the partitionedFile, we compute

the crc32 value for each partition and then write these digests with the indexs into shuffle index file.

For the sortShuffleWriter and unsafe shuffle writer, there is only one partitionedFile for a shuffleMapTask, so the compution of digests(compute the digests for each partition depend on the indexs of this partitionedFile) is cheap.

For the bypassShuffleWriter, the reduce partitions is little than byPassMergeThreshold, so the cost of digests compution is acceptable.

During shuffle read phase, the digest value will be passed with the block data.

And we will recompute the digest of the data obtained to compare with the origin digest value.
When recomputing the digest of data obtained, it only need an additional buffer(2048Bytes) for computing crc32 value.
After recomputing, we will reset the obtained data inputStream, if it is markSupported we only need reset it, otherwise it is a fileSegmentManagerBuffer, we need recreate it.

So, I think this verification mechanism proposed for shuffle transmitted data is efficient and complete.

How was this patch tested?

Unit test.

@turboFei
Copy link
Member Author

@squito Could you help to review this?

@turboFei turboFei force-pushed the SPARK-26089 branch 4 times, most recently from 6c7916e to b52e1aa Compare April 24, 2019 03:43
@turboFei
Copy link
Member Author

@attilapiros @srowen

@squito
Copy link
Contributor

squito commented Apr 24, 2019

HI @turboFei thanks for posting this. Have you looked at SPARK-26089 yet? That actually addresses two out of your three concerns:

  • only check the first maxBytesInFlight/3 bytes.
  • need additional memory.

I'm not saying there is no value here -- that change does not handle the 3rd one "only detect the compressed or wrapped data." I also have always worried about whether it really makes sense to rely on the the codec to detect corruption, so using a digest could also make sense. But, this is a large change, so the case should be made clearly.

@squito
Copy link
Contributor

squito commented Apr 24, 2019

@ankuriitg

@squito
Copy link
Contributor

squito commented Apr 24, 2019

oops, I just noticed you actually used that jira for this change. So clearly if we were going to do this, you should open another jira and explain the case for having a checksum. or also happy to hear if there is some mistake in the change already committed for SPARK-26089: 688b0c0

@turboFei
Copy link
Member Author

oops, I just noticed you actually used that jira for this change. So clearly if we were going to do this, you should open another jira and explain the case for having a checksum. or also happy to hear if there is some mistake in the change already committed for SPARK-26089: 688b0c0

Thanks, I will open another jira and make the case clearly.

@turboFei turboFei changed the title [SPARK-26089] Checking the shuffle transmitted data to handle large corrupt shuffle blocks [SPARK-27562][Shuffle]Complete the verification mechanism for shuffle transmitted data Apr 25, 2019
@turboFei
Copy link
Member Author

@squito I have created a new jira[SPARK-27562] and describe the schema.

@turboFei
Copy link
Member Author

@jerryshao

@turboFei turboFei force-pushed the SPARK-26089 branch 9 times, most recently from 8e2baff to 853542e Compare April 27, 2019 05:09
@turboFei
Copy link
Member Author

turboFei commented Apr 27, 2019

@cloud-fan Could you help to review this? I think this pr can guarantee the accuracy of shuffle data transmission efficiently.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 30, 2019
@github-actions github-actions bot closed this Dec 31, 2019
@jerryshao
Copy link
Contributor

This seems to be a quite useful feature in heavy load production environment. We're also suffering from such shuffle issue, I think this is a valuable improvement.

@turboFei will you continue to work on this PR? If so, I'm going to reopen and help to review it.

@cloud-fan
Copy link
Contributor

This is a nice feature. We need to think about how to integrate it with the batch shuffle fetch though.

@turboFei
Copy link
Member Author

I will try to finish it. Please help reopen it, thanks.

@cloud-fan cloud-fan reopened this May 12, 2020
@github-actions github-actions bot closed this May 13, 2020
@jerryshao
Copy link
Contributor

Not sure why this PR is closed again, I'm gonna reopen it. @turboFei seems there's lots conflicts in code, can you please bring this to the latest?

@jerryshao jerryshao reopened this May 13, 2020
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@turboFei
Copy link
Member Author

I will.

@turboFei
Copy link
Member Author

turboFei commented May 13, 2020

It seems that we have to recompute the crc32 value when fetching continuous blocks data.

@turboFei turboFei changed the title [SPARK-27562][Shuffle]Complete the verification mechanism for shuffle transmitted data [WIP][SPARK-27562][Shuffle]Complete the verification mechanism for shuffle transmitted data May 13, 2020
@github-actions github-actions bot closed this May 14, 2020
@turboFei
Copy link
Member Author

The behavior of bot is strange, have created a new pr, #28525

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants