-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-2792. Fix reading too much or too little data from each stream in ExternalMap / Sorter #1722
Conversation
All these changes are from @mridulm's work in apache#1609, but extracted here to fix this specific issue. This particular set of changes is to make sure that we read exactly the right range of bytes from each spill file in EAOM: some serializers can write bytes after the last object (e.g. the TC_RESET flag in Java serialization) and that would confuse the previous code into reading it as part of the next batch. There are also improvements to cleanup to make sure files are closed.
Modified ExternalSorterSuite to also set a low object stream reset and batch size, and verified that it failed before the changes and succeeded after.
QA tests have started for PR 1722. This patch merges cleanly. |
|
||
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) | ||
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) | ||
ser.deserializeStream(compressedStream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One delta w.r.t. your patch, @mridulm: you used to do ser = serializer.newInstance
before this, but this should not be necessary; our serializers support reading even multiple streams concurrently (though confusingly not writing them as far as I see; they can share an output buffer there). I removed that because creating a new instance is actually kind of expensive for Kryo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So that is something I was not sure of : particularly with kryo (not java).
We were seeing the input buffer getting stepped on from various threads - this was specifically in context of 2G fixes though, where we had to modify the way the buffer was created anyway. I dont know if the initialization changes something else.
Jenkins, test this please |
private val fileStream = new FileInputStream(file) | ||
private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) | ||
extends Iterator[(K, C)] | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I also removed some of the more paranoid asserts about batchSizes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those asserts caught the bugs :-) Bug yeah, some of them might have been expensive.
QA tests have started for PR 1722. This patch merges cleanly. |
QA results for PR 1722: |
LGTM, thanks Matei !
|
Oh wait, is the java serialier change also ported ?
|
+0, I have not actually reviewed this, I only did a cursory pass-through. When it LGTM to @mridulm, we can merge. |
Ah good point.. I've now pushed the JavaSerializer change. |
Jenkins, test this please |
QA tests have started for PR 1722. This patch merges cleanly. |
QA results for PR 1722: |
LGTM ! |
This makes it precise -- before we'd only reset after (reset + 1) writes
I just fixed objectStreamReset slightly so that 1 means "reset after every object" (that's what it was intended to be originally) |
Jenkins, test this please |
QA tests have started for PR 1722. This patch merges cleanly. |
QA results for PR 1722: |
Jenkins, test this please |
QA tests have started for PR 1722. This patch merges cleanly. |
QA results for PR 1722: |
* the stream 'resets' object class descriptions have to be re-written) | ||
*/ | ||
def writeObject[T: ClassTag](t: T): SerializationStream = { | ||
objOut.writeObject(t) | ||
counter += 1 | ||
if (counterReset > 0 && counter >= counterReset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the right behavior, but is a slight change ... I dont think anyone is expecting the earlier behavior though !
LGTM ! |
Alright, I've merged this in. Thanks for looking over it! |
…in ExternalMap / Sorter All these changes are from mridulm's work in #1609, but extracted here to fix this specific issue and make it easier to merge not 1.1. This particular set of changes is to make sure that we read exactly the right range of bytes from each spill file in EAOM: some serializers can write bytes after the last object (e.g. the TC_RESET flag in Java serialization) and that would confuse the previous code into reading it as part of the next batch. There are also improvements to cleanup to make sure files are closed. In addition to bringing in the changes to ExternalAppendOnlyMap, I also copied them to the corresponding code in ExternalSorter and updated its test suite to test for the same issues. Author: Matei Zaharia <matei@databricks.com> Closes #1722 from mateiz/spark-2792 and squashes the following commits: 5d4bfb5 [Matei Zaharia] Make objectStreamReset counter count the last object written too 18fe865 [Matei Zaharia] Update docs on objectStreamReset 576ee83 [Matei Zaharia] Allow objectStreamReset to be 0 0374217 [Matei Zaharia] Remove super paranoid code to close file handles bda37bb [Matei Zaharia] Implement Mridul's ExternalAppendOnlyMap fixes in ExternalSorter too 0d6dad7 [Matei Zaharia] Added Mridul's test changes for ExternalAppendOnlyMap 9a78e4b [Matei Zaharia] Add @mridulm's fixes to ExternalAppendOnlyMap for batch sizes
…in ExternalMap / Sorter All these changes are from mridulm's work in apache#1609, but extracted here to fix this specific issue and make it easier to merge not 1.1. This particular set of changes is to make sure that we read exactly the right range of bytes from each spill file in EAOM: some serializers can write bytes after the last object (e.g. the TC_RESET flag in Java serialization) and that would confuse the previous code into reading it as part of the next batch. There are also improvements to cleanup to make sure files are closed. In addition to bringing in the changes to ExternalAppendOnlyMap, I also copied them to the corresponding code in ExternalSorter and updated its test suite to test for the same issues. Author: Matei Zaharia <matei@databricks.com> Closes apache#1722 from mateiz/spark-2792 and squashes the following commits: 5d4bfb5 [Matei Zaharia] Make objectStreamReset counter count the last object written too 18fe865 [Matei Zaharia] Update docs on objectStreamReset 576ee83 [Matei Zaharia] Allow objectStreamReset to be 0 0374217 [Matei Zaharia] Remove super paranoid code to close file handles bda37bb [Matei Zaharia] Implement Mridul's ExternalAppendOnlyMap fixes in ExternalSorter too 0d6dad7 [Matei Zaharia] Added Mridul's test changes for ExternalAppendOnlyMap 9a78e4b [Matei Zaharia] Add @mridulm's fixes to ExternalAppendOnlyMap for batch sizes
All these changes are from @mridulm's work in #1609, but extracted here to fix this specific issue and make it easier to merge not 1.1. This particular set of changes is to make sure that we read exactly the right range of bytes from each spill file in EAOM: some serializers can write bytes after the last object (e.g. the TC_RESET flag in Java serialization) and that would confuse the previous code into reading it as part of the next batch. There are also improvements to cleanup to make sure files are closed.
In addition to bringing in the changes to ExternalAppendOnlyMap, I also copied them to the corresponding code in ExternalSorter and updated its test suite to test for the same issues.