Skip to content

Commit

Permalink
[SPARK-26265][CORE][FOLLOWUP] Put freePage into a finally block
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Based on the [comment](#23272 (comment)), it seems to be better to put `freePage` into a `finally` block. This patch as a follow-up to do so.

## How was this patch tested?

Existing tests.

Closes #23294 from viirya/SPARK-26265-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
viirya authored and HyukjinKwon committed Dec 15, 2018
1 parent d25e443 commit 1b604c1
Showing 1 changed file with 30 additions and 27 deletions.
57 changes: 30 additions & 27 deletions core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,36 +262,39 @@ private void advanceToNextPage() {
// reference to the page to free and free it after releasing the lock of `MapIterator`.
MemoryBlock pageToFree = null;

synchronized (this) {
int nextIdx = dataPages.indexOf(currentPage) + 1;
if (destructive && currentPage != null) {
dataPages.remove(currentPage);
pageToFree = currentPage;
nextIdx --;
}
if (dataPages.size() > nextIdx) {
currentPage = dataPages.get(nextIdx);
pageBaseObject = currentPage.getBaseObject();
offsetInPage = currentPage.getBaseOffset();
recordsInPage = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage);
offsetInPage += UnsafeAlignedOffset.getUaoSize();
} else {
currentPage = null;
if (reader != null) {
handleFailedDelete();
try {
synchronized (this) {
int nextIdx = dataPages.indexOf(currentPage) + 1;
if (destructive && currentPage != null) {
dataPages.remove(currentPage);
pageToFree = currentPage;
nextIdx--;
}
try {
Closeables.close(reader, /* swallowIOException = */ false);
reader = spillWriters.getFirst().getReader(serializerManager);
recordsInPage = -1;
} catch (IOException e) {
// Scala iterator does not handle exception
Platform.throwException(e);
if (dataPages.size() > nextIdx) {
currentPage = dataPages.get(nextIdx);
pageBaseObject = currentPage.getBaseObject();
offsetInPage = currentPage.getBaseOffset();
recordsInPage = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage);
offsetInPage += UnsafeAlignedOffset.getUaoSize();
} else {
currentPage = null;
if (reader != null) {
handleFailedDelete();
}
try {
Closeables.close(reader, /* swallowIOException = */ false);
reader = spillWriters.getFirst().getReader(serializerManager);
recordsInPage = -1;
} catch (IOException e) {
// Scala iterator does not handle exception
Platform.throwException(e);
}
}
}
}
if (pageToFree != null) {
freePage(pageToFree);
} finally {
if (pageToFree != null) {
freePage(pageToFree);
}
}
}

Expand Down

0 comments on commit 1b604c1

Please sign in to comment.