Skip to content

Commit

Permalink
Update tests to check that BufferReleasingStream is closing delegate …
Browse files Browse the repository at this point in the history
…InputStream
  • Loading branch information
massie committed Jun 12, 2015
1 parent f458489 commit 7429a98
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ private[spark] class HashShuffleReader[K, C](

// Create a key/value iterator for each stream
val recordIter = wrappedStreams.flatMap { wrappedStream =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,12 @@ final class ShuffleBlockFetcherIterator(
}
}

/** Helper class that ensures a ManagerBuffer is released upon InputStream.close() */
/**
* Helper class that ensures a ManagedBuffer is release upon InputStream.close()
* Note: the delegate parameter is private[storage] to make it available to tests.
*/
private class BufferReleasingInputStream(
delegate: InputStream,
private[storage] val delegate: InputStream,
iterator: ShuffleBlockFetcherIterator)
extends InputStream {
private var closed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,16 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {

// Make sure we release buffers when a wrapped input stream is closed.
val mockBuf = localBlocks.getOrElse(blockId, remoteBlocks(blockId))
val wrappedInputStream = new BufferReleasingInputStream(inputStream.get, iterator)
// Note: ShuffleBlockFetcherIterator wraps input streams in a BufferReleasingInputStream
val wrappedInputStream = inputStream.get.asInstanceOf[BufferReleasingInputStream]
verify(mockBuf, times(0)).release()
verify(wrappedInputStream.delegate, times(0)).close()
wrappedInputStream.close()
verify(mockBuf, times(1)).release()
verify(wrappedInputStream.delegate, times(1)).close()
wrappedInputStream.close() // close should be idempotent
verify(mockBuf, times(1)).release()
verify(wrappedInputStream.delegate, times(1)).close()
}

// 3 local blocks, and 2 remote blocks
Expand Down

0 comments on commit 7429a98

Please sign in to comment.