Skip to content

Commit

Permalink
Add NettyByteBuf to GetBlobOperation (#1314)
Browse files Browse the repository at this point in the history
This starts to use Netty ByteBuf in GetBlobOperation.

This PR seeks the least intrusive way to introduce Netty ByteBuf to GetBlobOperation, so it ignores a lot of places where the ByteBuffer can be replaced by Netty ByteBuf.

The major change is to change the buffer/inputstream in BlobData to use Netty ByteBuf.
  • Loading branch information
justinlin-linkedin authored Jan 31, 2020
1 parent 4e2b8dd commit 3cdb994
Show file tree
Hide file tree
Showing 34 changed files with 547 additions and 330 deletions.
12 changes: 2 additions & 10 deletions ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
public class ResponseInfo {
private final RequestInfo requestInfo;
private final NetworkClientErrorCode error;
private final Object response;
private final DataNodeId dataNode;
private Object response;

/**
* Constructs a ResponseInfo with the given parameters.
Expand Down Expand Up @@ -68,21 +68,13 @@ public Object getResponse() {
return response;
}

/**
* Increase the reference count of underlying response.
*/
public void retain() {
if (response != null) {
ReferenceCountUtil.retain(response);
}
}

/**
* Decrease the reference count of underlying response.
*/
public void release() {
if (response != null) {
ReferenceCountUtil.release(response);
response = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.ambry.commons;

import com.github.ambry.router.Callback;
import com.github.ambry.utils.NettyByteBufLeakHelper;
import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.ambry.rest.RestServiceException;
import com.github.ambry.router.Callback;
import com.github.ambry.router.FutureResult;
import com.github.ambry.utils.NettyByteBufLeakHelper;
import com.github.ambry.utils.TestUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package com.github.ambry.messageformat;

import com.github.ambry.utils.ByteBufferInputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;


/**
Expand All @@ -22,17 +25,32 @@
public class BlobData {
private final BlobType blobType;
private final long size;
private final ByteBufferInputStream stream;
private ByteBuf byteBuf;
private ByteBufferInputStream stream = null;

/**
* The blob data contains the stream and other required info
* @param blobType {@link BlobType} of the blob
* @param size The size of the blob content.
* @param byteBuf The content of this blob in a {@link ByteBuf}.
*/
public BlobData(BlobType blobType, long size, ByteBuf byteBuf) {
this.blobType = blobType;
this.size = size;
this.byteBuf = byteBuf;
}

/**
* The blob data contains the stream and other required info
* @param blobType {@link BlobType} of the blob
* @param size The size of the blob content.
* @param stream The {@link ByteBufferInputStream} containing the blob content.
*/
@Deprecated
public BlobData(BlobType blobType, long size, ByteBufferInputStream stream) {
this.blobType = blobType;
this.size = size;
this.byteBuf = Unpooled.wrappedBuffer(stream.getByteBuffer());
this.stream = stream;
}

Expand All @@ -53,7 +71,37 @@ public long getSize() {
/**
* @return the {@link ByteBufferInputStream} containing the blob content.
*/
@Deprecated
public ByteBufferInputStream getStream() {
if (stream != null) {
return stream;
}
// The blob content is passed as a ByteBuf since the stream is nulle
if (byteBuf == null) {
return null;
}
ByteBuffer temp = ByteBuffer.allocate(byteBuf.readableBytes());
byteBuf.readBytes(temp);
byteBuf.release();
byteBuf = null;
temp.flip();
stream = new ByteBufferInputStream(temp);
return stream;
}

/**
* Return the netty {@link ByteBuf} and then transfer the ownership to the caller. It's not safe
* to call this method more than once.
*/
public ByteBuf getAndRelease() {
if (byteBuf == null) {
return null;
}
try {
return byteBuf.retainedDuplicate();
} finally {
byteBuf.release();
byteBuf = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import com.github.ambry.store.StoreKey;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.ByteBufferDataInputStream;
import com.github.ambry.utils.Crc32;
import com.github.ambry.utils.CrcInputStream;
import com.github.ambry.utils.NettyByteBufDataInputStream;
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -243,7 +245,8 @@ static boolean isValidBlobRecordVersion(short blobRecordVersion) {
*/
public static BlobAll deserializeBlobAll(InputStream stream, StoreKeyFactory storeKeyFactory)
throws IOException, MessageFormatException {
DataInputStream inputStream = new DataInputStream(stream);
DataInputStream inputStream =
stream instanceof DataInputStream ? (DataInputStream) stream : new DataInputStream(stream);
short headerVersion = inputStream.readShort();
ByteBuffer headerBuf;
MessageHeader_Format header;
Expand Down Expand Up @@ -277,7 +280,7 @@ public static BlobAll deserializeBlobAll(InputStream stream, StoreKeyFactory sto
MessageFormatErrorCodes.Unknown_Format_Version);
}
header.verifyHeader();
StoreKey storeKey = storeKeyFactory.getStoreKey(new DataInputStream(stream));
StoreKey storeKey = storeKeyFactory.getStoreKey(inputStream);
ByteBuffer blobEncryptionKey = null;
if (header.hasEncryptionKeyRecord()) {
blobEncryptionKey = deserializeBlobEncryptionKey(stream);
Expand Down Expand Up @@ -1671,15 +1674,15 @@ public static BlobData deserializeBlobRecord(CrcInputStream crcStream) throws IO
if (dataSize > Integer.MAX_VALUE) {
throw new IOException("We only support data of max size == MAX_INT. Error while reading blob from store");
}
ByteBufferInputStream output = Utils.getByteBufferInputStreamFromCrcInputStream(crcStream, (int) dataSize);
ByteBuf byteBuf = Utils.readNettyByteBufFromCrcInputStream(crcStream, (int) dataSize);
long crc = crcStream.getValue();
long streamCrc = dataStream.readLong();
if (crc != streamCrc) {
logger.error("corrupt data while parsing blob content expectedcrc {} actualcrc {}", crc, streamCrc);
throw new MessageFormatException("corrupt data while parsing blob content",
MessageFormatErrorCodes.Data_Corrupt);
}
return new BlobData(BlobType.DataBlob, dataSize, output);
return new BlobData(BlobType.DataBlob, dataSize, byteBuf);
}
}

Expand Down Expand Up @@ -1729,15 +1732,15 @@ public static BlobData deserializeBlobRecord(CrcInputStream crcStream) throws IO
if (dataSize > Integer.MAX_VALUE) {
throw new IOException("We only support data of max size == MAX_INT. Error while reading blob from store");
}
ByteBufferInputStream output = Utils.getByteBufferInputStreamFromCrcInputStream(crcStream, (int) dataSize);
ByteBuf byteBuf = Utils.readNettyByteBufFromCrcInputStream(crcStream, (int) dataSize);
long crc = crcStream.getValue();
long streamCrc = dataStream.readLong();
if (crc != streamCrc) {
logger.error("corrupt data while parsing blob content expectedcrc {} actualcrc {}", crc, streamCrc);
throw new MessageFormatException("corrupt data while parsing blob content",
MessageFormatErrorCodes.Data_Corrupt);
}
return new BlobData(blobContentType, dataSize, output);
return new BlobData(blobContentType, dataSize, byteBuf);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.store.TransformationOutput;
import com.github.ambry.store.Transformer;
import io.netty.buffer.ByteBufInputStream;
import java.io.DataInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -72,9 +73,12 @@ public TransformationOutput transform(Message message) {
throw new IllegalStateException("Message cannot be a deleted record ");
}
if (msgInfo.getStoreKey().equals(keyInStream)) {
// ValidatingTransformer only exists on ambry-server and we don't enable netty on ambry server yet. So And blobData.getAndRelease
// will return an Unpooled ByteBuf, it's not not to release it.
// @todo, when enabling netty in ambry-server, release this ByteBuf.
PutMessageFormatInputStream transformedStream =
new PutMessageFormatInputStream(keyInStream, encryptionKey, props, metadata, blobData.getStream(),
blobData.getSize(), blobData.getBlobType());
new PutMessageFormatInputStream(keyInStream, encryptionKey, props, metadata,
new ByteBufInputStream(blobData.getAndRelease(), true), blobData.getSize(), blobData.getBlobType());
MessageInfo transformedMsgInfo =
new MessageInfo(keyInStream, transformedStream.getSize(), msgInfo.isDeleted(), msgInfo.isTtlUpdated(),
msgInfo.getExpirationTimeInMs(), msgInfo.getCrc(), msgInfo.getAccountId(), msgInfo.getContainerId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.Crc32;
import com.github.ambry.utils.CrcInputStream;
import com.github.ambry.utils.NettyByteBufLeakHelper;
import com.github.ambry.utils.SystemTime;
import com.github.ambry.utils.TestUtils;
import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Random;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;


public class MessageFormatInputStreamTest {
private static short messageFormatHeaderVersionSaved;
private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper();

@BeforeClass
public static void saveMessageFormatHeaderVersionToUse() {
Expand All @@ -49,6 +54,16 @@ public void resetMessageFormatHeaderVersionToUse() {
MessageFormatRecord.headerVersionToUse = messageFormatHeaderVersionSaved;
}

@Before
public void before() {
nettyByteBufLeakHelper.beforeTest();
}

@After
public void after() {
nettyByteBufLeakHelper.afterTest();
}

/**
* Tests for {@link PutMessageFormatInputStream} in different versions.
*/
Expand Down Expand Up @@ -240,7 +255,12 @@ private void messageFormatPutRecordsTest(short blobVersion, BlobType blobType, s
} else {
Assert.assertEquals(null, blobAll.getBlobEncryptionKey());
}
Assert.assertEquals(ByteBuffer.wrap(data), blobAll.getBlobData().getStream().getByteBuffer());
ByteBuf byteBuf = blobAll.getBlobData().getAndRelease();
try {
Assert.assertEquals(Unpooled.wrappedBuffer(data), byteBuf);
} finally {
byteBuf.release();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void deserializeTest() throws MessageFormatException, IOException {
BlobData blobData = MessageFormatRecord.deserializeBlob(new ByteBufferInputStream(sData));
Assert.assertEquals(blobData.getSize(), 2000);
byte[] verify = new byte[2000];
blobData.getStream().read(verify);
blobData.getAndRelease().readBytes(verify);
Assert.assertArrayEquals(verify, data.array());

// corrupt blob record V1
Expand Down Expand Up @@ -640,7 +640,7 @@ private void testBlobRecordV2(int blobSize, BlobType blobType) throws IOExceptio
BlobData blobData = getBlobRecordV2(blobSize, blobType, blobContent, entireBlob);
Assert.assertEquals("Blob size mismatch", blobSize, blobData.getSize());
byte[] verify = new byte[blobSize];
blobData.getStream().read(verify);
blobData.getAndRelease().readBytes(verify);
Assert.assertArrayEquals("BlobContent mismatch", blobContent.array(), verify);

// corrupt blob record V2
Expand Down Expand Up @@ -694,7 +694,7 @@ public void testBlobRecordWithMetadataContentV2() throws IOException, MessageFor

Assert.assertEquals(metadataContentSize, blobData.getSize());
byte[] verify = new byte[metadataContentSize];
blobData.getStream().read(verify);
blobData.getAndRelease().readBytes(verify);
Assert.assertArrayEquals("Metadata content mismatch", metadataContent.array(), verify);

// deserialize and check for metadata contents
Expand Down Expand Up @@ -728,7 +728,7 @@ public void testBlobRecordWithMetadataContentV3() throws IOException, MessageFor
BlobData blobData = getBlobRecordV2(metadataContentSize, BlobType.MetadataBlob, metadataContent, blob);
Assert.assertEquals(metadataContentSize, blobData.getSize());
byte[] verify = new byte[metadataContentSize];
blobData.getStream().read(verify);
blobData.getAndRelease().readBytes(verify);
Assert.assertArrayEquals("Metadata content mismatch", metadataContent.array(), verify);

metadataContent.rewind();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ private void doSendWriteCompositeMessagesTest(byte[][] blob, byte[][] userMetada
Assert.assertEquals(BlobType.DataBlob, deserializedBlob.getBlobData().getBlobType());
Assert.assertEquals(blob[i].length, deserializedBlob.getBlobData().getSize());
byte[] readBlob = new byte[blob[i].length];
deserializedBlob.getBlobData().getStream().read(readBlob);
deserializedBlob.getBlobData().getAndRelease().readBytes(readBlob);
Assert.assertArrayEquals(blob[i], readBlob);

if (headerVersions[i] == MessageFormatRecord.Message_Header_Version_V1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import com.github.ambry.store.TransformationOutput;
import com.github.ambry.store.Transformer;
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.NettyByteBufLeakHelper;
import com.github.ambry.utils.SystemTime;
import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -40,6 +44,7 @@
import java.util.Map;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -67,6 +72,7 @@ public static List<Object[]> data() {
private final EnumSet<TransformerOptions> options;
private final StoreKeyFactory storeKeyFactory;
private final RandomKeyConverter randomKeyConverter;
private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper();

@BeforeClass
public static void saveMessageFormatHeaderVersionToUse() {
Expand All @@ -78,6 +84,16 @@ public void resetMessageFormatHeaderVersionToUse() {
MessageFormatRecord.headerVersionToUse = messageFormatHeaderVersionSaved;
}

@Before
public void before() {
nettyByteBufLeakHelper.beforeTest();
}

@After
public void after() {
nettyByteBufLeakHelper.afterTest();
}

public MessageSievingInputStreamTest(EnumSet<TransformerOptions> options) throws Exception {
this.options = options;
this.storeKeyFactory = new MockIdFactory();
Expand Down Expand Up @@ -960,8 +976,13 @@ private void verifySievedTransformedMessage(MessageSievingInputStream sievedStre
Assert.assertEquals(accountId, propsFromStream.getAccountId());
Assert.assertEquals(containerId, propsFromStream.getContainerId());
Assert.assertEquals(ByteBuffer.wrap(usermetadata), userMetadataFromStream);
Assert.assertEquals(ByteBuffer.wrap(data), blobDataFromStream.getStream().getByteBuffer());
Assert.assertEquals(blobType, blobDataFromStream.getBlobType());
ByteBuf byteBuf = blobDataFromStream.getAndRelease();
try {
Assert.assertEquals(Unpooled.wrappedBuffer(data), byteBuf);
} finally {
byteBuf.release();
}
}
}

Expand Down Expand Up @@ -1064,8 +1085,8 @@ public TransformationOutput transform(Message message) {
} else {
MessageInfo transformedMsgInfo;
PutMessageFormatInputStream transformedStream =
new PutMessageFormatInputStream(newKey, encryptionKey, props, metadata, blobData.getStream(),
blobData.getSize(), blobData.getBlobType());
new PutMessageFormatInputStream(newKey, encryptionKey, props, metadata,
new ByteBufInputStream(blobData.getAndRelease(), true), blobData.getSize(), blobData.getBlobType());
transformedMsgInfo =
new MessageInfo(newKey, transformedStream.getSize(), msgInfo.isDeleted(), msgInfo.isTtlUpdated(),
msgInfo.getExpirationTimeInMs(), msgInfo.getCrc(), msgInfo.getAccountId(), msgInfo.getContainerId(),
Expand Down
Loading

0 comments on commit 3cdb994

Please sign in to comment.