From 63cb660a0905a67eaf9aed0e2602a09269efcd1d Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Fri, 15 Nov 2019 13:33:26 -0800 Subject: [PATCH 1/6] Start using Netty ByteBuf in GetBlobOperation --- .../network/ResponseInfo.java | 12 +- .../BlobData.java | 49 +++- .../MessageFormatRecord.java | 17 +- .../ValidatingTransformer.java | 8 +- .../MessageFormatInputStreamTest.java | 22 +- .../MessageFormatRecordTest.java | 8 +- .../MessageFormatSendTest.java | 2 +- .../MessageSievingInputStreamTest.java | 27 +- .../SocketNetworkClientTest.java | 2 +- .../BlobIdTransformer.java | 16 +- .../com.github.ambry.router/DecryptJob.java | 13 +- .../GetBlobOperation.java | 275 +++++++++--------- .../ChunkFillTest.java | 3 +- .../CryptoJobHandlerTest.java | 10 +- .../GetBlobOperationTest.java | 39 ++- .../MockNetworkClient.java | 7 +- .../MockNetworkClientFactory.java | 4 +- .../com.github.ambry.router/MockSelector.java | 32 +- .../PutManagerTest.java | 5 +- .../ServerHardDeleteTest.java | 8 +- .../ServerTestUtil.java | 86 ++---- .../com.github.ambry.server/Verifier.java | 19 +- .../store/HardDeleteVerifier.java | 56 ++-- .../tools/admin/BlobValidator.java | 8 +- .../tools/admin/ServerAdminTool.java | 20 +- .../tools/perf/ServerReadPerformance.java | 10 +- .../ByteBufferInputStream.java | 12 +- .../java/com.github.ambry.utils/Utils.java | 78 ++++- .../NettyByteBufLeakHelper.java | 2 +- build.gradle | 1 + 30 files changed, 531 insertions(+), 320 deletions(-) rename {ambry-commons/src/test/java/com.github.ambry.commons => ambry-utils/src/test/java/com.github.ambry.utils}/NettyByteBufLeakHelper.java (99%) diff --git a/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java b/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java index bfbd7952bd..b498c86690 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java +++ b/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java @@ -27,8 +27,8 @@ public class ResponseInfo { private final RequestInfo requestInfo; private final NetworkClientErrorCode error; - private final Object response; private final DataNodeId dataNode; + private Object response; /** * Constructs a ResponseInfo with the given parameters. @@ -68,21 +68,13 @@ public Object getResponse() { return response; } - /** - * Increase the reference count of underlying response. - */ - public void retain() { - if (response != null) { - ReferenceCountUtil.retain(response); - } - } - /** * Decrease the reference count of underlying response. */ public void release() { if (response != null) { ReferenceCountUtil.release(response); + response = null; } } diff --git a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java index eaa3e9c3fc..008e97050e 100644 --- a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java +++ b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java @@ -14,6 +14,9 @@ package com.github.ambry.messageformat; import com.github.ambry.utils.ByteBufferInputStream; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.ByteBuffer; /** @@ -22,7 +25,20 @@ public class BlobData { private final BlobType blobType; private final long size; - private final ByteBufferInputStream stream; + private ByteBuf byteBuf; + private ByteBufferInputStream stream = null; + + /** + * The blob data contains the stream and other required info + * @param blobType {@link BlobType} of the blob + * @param size The size of the blob content. + * @param byteBuf The content of this blob in a {@link ByteBuf}. + */ + public BlobData(BlobType blobType, long size, ByteBuf byteBuf) { + this.blobType = blobType; + this.size = size; + this.byteBuf = byteBuf; + } /** * The blob data contains the stream and other required info @@ -30,9 +46,11 @@ public class BlobData { * @param size The size of the blob content. * @param stream The {@link ByteBufferInputStream} containing the blob content. */ + @Deprecated public BlobData(BlobType blobType, long size, ByteBufferInputStream stream) { this.blobType = blobType; this.size = size; + this.byteBuf = Unpooled.wrappedBuffer(stream.getByteBuffer()); this.stream = stream; } @@ -53,7 +71,36 @@ public long getSize() { /** * @return the {@link ByteBufferInputStream} containing the blob content. */ + @Deprecated public ByteBufferInputStream getStream() { + if (stream != null) { + return stream; + } + // The blob content is passed as a ByteBuf + if (byteBuf == null) { + return null; + } + ByteBuffer temp = ByteBuffer.allocate(byteBuf.readableBytes()); + byteBuf.writeBytes(temp); + byteBuf.release(); + byteBuf = null; + stream = new ByteBufferInputStream(temp); return stream; } + + /** + * Return the netty {@link ByteBuf} and then transfer the ownship to the caller. It's not safe + * to call this method more than once. + */ + public ByteBuf getAndRelease() { + if (byteBuf == null) { + return null; + } + try { + return byteBuf.retainedDuplicate(); + } finally { + byteBuf.release(); + byteBuf = null; + } + } } diff --git a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/MessageFormatRecord.java b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/MessageFormatRecord.java index 21e68f9d0e..2e44256310 100644 --- a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/MessageFormatRecord.java +++ b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/MessageFormatRecord.java @@ -15,11 +15,13 @@ import com.github.ambry.store.StoreKey; import com.github.ambry.store.StoreKeyFactory; -import com.github.ambry.utils.ByteBufferInputStream; +import com.github.ambry.utils.ByteBufferDataInputStream; import com.github.ambry.utils.Crc32; import com.github.ambry.utils.CrcInputStream; +import com.github.ambry.utils.NettyByteBufDataInputStream; import com.github.ambry.utils.Pair; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -243,7 +245,8 @@ static boolean isValidBlobRecordVersion(short blobRecordVersion) { */ public static BlobAll deserializeBlobAll(InputStream stream, StoreKeyFactory storeKeyFactory) throws IOException, MessageFormatException { - DataInputStream inputStream = new DataInputStream(stream); + DataInputStream inputStream = + stream instanceof DataInputStream ? (DataInputStream) stream : new DataInputStream(stream); short headerVersion = inputStream.readShort(); ByteBuffer headerBuf; MessageHeader_Format header; @@ -277,7 +280,7 @@ public static BlobAll deserializeBlobAll(InputStream stream, StoreKeyFactory sto MessageFormatErrorCodes.Unknown_Format_Version); } header.verifyHeader(); - StoreKey storeKey = storeKeyFactory.getStoreKey(new DataInputStream(stream)); + StoreKey storeKey = storeKeyFactory.getStoreKey(inputStream); ByteBuffer blobEncryptionKey = null; if (header.hasEncryptionKeyRecord()) { blobEncryptionKey = deserializeBlobEncryptionKey(stream); @@ -1671,7 +1674,7 @@ public static BlobData deserializeBlobRecord(CrcInputStream crcStream) throws IO if (dataSize > Integer.MAX_VALUE) { throw new IOException("We only support data of max size == MAX_INT. Error while reading blob from store"); } - ByteBufferInputStream output = Utils.getByteBufferInputStreamFromCrcInputStream(crcStream, (int) dataSize); + ByteBuf byteBuf = Utils.readNettyByteBufFromCrcInputStream(crcStream, (int) dataSize); long crc = crcStream.getValue(); long streamCrc = dataStream.readLong(); if (crc != streamCrc) { @@ -1679,7 +1682,7 @@ public static BlobData deserializeBlobRecord(CrcInputStream crcStream) throws IO throw new MessageFormatException("corrupt data while parsing blob content", MessageFormatErrorCodes.Data_Corrupt); } - return new BlobData(BlobType.DataBlob, dataSize, output); + return new BlobData(BlobType.DataBlob, dataSize, byteBuf); } } @@ -1729,7 +1732,7 @@ public static BlobData deserializeBlobRecord(CrcInputStream crcStream) throws IO if (dataSize > Integer.MAX_VALUE) { throw new IOException("We only support data of max size == MAX_INT. Error while reading blob from store"); } - ByteBufferInputStream output = Utils.getByteBufferInputStreamFromCrcInputStream(crcStream, (int) dataSize); + ByteBuf byteBuf = Utils.readNettyByteBufFromCrcInputStream(crcStream, (int) dataSize); long crc = crcStream.getValue(); long streamCrc = dataStream.readLong(); if (crc != streamCrc) { @@ -1737,7 +1740,7 @@ public static BlobData deserializeBlobRecord(CrcInputStream crcStream) throws IO throw new MessageFormatException("corrupt data while parsing blob content", MessageFormatErrorCodes.Data_Corrupt); } - return new BlobData(blobContentType, dataSize, output); + return new BlobData(blobContentType, dataSize, byteBuf); } } diff --git a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java index 4471ed8014..e47d0df82c 100644 --- a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java +++ b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java @@ -20,6 +20,7 @@ import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.store.TransformationOutput; import com.github.ambry.store.Transformer; +import io.netty.buffer.ByteBufInputStream; import java.io.DataInputStream; import java.io.InputStream; import java.nio.ByteBuffer; @@ -72,9 +73,12 @@ public TransformationOutput transform(Message message) { throw new IllegalStateException("Message cannot be a deleted record "); } if (msgInfo.getStoreKey().equals(keyInStream)) { + // ValidatingTransformer only exists on ambry-server and we don't enable netty on ambry server yet. So And blobData.getAndRelease + // will return an Unpooled ByteBuf, it's not not to release it. + // @todo, when enabling netty in ambry-server, release this ByteBuf. PutMessageFormatInputStream transformedStream = - new PutMessageFormatInputStream(keyInStream, encryptionKey, props, metadata, blobData.getStream(), - blobData.getSize(), blobData.getBlobType()); + new PutMessageFormatInputStream(keyInStream, encryptionKey, props, metadata, + new ByteBufInputStream(blobData.getAndRelease(), true), blobData.getSize(), blobData.getBlobType()); MessageInfo transformedMsgInfo = new MessageInfo(keyInStream, transformedStream.getSize(), msgInfo.isDeleted(), msgInfo.isTtlUpdated(), msgInfo.getExpirationTimeInMs(), msgInfo.getCrc(), msgInfo.getAccountId(), msgInfo.getContainerId(), diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java index 8d6c0cd2ea..3ddc142c82 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java @@ -22,9 +22,12 @@ import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Crc32; import com.github.ambry.utils.CrcInputStream; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -32,12 +35,14 @@ import java.util.Random; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; public class MessageFormatInputStreamTest { private static short messageFormatHeaderVersionSaved; + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); @BeforeClass public static void saveMessageFormatHeaderVersionToUse() { @@ -49,6 +54,16 @@ public void resetMessageFormatHeaderVersionToUse() { MessageFormatRecord.headerVersionToUse = messageFormatHeaderVersionSaved; } + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } + /** * Tests for {@link PutMessageFormatInputStream} in different versions. */ @@ -240,7 +255,12 @@ private void messageFormatPutRecordsTest(short blobVersion, BlobType blobType, s } else { Assert.assertEquals(null, blobAll.getBlobEncryptionKey()); } - Assert.assertEquals(ByteBuffer.wrap(data), blobAll.getBlobData().getStream().getByteBuffer()); + ByteBuf byteBuf = blobAll.getBlobData().getAndRelease(); + try { + Assert.assertEquals(Unpooled.wrappedBuffer(data), byteBuf); + } finally { + byteBuf.release(); + } } /** diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java index 890cd4f046..0efb2e8d1e 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java @@ -173,7 +173,7 @@ public void deserializeTest() throws MessageFormatException, IOException { BlobData blobData = MessageFormatRecord.deserializeBlob(new ByteBufferInputStream(sData)); Assert.assertEquals(blobData.getSize(), 2000); byte[] verify = new byte[2000]; - blobData.getStream().read(verify); + blobData.getAndRelease().readBytes(verify); Assert.assertArrayEquals(verify, data.array()); // corrupt blob record V1 @@ -640,7 +640,7 @@ private void testBlobRecordV2(int blobSize, BlobType blobType) throws IOExceptio BlobData blobData = getBlobRecordV2(blobSize, blobType, blobContent, entireBlob); Assert.assertEquals("Blob size mismatch", blobSize, blobData.getSize()); byte[] verify = new byte[blobSize]; - blobData.getStream().read(verify); + blobData.getAndRelease().readBytes(verify); Assert.assertArrayEquals("BlobContent mismatch", blobContent.array(), verify); // corrupt blob record V2 @@ -694,7 +694,7 @@ public void testBlobRecordWithMetadataContentV2() throws IOException, MessageFor Assert.assertEquals(metadataContentSize, blobData.getSize()); byte[] verify = new byte[metadataContentSize]; - blobData.getStream().read(verify); + blobData.getAndRelease().readBytes(verify); Assert.assertArrayEquals("Metadata content mismatch", metadataContent.array(), verify); // deserialize and check for metadata contents @@ -728,7 +728,7 @@ public void testBlobRecordWithMetadataContentV3() throws IOException, MessageFor BlobData blobData = getBlobRecordV2(metadataContentSize, BlobType.MetadataBlob, metadataContent, blob); Assert.assertEquals(metadataContentSize, blobData.getSize()); byte[] verify = new byte[metadataContentSize]; - blobData.getStream().read(verify); + blobData.getAndRelease().readBytes(verify); Assert.assertArrayEquals("Metadata content mismatch", metadataContent.array(), verify); metadataContent.rewind(); diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java index f7937d3061..469cf70921 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java @@ -468,7 +468,7 @@ private void doSendWriteCompositeMessagesTest(byte[][] blob, byte[][] userMetada Assert.assertEquals(BlobType.DataBlob, deserializedBlob.getBlobData().getBlobType()); Assert.assertEquals(blob[i].length, deserializedBlob.getBlobData().getSize()); byte[] readBlob = new byte[blob[i].length]; - deserializedBlob.getBlobData().getStream().read(readBlob); + deserializedBlob.getBlobData().getAndRelease().readBytes(readBlob); Assert.assertArrayEquals(blob[i], readBlob); if (headerVersions[i] == MessageFormatRecord.Message_Header_Version_V1) { diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java index 5a8be93dce..da672e8dfc 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java @@ -24,8 +24,12 @@ import com.github.ambry.store.TransformationOutput; import com.github.ambry.store.Transformer; import com.github.ambry.utils.ByteBufferInputStream; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.Unpooled; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -40,6 +44,7 @@ import java.util.Map; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -67,6 +72,7 @@ public static List data() { private final EnumSet options; private final StoreKeyFactory storeKeyFactory; private final RandomKeyConverter randomKeyConverter; + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); @BeforeClass public static void saveMessageFormatHeaderVersionToUse() { @@ -78,6 +84,16 @@ public void resetMessageFormatHeaderVersionToUse() { MessageFormatRecord.headerVersionToUse = messageFormatHeaderVersionSaved; } + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } + public MessageSievingInputStreamTest(EnumSet options) throws Exception { this.options = options; this.storeKeyFactory = new MockIdFactory(); @@ -960,8 +976,13 @@ private void verifySievedTransformedMessage(MessageSievingInputStream sievedStre Assert.assertEquals(accountId, propsFromStream.getAccountId()); Assert.assertEquals(containerId, propsFromStream.getContainerId()); Assert.assertEquals(ByteBuffer.wrap(usermetadata), userMetadataFromStream); - Assert.assertEquals(ByteBuffer.wrap(data), blobDataFromStream.getStream().getByteBuffer()); Assert.assertEquals(blobType, blobDataFromStream.getBlobType()); + ByteBuf byteBuf = blobDataFromStream.getAndRelease(); + try { + Assert.assertEquals(Unpooled.wrappedBuffer(data), byteBuf); + } finally { + byteBuf.release(); + } } } @@ -1064,8 +1085,8 @@ public TransformationOutput transform(Message message) { } else { MessageInfo transformedMsgInfo; PutMessageFormatInputStream transformedStream = - new PutMessageFormatInputStream(newKey, encryptionKey, props, metadata, blobData.getStream(), - blobData.getSize(), blobData.getBlobType()); + new PutMessageFormatInputStream(newKey, encryptionKey, props, metadata, + new ByteBufInputStream(blobData.getAndRelease(), true), blobData.getSize(), blobData.getBlobType()); transformedMsgInfo = new MessageInfo(newKey, transformedStream.getSize(), msgInfo.isDeleted(), msgInfo.isTtlUpdated(), msgInfo.getExpirationTimeInMs(), msgInfo.getCrc(), msgInfo.getAccountId(), msgInfo.getContainerId(), diff --git a/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java b/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java index eabb353785..1b25aa5df3 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java @@ -18,7 +18,7 @@ import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockDataNodeId; import com.github.ambry.clustermap.ReplicaId; -import com.github.ambry.commons.NettyByteBufLeakHelper; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.config.NetworkConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.utils.MockTime; diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java b/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java index ab090274ec..105936ebbf 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java @@ -32,6 +32,9 @@ import com.github.ambry.store.Transformer; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Pair; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.Unpooled; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -154,14 +157,14 @@ private Message newMessage(InputStream inputStream, StoreKey newKey, MessageInfo BlobProperties oldProperties = deserializeBlobProperties(inputStream); ByteBuffer userMetaData = deserializeUserMetadata(inputStream); BlobData blobData = deserializeBlob(inputStream); - ByteBufferInputStream blobDataBytes = blobData.getStream(); + ByteBuf blobDataBytes = blobData.getAndRelease(); long blobPropertiesSize = oldProperties.getBlobSize(); //If the blob is a metadata blob its data chunk id list //will be rewritten with transformed IDs if (blobData.getBlobType().equals(BlobType.MetadataBlob)) { - ByteBuffer serializedMetadataContent = blobData.getStream().getByteBuffer(); + ByteBuffer serializedMetadataContent = blobDataBytes.nioBuffer(); CompositeBlobInfo compositeBlobInfo = MetadataContentSerDe.deserializeMetadataContentRecord(serializedMetadataContent, storeKeyFactory); Map convertedKeys = storeKeyConverter.convert(compositeBlobInfo.getKeys()); @@ -221,7 +224,7 @@ private Message newMessage(InputStream inputStream, StoreKey newKey, MessageInfo } blobPropertiesSize = compositeBlobInfo.getTotalSize(); metadataContent.flip(); - blobDataBytes = new ByteBufferInputStream(metadataContent); + blobDataBytes = Unpooled.wrappedBuffer(metadataContent); blobData = new BlobData(blobData.getBlobType(), metadataContent.remaining(), blobDataBytes); } @@ -231,9 +234,12 @@ private Message newMessage(InputStream inputStream, StoreKey newKey, MessageInfo oldProperties.getCreationTimeInMs(), newBlobId.getAccountId(), newBlobId.getContainerId(), oldProperties.isEncrypted(), null); + // BlobIDTransformer only exists on ambry-server and we don't enable netty on ambry server yet. So And blobData.getAndRelease + // will return an Unpooled ByteBuf, it's not not to release it. + // @todo, when enabling netty in ambry-server, release this ByteBuf. PutMessageFormatInputStream putMessageFormatInputStream = - new PutMessageFormatInputStream(newKey, blobEncryptionKey, newProperties, userMetaData, blobDataBytes, - blobData.getSize(), blobData.getBlobType()); + new PutMessageFormatInputStream(newKey, blobEncryptionKey, newProperties, userMetaData, + new ByteBufInputStream(blobDataBytes, true), blobData.getSize(), blobData.getBlobType()); // Reuse the original CRC if present in the oldMessageInfo. This is important to ensure that messages that are // received via replication are sent to the store with proper CRCs (which the store needs to detect duplicate // messages). As an additional guard, here the original CRC is only reused if the key's ID in string form is the diff --git a/ambry-router/src/main/java/com.github.ambry.router/DecryptJob.java b/ambry-router/src/main/java/com.github.ambry.router/DecryptJob.java index 40ed083063..9e985fbd42 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/DecryptJob.java +++ b/ambry-router/src/main/java/com.github.ambry.router/DecryptJob.java @@ -14,6 +14,7 @@ package com.github.ambry.router; import com.github.ambry.commons.BlobId; +import io.netty.buffer.ByteBuf; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; @@ -23,7 +24,7 @@ */ class DecryptJob implements CryptoJob { private final BlobId blobId; - private final ByteBuffer encryptedBlobContent; + private final ByteBuf encryptedBlobContent; private final ByteBuffer encryptedUserMetadata; private final ByteBuffer encryptedPerBlobKey; private final Callback callback; @@ -36,14 +37,14 @@ class DecryptJob implements CryptoJob { * {@link Callback} * @param blobId the {@link BlobId} for which decryption is requested * @param encryptedPerBlobKey encrypted per blob key - * @param encryptedBlobContent encrypted blob content. Could be {@null} + * @param encryptedBlobContent encrypted blob content. Could be {@null}. Currently the BlobContent is in a {@link ByteBuf}. * @param encryptedUserMetadata encrypted user metadata. Could be {@null} * @param cryptoService the {@link CryptoService} instance to use * @param kms the {@link KeyManagementService} instance to use * @param decryptJobMetricsTracker metrics tracker to track the decryption job * @param callback {@link Callback} to be invoked on completion */ - DecryptJob(BlobId blobId, ByteBuffer encryptedPerBlobKey, ByteBuffer encryptedBlobContent, + DecryptJob(BlobId blobId, ByteBuffer encryptedPerBlobKey, ByteBuf encryptedBlobContent, ByteBuffer encryptedUserMetadata, CryptoService cryptoService, KeyManagementService kms, CryptoJobMetricsTracker decryptJobMetricsTracker, Callback callback) { this.blobId = blobId; @@ -73,7 +74,7 @@ public void run() { Object containerKey = kms.getKey(blobId.getAccountId(), blobId.getContainerId()); Object perBlobKey = cryptoService.decryptKey(encryptedPerBlobKey, containerKey); if (encryptedBlobContent != null) { - decryptedBlobContent = cryptoService.decrypt(encryptedBlobContent, perBlobKey); + decryptedBlobContent = cryptoService.decrypt(encryptedBlobContent.nioBuffer(), perBlobKey); } if (encryptedUserMetadata != null) { decryptedUserMetadata = cryptoService.decrypt(encryptedUserMetadata, perBlobKey); @@ -81,6 +82,10 @@ public void run() { } catch (Exception e) { exception = e; } finally { + // After decryption, we release the ByteBuf; + if (encryptedBlobContent != null) { + encryptedBlobContent.release(); + } decryptJobMetricsTracker.onJobProcessingComplete(); callback.onCompletion( exception == null ? new DecryptJobResult(blobId, decryptedBlobContent, decryptedUserMetadata) : null, diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java index ca3e8e57eb..39621b2b0d 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java @@ -42,6 +42,9 @@ import com.github.ambry.store.MessageInfo; import com.github.ambry.store.StoreKey; import com.github.ambry.utils.Time; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -100,8 +103,7 @@ class GetBlobOperation extends GetOperation { // a list iterator to the chunk ids that need to be fetched for this operation, if this is a composite blob. private ListIterator chunkIdIterator; // chunk index to retrieved chunk buffer mapping. - private Map chunkIndexToBuffer; - private Map chunkIndexToResponseInfo; + private Map chunkIndexToBuf; // To find the GetChunk to hand over the response quickly. private final Map correlationIdToGetChunk = new HashMap<>(); // the blob info that is populated on OperationType.BlobInfo or OperationType.All @@ -142,6 +144,14 @@ class GetBlobOperation extends GetOperation { firstChunk = new FirstGetChunk(); } + private void setOperationCompleted() { + operationCompleted = true; + if (chunkIndexToBuf != null) { + chunkIndexToBuf.values().forEach(ReferenceCountUtil::release); + chunkIndexToBuf.clear(); + } + } + /** * {@inheritDoc} *
@@ -159,7 +169,7 @@ void abort(Exception abortCause) { blobDataChannel.completeRead(); } } - operationCompleted = true; + setOperationCompleted(); } /** @@ -180,7 +190,7 @@ private void onChunkOperationComplete(GetChunk chunk) { // If this is an operation just to get the chunk ids, then these ids will be returned as part of the // result callback and no more chunks will be fetched, so mark the operation as complete to let the // GetManager remove this operation. - operationCompleted = true; + setOperationCompleted(); List chunkIds = e == null && compositeBlobInfo != null ? compositeBlobInfo.getKeys() : null; operationResult = new GetBlobResultInternal(null, chunkIds); } else { @@ -216,7 +226,7 @@ private void onChunkOperationComplete(GetChunk chunk) { if (options.getBlobOptions.getOperationType() != GetBlobOptions.OperationType.BlobInfo) { blobDataChannel = new BlobDataReadableStreamChannel(); } else { - operationCompleted = true; + setOperationCompleted(); } operationResult = new GetBlobResultInternal(new GetBlobResult(blobInfo, blobDataChannel), null); } else { @@ -341,9 +351,9 @@ public void onCompletion(Long result, Exception exception) { setOperationException(exception); } int currentNumChunk = numChunksWrittenOut.get(); - ResponseInfo responseInfo = chunkIndexToResponseInfo.remove(currentNumChunk); - if (responseInfo != null) { - responseInfo.release(); + ByteBuf byteBuf = chunkIndexToBuf.remove(currentNumChunk); + if (byteBuf != null) { + byteBuf.release(); } numChunksWrittenOut.incrementAndGet(); routerCallback.onPollReady(); @@ -416,9 +426,9 @@ int getNumChunksWrittenOut() { private void maybeWriteToChannel() { // if there are chunks available to be written out, do now. if (firstChunk.isComplete() && readCalled) { - while (operationException.get() == null && chunkIndexToBuffer.containsKey(indexOfNextChunkToWriteOut)) { - ByteBuffer chunkBuf = chunkIndexToBuffer.remove(indexOfNextChunkToWriteOut); - asyncWritableChannel.write(chunkBuf, chunkAsyncWriteCallback); + while (operationException.get() == null && chunkIndexToBuf.containsKey(indexOfNextChunkToWriteOut)) { + ByteBuf byteBuf = chunkIndexToBuf.get(indexOfNextChunkToWriteOut); + asyncWritableChannel.write(byteBuf.nioBuffer(), chunkAsyncWriteCallback); indexOfNextChunkToWriteOut++; } if (operationException.get() != null || numChunksWrittenOut.get() == numChunksTotal) { @@ -450,18 +460,18 @@ void completeRead() { } releaseResource(); } - operationCompleted = true; + setOperationCompleted(); } /** - * Release all the {@link ResponseInfo} in the map. Use {@link ConcurrentHashMap#remove(Object)} method to avoid + * Release all the {@link ByteBuf} in the map. Use {@link ConcurrentHashMap#remove(Object)} method to avoid * conflict with the release call in the chunk async callback. */ private void releaseResource() { - for (Integer key : chunkIndexToResponseInfo.keySet()) { - ResponseInfo response = chunkIndexToResponseInfo.remove(key); - if (response != null) { - response.release(); + for (Integer key : chunkIndexToBuf.keySet()) { + ByteBuf byteBuf = chunkIndexToBuf.remove(key); + if (byteBuf != null) { + byteBuf.release(); } } } @@ -642,13 +652,9 @@ protected void maybeProcessCallbacks() { decryptJobMetricsTracker.onJobResultProcessingStart(); // Only when the blob is encrypted should we need to call this method. When finish decryption, we don't need // response info anymore. - ResponseInfo responseInfo = chunkIndexToResponseInfo.remove(chunkIndex); - if (responseInfo != null) { - responseInfo.release(); - } if (decryptCallbackResultInfo.exception == null) { - chunkIndexToBuffer.put(chunkIndex, - filterChunkToRange(decryptCallbackResultInfo.result.getDecryptedBlobContent())); + chunkIndexToBuf.put(chunkIndex, + filterChunkToRange(Unpooled.wrappedBuffer(decryptCallbackResultInfo.result.getDecryptedBlobContent()))); numChunksRetrieved++; logger.trace("Decrypt result successfully updated for data chunk {}", chunkBlobId); progressTracker.setCryptoJobSuccess(); @@ -744,29 +750,31 @@ void checkAndMaybeComplete() { /** * Handle the body of the response: Deserialize and add to the list of chunk buffers. - * @param responseInfo the response received for a request sent out on behalf of this chunk. * @param payload the body of the response. * @param messageMetadata the {@link MessageMetadata} associated with the message. * @param messageInfo the {@link MessageInfo} associated with the message. * @throws IOException if there is an IOException while deserializing the body. * @throws MessageFormatException if there is a MessageFormatException while deserializing the body. */ - void handleBody(ResponseInfo responseInfo, InputStream payload, MessageMetadata messageMetadata, - MessageInfo messageInfo) throws IOException, MessageFormatException { + void handleBody(InputStream payload, MessageMetadata messageMetadata, MessageInfo messageInfo) + throws IOException, MessageFormatException { if (!successfullyDeserialized) { BlobData blobData = MessageFormatRecord.deserializeBlob(payload); ByteBuffer encryptionKey = messageMetadata == null ? null : messageMetadata.getEncryptionKey(); - ByteBuffer chunkBuffer = blobData.getStream().getByteBuffer(); - responseInfo.retain(); - chunkIndexToResponseInfo.put(chunkIndex, responseInfo); + ByteBuf chunkBuf = blobData.getAndRelease(); - boolean launchedJob = maybeLaunchCryptoJob(chunkBuffer, null, encryptionKey, chunkBlobId); - if (!launchedJob) { - chunkIndexToBuffer.put(chunkIndex, filterChunkToRange(chunkBuffer)); - numChunksRetrieved++; - } + try { + boolean launchedJob = maybeLaunchCryptoJob(chunkBuf, null, encryptionKey, chunkBlobId); + if (!launchedJob) { + chunkBuf = filterChunkToRange(chunkBuf); + chunkIndexToBuf.put(chunkIndex, chunkBuf.retainedDuplicate()); + numChunksRetrieved++; + } - successfullyDeserialized = true; + successfullyDeserialized = true; + } finally { + chunkBuf.release(); + } } else { // If successTarget > 1, then content reconciliation may have to be done. For now, ignore subsequent responses. } @@ -825,7 +833,7 @@ void handleResponse(ResponseInfo responseInfo, GetResponse getResponse) { // we do not notify the ResponseHandler responsible for failure detection as this is an unexpected error. } else { try { - processGetBlobResponse(responseInfo, getRequestInfo, getResponse); + processGetBlobResponse(getRequestInfo, getResponse); } catch (IOException | MessageFormatException e) { // This should really not happen. Again, we do not notify the ResponseHandler responsible for failure // detection. @@ -845,13 +853,13 @@ void handleResponse(ResponseInfo responseInfo, GetResponse getResponse) { /** * Launch a crypto job as needed. - * @param dataBuffer to buffer to encrypt or decrypt. + * @param dataBuf to buffer to encrypt or decrypt. * @param userMetadata userMetadata of the blob. * @param encryptionKey encryption key for the blob. Could be null for non encrypted blob. * @param targetBlobId the {@link BlobId} of the blob. * @return {@code true} if a crypto job was launched, otherwise {@code false}. */ - protected boolean maybeLaunchCryptoJob(ByteBuffer dataBuffer, byte[] userMetadata, ByteBuffer encryptionKey, + protected boolean maybeLaunchCryptoJob(ByteBuf dataBuf, byte[] userMetadata, ByteBuffer encryptionKey, BlobId targetBlobId) { // // Three cases to handle: @@ -870,7 +878,7 @@ protected boolean maybeLaunchCryptoJob(ByteBuffer dataBuffer, byte[] userMetadat decryptCallbackResultInfo = new DecryptCallBackResultInfo(); progressTracker.initializeCryptoJobTracker(CryptoJobType.DECRYPTION); decryptJobMetricsTracker.onJobSubmission(); - cryptoJobHandler.submitJob(new DecryptJob(targetBlobId, encryptionKey, dataBuffer, + cryptoJobHandler.submitJob(new DecryptJob(targetBlobId, encryptionKey, dataBuf.retainedDuplicate(), userMetadata != null ? ByteBuffer.wrap(userMetadata) : null, cryptoService, kms, decryptJobMetricsTracker, (DecryptJob.DecryptJobResult result, Exception exception) -> { routerMetrics.decryptTimeMs.update(System.currentTimeMillis() - startTimeMs); @@ -890,14 +898,13 @@ protected boolean maybeLaunchCryptoJob(ByteBuffer dataBuffer, byte[] userMetadat /** * Process the GetResponse extracted from a {@link ResponseInfo} - * @param responseInfo the response received for a request sent out on behalf of this chunk. * @param getRequestInfo the associated {@link RequestInfo} for which this response was received. * @param getResponse the {@link GetResponse} extracted from the {@link ResponseInfo} * @throws IOException if there is an error during deserialization of the GetResponse. * @throws MessageFormatException if there is an error during deserialization of the GetResponse. */ - private void processGetBlobResponse(ResponseInfo responseInfo, GetRequestInfo getRequestInfo, - GetResponse getResponse) throws IOException, MessageFormatException { + private void processGetBlobResponse(GetRequestInfo getRequestInfo, GetResponse getResponse) + throws IOException, MessageFormatException { ServerErrorCode getError = getResponse.getError(); if (getError == ServerErrorCode.No_Error) { int partitionsInResponse = getResponse.getPartitionResponseInfoList().size(); @@ -918,7 +925,7 @@ private void processGetBlobResponse(ResponseInfo responseInfo, GetRequestInfo ge } else { MessageMetadata messageMetadata = partitionResponseInfo.getMessageMetadataList().get(0); MessageInfo messageInfo = partitionResponseInfo.getMessageInfoList().get(0); - handleBody(responseInfo, getResponse.getInputStream(), messageMetadata, messageInfo); + handleBody(getResponse.getInputStream(), messageMetadata, messageInfo); chunkOperationTracker.onResponse(getRequestInfo.replicaId, TrackedRequestFinalState.SUCCESS); if (RouterUtils.isRemoteReplica(routerConfig, getRequestInfo.replicaId)) { logger.trace("Cross colo request successful for remote replica in {} ", @@ -990,16 +997,15 @@ void setChunkException(RouterException exception) { /** * Slice this chunk's data to only include the bytes within the operation's specified byte range. - * @param buf the {@link ByteBuffer} representing the content of this chunk. - * @return A {@link ByteBuffer} that only includes bytes within the operation's specified byte range. + * @param buf the {@link ByteBuf} representing the content of this chunk. + * @return A {@link ByteBuf} that only includes bytes within the operation's specified byte range. */ - protected ByteBuffer filterChunkToRange(ByteBuffer buf) { + protected ByteBuf filterChunkToRange(ByteBuf buf) { if (options.getBlobOptions.getRange() == null) { return buf; } if (resolvedByteRange.getRangeSize() == 0) { - buf.position(0); - buf.limit(0); + buf.clear(); } else { long relativeOffset = offset; if (options.getBlobOptions.hasBlobSegmentIdx()) { @@ -1008,10 +1014,10 @@ protected ByteBuffer filterChunkToRange(ByteBuffer buf) { long startOffsetInThisChunk = chunkIndex == 0 ? resolvedByteRange.getStartOffset() - relativeOffset : 0; long endOffsetInThisChunkExclusive = chunkIndex == (numChunksTotal - 1) ? resolvedByteRange.getEndOffset() - relativeOffset + 1 : chunkSize; - buf.position((int) startOffsetInThisChunk); - buf.limit((int) endOffsetInThisChunkExclusive); + buf.setIndex(buf.readerIndex() + (int) startOffsetInThisChunk, + buf.readerIndex() + (int) endOffsetInThisChunkExclusive); } - return buf.slice(); + return buf; } /** @@ -1118,10 +1124,6 @@ MessageFormatFlags getOperationFlag() { @Override protected void maybeProcessCallbacks() { if (progressTracker.isCryptoJobRequired() && decryptCallbackResultInfo.decryptJobComplete) { - ResponseInfo responseInfo = chunkIndexToResponseInfo.remove(chunkIndex); - if (responseInfo != null) { - responseInfo.release(); - } decryptJobMetricsTracker.onJobResultProcessingStart(); if (decryptCallbackResultInfo.exception != null) { decryptJobMetricsTracker.incrementOperationError(); @@ -1150,7 +1152,8 @@ protected void maybeProcessCallbacks() { } totalSize = decryptCallbackResultInfo.result.getDecryptedBlobContent().remaining(); if (!resolveRange(totalSize)) { - chunkIndexToBuffer.put(0, filterChunkToRange(decryptCallbackResultInfo.result.getDecryptedBlobContent())); + chunkIndexToBuf.put(0, filterChunkToRange( + Unpooled.wrappedBuffer(decryptCallbackResultInfo.result.getDecryptedBlobContent()))); numChunksRetrieved = 1; progressTracker.setCryptoJobSuccess(); logger.trace("BlobContent available to process for simple blob {}", blobId); @@ -1170,8 +1173,8 @@ protected void maybeProcessCallbacks() { * or the only chunk of the blob. */ @Override - void handleBody(ResponseInfo responseInfo, InputStream payload, MessageMetadata messageMetadata, - MessageInfo messageInfo) throws IOException, MessageFormatException { + void handleBody(InputStream payload, MessageMetadata messageMetadata, MessageInfo messageInfo) + throws IOException, MessageFormatException { if (!successfullyDeserialized) { BlobData blobData; ByteBuffer encryptionKey; @@ -1210,18 +1213,19 @@ void handleBody(ResponseInfo responseInfo, InputStream payload, MessageMetadata } } blobType = blobData.getBlobType(); - chunkIndexToBuffer = new TreeMap<>(); - chunkIndexToResponseInfo = new ConcurrentHashMap<>(); + chunkIndexToBuf = new ConcurrentHashMap<>(); if (rawMode) { + if (blobData != null) { + // RawMode, release blob data. + blobData.getAndRelease().release(); + } // Return the raw bytes from storage if (encryptionKey != null) { chunkIdIterator = null; dataChunks = null; chunkIndex = 0; numChunksTotal = 1; - chunkIndexToBuffer.put(0, rawPayloadBuffer); - responseInfo.retain(); - chunkIndexToResponseInfo.put(0, responseInfo); + chunkIndexToBuf.put(0, Unpooled.wrappedBuffer(rawPayloadBuffer)); numChunksRetrieved = 1; } else { setOperationException(new IllegalStateException("Only encrypted blobs supported in raw mode")); @@ -1230,7 +1234,7 @@ void handleBody(ResponseInfo responseInfo, InputStream payload, MessageMetadata if (blobType == BlobType.MetadataBlob) { handleMetadataBlob(blobData, userMetadata, encryptionKey); } else { - handleSimpleBlob(responseInfo, blobData, userMetadata, encryptionKey); + handleSimpleBlob(blobData, userMetadata, encryptionKey); } } successfullyDeserialized = true; @@ -1284,56 +1288,60 @@ RouterErrorCode processServerError(ServerErrorCode errorCode) { */ private void handleMetadataBlob(BlobData blobData, byte[] userMetadata, ByteBuffer encryptionKey) throws IOException, MessageFormatException { - ByteBuffer serializedMetadataContent = blobData.getStream().getByteBuffer(); - compositeBlobInfo = - MetadataContentSerDe.deserializeMetadataContentRecord(serializedMetadataContent, blobIdFactory); - totalSize = compositeBlobInfo.getTotalSize(); - chunkMetadataList = compositeBlobInfo.getChunkMetadataList(); - boolean rangeResolutionFailure = false; + ByteBuf serializedMetadataContent = blobData.getAndRelease(); try { - if (options.getBlobOptions.hasBlobSegmentIdx()) { - int requestedSegment = options.getBlobOptions.getBlobSegmentIdx(); - if (requestedSegment < 0 || requestedSegment >= chunkMetadataList.size()) { - throw new IllegalArgumentException( - "Bad segment number: " + requestedSegment + ", num of keys: " + chunkMetadataList.size()); + compositeBlobInfo = + MetadataContentSerDe.deserializeMetadataContentRecord(serializedMetadataContent.nioBuffer(), blobIdFactory); + totalSize = compositeBlobInfo.getTotalSize(); + chunkMetadataList = compositeBlobInfo.getChunkMetadataList(); + boolean rangeResolutionFailure = false; + try { + if (options.getBlobOptions.hasBlobSegmentIdx()) { + int requestedSegment = options.getBlobOptions.getBlobSegmentIdx(); + if (requestedSegment < 0 || requestedSegment >= chunkMetadataList.size()) { + throw new IllegalArgumentException( + "Bad segment number: " + requestedSegment + ", num of keys: " + chunkMetadataList.size()); + } + chunkMetadataList = chunkMetadataList.subList(requestedSegment, requestedSegment + 1); } - chunkMetadataList = chunkMetadataList.subList(requestedSegment, requestedSegment + 1); - } - if (options.getBlobOptions.getRange() != null) { - resolvedByteRange = options.getBlobOptions.getRange().toResolvedByteRange(totalSize); - // Get only the chunks within the range. - if (!options.getBlobOptions.hasBlobSegmentIdx()) { - chunkMetadataList = compositeBlobInfo.getStoreKeysInByteRange(resolvedByteRange.getStartOffset(), - resolvedByteRange.getEndOffset()); + if (options.getBlobOptions.getRange() != null) { + resolvedByteRange = options.getBlobOptions.getRange().toResolvedByteRange(totalSize); + // Get only the chunks within the range. + if (!options.getBlobOptions.hasBlobSegmentIdx()) { + chunkMetadataList = compositeBlobInfo.getStoreKeysInByteRange(resolvedByteRange.getStartOffset(), + resolvedByteRange.getEndOffset()); + } } + } catch (IllegalArgumentException e) { + onInvalidRange(e); + rangeResolutionFailure = true; } - } catch (IllegalArgumentException e) { - onInvalidRange(e); - rangeResolutionFailure = true; - } - if (!rangeResolutionFailure) { - if (options.getChunkIdsOnly || getOperationFlag() == MessageFormatFlags.Blob || encryptionKey == null) { - initializeDataChunks(); - } else { - // if blob is encrypted, then decryption is required only in case of GetBlobInfo and GetBlobAll (since user-metadata - // is expected to be encrypted). Incase of GetBlob, Metadata blob does not need any decryption even if BlobProperties says so - decryptCallbackResultInfo = new DecryptCallBackResultInfo(); - progressTracker.initializeCryptoJobTracker(CryptoJobType.DECRYPTION); - decryptJobMetricsTracker.onJobSubmission(); - logger.trace("Submitting decrypt job for Metadata chunk {}", blobId); - long startTimeMs = System.currentTimeMillis(); - cryptoJobHandler.submitJob( - new DecryptJob(blobId, encryptionKey, null, ByteBuffer.wrap(userMetadata), cryptoService, kms, - decryptJobMetricsTracker, (DecryptJob.DecryptJobResult result, Exception exception) -> { - routerMetrics.decryptTimeMs.update(System.currentTimeMillis() - startTimeMs); - decryptJobMetricsTracker.onJobCallbackProcessingStart(); - logger.trace("Handling decrypt job call back for Metadata chunk {} to set decrypt callback results", - blobId); - decryptCallbackResultInfo.setResultAndException(result, exception); - routerCallback.onPollReady(); - decryptJobMetricsTracker.onJobCallbackProcessingComplete(); - })); + if (!rangeResolutionFailure) { + if (options.getChunkIdsOnly || getOperationFlag() == MessageFormatFlags.Blob || encryptionKey == null) { + initializeDataChunks(); + } else { + // if blob is encrypted, then decryption is required only in case of GetBlobInfo and GetBlobAll (since user-metadata + // is expected to be encrypted). Incase of GetBlob, Metadata blob does not need any decryption even if BlobProperties says so + decryptCallbackResultInfo = new DecryptCallBackResultInfo(); + progressTracker.initializeCryptoJobTracker(CryptoJobType.DECRYPTION); + decryptJobMetricsTracker.onJobSubmission(); + logger.trace("Submitting decrypt job for Metadata chunk {}", blobId); + long startTimeMs = System.currentTimeMillis(); + cryptoJobHandler.submitJob( + new DecryptJob(blobId, encryptionKey, null, ByteBuffer.wrap(userMetadata), cryptoService, kms, + decryptJobMetricsTracker, (DecryptJob.DecryptJobResult result, Exception exception) -> { + routerMetrics.decryptTimeMs.update(System.currentTimeMillis() - startTimeMs); + decryptJobMetricsTracker.onJobCallbackProcessingStart(); + logger.trace("Handling decrypt job call back for Metadata chunk {} to set decrypt callback results", + blobId); + decryptCallbackResultInfo.setResultAndException(result, exception); + routerCallback.onPollReady(); + decryptJobMetricsTracker.onJobCallbackProcessingComplete(); + })); + } } + } finally { + serializedMetadataContent.release(); } } @@ -1360,36 +1368,37 @@ private void initializeDataChunks() { /** * Process a simple blob and extract the requested data from the blob. - * @param responseInfo the response received for a request sent out on behalf of this chunk. * @param blobData the simple blob's data * @param userMetadata userMetadata of the blob * @param encryptionKey encryption key for the blob. Could be null for non encrypted blob. */ - private void handleSimpleBlob(ResponseInfo responseInfo, BlobData blobData, byte[] userMetadata, - ByteBuffer encryptionKey) { - boolean rangeResolutionFailure = false; - if (encryptionKey == null) { - totalSize = blobData.getSize(); - rangeResolutionFailure = resolveRange(totalSize); - } else { - // for encrypted blobs, Blob data will not have the right size. Will have to wait until decryption is complete - } - if (!rangeResolutionFailure) { - chunkIdIterator = null; - numChunksTotal = 0; - dataChunks = null; - if (!options.getChunkIdsOnly) { - chunkIndex = 0; - numChunksTotal = 1; - ByteBuffer dataBuffer = blobData.getStream().getByteBuffer(); - responseInfo.retain(); - chunkIndexToResponseInfo.put(0, responseInfo); - boolean launchedJob = maybeLaunchCryptoJob(dataBuffer, userMetadata, encryptionKey, blobId); - if (!launchedJob) { - chunkIndexToBuffer.put(0, filterChunkToRange(dataBuffer)); - numChunksRetrieved = 1; + private void handleSimpleBlob(BlobData blobData, byte[] userMetadata, ByteBuffer encryptionKey) { + ByteBuf chunkBuf = blobData.getAndRelease(); + try { + boolean rangeResolutionFailure = false; + if (encryptionKey == null) { + totalSize = blobData.getSize(); + rangeResolutionFailure = resolveRange(totalSize); + } else { + // for encrypted blobs, Blob data will not have the right size. Will have to wait until decryption is complete + } + if (!rangeResolutionFailure) { + chunkIdIterator = null; + numChunksTotal = 0; + dataChunks = null; + if (!options.getChunkIdsOnly) { + chunkIndex = 0; + numChunksTotal = 1; + boolean launchedJob = maybeLaunchCryptoJob(chunkBuf, userMetadata, encryptionKey, blobId); + if (!launchedJob) { + chunkBuf = filterChunkToRange(chunkBuf); + chunkIndexToBuf.put(0, chunkBuf.retainedDuplicate()); + numChunksRetrieved = 1; + } } } + } finally { + chunkBuf.release(); } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/ChunkFillTest.java b/ambry-router/src/test/java/com.github.ambry.router/ChunkFillTest.java index 8dbdd85849..80bba01eca 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/ChunkFillTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/ChunkFillTest.java @@ -28,6 +28,7 @@ import com.github.ambry.utils.MockTime; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; +import io.netty.buffer.Unpooled; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -331,7 +332,7 @@ private void assertDataIdentity(ClusterMap clusterMap) throws IOException { for (int i = 0; i < numChunks; i++) { compositeBuffers[i].flip(); DecryptJob decryptJob = - new DecryptJob(compositeBlobIds[i], compositeEncryptionKeys[i], compositeBuffers[i], null, cryptoService, + new DecryptJob(compositeBlobIds[i], compositeEncryptionKeys[i], Unpooled.wrappedBuffer(compositeBuffers[i]), null, cryptoService, kms, new CryptoJobMetricsTracker(routerMetrics.decryptJobMetrics), (result, exception) -> { Assert.assertNull("Exception shouldn't have been thrown", exception); int chunkSize = result.getDecryptedBlobContent().remaining(); diff --git a/ambry-router/src/test/java/com.github.ambry.router/CryptoJobHandlerTest.java b/ambry-router/src/test/java/com.github.ambry.router/CryptoJobHandlerTest.java index 2f69749287..d1bd20d8ac 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/CryptoJobHandlerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/CryptoJobHandlerTest.java @@ -23,6 +23,7 @@ import com.github.ambry.config.VerifiableProperties; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.UtilsTest; +import io.netty.buffer.Unpooled; import java.io.IOException; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; @@ -244,7 +245,7 @@ public void testPendingDecryptJobs() throws InterruptedException, GeneralSecurit assertNotNull("Encrypted content should not be null", encryptJobResult.getEncryptedBlobContent()); assertNotNull("Encrypted key should not be null", encryptJobResult.getEncryptedKey()); decryptJobs.add(new DecryptJob(testData.blobId, encryptJobResult.getEncryptedKey(), - encryptJobResult.getEncryptedBlobContent(), encryptJobResult.getEncryptedUserMetadata(), + Unpooled.wrappedBuffer(encryptJobResult.getEncryptedBlobContent()), encryptJobResult.getEncryptedUserMetadata(), cryptoService, kms, new CryptoJobMetricsTracker(routerMetrics.decryptJobMetrics), (DecryptJob.DecryptJobResult decryptJobResult, Exception e) -> { if (e == null) { @@ -307,7 +308,7 @@ public void testCryptoJobHandlerClose() throws GeneralSecurityException { })); cryptoJobHandler.submitJob( - new DecryptJob(randomData.blobId, randomData.blobContent, randomData.blobContent, randomData.userMetadata, + new DecryptJob(randomData.blobId, randomData.blobContent, Unpooled.wrappedBuffer(randomData.blobContent), randomData.userMetadata, cryptoService, kms, new CryptoJobMetricsTracker(routerMetrics.decryptJobMetrics), (DecryptJob.DecryptJobResult result, Exception exception) -> { fail("Callback should not have been called since CryptoWorker is closed"); @@ -397,7 +398,7 @@ private void testFailureOnDecryption(SecretKeySpec perBlobKey, MockKeyManagement new GeneralSecurityException("Exception to test", new IllegalStateException())); } cryptoJobHandler.submitJob(new DecryptJob(testData.blobId, encryptJobResult.getEncryptedKey(), - encryptJobResult.getEncryptedBlobContent(), encryptJobResult.getEncryptedUserMetadata(), + Unpooled.wrappedBuffer(encryptJobResult.getEncryptedBlobContent()), encryptJobResult.getEncryptedUserMetadata(), cryptoService, kms, new CryptoJobMetricsTracker(routerMetrics.decryptJobMetrics), (DecryptJob.DecryptJobResult result, Exception e) -> { decryptCallBackCount.countDown(); @@ -444,8 +445,9 @@ public void onCompletion(EncryptJob.EncryptJobResult encryptJobResult, Exception assertNotNull("Encrypted userMetadata should not be null", encryptJobResult.getEncryptedUserMetadata()); } assertNotNull("Encrypted key should not be null", encryptJobResult.getEncryptedKey()); + ByteBuffer encryptedBlobContent = encryptJobResult.getEncryptedBlobContent(); cryptoJobHandler.submitJob( - new DecryptJob(blobId, encryptJobResult.getEncryptedKey(), encryptJobResult.getEncryptedBlobContent(), + new DecryptJob(blobId, encryptJobResult.getEncryptedKey(), encryptedBlobContent == null? null: Unpooled.wrappedBuffer(encryptedBlobContent), encryptJobResult.getEncryptedUserMetadata(), cryptoService, kms, new CryptoJobMetricsTracker(routerMetrics.decryptJobMetrics), decryptCallBackVerifier)); } else { diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java index c84f98e814..2bcef8fde3 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java @@ -24,6 +24,7 @@ import com.github.ambry.commons.ByteBufferAsyncWritableChannel; import com.github.ambry.commons.ByteBufferReadableStreamChannel; import com.github.ambry.commons.LoggingNotificationSystem; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.commons.ResponseHandler; import com.github.ambry.config.CryptoServiceConfig; import com.github.ambry.config.KMSConfig; @@ -54,6 +55,7 @@ import com.github.ambry.utils.MockTime; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; @@ -76,13 +78,13 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import static com.github.ambry.router.PutManagerTest.*; import static com.github.ambry.router.RouterTestHelpers.*; -import static org.junit.Assert.*; import static org.junit.Assume.*; @@ -118,6 +120,7 @@ public class GetBlobOperationTest { private final RouterCallback routerCallback; private final String operationTrackerType; private final boolean testEncryption; + private final boolean networkUseNetty; private MockKeyManagementService kms = null; private MockCryptoService cryptoService = null; private CryptoJobHandler cryptoJobHandler = null; @@ -133,6 +136,7 @@ public class GetBlobOperationTest { private BlobProperties blobProperties; private byte[] userMetadata; private byte[] putContent; + private NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); // Options which are passed into GetBlobOperations private GetBlobOptionsInternal options; @@ -155,6 +159,11 @@ public void testAndAssert(RouterErrorCode expectedError) throws Exception { } }; + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + @After public void after() { router.close(); @@ -162,6 +171,7 @@ public void after() { if (cryptoJobHandler != null) { cryptoJobHandler.close(); } + nettyByteBufLeakHelper.afterTest(); } /** @@ -171,9 +181,10 @@ public void after() { */ @Parameterized.Parameters public static List data() { - return Arrays.asList(new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false}, - {AdaptiveOperationTracker.class.getSimpleName(), false}, - {AdaptiveOperationTracker.class.getSimpleName(), true}}); + return Arrays.asList(new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false, false}, + {SimpleOperationTracker.class.getSimpleName(), false, true}, + {AdaptiveOperationTracker.class.getSimpleName(), false, false}, + {AdaptiveOperationTracker.class.getSimpleName(), true, false}}); } /** @@ -182,9 +193,11 @@ public static List data() { * @param operationTrackerType the type of {@link OperationTracker} to use. * @param testEncryption {@code true} if blobs need to be tested w/ encryption. {@code false} otherwise */ - public GetBlobOperationTest(String operationTrackerType, boolean testEncryption) throws Exception { + public GetBlobOperationTest(String operationTrackerType, boolean testEncryption, boolean networkUseNetty) + throws Exception { this.operationTrackerType = operationTrackerType; this.testEncryption = testEncryption; + this.networkUseNetty = networkUseNetty; // Defaults. Tests may override these and do new puts as appropriate. maxChunkSize = random.nextInt(1024 * 1024) + 1; // a blob size that is greater than the maxChunkSize and is not a multiple of it. Will result in a composite blob. @@ -394,11 +407,15 @@ public void testCompositeBlobRawMode() throws Exception { // extract chunk ids BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(new ByteArrayInputStream(payload.array()), blobIdFactory); - ByteBuffer metadataBuffer = blobAll.getBlobData().getStream().getByteBuffer(); - CompositeBlobInfo compositeBlobInfo = - MetadataContentSerDe.deserializeMetadataContentRecord(metadataBuffer, blobIdFactory); - Assert.assertEquals("Total size didn't match", blobSize, compositeBlobInfo.getTotalSize()); - Assert.assertEquals("Chunk count didn't match", numChunks, compositeBlobInfo.getKeys().size()); + ByteBuf metadataBuffer = blobAll.getBlobData().getAndRelease(); + try { + CompositeBlobInfo compositeBlobInfo = + MetadataContentSerDe.deserializeMetadataContentRecord(metadataBuffer.nioBuffer(), blobIdFactory); + Assert.assertEquals("Total size didn't match", blobSize, compositeBlobInfo.getTotalSize()); + Assert.assertEquals("Chunk count didn't match", numChunks, compositeBlobInfo.getKeys().size()); + } finally { + metadataBuffer.release(); + } // TODO; test raw get on each chunk (needs changes to test framework) } else { @@ -1047,6 +1064,7 @@ public void testRangeRequestCompositeBlob() throws Exception { public void testEarlyReadableStreamChannelClose() throws Exception { for (int numChunksInBlob = 0; numChunksInBlob <= 4; numChunksInBlob++) { for (int numChunksToRead = 0; numChunksToRead < numChunksInBlob; numChunksToRead++) { + System.out.println("" + numChunksInBlob + " chunks but only read " + numChunksToRead); testEarlyReadableStreamChannelClose(numChunksInBlob, numChunksToRead); } } @@ -1606,6 +1624,7 @@ private Properties getDefaultNonBlockingRouterProperties(boolean excludeTimeout) properties.setProperty("router.operation.tracker.exclude.timeout.enabled", Boolean.toString(excludeTimeout)); properties.setProperty("router.operation.tracker.terminate.on.not.found.enabled", "true"); properties.setProperty("router.get.blob.operation.share.memory", "true"); + properties.setProperty("network.use.netty.byte.buf", Boolean.toString(networkUseNetty)); return properties; } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/MockNetworkClient.java b/ambry-router/src/test/java/com.github.ambry.router/MockNetworkClient.java index 21f281fed5..534ecceb72 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/MockNetworkClient.java +++ b/ambry-router/src/test/java/com.github.ambry.router/MockNetworkClient.java @@ -16,6 +16,7 @@ import com.codahale.metrics.MetricRegistry; import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.config.NetworkConfig; +import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.SocketNetworkClient; import com.github.ambry.network.NetworkMetrics; import com.github.ambry.network.RequestInfo; @@ -25,6 +26,7 @@ import com.github.ambry.utils.Time; import java.io.IOException; import java.util.List; +import java.util.Properties; import java.util.Set; @@ -41,8 +43,9 @@ class MockNetworkClient extends SocketNetworkClient { * Construct a MockNetworkClient with mock components. */ MockNetworkClient() throws IOException { - super(new MockSelector(new MockServerLayout(new MockClusterMap()), null, new MockTime()), null, - new NetworkMetrics(new MetricRegistry()), 0, 0, 0, new MockTime()); + super(new MockSelector(new MockServerLayout(new MockClusterMap()), null, new MockTime(), + new NetworkConfig(new VerifiableProperties(new Properties()))), null, new NetworkMetrics(new MetricRegistry()), + 0, 0, 0, new MockTime()); } /** diff --git a/ambry-router/src/test/java/com.github.ambry.router/MockNetworkClientFactory.java b/ambry-router/src/test/java/com.github.ambry.router/MockNetworkClientFactory.java index cc65ac9db5..1702a60aac 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/MockNetworkClientFactory.java +++ b/ambry-router/src/test/java/com.github.ambry.router/MockNetworkClientFactory.java @@ -66,7 +66,7 @@ class MockNetworkClientFactory extends SocketNetworkClientFactory { */ @Override public SocketNetworkClient getNetworkClient() throws IOException { - MockSelector selector = new MockSelector(serverLayout, state, time); + MockSelector selector = new MockSelector(serverLayout, state, time, networkConfig); return new SocketNetworkClient(selector, networkConfig, new NetworkMetrics(new MetricRegistry()), maxPortsPlainText, maxPortsSsl, checkoutTimeoutMs, time); } @@ -77,7 +77,7 @@ public SocketNetworkClient getNetworkClient() throws IOException { * @throws IOException if the selector could not be constructed. */ public MockNetworkClient getMockNetworkClient() throws IOException { - MockSelector selector = new MockSelector(serverLayout, state, time); + MockSelector selector = new MockSelector(serverLayout, state, time, networkConfig); return new MockNetworkClient(selector, networkConfig, new NetworkMetrics(new MetricRegistry()), maxPortsPlainText, maxPortsSsl, checkoutTimeoutMs, time); } diff --git a/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java b/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java index 141913c35f..2e04319ce6 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java +++ b/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java @@ -14,6 +14,7 @@ package com.github.ambry.router; import com.codahale.metrics.MetricRegistry; +import com.github.ambry.commons.ByteBufferReadableStreamChannel; import com.github.ambry.config.NetworkConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.BoundedByteBufferReceive; @@ -23,9 +24,15 @@ import com.github.ambry.network.NetworkSend; import com.github.ambry.network.PortType; import com.github.ambry.network.Selector; +import com.github.ambry.utils.ByteBufferChannel; +import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Time; import java.io.IOException; +import java.io.SequenceInputStream; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -48,6 +55,7 @@ class MockSelector extends Selector { private final AtomicReference state; private final MockServerLayout serverLayout; private boolean isOpen = true; + private final NetworkConfig config; /** * @@ -58,14 +66,15 @@ class MockSelector extends Selector { * @param time the Time instance to use. * @throws IOException if {@link Selector} throws. */ - MockSelector(MockServerLayout serverLayout, AtomicReference state, Time time) throws IOException { - super(new NetworkMetrics(new MetricRegistry()), time, null, - new NetworkConfig(new VerifiableProperties(new Properties()))); + MockSelector(MockServerLayout serverLayout, AtomicReference state, Time time, NetworkConfig config) + throws IOException { + super(new NetworkMetrics(new MetricRegistry()), time, null, config); // we don't need the actual selector, close it. super.close(); this.serverLayout = serverLayout; this.state = state == null ? new AtomicReference(MockSelectorState.Good) : state; this.time = time; + this.config = config; } /** @@ -122,7 +131,22 @@ public void poll(long timeoutMs, List sends) throws IOException { MockServer server = connIdToServer.get(send.getConnectionId()); BoundedByteBufferReceive receive = server.send(send.getPayload()); if (receive != null) { - receives.add(new NetworkReceive(send.getConnectionId(), receive, time)); + if (!config.networkUseNettyByteBuf) { + receives.add(new NetworkReceive(send.getConnectionId(), receive, time)); + } else { + // Convert a BoundedByteBufferReceive to BoundedNettyByteBufReceive + BoundedNettyByteBufReceive boundedNettyByteBufReceive = new BoundedNettyByteBufReceive(); + ByteBuffer buffer = (ByteBuffer) receive.getAndRelease(); + ByteBuffer sizeBuffer = ByteBuffer.allocate(Long.BYTES); + sizeBuffer.putLong(buffer.remaining() + Long.BYTES); + sizeBuffer.flip(); + ReadableByteChannel channel = Channels.newChannel( + new SequenceInputStream(new ByteBufferInputStream(sizeBuffer), new ByteBufferInputStream(buffer))); + while (!boundedNettyByteBufReceive.isReadComplete()) { + boundedNettyByteBufReceive.readFrom(channel); + } + receives.add(new NetworkReceive(send.getConnectionId(), boundedNettyByteBufReceive, time)); + } } } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java b/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java index 8da290ad01..2d09956099 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java @@ -44,6 +44,7 @@ import com.github.ambry.utils.ThrowingConsumer; import com.github.ambry.utils.Utils; import com.github.ambry.utils.UtilsTest; +import io.netty.buffer.Unpooled; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -1117,7 +1118,7 @@ private void verifyBlob(RequestAndResult requestAndResult, HashMap { assertNull("Exception should not be thrown", exception); assertEquals("BlobId mismatch", origBlobId, result.getBlobId()); @@ -1158,7 +1159,7 @@ private void verifyCompositeBlob(BlobProperties properties, byte[] originalPutCo // reason to directly call run() instead of spinning up a thread instead of calling start() is that, any exceptions or // assertion failures in non main thread will not fail the test. new DecryptJob(dataBlobPutRequest.getBlobId(), dataBlobPutRequest.getBlobEncryptionKey().duplicate(), - ByteBuffer.wrap(dataBlobContent), dataBlobPutRequest.getUsermetadata().duplicate(), cryptoService, kms, + Unpooled.wrappedBuffer(dataBlobContent), dataBlobPutRequest.getUsermetadata().duplicate(), cryptoService, kms, new CryptoJobMetricsTracker(metrics.decryptJobMetrics), (result, exception) -> { Assert.assertNull("Exception should not be thrown", exception); assertEquals("BlobId mismatch", dataBlobPutRequest.getBlobId(), result.getBlobId()); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java index 13a8ec3245..425bd8cabd 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java @@ -48,6 +48,7 @@ import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; @@ -436,7 +437,12 @@ void getAndVerify(ConnectedChannel channel, int blobsCount) throws Exception { BlobData blobData = MessageFormatRecord.deserializeBlob(resp.getInputStream()); Assert.assertEquals(properties.get(i).getBlobSize(), blobData.getSize()); byte[] dataOutput = new byte[(int) blobData.getSize()]; - blobData.getStream().read(dataOutput); + ByteBuf buffer = blobData.getAndRelease(); + try { + buffer.readBytes(dataOutput); + } finally { + buffer.release(); + } Assert.assertArrayEquals(dataOutput, data.get(i)); } } else { diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java index 598a601ff2..6596899509 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java @@ -94,6 +94,7 @@ import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; @@ -127,6 +128,17 @@ final class ServerTestUtil { + static byte[] getBlobDataAndRelease(BlobData blobData) { + byte[] actualBlobData = new byte[(int) blobData.getSize()]; + ByteBuf buffer = blobData.getAndRelease(); + try { + buffer.readBytes(actualBlobData); + } finally { + buffer.release(); + } + return actualBlobData; + } + static void endToEndTest(Port targetPort, String routerDatacenter, MockCluster cluster, SSLConfig clientSSLConfig, SSLSocketFactory clientSSLSocketFactory, Properties routerProps, boolean testEncryption) { try { @@ -351,8 +363,7 @@ static void endToEndTest(Port targetPort, String routerDatacenter, MockCluster c GetResponse resp4 = GetResponse.readFrom(new DataInputStream(stream), clusterMap); responseStream = resp4.getInputStream(); BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(responseStream, blobIdFactory); - byte[] actualBlobData = new byte[(int) blobAll.getBlobData().getSize()]; - blobAll.getBlobData().getStream().getByteBuffer().get(actualBlobData); + byte[] actualBlobData = getBlobDataAndRelease(blobAll.getBlobData()); // verify content Assert.assertArrayEquals("Content mismatch", data, actualBlobData); if (testEncryption) { @@ -480,8 +491,7 @@ static void endToEndTest(Port targetPort, String routerDatacenter, MockCluster c resp1 = GetResponse.readFrom(new DataInputStream(stream), clusterMap); responseStream = resp1.getInputStream(); blobAll = MessageFormatRecord.deserializeBlobAll(responseStream, blobIdFactory); - actualBlobData = new byte[(int) blobAll.getBlobData().getSize()]; - blobAll.getBlobData().getStream().getByteBuffer().get(actualBlobData); + actualBlobData = getBlobDataAndRelease(blobAll.getBlobData()); Assert.assertArrayEquals("Content mismatch", data, actualBlobData); // delete a blob on a restarted store , which should succeed @@ -794,11 +804,7 @@ static void endToEndReplicationWithMultiNodeMultiPartitionTest(int interestedDat resp = GetResponse.readFrom(new DataInputStream(stream), clusterMap); try { BlobData blobData = MessageFormatRecord.deserializeBlob(resp.getInputStream()); - byte[] blobout = new byte[(int) blobData.getSize()]; - int readsize = 0; - while (readsize < blobData.getSize()) { - readsize += blobData.getStream().read(blobout, readsize, (int) blobData.getSize() - readsize); - } + byte[] blobout = getBlobDataAndRelease(blobData); Assert.assertArrayEquals(data, blobout); if (testEncryption) { assertNotNull("MessageMetadata should not have been null", @@ -827,13 +833,7 @@ static void endToEndReplicationWithMultiNodeMultiPartitionTest(int interestedDat getExpiryTimeMs(blobAll.getBlobInfo().getBlobProperties())); assertEquals("Expiration time mismatch (MessageInfo)", expectedExpiryTimeMs, resp.getPartitionResponseInfoList().get(0).getMessageInfoList().get(0).getExpirationTimeInMs()); - byte[] blobout = new byte[(int) blobAll.getBlobData().getSize()]; - int readsize = 0; - while (readsize < blobAll.getBlobData().getSize()) { - readsize += blobAll.getBlobData() - .getStream() - .read(blobout, readsize, (int) blobAll.getBlobData().getSize() - readsize); - } + byte[] blobout = getBlobDataAndRelease(blobAll.getBlobData()); Assert.assertArrayEquals(data, blobout); if (testEncryption) { Assert.assertNotNull("EncryptionKey should not ne null", blobAll.getBlobEncryptionKey()); @@ -1039,11 +1039,7 @@ static void endToEndReplicationWithMultiNodeMultiPartitionTest(int interestedDat } else { try { BlobData blobData = MessageFormatRecord.deserializeBlob(resp.getInputStream()); - byte[] blobout = new byte[(int) blobData.getSize()]; - int readsize = 0; - while (readsize < blobData.getSize()) { - readsize += blobData.getStream().read(blobout, readsize, (int) blobData.getSize() - readsize); - } + byte[] blobout = getBlobDataAndRelease(blobData); Assert.assertArrayEquals(data, blobout); if (testEncryption) { assertNotNull("MessageMetadata should not have been null", @@ -1074,13 +1070,7 @@ static void endToEndReplicationWithMultiNodeMultiPartitionTest(int interestedDat } else { try { BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(resp.getInputStream(), blobIdFactory); - byte[] blobout = new byte[(int) blobAll.getBlobData().getSize()]; - int readsize = 0; - while (readsize < blobAll.getBlobData().getSize()) { - readsize += blobAll.getBlobData() - .getStream() - .read(blobout, readsize, (int) blobAll.getBlobData().getSize() - readsize); - } + byte[] blobout = getBlobDataAndRelease(blobAll.getBlobData()); Assert.assertArrayEquals(data, blobout); if (testEncryption) { Assert.assertNotNull("EncryptionKey should not ne null", blobAll.getBlobEncryptionKey()); @@ -1191,11 +1181,7 @@ static void endToEndReplicationWithMultiNodeMultiPartitionTest(int interestedDat } else { try { BlobData blobData = MessageFormatRecord.deserializeBlob(resp.getInputStream()); - byte[] blobout = new byte[(int) blobData.getSize()]; - int readsize = 0; - while (readsize < blobData.getSize()) { - readsize += blobData.getStream().read(blobout, readsize, (int) blobData.getSize() - readsize); - } + byte[] blobout = getBlobDataAndRelease(blobData); Assert.assertArrayEquals(data, blobout); if (testEncryption) { assertNotNull("MessageMetadata should not have been null", @@ -1223,13 +1209,7 @@ static void endToEndReplicationWithMultiNodeMultiPartitionTest(int interestedDat } else { try { BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(resp.getInputStream(), blobIdFactory); - byte[] blobout = new byte[(int) blobAll.getBlobData().getSize()]; - int readsize = 0; - while (readsize < blobAll.getBlobData().getSize()) { - readsize += blobAll.getBlobData() - .getStream() - .read(blobout, readsize, (int) blobAll.getBlobData().getSize() - readsize); - } + byte[] blobout = getBlobDataAndRelease(blobAll.getBlobData()); Assert.assertArrayEquals(data, blobout); if (testEncryption) { Assert.assertNotNull("EncryptionKey should not ne null", blobAll.getBlobEncryptionKey()); @@ -1428,8 +1408,7 @@ public void onCompletion(String result, Exception exception) { GetResponse resp2 = GetResponse.readFrom(new DataInputStream(stream), clusterMap); InputStream responseStream = resp2.getInputStream(); BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(responseStream, blobIdFactory); - byte[] actualBlobData = new byte[(int) blobAll.getBlobData().getSize()]; - blobAll.getBlobData().getStream().getByteBuffer().get(actualBlobData); + byte[] actualBlobData = getBlobDataAndRelease(blobAll.getBlobData()); Assert.assertArrayEquals("Content mismatch", data, actualBlobData); // delete a blob on a restarted store , which should succeed @@ -1637,11 +1616,7 @@ static void endToEndReplicationWithMultiNodeSinglePartitionTest(String routerDat GetResponse resp3 = GetResponse.readFrom(new DataInputStream(stream), clusterMap); try { BlobData blobData = MessageFormatRecord.deserializeBlob(resp3.getInputStream()); - byte[] blobout = new byte[(int) blobData.getSize()]; - int readsize = 0; - while (readsize < blobData.getSize()) { - readsize += blobData.getStream().read(blobout, readsize, (int) blobData.getSize() - readsize); - } + byte[] blobout = getBlobDataAndRelease(blobData); Assert.assertArrayEquals(dataList.get(0), blobout); if (testEncryption) { assertNotNull("MessageMetadata should not have been null", @@ -1666,13 +1641,7 @@ static void endToEndReplicationWithMultiNodeSinglePartitionTest(String routerDat GetResponse resp4 = GetResponse.readFrom(new DataInputStream(stream), clusterMap); try { BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(resp4.getInputStream(), blobIdFactory); - byte[] blobout = new byte[(int) blobAll.getBlobData().getSize()]; - int readsize = 0; - while (readsize < blobAll.getBlobData().getSize()) { - readsize += blobAll.getBlobData() - .getStream() - .read(blobout, readsize, (int) blobAll.getBlobData().getSize() - readsize); - } + byte[] blobout = getBlobDataAndRelease(blobAll.getBlobData()); Assert.assertArrayEquals(dataList.get(0), blobout); if (testEncryption) { assertNotNull("MessageMetadata should not have been null", blobAll.getBlobEncryptionKey()); @@ -2088,11 +2057,7 @@ private static void checkBlobContent(MockClusterMap clusterMap, BlobId blobId, C GetResponse resp = GetResponse.readFrom(new DataInputStream(stream), clusterMap); assertEquals(ServerErrorCode.No_Error, resp.getError()); BlobData blobData = MessageFormatRecord.deserializeBlob(resp.getInputStream()); - byte[] blobout = new byte[(int) blobData.getSize()]; - int readsize = 0; - while (readsize < blobData.getSize()) { - readsize += blobData.getStream().read(blobout, readsize, (int) blobData.getSize() - readsize); - } + byte[] blobout = getBlobDataAndRelease(blobData); Assert.assertArrayEquals(dataToCheck, blobout); if (encryptionKey != null) { Assert.assertNotNull("EncryptionKey should not have been null", @@ -2194,8 +2159,7 @@ static void checkTtlUpdateStatus(ConnectedChannel channel, ClusterMap clusterMap response = GetResponse.readFrom(new DataInputStream(stream), clusterMap); InputStream responseStream = response.getInputStream(); BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(responseStream, storeKeyFactory); - byte[] actualBlobData = new byte[(int) blobAll.getBlobData().getSize()]; - blobAll.getBlobData().getStream().getByteBuffer().get(actualBlobData); + byte[] actualBlobData = getBlobDataAndRelease(blobAll.getBlobData()); assertArrayEquals("Content mismatch", expectedBlobData, actualBlobData); messageInfo = response.getPartitionResponseInfoList().get(0).getMessageInfoList().get(0); assertEquals("Blob ID not as expected", blobId, messageInfo.getStoreKey()); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java b/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java index 279e47ba37..5607f9916b 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java @@ -33,6 +33,7 @@ import com.github.ambry.protocol.GetResponse; import com.github.ambry.protocol.PartitionRequestInfo; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.DataInputStream; import java.io.InputStream; import java.nio.ByteBuffer; @@ -198,9 +199,11 @@ public void run() { try { BlobData blobData = MessageFormatRecord.deserializeBlob(resp.getInputStream()); byte[] blobout = new byte[(int) blobData.getSize()]; - int readsize = 0; - while (readsize < blobData.getSize()) { - readsize += blobData.getStream().read(blobout, readsize, (int) blobData.getSize() - readsize); + ByteBuf buffer = blobData.getAndRelease(); + try { + buffer.readBytes(blobout); + } finally { + buffer.release(); } if (ByteBuffer.wrap(blobout).compareTo(ByteBuffer.wrap(payload.blob)) != 0) { throw new IllegalStateException(); @@ -228,11 +231,11 @@ public void run() { BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(resp.getInputStream(), new BlobIdFactory(clusterMap)); byte[] blobout = new byte[(int) blobAll.getBlobData().getSize()]; - int readsize = 0; - while (readsize < blobAll.getBlobData().getSize()) { - readsize += blobAll.getBlobData() - .getStream() - .read(blobout, readsize, (int) blobAll.getBlobData().getSize() - readsize); + ByteBuf buffer = blobAll.getBlobData().getAndRelease(); + try { + buffer.readBytes(blobout); + } finally { + buffer.release(); } if (ByteBuffer.wrap(blobout).compareTo(ByteBuffer.wrap(payload.blob)) != 0) { throw new IllegalStateException(); diff --git a/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java b/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java index b9a13b0fef..2b0028d0e5 100644 --- a/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java +++ b/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java @@ -28,6 +28,8 @@ import com.github.ambry.tools.util.ToolUtils; import com.github.ambry.utils.CrcInputStream; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; @@ -456,28 +458,32 @@ private void verify(String dataDir) throws Exception { } if (isDeleted) { - if (!verifyZeroed(metadata.array()) || !verifyZeroed( - Utils.readBytesFromStream(output.getStream(), new byte[(int) output.getSize()], 0, - (int) output.getSize()))) { + ByteBuf byteBuf = output.getAndRelease(); + try { + if (!verifyZeroed(metadata.array()) || !verifyZeroed( + Utils.readBytesFromByteBuf(byteBuf, new byte[(int) output.getSize()], 0, + (int) output.getSize()))) { /* If the offset in the index is different from that in the log, hard delete wouldn't have been possible and we just saw a duplicate put for the same key, otherwise we missed a hard delete. */ - if (currentOffset == indexValue.getOriginalMessageOffset()) { - notHardDeletedErrorCount++; + if (currentOffset == indexValue.getOriginalMessageOffset()) { + notHardDeletedErrorCount++; + } else { + // the assumption here is that this put has been lost as far as the index is concerned due to + // a duplicate put. Of course, these shouldn't happen anymore, we are accounting for past + // bugs. + duplicatePuts++; + } } else { - // the assumption here is that this put has been lost as far as the index is concerned due to - // a duplicate put. Of course, these shouldn't happen anymore, we are accounting for past - // bugs. - duplicatePuts++; + hardDeletedPuts++; } - } else { - hardDeletedPuts++; + } finally { + byteBuf.release(); + byteBuf = null; } } else { unDeletedPuts++; } - } else if (MessageFormatRecord.deserializeUpdateRecord(streamlog) - .getType() - .equals(SubRecord.Type.DELETE)) { + } else if (MessageFormatRecord.deserializeUpdateRecord(streamlog).getType().equals(SubRecord.Type.DELETE)) { deletes++; } currentOffset += (header.getMessageSize() + buffer.capacity() + id.sizeInBytes()); @@ -681,8 +687,7 @@ private boolean deserializeUpdateRecord(InputStream streamlog, InputStream oldSt boolean caughtException = false; boolean caughtExceptionInOld = false; try { - isDeleteRecord = - MessageFormatRecord.deserializeUpdateRecord(streamlog).getType().equals(SubRecord.Type.DELETE); + isDeleteRecord = MessageFormatRecord.deserializeUpdateRecord(streamlog).getType().equals(SubRecord.Type.DELETE); } catch (Exception e) { caughtException = true; } @@ -730,22 +735,31 @@ boolean deserializeUserMetadataAndBlob(InputStream streamlog, InputStream oldStr if (!caughtException) { if (isDeleted) { + ByteBuf byteBuf = blobData.getAndRelease(); try { asExpected = verifyZeroed(usermetadata.array()) && verifyZeroed( - Utils.readBytesFromStream(blobData.getStream(), new byte[(int) blobData.getSize()], 0, - (int) blobData.getSize())); + Utils.readBytesFromByteBuf(byteBuf, new byte[(int) blobData.getSize()], 0, (int) blobData.getSize())); } catch (IOException e) { asExpected = false; + } finally { + byteBuf.release(); + byteBuf = null; } } else { + ByteBuf byteBuf = blobData.getAndRelease(); + ByteBuf oldByteBuf = oldBlobData.getAndRelease(); try { asExpected = Arrays.equals(usermetadata.array(), oldUsermetadata.array()) && Arrays.equals( - Utils.readBytesFromStream(blobData.getStream(), new byte[(int) blobData.getSize()], 0, - (int) blobData.getSize()), - Utils.readBytesFromStream(oldBlobData.getStream(), new byte[(int) oldBlobData.getSize()], 0, + Utils.readBytesFromByteBuf(byteBuf, new byte[(int) blobData.getSize()], 0, (int) blobData.getSize()), + Utils.readBytesFromByteBuf(oldByteBuf, new byte[(int) oldBlobData.getSize()], 0, (int) oldBlobData.getSize())); } catch (IOException e) { asExpected = false; + } finally { + byteBuf.release(); + oldByteBuf.release(); + byteBuf = null; + oldByteBuf = null; } } return asExpected; diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java b/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java index a6fa74dee7..712664ac2f 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java @@ -37,6 +37,7 @@ import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Throttler; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -415,9 +416,10 @@ private ServerResponse getRecordFromNode(DataNodeId dataNodeId, BlobId blobId, G ServerErrorCode errorCode = response.getFirst(); if (errorCode == ServerErrorCode.No_Error) { BlobAll blobAll = response.getSecond(); - ByteBuffer buffer = blobAll.getBlobData().getStream().getByteBuffer(); - byte[] blobBytes = new byte[buffer.remaining()]; - buffer.get(blobBytes); + ByteBuf buffer = blobAll.getBlobData().getAndRelease(); + byte[] blobBytes = new byte[buffer.readableBytes()]; + buffer.readBytes(blobBytes); + buffer.release(); serverResponse = new ServerResponse(errorCode, blobAll.getStoreKey(), blobAll.getBlobInfo().getBlobProperties(), blobAll.getBlobInfo().getUserMetadata(), blobBytes, blobAll.getBlobEncryptionKey()); } else { diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java b/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java index 255c6ce201..bee47864da 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java @@ -55,12 +55,11 @@ import com.github.ambry.server.ServerErrorCode; import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.tools.util.ToolUtils; -import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Pair; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; -import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBuf; import java.io.Closeable; import java.io.DataInputStream; import java.io.File; @@ -317,7 +316,12 @@ public static void main(String[] args) throws Exception { serverAdminTool.getBlob(dataNodeId, blobId, config.getOption, clusterMap); if (bResponse.getFirst() == ServerErrorCode.No_Error) { LOGGER.info("Blob type of {} from {} is {}", blobId, dataNodeId, bResponse.getSecond().getBlobType()); - writeBufferToFile(bResponse.getSecond().getStream().getByteBuffer(), outputFileStream); + ByteBuf buffer = bResponse.getSecond().getAndRelease(); + try { + writeByteBufToFile(buffer, outputFileStream); + } finally { + buffer.release(); + } LOGGER.info("Blob data for {} from {} written to {}", blobId, dataNodeId, config.dataOutputFilePath); } else { LOGGER.error("Failed to get blob data for {} from {} with option {}. Error code is {}", blobId, dataNodeId, @@ -457,6 +461,16 @@ private static void writeBufferToFile(ByteBuffer buffer, FileOutputStream output outputFileStream.write(bytes); } + /** + * Writes the content of {@code buffer} into {@link ServerAdminToolConfig#dataOutputFilePath}. + * @param buffer the {@link ByteBuf} whose content needs to be written. + * @param outputFileStream the {@link FileOutputStream} to write to. + * @throws IOException + */ + private static void writeByteBufToFile(ByteBuf buffer, FileOutputStream outputFileStream) throws IOException { + buffer.readBytes(outputFileStream, buffer.readableBytes()); + } + /** * Sends a {@link RequestControlAdminRequest} to {@code dataNodeId} to set enable status of {@code toControl} to * {@code enable} for {@code partitionId}. diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java b/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java index 9985cf5dfa..e12bad1f8c 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java @@ -42,6 +42,7 @@ import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Throttler; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.BufferedReader; import java.io.DataInputStream; import java.io.File; @@ -260,9 +261,12 @@ public void run() { long sizeRead = 0; byte[] outputBuffer = new byte[(int) blobData.getSize()]; ByteBufferOutputStream streamOut = new ByteBufferOutputStream(ByteBuffer.wrap(outputBuffer)); - while (sizeRead < blobData.getSize()) { - streamOut.write(blobData.getStream().read()); - sizeRead++; + ByteBuf buffer = blobData.getAndRelease(); + try { + buffer.readBytes(streamOut, (int) blobData.getSize()); + } finally { + buffer.release(); + buffer = null; } long latencyPerBlob = SystemTime.getInstance().nanoseconds() - startTimeGetBlob; totalTimeTaken.addAndGet(latencyPerBlob); diff --git a/ambry-utils/src/main/java/com.github.ambry.utils/ByteBufferInputStream.java b/ambry-utils/src/main/java/com.github.ambry.utils/ByteBufferInputStream.java index 5c8712f5ea..8a3ba16fd1 100644 --- a/ambry-utils/src/main/java/com.github.ambry.utils/ByteBufferInputStream.java +++ b/ambry-utils/src/main/java/com.github.ambry.utils/ByteBufferInputStream.java @@ -42,17 +42,7 @@ public ByteBufferInputStream(ByteBuffer byteBuffer) { * @throws IOException */ public ByteBufferInputStream(InputStream stream, int size) throws IOException { - this.byteBuffer = ByteBuffer.allocate(size); - int read = 0; - ReadableByteChannel readableByteChannel = Channels.newChannel(stream); - while (read < size) { - int sizeRead = readableByteChannel.read(byteBuffer); - if (sizeRead == 0 || sizeRead == -1) { - throw new IOException("Total size read " + read + " is less than the size to be read " + size); - } - read += sizeRead; - } - byteBuffer.flip(); + this.byteBuffer = Utils.getByteBufferFromInputStream(stream, size); this.mark = -1; this.readLimit = -1; } diff --git a/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java b/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java index 088a338b94..9c19dc9fbb 100644 --- a/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java +++ b/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java @@ -254,6 +254,28 @@ public static ByteBufferInputStream getByteBufferInputStreamFromCrcInputStream(C return new ByteBufferInputStream(buffer); } + /** + * A helper method to return {@link ByteBuffer} from given {@link InputStream} at the given size. + * @param stream The {@link InputStream} to read {@link ByteBuffer} out. + * @param dataSize The size of {@link ByteBuffer}. + * @return The {@link ByteBuffer} + * @throws IOException Unexpected IO errors. + */ + public static ByteBuffer getByteBufferFromInputStream(InputStream stream, int dataSize) throws IOException { + ByteBuffer output = ByteBuffer.allocate(dataSize); + int read = 0; + ReadableByteChannel readableByteChannel = Channels.newChannel(stream); + while (read < dataSize) { + int sizeRead = readableByteChannel.read(output); + if (sizeRead == 0 || sizeRead == -1) { + throw new IOException("Total size read " + read + " is less than the size to be read " + dataSize); + } + read += sizeRead; + } + output.flip(); + return output; + } + /** * Transfer {@code dataSize} bytes of data from the given crc stream to a newly create {@link ByteBuffer}. The method * would also update the crc value in the crc stream. @@ -277,17 +299,36 @@ public static ByteBuffer readByteBufferFromCrcInputStream(CrcInputStream crcStre crcStream.updateCrc(output.duplicate()); nettyByteBuf.readerIndex(startIndex + dataSize); } else { - output = ByteBuffer.allocate(dataSize); - int read = 0; - ReadableByteChannel readableByteChannel = Channels.newChannel(crcStream); - while (read < dataSize) { - int sizeRead = readableByteChannel.read(output); - if (sizeRead == 0 || sizeRead == -1) { - throw new IOException("Total size read " + read + " is less than the size to be read " + dataSize); - } - read += sizeRead; - } - output.flip(); + output = getByteBufferFromInputStream(crcStream, dataSize); + } + return output; + } + + /** + * Transfer {@code dataSize} bytes of data from the given crc stream to a newly create {@link ByteBuf}. The method + * would also update the crc value in the crc stream. + * @param crcStream The crc stream. + * @param dataSize The number of bytes to transfer. + * @return the newly created {@link ByteBuf} which contains the transferred data. + * @throws IOException Any I/O error. + */ + public static ByteBuf readNettyByteBufFromCrcInputStream(CrcInputStream crcStream, int dataSize) throws IOException { + ByteBuf output; + InputStream inputStream = crcStream.getUnderlyingInputStream(); + if (inputStream instanceof NettyByteBufDataInputStream) { + ByteBuf nettyByteBuf = ((NettyByteBufDataInputStream) inputStream).getBuffer(); + // construct a java.nio.ByteBuffer to create a ByteBufferInputStream + int startIndex = nettyByteBuf.readerIndex(); + output = nettyByteBuf.retainedSlice(startIndex, dataSize); + crcStream.updateCrc(output.nioBuffer()); + nettyByteBuf.readerIndex(startIndex + dataSize); + } else if (inputStream instanceof ByteBufferDataInputStream) { + ByteBuffer buffer = getByteBufferFromByteBufferDataInputStream((ByteBufferDataInputStream) inputStream, dataSize); + crcStream.updateCrc(buffer.duplicate()); + output = Unpooled.wrappedBuffer(buffer); + } else { + ByteBuffer buffer = getByteBufferFromInputStream(crcStream, dataSize); + output = Unpooled.wrappedBuffer(buffer); } return output; } @@ -854,6 +895,21 @@ public static byte[] readBytesFromStream(InputStream stream, byte[] data, int of return data; } + /** + * Read "size" length of bytes from a netty {@link ByteBuf} to a byte array starting at the given offset in the byte[]. If "size" + * length of bytes can't be read because the end of the buffer has been reached, IOException is thrown. This method + * blocks until input data is available, the end of the buffer is detected, or an exception is thrown. + * @param buffer from which data to be read from + * @param data byte[] into which the data has to be written + * @param offset starting offset in the byte[] at which the data has to be written to + * @param size length of bytes to be read from the stream + * @return byte[] which has the data that is read from the buffer. Same as @param data + * @throws IOException + */ + public static byte[] readBytesFromByteBuf(ByteBuf buffer, byte[] data, int offset, int size) throws IOException { + return readBytesFromStream(new ByteBufInputStream(buffer), data, offset, size); + } + /** * Create a {@link DataInputStream} from the given buffer, which has to be either a {@link ByteBuffer} or a {@link ByteBuf}. * This is equivalent to {@link #createDataInputStreamFromBuffer(Object, boolean)}, where the {@code shareMemory} is false. diff --git a/ambry-commons/src/test/java/com.github.ambry.commons/NettyByteBufLeakHelper.java b/ambry-utils/src/test/java/com.github.ambry.utils/NettyByteBufLeakHelper.java similarity index 99% rename from ambry-commons/src/test/java/com.github.ambry.commons/NettyByteBufLeakHelper.java rename to ambry-utils/src/test/java/com.github.ambry.utils/NettyByteBufLeakHelper.java index fe30f0fa7a..14929d71dc 100644 --- a/ambry-commons/src/test/java/com.github.ambry.commons/NettyByteBufLeakHelper.java +++ b/ambry-utils/src/test/java/com.github.ambry.utils/NettyByteBufLeakHelper.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package com.github.ambry.commons; +package com.github.ambry.utils; import io.netty.buffer.PoolArenaMetric; import io.netty.buffer.PooledByteBufAllocator; diff --git a/build.gradle b/build.gradle index ec8b646e66..6be39c4646 100644 --- a/build.gradle +++ b/build.gradle @@ -89,6 +89,7 @@ subprojects { systemProperty 'io.netty.allocator.smallCacheSize', '0' systemProperty 'io.netty.allocator.normalCacheSize', '0' systemProperty 'io.netty.allocator.maxCachedBufferCapacity', '0' + systemProperty 'io.netty.leakDetection.acquireAndReleaseOnly', 'true' } task intTest(type: Test) { From cbc5dd62aa4dd43c208fa4679e09637580fb9b70 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Tue, 17 Dec 2019 10:18:21 -0800 Subject: [PATCH 2/6] fix --- .../ByteBufferAsyncWritableChannelTest.java | 1 + .../CopyingAsyncWritableChannelTest.java | 1 + 2 files changed, 2 insertions(+) diff --git a/ambry-commons/src/test/java/com.github.ambry.commons/ByteBufferAsyncWritableChannelTest.java b/ambry-commons/src/test/java/com.github.ambry.commons/ByteBufferAsyncWritableChannelTest.java index c0f926984a..6ca785d092 100644 --- a/ambry-commons/src/test/java/com.github.ambry.commons/ByteBufferAsyncWritableChannelTest.java +++ b/ambry-commons/src/test/java/com.github.ambry.commons/ByteBufferAsyncWritableChannelTest.java @@ -14,6 +14,7 @@ package com.github.ambry.commons; import com.github.ambry.router.Callback; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.Utils; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; diff --git a/ambry-commons/src/test/java/com.github.ambry.commons/CopyingAsyncWritableChannelTest.java b/ambry-commons/src/test/java/com.github.ambry.commons/CopyingAsyncWritableChannelTest.java index bb3af35ae4..8c01116b87 100644 --- a/ambry-commons/src/test/java/com.github.ambry.commons/CopyingAsyncWritableChannelTest.java +++ b/ambry-commons/src/test/java/com.github.ambry.commons/CopyingAsyncWritableChannelTest.java @@ -19,6 +19,7 @@ import com.github.ambry.rest.RestServiceException; import com.github.ambry.router.Callback; import com.github.ambry.router.FutureResult; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.TestUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; From 54f96061e67cd977c51fe794289f35272aef89fd Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Mon, 6 Jan 2020 10:44:19 -0800 Subject: [PATCH 3/6] Rebase --- .../test/java/com.github.ambry.router/CryptoServiceTest.java | 2 +- .../test/java/com.github.ambry.router/GCMCryptoServiceTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ambry-router/src/test/java/com.github.ambry.router/CryptoServiceTest.java b/ambry-router/src/test/java/com.github.ambry.router/CryptoServiceTest.java index ba9c777fb1..485848763b 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/CryptoServiceTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/CryptoServiceTest.java @@ -14,8 +14,8 @@ package com.github.ambry.router; import com.codahale.metrics.MetricRegistry; -import com.github.ambry.commons.NettyByteBufLeakHelper; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.TestUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; diff --git a/ambry-router/src/test/java/com.github.ambry.router/GCMCryptoServiceTest.java b/ambry-router/src/test/java/com.github.ambry.router/GCMCryptoServiceTest.java index bbe7e792bb..9b70732e3e 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GCMCryptoServiceTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GCMCryptoServiceTest.java @@ -14,8 +14,8 @@ package com.github.ambry.router; import com.codahale.metrics.MetricRegistry; -import com.github.ambry.commons.NettyByteBufLeakHelper; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.TestUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; From b1c48ce06ba49db5726df2446605310568477768 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Thu, 30 Jan 2020 16:20:05 -0800 Subject: [PATCH 4/6] Fix some comments --- .../BlobData.java | 5 +-- .../GetBlobOperation.java | 35 ++++++++++--------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java index 008e97050e..1d83d49a51 100644 --- a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java +++ b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java @@ -76,14 +76,15 @@ public ByteBufferInputStream getStream() { if (stream != null) { return stream; } - // The blob content is passed as a ByteBuf + // The blob content is passed as a ByteBuf since the stream is nulle if (byteBuf == null) { return null; } ByteBuffer temp = ByteBuffer.allocate(byteBuf.readableBytes()); - byteBuf.writeBytes(temp); + byteBuf.readBytes(temp); byteBuf.release(); byteBuf = null; + temp.flip(); stream = new ByteBufferInputStream(temp); return stream; } diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java index 39621b2b0d..f8ea11e3d9 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java @@ -144,12 +144,26 @@ class GetBlobOperation extends GetOperation { firstChunk = new FirstGetChunk(); } + + /** + * Release all the {@link ByteBuf} in the map. Use {@link ConcurrentHashMap#remove(Object)} method to avoid + * conflict with the release call in the chunk async callback. + */ + private void releaseResource() { + for (Integer key : chunkIndexToBuf.keySet()) { + ByteBuf byteBuf = chunkIndexToBuf.remove(key); + if (byteBuf != null) { + byteBuf.release(); + } + } + } + + /** + * Set the operation to be completed and release all the resources + */ private void setOperationCompleted() { operationCompleted = true; - if (chunkIndexToBuf != null) { - chunkIndexToBuf.values().forEach(ReferenceCountUtil::release); - chunkIndexToBuf.clear(); - } + releaseResource(); } /** @@ -463,19 +477,6 @@ void completeRead() { setOperationCompleted(); } - /** - * Release all the {@link ByteBuf} in the map. Use {@link ConcurrentHashMap#remove(Object)} method to avoid - * conflict with the release call in the chunk async callback. - */ - private void releaseResource() { - for (Integer key : chunkIndexToBuf.keySet()) { - ByteBuf byteBuf = chunkIndexToBuf.remove(key); - if (byteBuf != null) { - byteBuf.release(); - } - } - } - /** * Update chunking and size related metrics - blob size, chunk count, and whether the blob is simple or composite. */ From 0adb3fb62688d1c1e270e43a28a4f3293b549e26 Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Thu, 30 Jan 2020 16:32:42 -0800 Subject: [PATCH 5/6] Fix test error --- .../main/java/com.github.ambry.router/GetBlobOperation.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java index f8ea11e3d9..1175ca9a9a 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java @@ -150,6 +150,9 @@ class GetBlobOperation extends GetOperation { * conflict with the release call in the chunk async callback. */ private void releaseResource() { + if (chunkIndexToBuf == null) { + return; + } for (Integer key : chunkIndexToBuf.keySet()) { ByteBuf byteBuf = chunkIndexToBuf.remove(key); if (byteBuf != null) { From 81fb3a6b7e9334d69beaa133a587207078e92e9f Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Thu, 30 Jan 2020 21:39:55 -0800 Subject: [PATCH 6/6] Comments --- .../main/java/com.github.ambry.messageformat/BlobData.java | 2 +- .../main/java/com.github.ambry.router/GetBlobOperation.java | 1 - .../java/com.github.ambry.router/GetBlobOperationTest.java | 1 - ambry-utils/src/main/java/com.github.ambry.utils/Utils.java | 5 +++-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java index 1d83d49a51..fdb0c11e0e 100644 --- a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java +++ b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java @@ -90,7 +90,7 @@ public ByteBufferInputStream getStream() { } /** - * Return the netty {@link ByteBuf} and then transfer the ownship to the caller. It's not safe + * Return the netty {@link ByteBuf} and then transfer the ownership to the caller. It's not safe * to call this method more than once. */ public ByteBuf getAndRelease() { diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java index 1175ca9a9a..1dd29b3f08 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java @@ -44,7 +44,6 @@ import com.github.ambry.utils.Time; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.util.ReferenceCountUtil; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java index 2bcef8fde3..54be311bd9 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java @@ -1064,7 +1064,6 @@ public void testRangeRequestCompositeBlob() throws Exception { public void testEarlyReadableStreamChannelClose() throws Exception { for (int numChunksInBlob = 0; numChunksInBlob <= 4; numChunksInBlob++) { for (int numChunksToRead = 0; numChunksToRead < numChunksInBlob; numChunksToRead++) { - System.out.println("" + numChunksInBlob + " chunks but only read " + numChunksToRead); testEarlyReadableStreamChannelClose(numChunksInBlob, numChunksToRead); } } diff --git a/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java b/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java index 9c19dc9fbb..e3400b1bb5 100644 --- a/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java +++ b/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java @@ -261,7 +261,7 @@ public static ByteBufferInputStream getByteBufferInputStreamFromCrcInputStream(C * @return The {@link ByteBuffer} * @throws IOException Unexpected IO errors. */ - public static ByteBuffer getByteBufferFromInputStream(InputStream stream, int dataSize) throws IOException { + public static ByteBuffer getByteBufferFromInputStream(InputStream stream, int dataSize) throws IOException { ByteBuffer output = ByteBuffer.allocate(dataSize); int read = 0; ReadableByteChannel readableByteChannel = Channels.newChannel(stream); @@ -907,7 +907,8 @@ public static byte[] readBytesFromStream(InputStream stream, byte[] data, int of * @throws IOException */ public static byte[] readBytesFromByteBuf(ByteBuf buffer, byte[] data, int offset, int size) throws IOException { - return readBytesFromStream(new ByteBufInputStream(buffer), data, offset, size); + buffer.readBytes(data, offset, size); + return data; } /**