Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NettyByteBuf to GetBlobOperation #1314

Merged
merged 6 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
public class ResponseInfo {
private final RequestInfo requestInfo;
private final NetworkClientErrorCode error;
private final Object response;
private final DataNodeId dataNode;
private Object response;

/**
* Constructs a ResponseInfo with the given parameters.
Expand Down Expand Up @@ -68,21 +68,13 @@ public Object getResponse() {
return response;
}

/**
* Increase the reference count of underlying response.
*/
public void retain() {
if (response != null) {
ReferenceCountUtil.retain(response);
}
}

/**
* Decrease the reference count of underlying response.
*/
public void release() {
if (response != null) {
ReferenceCountUtil.release(response);
response = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this only be set to null when the refcount reaches zero?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that we remove the retain function, we can be sure that we only need to release it once.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.ambry.commons;

import com.github.ambry.router.Callback;
import com.github.ambry.utils.NettyByteBufLeakHelper;
import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.ambry.rest.RestServiceException;
import com.github.ambry.router.Callback;
import com.github.ambry.router.FutureResult;
import com.github.ambry.utils.NettyByteBufLeakHelper;
import com.github.ambry.utils.TestUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package com.github.ambry.messageformat;

import com.github.ambry.utils.ByteBufferInputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;


/**
Expand All @@ -22,17 +25,32 @@
public class BlobData {
private final BlobType blobType;
private final long size;
private final ByteBufferInputStream stream;
private ByteBuf byteBuf;
private ByteBufferInputStream stream = null;

/**
* The blob data contains the stream and other required info
* @param blobType {@link BlobType} of the blob
* @param size The size of the blob content.
* @param byteBuf The content of this blob in a {@link ByteBuf}.
*/
public BlobData(BlobType blobType, long size, ByteBuf byteBuf) {
this.blobType = blobType;
this.size = size;
this.byteBuf = byteBuf;
}

/**
* The blob data contains the stream and other required info
* @param blobType {@link BlobType} of the blob
* @param size The size of the blob content.
* @param stream The {@link ByteBufferInputStream} containing the blob content.
*/
@Deprecated
public BlobData(BlobType blobType, long size, ByteBufferInputStream stream) {
this.blobType = blobType;
this.size = size;
this.byteBuf = Unpooled.wrappedBuffer(stream.getByteBuffer());
this.stream = stream;
}

Expand All @@ -53,7 +71,37 @@ public long getSize() {
/**
* @return the {@link ByteBufferInputStream} containing the blob content.
*/
@Deprecated
public ByteBufferInputStream getStream() {
if (stream != null) {
return stream;
}
// The blob content is passed as a ByteBuf since the stream is nulle
if (byteBuf == null) {
return null;
}
ByteBuffer temp = ByteBuffer.allocate(byteBuf.readableBytes());
byteBuf.readBytes(temp);
byteBuf.release();
byteBuf = null;
temp.flip();
stream = new ByteBufferInputStream(temp);
return stream;
}

/**
* Return the netty {@link ByteBuf} and then transfer the ownership to the caller. It's not safe
* to call this method more than once.
*/
public ByteBuf getAndRelease() {
if (byteBuf == null) {
return null;
}
try {
return byteBuf.retainedDuplicate();
} finally {
byteBuf.release();
byteBuf = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import com.github.ambry.store.StoreKey;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.ByteBufferDataInputStream;
import com.github.ambry.utils.Crc32;
import com.github.ambry.utils.CrcInputStream;
import com.github.ambry.utils.NettyByteBufDataInputStream;
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -243,7 +245,8 @@ static boolean isValidBlobRecordVersion(short blobRecordVersion) {
*/
public static BlobAll deserializeBlobAll(InputStream stream, StoreKeyFactory storeKeyFactory)
throws IOException, MessageFormatException {
DataInputStream inputStream = new DataInputStream(stream);
DataInputStream inputStream =
stream instanceof DataInputStream ? (DataInputStream) stream : new DataInputStream(stream);
short headerVersion = inputStream.readShort();
ByteBuffer headerBuf;
MessageHeader_Format header;
Expand Down Expand Up @@ -277,7 +280,7 @@ public static BlobAll deserializeBlobAll(InputStream stream, StoreKeyFactory sto
MessageFormatErrorCodes.Unknown_Format_Version);
}
header.verifyHeader();
StoreKey storeKey = storeKeyFactory.getStoreKey(new DataInputStream(stream));
StoreKey storeKey = storeKeyFactory.getStoreKey(inputStream);
ByteBuffer blobEncryptionKey = null;
if (header.hasEncryptionKeyRecord()) {
blobEncryptionKey = deserializeBlobEncryptionKey(stream);
Expand Down Expand Up @@ -1671,15 +1674,15 @@ public static BlobData deserializeBlobRecord(CrcInputStream crcStream) throws IO
if (dataSize > Integer.MAX_VALUE) {
throw new IOException("We only support data of max size == MAX_INT. Error while reading blob from store");
}
ByteBufferInputStream output = Utils.getByteBufferInputStreamFromCrcInputStream(crcStream, (int) dataSize);
ByteBuf byteBuf = Utils.readNettyByteBufFromCrcInputStream(crcStream, (int) dataSize);
long crc = crcStream.getValue();
long streamCrc = dataStream.readLong();
if (crc != streamCrc) {
logger.error("corrupt data while parsing blob content expectedcrc {} actualcrc {}", crc, streamCrc);
throw new MessageFormatException("corrupt data while parsing blob content",
MessageFormatErrorCodes.Data_Corrupt);
}
return new BlobData(BlobType.DataBlob, dataSize, output);
return new BlobData(BlobType.DataBlob, dataSize, byteBuf);
}
}

Expand Down Expand Up @@ -1729,15 +1732,15 @@ public static BlobData deserializeBlobRecord(CrcInputStream crcStream) throws IO
if (dataSize > Integer.MAX_VALUE) {
throw new IOException("We only support data of max size == MAX_INT. Error while reading blob from store");
}
ByteBufferInputStream output = Utils.getByteBufferInputStreamFromCrcInputStream(crcStream, (int) dataSize);
ByteBuf byteBuf = Utils.readNettyByteBufFromCrcInputStream(crcStream, (int) dataSize);
long crc = crcStream.getValue();
long streamCrc = dataStream.readLong();
if (crc != streamCrc) {
logger.error("corrupt data while parsing blob content expectedcrc {} actualcrc {}", crc, streamCrc);
throw new MessageFormatException("corrupt data while parsing blob content",
MessageFormatErrorCodes.Data_Corrupt);
}
return new BlobData(blobContentType, dataSize, output);
return new BlobData(blobContentType, dataSize, byteBuf);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.store.TransformationOutput;
import com.github.ambry.store.Transformer;
import io.netty.buffer.ByteBufInputStream;
import java.io.DataInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -72,9 +73,12 @@ public TransformationOutput transform(Message message) {
throw new IllegalStateException("Message cannot be a deleted record ");
}
if (msgInfo.getStoreKey().equals(keyInStream)) {
// ValidatingTransformer only exists on ambry-server and we don't enable netty on ambry server yet. So And blobData.getAndRelease
// will return an Unpooled ByteBuf, it's not not to release it.
// @todo, when enabling netty in ambry-server, release this ByteBuf.
PutMessageFormatInputStream transformedStream =
new PutMessageFormatInputStream(keyInStream, encryptionKey, props, metadata, blobData.getStream(),
blobData.getSize(), blobData.getBlobType());
new PutMessageFormatInputStream(keyInStream, encryptionKey, props, metadata,
new ByteBufInputStream(blobData.getAndRelease(), true), blobData.getSize(), blobData.getBlobType());
MessageInfo transformedMsgInfo =
new MessageInfo(keyInStream, transformedStream.getSize(), msgInfo.isDeleted(), msgInfo.isTtlUpdated(),
msgInfo.getExpirationTimeInMs(), msgInfo.getCrc(), msgInfo.getAccountId(), msgInfo.getContainerId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.Crc32;
import com.github.ambry.utils.CrcInputStream;
import com.github.ambry.utils.NettyByteBufLeakHelper;
import com.github.ambry.utils.SystemTime;
import com.github.ambry.utils.TestUtils;
import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Random;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;


public class MessageFormatInputStreamTest {
private static short messageFormatHeaderVersionSaved;
private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper();

@BeforeClass
public static void saveMessageFormatHeaderVersionToUse() {
Expand All @@ -49,6 +54,16 @@ public void resetMessageFormatHeaderVersionToUse() {
MessageFormatRecord.headerVersionToUse = messageFormatHeaderVersionSaved;
}

@Before
public void before() {
nettyByteBufLeakHelper.beforeTest();
}

@After
public void after() {
nettyByteBufLeakHelper.afterTest();
}

/**
* Tests for {@link PutMessageFormatInputStream} in different versions.
*/
Expand Down Expand Up @@ -240,7 +255,12 @@ private void messageFormatPutRecordsTest(short blobVersion, BlobType blobType, s
} else {
Assert.assertEquals(null, blobAll.getBlobEncryptionKey());
}
Assert.assertEquals(ByteBuffer.wrap(data), blobAll.getBlobData().getStream().getByteBuffer());
ByteBuf byteBuf = blobAll.getBlobData().getAndRelease();
try {
Assert.assertEquals(Unpooled.wrappedBuffer(data), byteBuf);
} finally {
byteBuf.release();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void deserializeTest() throws MessageFormatException, IOException {
BlobData blobData = MessageFormatRecord.deserializeBlob(new ByteBufferInputStream(sData));
Assert.assertEquals(blobData.getSize(), 2000);
byte[] verify = new byte[2000];
blobData.getStream().read(verify);
blobData.getAndRelease().readBytes(verify);
Assert.assertArrayEquals(verify, data.array());

// corrupt blob record V1
Expand Down Expand Up @@ -640,7 +640,7 @@ private void testBlobRecordV2(int blobSize, BlobType blobType) throws IOExceptio
BlobData blobData = getBlobRecordV2(blobSize, blobType, blobContent, entireBlob);
Assert.assertEquals("Blob size mismatch", blobSize, blobData.getSize());
byte[] verify = new byte[blobSize];
blobData.getStream().read(verify);
blobData.getAndRelease().readBytes(verify);
Assert.assertArrayEquals("BlobContent mismatch", blobContent.array(), verify);

// corrupt blob record V2
Expand Down Expand Up @@ -694,7 +694,7 @@ public void testBlobRecordWithMetadataContentV2() throws IOException, MessageFor

Assert.assertEquals(metadataContentSize, blobData.getSize());
byte[] verify = new byte[metadataContentSize];
blobData.getStream().read(verify);
blobData.getAndRelease().readBytes(verify);
Assert.assertArrayEquals("Metadata content mismatch", metadataContent.array(), verify);

// deserialize and check for metadata contents
Expand Down Expand Up @@ -728,7 +728,7 @@ public void testBlobRecordWithMetadataContentV3() throws IOException, MessageFor
BlobData blobData = getBlobRecordV2(metadataContentSize, BlobType.MetadataBlob, metadataContent, blob);
Assert.assertEquals(metadataContentSize, blobData.getSize());
byte[] verify = new byte[metadataContentSize];
blobData.getStream().read(verify);
blobData.getAndRelease().readBytes(verify);
Assert.assertArrayEquals("Metadata content mismatch", metadataContent.array(), verify);

metadataContent.rewind();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ private void doSendWriteCompositeMessagesTest(byte[][] blob, byte[][] userMetada
Assert.assertEquals(BlobType.DataBlob, deserializedBlob.getBlobData().getBlobType());
Assert.assertEquals(blob[i].length, deserializedBlob.getBlobData().getSize());
byte[] readBlob = new byte[blob[i].length];
deserializedBlob.getBlobData().getStream().read(readBlob);
deserializedBlob.getBlobData().getAndRelease().readBytes(readBlob);
Assert.assertArrayEquals(blob[i], readBlob);

if (headerVersions[i] == MessageFormatRecord.Message_Header_Version_V1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import com.github.ambry.store.TransformationOutput;
import com.github.ambry.store.Transformer;
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.NettyByteBufLeakHelper;
import com.github.ambry.utils.SystemTime;
import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -40,6 +44,7 @@
import java.util.Map;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -67,6 +72,7 @@ public static List<Object[]> data() {
private final EnumSet<TransformerOptions> options;
private final StoreKeyFactory storeKeyFactory;
private final RandomKeyConverter randomKeyConverter;
private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper();

@BeforeClass
public static void saveMessageFormatHeaderVersionToUse() {
Expand All @@ -78,6 +84,16 @@ public void resetMessageFormatHeaderVersionToUse() {
MessageFormatRecord.headerVersionToUse = messageFormatHeaderVersionSaved;
}

@Before
public void before() {
nettyByteBufLeakHelper.beforeTest();
}

@After
public void after() {
nettyByteBufLeakHelper.afterTest();
}

public MessageSievingInputStreamTest(EnumSet<TransformerOptions> options) throws Exception {
this.options = options;
this.storeKeyFactory = new MockIdFactory();
Expand Down Expand Up @@ -960,8 +976,13 @@ private void verifySievedTransformedMessage(MessageSievingInputStream sievedStre
Assert.assertEquals(accountId, propsFromStream.getAccountId());
Assert.assertEquals(containerId, propsFromStream.getContainerId());
Assert.assertEquals(ByteBuffer.wrap(usermetadata), userMetadataFromStream);
Assert.assertEquals(ByteBuffer.wrap(data), blobDataFromStream.getStream().getByteBuffer());
Assert.assertEquals(blobType, blobDataFromStream.getBlobType());
ByteBuf byteBuf = blobDataFromStream.getAndRelease();
try {
Assert.assertEquals(Unpooled.wrappedBuffer(data), byteBuf);
} finally {
byteBuf.release();
}
}
}

Expand Down Expand Up @@ -1064,8 +1085,8 @@ public TransformationOutput transform(Message message) {
} else {
MessageInfo transformedMsgInfo;
PutMessageFormatInputStream transformedStream =
new PutMessageFormatInputStream(newKey, encryptionKey, props, metadata, blobData.getStream(),
blobData.getSize(), blobData.getBlobType());
new PutMessageFormatInputStream(newKey, encryptionKey, props, metadata,
new ByteBufInputStream(blobData.getAndRelease(), true), blobData.getSize(), blobData.getBlobType());
transformedMsgInfo =
new MessageInfo(newKey, transformedStream.getSize(), msgInfo.isDeleted(), msgInfo.isTtlUpdated(),
msgInfo.getExpirationTimeInMs(), msgInfo.getCrc(), msgInfo.getAccountId(), msgInfo.getContainerId(),
Expand Down
Loading