From a011bfabddc0ca6705a6d59a9112cd4216d0241c Mon Sep 17 00:00:00 2001 From: Matt Massie Date: Thu, 18 Jun 2015 14:51:19 -0700 Subject: [PATCH] Use PrivateMethodTester on check that delegate stream is closed --- .../spark/storage/ShuffleBlockFetcherIterator.scala | 6 +++--- .../storage/ShuffleBlockFetcherIteratorSuite.scala | 11 +++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 78361f2df6d3a..6a9771777776d 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -319,10 +319,10 @@ final class ShuffleBlockFetcherIterator( * Note: the delegate parameter is private[storage] to make it available to tests. */ private class BufferReleasingInputStream( - private[storage] val delegate: InputStream, - iterator: ShuffleBlockFetcherIterator) + private val delegate: InputStream, + private val iterator: ShuffleBlockFetcherIterator) extends InputStream { - private var closed = false + private[this] var closed = false override def read(): Int = delegate.read() diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 4657caf332c5c..9ced4148d7206 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -27,6 +27,7 @@ import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer +import org.scalatest.PrivateMethodTester import org.apache.spark.{SparkFunSuite, TaskContextImpl} import org.apache.spark.network._ @@ -34,7 +35,7 @@ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.shuffle.BlockFetchingListener -class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite { +class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodTester { // Some of the tests are quite tricky because we are testing the cleanup behavior // in the presence of faults. @@ -113,13 +114,15 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite { // Note: ShuffleBlockFetcherIterator wraps input streams in a BufferReleasingInputStream val wrappedInputStream = inputStream.get.asInstanceOf[BufferReleasingInputStream] verify(mockBuf, times(0)).release() - verify(wrappedInputStream.delegate, times(0)).close() + val delegateAccess = PrivateMethod[InputStream]('delegate) + + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(0)).close() wrappedInputStream.close() verify(mockBuf, times(1)).release() - verify(wrappedInputStream.delegate, times(1)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close() wrappedInputStream.close() // close should be idempotent verify(mockBuf, times(1)).release() - verify(wrappedInputStream.delegate, times(1)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close() } // 3 local blocks, and 2 remote blocks