Skip to content

Commit

Permalink
Fixed tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin authored and aarondav committed Oct 10, 2014
1 parent d68f328 commit 1bdd7ee
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ class BlockClientHandler extends SimpleChannelInboundHandler[ServerResponse] wit
val listener = outstandingRequests.get(blockId)
if (listener == null) {
logWarning(s"Got a response for block $blockId from $server but it is not outstanding")
buf.release()
} else {
outstandingRequests.remove(blockId)
listener.onBlockFetchSuccess(blockId, buf)
buf.release()
}
case BlockFetchFailure(blockId, errorMsg) =>
val listener = outstandingRequests.get(blockId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ final class ShuffleBlockFetcherIterator(
shuffleMetrics.remoteBytesRead += buf.size
shuffleMetrics.remoteBlocksFetched += 1
}
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
}

override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,39 @@ class ProtocolSuite extends FunSuite {
assert(msg === serverChannel.readInbound())
}

test("server to client protocol") {
test("server to client protocol - BlockFetchSuccess(\"a1234\", new TestManagedBuffer(10))") {
testServerToClient(BlockFetchSuccess("a1234", new TestManagedBuffer(10)))
}

test("server to client protocol - BlockFetchSuccess(\"\", new TestManagedBuffer(0))") {
testServerToClient(BlockFetchSuccess("", new TestManagedBuffer(0)))
}

test("server to client protocol - BlockFetchFailure(\"abcd\", \"this is an error\")") {
testServerToClient(BlockFetchFailure("abcd", "this is an error"))
}

test("server to client protocol - BlockFetchFailure(\"\", \"\")") {
testServerToClient(BlockFetchFailure("", ""))
}

test("client to server protocol") {
test("client to server protocol - BlockFetchRequest(Seq.empty[String])") {
testClientToServer(BlockFetchRequest(Seq.empty[String]))
}

test("client to server protocol - BlockFetchRequest(Seq(\"b1\"))") {
testClientToServer(BlockFetchRequest(Seq("b1")))
}

test("client to server protocol - BlockFetchRequest(Seq(\"b1\", \"b2\", \"b3\"))") {
testClientToServer(BlockFetchRequest(Seq("b1", "b2", "b3")))
}

ignore("client to server protocol - BlockUploadRequest(\"\", new TestManagedBuffer(0))") {
testClientToServer(BlockUploadRequest("", new TestManagedBuffer(0)))
}

ignore("client to server protocol - BlockUploadRequest(\"b_upload\", new TestManagedBuffer(10))") {
testClientToServer(BlockUploadRequest("b_upload", new TestManagedBuffer(10)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll {
}

override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
data.retain()
receivedBlockIds.add(blockId)
receivedBuffers.add(data)
sem.release()
Expand All @@ -130,34 +131,39 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll {
assert(blockIds === Set(bufferBlockId))
assert(buffers.map(_.convertToNetty()) === Set(byteBufferBlockReference))
assert(failBlockIds.isEmpty)
buffers.foreach(_.release())
}

test("fetch a FileSegment block via zero-copy send") {
val (blockIds, buffers, failBlockIds) = fetchBlocks(Seq(fileBlockId))
assert(blockIds === Set(fileBlockId))
assert(buffers.map(_.convertToNetty()) === Set(fileBlockReference))
assert(failBlockIds.isEmpty)
buffers.foreach(_.release())
}

test("fetch a non-existent block") {
val (blockIds, buffers, failBlockIds) = fetchBlocks(Seq("random-block"))
assert(blockIds.isEmpty)
assert(buffers.isEmpty)
assert(failBlockIds === Set("random-block"))
buffers.foreach(_.release())
}

test("fetch both ByteBuffer block and FileSegment block") {
val (blockIds, buffers, failBlockIds) = fetchBlocks(Seq(bufferBlockId, fileBlockId))
assert(blockIds === Set(bufferBlockId, fileBlockId))
assert(buffers.map(_.convertToNetty()) === Set(byteBufferBlockReference, fileBlockReference))
assert(failBlockIds.isEmpty)
buffers.foreach(_.release())
}

test("fetch both ByteBuffer block and a non-existent block") {
val (blockIds, buffers, failBlockIds) = fetchBlocks(Seq(bufferBlockId, "random-block"))
assert(blockIds === Set(bufferBlockId))
assert(buffers.map(_.convertToNetty()) === Set(byteBufferBlockReference))
assert(failBlockIds === Set("random-block"))
buffers.foreach(_.release())
}

test("shutting down server should also close client") {
Expand Down

0 comments on commit 1bdd7ee

Please sign in to comment.