From 7429a985cef1c3530fb147f68eabf12aae613a4a Mon Sep 17 00:00:00 2001 From: Matt Massie Date: Fri, 12 Jun 2015 12:13:36 -0700 Subject: [PATCH] Update tests to check that BufferReleasingStream is closing delegate InputStream --- .../org/apache/spark/shuffle/hash/HashShuffleReader.scala | 3 +++ .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 7 +++++-- .../spark/storage/ShuffleBlockFetcherIteratorSuite.scala | 6 +++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 23bae9223bb7c..ca6eddf8d5c12 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -51,6 +51,9 @@ private[spark] class HashShuffleReader[K, C]( // Create a key/value iterator for each stream val recordIter = wrappedStreams.flatMap { wrappedStream => + // Note: the asKeyValueIterator below wraps a key/value iterator inside of a + // NextIterator. The NextIterator makes sure that close() is called on the + // underlying InputStream when all records have been read. serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index a1376c8f4e484..78361f2df6d3a 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -314,9 +314,12 @@ final class ShuffleBlockFetcherIterator( } } -/** Helper class that ensures a ManagerBuffer is released upon InputStream.close() */ +/** + * Helper class that ensures a ManagedBuffer is release upon InputStream.close() + * Note: the delegate parameter is private[storage] to make it available to tests. + */ private class BufferReleasingInputStream( - delegate: InputStream, + private[storage] val delegate: InputStream, iterator: ShuffleBlockFetcherIterator) extends InputStream { private var closed = false diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 89f6713946b4e..4657caf332c5c 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -110,12 +110,16 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite { // Make sure we release buffers when a wrapped input stream is closed. val mockBuf = localBlocks.getOrElse(blockId, remoteBlocks(blockId)) - val wrappedInputStream = new BufferReleasingInputStream(inputStream.get, iterator) + // Note: ShuffleBlockFetcherIterator wraps input streams in a BufferReleasingInputStream + val wrappedInputStream = inputStream.get.asInstanceOf[BufferReleasingInputStream] verify(mockBuf, times(0)).release() + verify(wrappedInputStream.delegate, times(0)).close() wrappedInputStream.close() verify(mockBuf, times(1)).release() + verify(wrappedInputStream.delegate, times(1)).close() wrappedInputStream.close() // close should be idempotent verify(mockBuf, times(1)).release() + verify(wrappedInputStream.delegate, times(1)).close() } // 3 local blocks, and 2 remote blocks