Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
justinlin-linkedin committed Nov 20, 2019
1 parent 7f3d23c commit 1994da0
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -340,8 +341,7 @@ public void onCompletion(Long result, Exception exception) {
if (exception != null) {
setOperationException(exception);
}
int currentNumChunk = numChunksWrittenOut.addAndGet(1);
System.out.println("release " + currentNumChunk + " buffer");
int currentNumChunk = numChunksWrittenOut.getAndIncrement();
ByteBuf chunk = chunkIndexToBuf.remove(currentNumChunk);
if (chunk != null) {
chunk.release();
Expand Down Expand Up @@ -418,7 +418,6 @@ private void maybeWriteToChannel() {
// if there are chunks available to be written out, do now.
if (firstChunk.isComplete() && readCalled) {
while (operationException.get() == null && chunkIndexToBuf.containsKey(indexOfNextChunkToWriteOut)) {
System.out.println("Writing " + indexOfNextChunkToWriteOut + "chunk to async writable channel");
ByteBuf chunkBuf = chunkIndexToBuf.get(indexOfNextChunkToWriteOut);
asyncWritableChannel.write(chunkBuf.nioBuffer(), chunkAsyncWriteCallback);
indexOfNextChunkToWriteOut++;
Expand All @@ -434,8 +433,6 @@ private void maybeWriteToChannel() {
*/
void completeRead() {
if (readIntoCallbackCalled.compareAndSet(false, true)) {
chunkIndexToBuf.values().forEach(ReferenceCountUtil::release);
chunkIndexToBuf.clear();
Exception e = operationException.get();
readIntoFuture.done(bytesWritten.get(), e);
if (readIntoCallback != null) {
Expand Down Expand Up @@ -747,7 +744,6 @@ void handleBody(InputStream payload, MessageMetadata messageMetadata, MessageInf
boolean launchedJob = maybeLaunchCryptoJob(chunkBuffer, null, encryptionKey, chunkBlobId);
if (!launchedJob) {
chunkBuffer = filterChunkToRange(chunkBuffer);
System.out.println("Getting " + chunkIndex + " chunk from server");
chunkIndexToBuf.put(chunkIndex, chunkBuffer.retainedDuplicate());
numChunksRetrieved++;
}
Expand Down Expand Up @@ -1194,7 +1190,7 @@ void handleBody(InputStream payload, MessageMetadata messageMetadata, MessageInf
}
}
blobType = blobData.getBlobType();
chunkIndexToBuf = new TreeMap<>();
chunkIndexToBuf = new ConcurrentHashMap<>();
if (rawMode) {
// Return the raw bytes from storage
if (encryptionKey != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void after() {
*/
@Parameterized.Parameters
public static List<Object[]> data() {
return Arrays.asList(new Object[][]{//{SimpleOperationTracker.class.getSimpleName(), false, false},
return Arrays.asList(new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false, false},
{SimpleOperationTracker.class.getSimpleName(), false, true},
{AdaptiveOperationTracker.class.getSimpleName(), false, false},
{AdaptiveOperationTracker.class.getSimpleName(), true, false}});
Expand Down Expand Up @@ -418,7 +418,7 @@ public void testZeroSizedBlobGetSuccess() throws Exception {
*/
@Test
public void testCompositeBlobChunkSizeMultipleGetSuccess() throws Exception {
for (int i = 9; i < 10; i++) {
for (int i = 2; i < 10; i++) {
blobSize = maxChunkSize * i;
doPut();
getAndAssertSuccess();
Expand Down Expand Up @@ -1461,14 +1461,14 @@ private void assertBlobReadSuccess(GetBlobOptions options, Future<Long> readInto
int readBytes = 0;
do {
ByteBuffer buf = asyncWritableChannel.getNextChunk();
asyncWritableChannel.resolveOldestChunk(null);
int bufLength = buf.remaining();
Assert.assertTrue("total content read should not be greater than length of put content",
readBytes + bufLength <= bytesToRead);
while (buf.hasRemaining()) {
Assert.assertEquals("Get and Put blob content should match", putContentBuf.get(), buf.get());
readBytes++;
}
asyncWritableChannel.resolveOldestChunk(null);
Assert.assertTrue("ReadyForPollCallback should have been invoked as writable channel callback was called",
mockNetworkClient.getAndClearWokenUpStatus());
} while (readBytes < bytesToRead);
Expand Down

0 comments on commit 1994da0

Please sign in to comment.