From 1bdd7eec5d9ddb5a9eb33c9733878aea3ca26ba6 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 29 Sep 2014 12:07:53 -0700 Subject: [PATCH] Fixed tests. --- .../network/netty/BlockClientHandler.scala | 2 ++ .../storage/ShuffleBlockFetcherIterator.scala | 2 +- .../spark/network/netty/ProtocolSuite.scala | 25 +++++++++++++++++-- .../netty/ServerClientIntegrationSuite.scala | 6 +++++ 4 files changed, 32 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/BlockClientHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/BlockClientHandler.scala index 1a74c6649f28a..466ece99b9b96 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/BlockClientHandler.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/BlockClientHandler.scala @@ -86,9 +86,11 @@ class BlockClientHandler extends SimpleChannelInboundHandler[ServerResponse] wit val listener = outstandingRequests.get(blockId) if (listener == null) { logWarning(s"Got a response for block $blockId from $server but it is not outstanding") + buf.release() } else { outstandingRequests.remove(blockId) listener.onBlockFetchSuccess(blockId, buf) + buf.release() } case BlockFetchFailure(blockId, errorMsg) => val listener = outstandingRequests.get(blockId) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 38486c1ded9ea..d095452a261db 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -155,7 +155,7 @@ final class ShuffleBlockFetcherIterator( shuffleMetrics.remoteBytesRead += buf.size shuffleMetrics.remoteBlocksFetched += 1 } - logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) + logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { diff --git a/core/src/test/scala/org/apache/spark/network/netty/ProtocolSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/ProtocolSuite.scala index 72034634a5bd2..46604ea1fb624 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/ProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/ProtocolSuite.scala @@ -71,18 +71,39 @@ class ProtocolSuite extends FunSuite { assert(msg === serverChannel.readInbound()) } - test("server to client protocol") { + test("server to client protocol - BlockFetchSuccess(\"a1234\", new TestManagedBuffer(10))") { testServerToClient(BlockFetchSuccess("a1234", new TestManagedBuffer(10))) + } + + test("server to client protocol - BlockFetchSuccess(\"\", new TestManagedBuffer(0))") { testServerToClient(BlockFetchSuccess("", new TestManagedBuffer(0))) + } + + test("server to client protocol - BlockFetchFailure(\"abcd\", \"this is an error\")") { testServerToClient(BlockFetchFailure("abcd", "this is an error")) + } + + test("server to client protocol - BlockFetchFailure(\"\", \"\")") { testServerToClient(BlockFetchFailure("", "")) } - test("client to server protocol") { + test("client to server protocol - BlockFetchRequest(Seq.empty[String])") { testClientToServer(BlockFetchRequest(Seq.empty[String])) + } + + test("client to server protocol - BlockFetchRequest(Seq(\"b1\"))") { testClientToServer(BlockFetchRequest(Seq("b1"))) + } + + test("client to server protocol - BlockFetchRequest(Seq(\"b1\", \"b2\", \"b3\"))") { testClientToServer(BlockFetchRequest(Seq("b1", "b2", "b3"))) + } + + ignore("client to server protocol - BlockUploadRequest(\"\", new TestManagedBuffer(0))") { testClientToServer(BlockUploadRequest("", new TestManagedBuffer(0))) + } + + ignore("client to server protocol - BlockUploadRequest(\"b_upload\", new TestManagedBuffer(10))") { testClientToServer(BlockUploadRequest("b_upload", new TestManagedBuffer(10))) } } diff --git a/core/src/test/scala/org/apache/spark/network/netty/ServerClientIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/ServerClientIntegrationSuite.scala index 98e896221f910..35ff90a2dabc5 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/ServerClientIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/ServerClientIntegrationSuite.scala @@ -112,6 +112,7 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll { } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { + data.retain() receivedBlockIds.add(blockId) receivedBuffers.add(data) sem.release() @@ -130,6 +131,7 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll { assert(blockIds === Set(bufferBlockId)) assert(buffers.map(_.convertToNetty()) === Set(byteBufferBlockReference)) assert(failBlockIds.isEmpty) + buffers.foreach(_.release()) } test("fetch a FileSegment block via zero-copy send") { @@ -137,6 +139,7 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll { assert(blockIds === Set(fileBlockId)) assert(buffers.map(_.convertToNetty()) === Set(fileBlockReference)) assert(failBlockIds.isEmpty) + buffers.foreach(_.release()) } test("fetch a non-existent block") { @@ -144,6 +147,7 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll { assert(blockIds.isEmpty) assert(buffers.isEmpty) assert(failBlockIds === Set("random-block")) + buffers.foreach(_.release()) } test("fetch both ByteBuffer block and FileSegment block") { @@ -151,6 +155,7 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll { assert(blockIds === Set(bufferBlockId, fileBlockId)) assert(buffers.map(_.convertToNetty()) === Set(byteBufferBlockReference, fileBlockReference)) assert(failBlockIds.isEmpty) + buffers.foreach(_.release()) } test("fetch both ByteBuffer block and a non-existent block") { @@ -158,6 +163,7 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll { assert(blockIds === Set(bufferBlockId)) assert(buffers.map(_.convertToNetty()) === Set(byteBufferBlockReference)) assert(failBlockIds === Set("random-block")) + buffers.foreach(_.release()) } test("shutting down server should also close client") {