From 1994da0976d93d4bc258510494f361c59e05cd50 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Wed, 20 Nov 2019 14:58:33 -0800 Subject: [PATCH] Add more tests --- .../java/com.github.ambry.router/GetBlobOperation.java | 10 +++------- .../com.github.ambry.router/GetBlobOperationTest.java | 6 +++--- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java index 7ce6f789f7..0d8422c7f2 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java @@ -55,6 +55,7 @@ import java.util.ListIterator; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -340,8 +341,7 @@ public void onCompletion(Long result, Exception exception) { if (exception != null) { setOperationException(exception); } - int currentNumChunk = numChunksWrittenOut.addAndGet(1); - System.out.println("release " + currentNumChunk + " buffer"); + int currentNumChunk = numChunksWrittenOut.getAndIncrement(); ByteBuf chunk = chunkIndexToBuf.remove(currentNumChunk); if (chunk != null) { chunk.release(); @@ -418,7 +418,6 @@ private void maybeWriteToChannel() { // if there are chunks available to be written out, do now. if (firstChunk.isComplete() && readCalled) { while (operationException.get() == null && chunkIndexToBuf.containsKey(indexOfNextChunkToWriteOut)) { - System.out.println("Writing " + indexOfNextChunkToWriteOut + "chunk to async writable channel"); ByteBuf chunkBuf = chunkIndexToBuf.get(indexOfNextChunkToWriteOut); asyncWritableChannel.write(chunkBuf.nioBuffer(), chunkAsyncWriteCallback); indexOfNextChunkToWriteOut++; @@ -434,8 +433,6 @@ private void maybeWriteToChannel() { */ void completeRead() { if (readIntoCallbackCalled.compareAndSet(false, true)) { - chunkIndexToBuf.values().forEach(ReferenceCountUtil::release); - chunkIndexToBuf.clear(); Exception e = operationException.get(); readIntoFuture.done(bytesWritten.get(), e); if (readIntoCallback != null) { @@ -747,7 +744,6 @@ void handleBody(InputStream payload, MessageMetadata messageMetadata, MessageInf boolean launchedJob = maybeLaunchCryptoJob(chunkBuffer, null, encryptionKey, chunkBlobId); if (!launchedJob) { chunkBuffer = filterChunkToRange(chunkBuffer); - System.out.println("Getting " + chunkIndex + " chunk from server"); chunkIndexToBuf.put(chunkIndex, chunkBuffer.retainedDuplicate()); numChunksRetrieved++; } @@ -1194,7 +1190,7 @@ void handleBody(InputStream payload, MessageMetadata messageMetadata, MessageInf } } blobType = blobData.getBlobType(); - chunkIndexToBuf = new TreeMap<>(); + chunkIndexToBuf = new ConcurrentHashMap<>(); if (rawMode) { // Return the raw bytes from storage if (encryptionKey != null) { diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java index cce4fe6cff..03aa1b563d 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java @@ -173,7 +173,7 @@ public void after() { */ @Parameterized.Parameters public static List data() { - return Arrays.asList(new Object[][]{//{SimpleOperationTracker.class.getSimpleName(), false, false}, + return Arrays.asList(new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false, false}, {SimpleOperationTracker.class.getSimpleName(), false, true}, {AdaptiveOperationTracker.class.getSimpleName(), false, false}, {AdaptiveOperationTracker.class.getSimpleName(), true, false}}); @@ -418,7 +418,7 @@ public void testZeroSizedBlobGetSuccess() throws Exception { */ @Test public void testCompositeBlobChunkSizeMultipleGetSuccess() throws Exception { - for (int i = 9; i < 10; i++) { + for (int i = 2; i < 10; i++) { blobSize = maxChunkSize * i; doPut(); getAndAssertSuccess(); @@ -1461,7 +1461,6 @@ private void assertBlobReadSuccess(GetBlobOptions options, Future readInto int readBytes = 0; do { ByteBuffer buf = asyncWritableChannel.getNextChunk(); - asyncWritableChannel.resolveOldestChunk(null); int bufLength = buf.remaining(); Assert.assertTrue("total content read should not be greater than length of put content", readBytes + bufLength <= bytesToRead); @@ -1469,6 +1468,7 @@ private void assertBlobReadSuccess(GetBlobOptions options, Future readInto Assert.assertEquals("Get and Put blob content should match", putContentBuf.get(), buf.get()); readBytes++; } + asyncWritableChannel.resolveOldestChunk(null); Assert.assertTrue("ReadyForPollCallback should have been invoked as writable channel callback was called", mockNetworkClient.getAndClearWokenUpStatus()); } while (readBytes < bytesToRead);