diff --git a/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java b/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java index bd5192fee8..3ec83bb721 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java @@ -32,8 +32,6 @@ public class NetworkConfig { public static final String SELECTOR_EXECUTOR_POOL_SIZE = "selector.executor.pool.size"; public static final String SELECTOR_MAX_KEY_TO_PROCESS = "selector.max.key.to.process"; public static final String SELECTOR_USE_DIRECT_BUFFERS = "selector.use.direct.buffers"; - public static final String NETWORK_USE_NETTY_BYTE_BUF = "network.use.netty.byte.buf"; - public static final String NETWORK_PUT_REQUEST_SHARE_MEMORY = "network.put.request.share.memory"; /** * The number of io threads that the server uses for carrying out network requests @@ -121,14 +119,6 @@ public class NetworkConfig { @Default("false") public final boolean selectorUseDirectBuffers; - @Config(NETWORK_USE_NETTY_BYTE_BUF) - @Default("false") - public final boolean networkUseNettyByteBuf; - - @Config(NETWORK_PUT_REQUEST_SHARE_MEMORY) - @Default("false") - public final boolean networkPutRequestShareMemory; - public NetworkConfig(VerifiableProperties verifiableProperties) { numIoThreads = verifiableProperties.getIntInRange(NUM_IO_THREADS, 8, 1, Integer.MAX_VALUE); queuedMaxRequests = verifiableProperties.getIntInRange(QUEUED_MAX_REQUESTS, 500, 1, Integer.MAX_VALUE); @@ -147,7 +137,5 @@ public NetworkConfig(VerifiableProperties verifiableProperties) { selectorMaxKeyToProcess = verifiableProperties.getIntInRange(SELECTOR_MAX_KEY_TO_PROCESS, -1, -1, Integer.MAX_VALUE); selectorUseDirectBuffers = verifiableProperties.getBoolean(SELECTOR_USE_DIRECT_BUFFERS, false); - networkUseNettyByteBuf = verifiableProperties.getBoolean(NETWORK_USE_NETTY_BYTE_BUF, false); - networkPutRequestShareMemory = verifiableProperties.getBoolean(NETWORK_PUT_REQUEST_SHARE_MEMORY, false); } } diff --git a/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java b/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java index 6eb79742e7..5482c6379a 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java @@ -94,7 +94,6 @@ public class RouterConfig { "router.operation.tracker.histogram.cache.timeout.ms"; public static final String ROUTER_MAX_IN_MEM_PUT_CHUNKS = "router.max.in.mem.put.chunks"; public static final String ROUTER_MAX_IN_MEM_GET_CHUNKS = "router.max.in.mem.get.chunks"; - public static final String ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY = "router.get.blob.operation.share.memory"; public static final String ROUTER_GET_ELIGIBLE_REPLICAS_BY_STATE_ENABLED = "router.get.eligible.replicas.by.state.enabled"; public static final String ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET = "router.put.use.dynamic.success.target"; @@ -440,13 +439,6 @@ public class RouterConfig { @Default("4") public final int routerMaxInMemGetChunks; - /** - * If {@code true}, the blob data shares memory with networking buffer in GetBlobOperation - */ - @Config(ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY) - @Default("false") - public final boolean routerGetBlobOperationShareMemory; - /** * if {@code true}, operation tracker will get replicas in required states based on the type of operation. This helps * dynamically manage replicas in cluster (i.e. add/remove/move replicas) without restarting frontends. @@ -558,7 +550,6 @@ public RouterConfig(VerifiableProperties verifiableProperties) { Integer.MAX_VALUE / routerMaxPutChunkSizeBytes); routerMaxInMemGetChunks = verifiableProperties.getIntInRange(ROUTER_MAX_IN_MEM_GET_CHUNKS, 4, 1, Integer.MAX_VALUE / routerMaxPutChunkSizeBytes); - routerGetBlobOperationShareMemory = verifiableProperties.getBoolean(ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY, false); routerGetEligibleReplicasByStateEnabled = verifiableProperties.getBoolean(ROUTER_GET_ELIGIBLE_REPLICAS_BY_STATE_ENABLED, false); routerPutUseDynamicSuccessTarget = verifiableProperties.getBoolean(ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET, false); diff --git a/ambry-api/src/main/java/com.github.ambry/network/BoundedByteBufferReceive.java b/ambry-api/src/main/java/com.github.ambry/network/BoundedByteBufferReceive.java deleted file mode 100644 index 66964ec7ea..0000000000 --- a/ambry-api/src/main/java/com.github.ambry/network/BoundedByteBufferReceive.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Copyright 2016 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.network; - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * A byte buffer version of Receive to buffer the incoming request or response. - */ -public class BoundedByteBufferReceive implements BoundedReceive { - - private ByteBuffer buffer = null; - private ByteBuffer sizeBuffer; - private long sizeToRead; - private long sizeRead; - private final static Logger logger = LoggerFactory.getLogger(BoundedByteBufferReceive.class); - - public BoundedByteBufferReceive() { - sizeToRead = 0; - sizeRead = 0; - sizeBuffer = ByteBuffer.allocate(Long.BYTES); - } - - @Override - public boolean isReadComplete() { - return buffer != null && sizeRead >= sizeToRead; - } - - @Override - public long readFrom(ReadableByteChannel channel) throws IOException { - long bytesRead = 0; - if (buffer == null) { - bytesRead = channel.read(sizeBuffer); - if (bytesRead < 0) { - throw new EOFException(); - } - if (sizeBuffer.position() == sizeBuffer.capacity()) { - sizeBuffer.flip(); - sizeToRead = sizeBuffer.getLong(); - sizeRead += Long.BYTES; - buffer = ByteBuffer.allocate((int) sizeToRead - Long.BYTES); - sizeBuffer = null; - } - } - if (buffer != null && sizeRead < sizeToRead) { - long bytesReadFromChannel = channel.read(buffer); - if (bytesReadFromChannel < 0) { - throw new EOFException(); - } - sizeRead += bytesReadFromChannel; - bytesRead += bytesReadFromChannel; - if (sizeRead == sizeToRead) { - buffer.flip(); - } - } - logger.trace("size read from channel {}", sizeRead); - return bytesRead; - } - - @Override - public ByteBuffer getAndRelease() { - try { - return buffer; - } finally { - buffer = null; - } - } - - /** - * The total size in bytes that needs to receive from the channel - * It will be initialized only after header is read. - * @return the size of the data in bytes to receive after reading header, otherwise return 0 - */ - @Override - public long sizeRead() { - return sizeRead; - } -} diff --git a/ambry-api/src/main/java/com.github.ambry/network/BoundedNettyByteBufReceive.java b/ambry-api/src/main/java/com.github.ambry/network/BoundedNettyByteBufReceive.java index 2d4c6d7a2c..74d91534ef 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/BoundedNettyByteBufReceive.java +++ b/ambry-api/src/main/java/com.github.ambry/network/BoundedNettyByteBufReceive.java @@ -13,13 +13,13 @@ */ package com.github.ambry.network; +import com.github.ambry.utils.AbstractByteBufHolder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import java.io.EOFException; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +27,7 @@ /** * A netty {@link ByteBuf} version of Receive to buffer the incoming request or response. */ -public class BoundedNettyByteBufReceive implements BoundedReceive { +public class BoundedNettyByteBufReceive extends AbstractByteBufHolder { private ByteBuf buffer = null; private ByteBuf sizeBuffer = null; @@ -35,9 +35,16 @@ public class BoundedNettyByteBufReceive implements BoundedReceive { private long sizeRead = 0; private final static Logger logger = LoggerFactory.getLogger(BoundedNettyByteBufReceive.class); - @Override + public BoundedNettyByteBufReceive() { + } + + BoundedNettyByteBufReceive(ByteBuf buffer, long sizeToRead) { + this.buffer = Objects.requireNonNull(buffer); + this.sizeToRead = sizeToRead; + } + public boolean isReadComplete() { - return buffer != null && sizeRead >= sizeToRead; + return buffer != null && sizeRead >= sizeToRead; } /** @@ -56,7 +63,6 @@ private int readBytesFromReadableByteChannel(ReadableByteChannel channel, ByteBu return n; } - @Override public long readFrom(ReadableByteChannel channel) throws IOException { long bytesRead = 0; if (buffer == null) { @@ -99,32 +105,22 @@ public long readFrom(ReadableByteChannel channel) throws IOException { return bytesRead; } - /** - * Returns the payload as {@link ByteBuf}, at the same time release the current reference to this payload. - * It's not safe to call this function multiple times. - * @return - */ - @Override - public ByteBuf getAndRelease() { - if (buffer == null) { - return null; - } else { - try { - return buffer.retainedDuplicate(); - } finally { - buffer.release(); - buffer = null; - } - } - } - /** * The total size in bytes that needs to receive from the channel * It will be initialized only after header is read. * @return the size of the data in bytes to receive after reading header, otherwise return 0 */ - @Override public long sizeRead() { return sizeRead; } + + @Override + public ByteBuf content() { + return buffer; + } + + @Override + public BoundedNettyByteBufReceive replace(ByteBuf content) { + return new BoundedNettyByteBufReceive(content, sizeToRead); + } } diff --git a/ambry-api/src/main/java/com.github.ambry/network/BoundedReceive.java b/ambry-api/src/main/java/com.github.ambry/network/BoundedReceive.java deleted file mode 100644 index 365657a954..0000000000 --- a/ambry-api/src/main/java/com.github.ambry/network/BoundedReceive.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Copyright 2019 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.network; - -/** - * This is an interface for {@link Receive} to use any kinds of data structure to buffer the - * incoming request or response from the network. - * @param The type of the buffer. It's either a {@link java.nio.ByteBuffer} or a {@link io.netty.buffer.ByteBuf}. - */ -public interface BoundedReceive extends Receive { - - /** - * Return the buffer and transfer the ownership of this buffer to the caller. It will release the underlying buffer - * and it's not safe to call this function twice. - * @return The byte buffer that contains the bytes from the network. - */ - T getAndRelease(); - - /** - * The size of read bytes from the network layer. - */ - long sizeRead(); -} diff --git a/ambry-api/src/main/java/com.github.ambry/network/NetworkReceive.java b/ambry-api/src/main/java/com.github.ambry/network/NetworkReceive.java index 233c5b8b43..13c1d58fcd 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/NetworkReceive.java +++ b/ambry-api/src/main/java/com.github.ambry/network/NetworkReceive.java @@ -27,14 +27,14 @@ public class NetworkReceive { /** * The bytes received from the destination */ - private final BoundedReceive receivedBytes; + private final BoundedNettyByteBufReceive receivedBytes; /** * The start time of when the receive started */ private final long receiveStartTimeInMs; - public NetworkReceive(String connectionId, BoundedReceive receivedBytes, Time time) { + public NetworkReceive(String connectionId, BoundedNettyByteBufReceive receivedBytes, Time time) { this.connectionId = connectionId; this.receivedBytes = receivedBytes; this.receiveStartTimeInMs = time.milliseconds(); @@ -44,7 +44,7 @@ public String getConnectionId() { return connectionId; } - public BoundedReceive getReceivedBytes() { + public BoundedNettyByteBufReceive getReceivedBytes() { return receivedBytes; } diff --git a/ambry-api/src/main/java/com.github.ambry/network/NetworkRequest.java b/ambry-api/src/main/java/com.github.ambry/network/NetworkRequest.java index 7156ce68e5..53b9e1f872 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/NetworkRequest.java +++ b/ambry-api/src/main/java/com.github.ambry/network/NetworkRequest.java @@ -13,6 +13,7 @@ */ package com.github.ambry.network; +import io.netty.buffer.ByteBuf; import java.io.InputStream; @@ -33,7 +34,11 @@ public interface NetworkRequest { long getStartTimeInMs(); /** - * Release any resource this request is holding. + * Release any resource this request is holding. By default it returns false so this method can be compatible + * with {@link ByteBuf#release()} + * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated */ - default void release() {}; + default boolean release() { + return false; + } } diff --git a/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java b/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java index b498c86690..4bdb18e9c1 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java +++ b/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java @@ -14,8 +14,8 @@ package com.github.ambry.network; import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.utils.AbstractByteBufHolder; import io.netty.buffer.ByteBuf; -import io.netty.util.ReferenceCountUtil; /** @@ -24,26 +24,33 @@ * was an error sending the request or a non-null ByteBuffer containing the successful response received for this * request. Also, this class contains {@link DataNodeId} to which the request is issued. */ -public class ResponseInfo { +public class ResponseInfo extends AbstractByteBufHolder { private final RequestInfo requestInfo; private final NetworkClientErrorCode error; private final DataNodeId dataNode; - private Object response; + private ByteBuf content; /** * Constructs a ResponseInfo with the given parameters. * @param requestInfo the {@link RequestInfo} associated with this response. * @param error the error encountered in sending this request, if there is any. - * @param response the response received for this request. + * @param content the response received for this request. */ - public ResponseInfo(RequestInfo requestInfo, NetworkClientErrorCode error, Object response) { - this(requestInfo, error, response, requestInfo == null ? null : requestInfo.getReplicaId().getDataNodeId()); + public ResponseInfo(RequestInfo requestInfo, NetworkClientErrorCode error, ByteBuf content) { + this(requestInfo, error, content, requestInfo == null ? null : requestInfo.getReplicaId().getDataNodeId()); } - public ResponseInfo(RequestInfo requestInfo, NetworkClientErrorCode error, Object response, DataNodeId dataNode) { + /** + * Constructs a ResponseInfo with the given parameters. + * @param requestInfo the {@link RequestInfo} associated with this response. + * @param error the error encountered in sending this request, if there is any. + * @param content the response received for this request. + * @param dataNode the {@link DataNodeId} of this request. + */ + public ResponseInfo(RequestInfo requestInfo, NetworkClientErrorCode error, ByteBuf content, DataNodeId dataNode) { this.requestInfo = requestInfo; this.error = error; - this.response = response; + this.content = content; this.dataNode = dataNode; } @@ -61,35 +68,6 @@ public NetworkClientErrorCode getError() { return error; } - /** - * @return the response received for this request. - */ - public Object getResponse() { - return response; - } - - /** - * Decrease the reference count of underlying response. - */ - public void release() { - if (response != null) { - ReferenceCountUtil.release(response); - response = null; - } - } - - /** - * Tries to call {@link ByteBuf#touch(Object)} if the specified message implements - * {@link ByteBuf}. If the specified message doesn't implement {@link ByteBuf}, - * this method does nothing. - * @param hint hint object. - */ - public void touch(Object hint) { - if (response != null) { - ReferenceCountUtil.touch(response, hint); - } - } - /** * @return the {@link DataNodeId} with which the response is associated. */ @@ -99,7 +77,17 @@ public DataNodeId getDataNode() { @Override public String toString() { - return "ResponseInfo{" + "requestInfo=" + requestInfo + ", error=" + error + ", response=" + response - + ", dataNode=" + dataNode + '}'; + return "ResponseInfo{requestInfo=" + requestInfo + ", error=" + error + ", response=" + content + ", dataNode=" + + dataNode + '}'; + } + + @Override + public ByteBuf content() { + return content; + } + + @Override + public ResponseInfo replace(ByteBuf content) { + return new ResponseInfo(requestInfo, error, content, dataNode); } } diff --git a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java index fdb0c11e0e..bae2404adc 100644 --- a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java +++ b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java @@ -13,45 +13,28 @@ */ package com.github.ambry.messageformat; -import com.github.ambry.utils.ByteBufferInputStream; +import com.github.ambry.utils.AbstractByteBufHolder; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import java.nio.ByteBuffer; /** * Contains the blob stream along with some required info */ -public class BlobData { +public class BlobData extends AbstractByteBufHolder { private final BlobType blobType; private final long size; - private ByteBuf byteBuf; - private ByteBufferInputStream stream = null; + private ByteBuf content; /** * The blob data contains the stream and other required info * @param blobType {@link BlobType} of the blob * @param size The size of the blob content. - * @param byteBuf The content of this blob in a {@link ByteBuf}. + * @param content The content of this blob in a {@link ByteBuf}. */ - public BlobData(BlobType blobType, long size, ByteBuf byteBuf) { + public BlobData(BlobType blobType, long size, ByteBuf content) { this.blobType = blobType; this.size = size; - this.byteBuf = byteBuf; - } - - /** - * The blob data contains the stream and other required info - * @param blobType {@link BlobType} of the blob - * @param size The size of the blob content. - * @param stream The {@link ByteBufferInputStream} containing the blob content. - */ - @Deprecated - public BlobData(BlobType blobType, long size, ByteBufferInputStream stream) { - this.blobType = blobType; - this.size = size; - this.byteBuf = Unpooled.wrappedBuffer(stream.getByteBuffer()); - this.stream = stream; + this.content = content; } /** @@ -68,40 +51,13 @@ public long getSize() { return size; } - /** - * @return the {@link ByteBufferInputStream} containing the blob content. - */ - @Deprecated - public ByteBufferInputStream getStream() { - if (stream != null) { - return stream; - } - // The blob content is passed as a ByteBuf since the stream is nulle - if (byteBuf == null) { - return null; - } - ByteBuffer temp = ByteBuffer.allocate(byteBuf.readableBytes()); - byteBuf.readBytes(temp); - byteBuf.release(); - byteBuf = null; - temp.flip(); - stream = new ByteBufferInputStream(temp); - return stream; + @Override + public ByteBuf content() { + return content; } - /** - * Return the netty {@link ByteBuf} and then transfer the ownership to the caller. It's not safe - * to call this method more than once. - */ - public ByteBuf getAndRelease() { - if (byteBuf == null) { - return null; - } - try { - return byteBuf.retainedDuplicate(); - } finally { - byteBuf.release(); - byteBuf = null; - } + @Override + public BlobData replace(ByteBuf content) { + return new BlobData(blobType, size, content); } } diff --git a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java index e47d0df82c..8cec009099 100644 --- a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java +++ b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java @@ -78,7 +78,7 @@ public TransformationOutput transform(Message message) { // @todo, when enabling netty in ambry-server, release this ByteBuf. PutMessageFormatInputStream transformedStream = new PutMessageFormatInputStream(keyInStream, encryptionKey, props, metadata, - new ByteBufInputStream(blobData.getAndRelease(), true), blobData.getSize(), blobData.getBlobType()); + new ByteBufInputStream(blobData.content(), true), blobData.getSize(), blobData.getBlobType()); MessageInfo transformedMsgInfo = new MessageInfo(keyInStream, transformedStream.getSize(), msgInfo.isDeleted(), msgInfo.isTtlUpdated(), msgInfo.getExpirationTimeInMs(), msgInfo.getCrc(), msgInfo.getAccountId(), msgInfo.getContainerId(), diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java index 3ddc142c82..66e09d1cde 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java @@ -255,7 +255,7 @@ private void messageFormatPutRecordsTest(short blobVersion, BlobType blobType, s } else { Assert.assertEquals(null, blobAll.getBlobEncryptionKey()); } - ByteBuf byteBuf = blobAll.getBlobData().getAndRelease(); + ByteBuf byteBuf = blobAll.getBlobData().content(); try { Assert.assertEquals(Unpooled.wrappedBuffer(data), byteBuf); } finally { diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java index 0efb2e8d1e..a491d1e2ac 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java @@ -21,6 +21,7 @@ import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Crc32; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.Pair; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; @@ -33,7 +34,9 @@ import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import static com.github.ambry.account.Account.*; @@ -46,6 +49,18 @@ public class MessageFormatRecordTest { + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } + //TODO Separate this mega test into smaller tests @Test public void deserializeTest() throws MessageFormatException, IOException { @@ -173,8 +188,9 @@ public void deserializeTest() throws MessageFormatException, IOException { BlobData blobData = MessageFormatRecord.deserializeBlob(new ByteBufferInputStream(sData)); Assert.assertEquals(blobData.getSize(), 2000); byte[] verify = new byte[2000]; - blobData.getAndRelease().readBytes(verify); + blobData.content().readBytes(verify); Assert.assertArrayEquals(verify, data.array()); + blobData.release(); // corrupt blob record V1 sData.flip(); @@ -640,8 +656,9 @@ private void testBlobRecordV2(int blobSize, BlobType blobType) throws IOExceptio BlobData blobData = getBlobRecordV2(blobSize, blobType, blobContent, entireBlob); Assert.assertEquals("Blob size mismatch", blobSize, blobData.getSize()); byte[] verify = new byte[blobSize]; - blobData.getAndRelease().readBytes(verify); + blobData.content().readBytes(verify); Assert.assertArrayEquals("BlobContent mismatch", blobContent.array(), verify); + blobData.release(); // corrupt blob record V2 entireBlob.flip(); @@ -694,8 +711,9 @@ public void testBlobRecordWithMetadataContentV2() throws IOException, MessageFor Assert.assertEquals(metadataContentSize, blobData.getSize()); byte[] verify = new byte[metadataContentSize]; - blobData.getAndRelease().readBytes(verify); + blobData.content().readBytes(verify); Assert.assertArrayEquals("Metadata content mismatch", metadataContent.array(), verify); + blobData.release(); // deserialize and check for metadata contents metadataContent.rewind(); @@ -728,8 +746,9 @@ public void testBlobRecordWithMetadataContentV3() throws IOException, MessageFor BlobData blobData = getBlobRecordV2(metadataContentSize, BlobType.MetadataBlob, metadataContent, blob); Assert.assertEquals(metadataContentSize, blobData.getSize()); byte[] verify = new byte[metadataContentSize]; - blobData.getAndRelease().readBytes(verify); + blobData.content().readBytes(verify); Assert.assertArrayEquals("Metadata content mismatch", metadataContent.array(), verify); + blobData.release(); metadataContent.rewind(); CompositeBlobInfo compositeBlobInfo = deserializeMetadataContentV3(metadataContent, new MockIdFactory()); diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java index af3fa188a1..b554d02fe3 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java @@ -23,6 +23,7 @@ import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.ByteBufferOutputStream; import com.github.ambry.utils.Crc32; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.TestUtils; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -36,6 +37,7 @@ import java.util.Random; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,6 +49,18 @@ public class MessageFormatSendTest { private final String putFormat; private static short messageFormatHeaderVersionSaved; + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } + @BeforeClass public static void saveMessageFormatHeaderVersionToUse() { messageFormatHeaderVersionSaved = MessageFormatRecord.headerVersionToUse; @@ -475,8 +489,9 @@ private void doSendWriteCompositeMessagesTest(byte[][] blob, byte[][] userMetada Assert.assertEquals(BlobType.DataBlob, deserializedBlob.getBlobData().getBlobType()); Assert.assertEquals(blob[i].length, deserializedBlob.getBlobData().getSize()); byte[] readBlob = new byte[blob[i].length]; - deserializedBlob.getBlobData().getAndRelease().readBytes(readBlob); + deserializedBlob.getBlobData().content().readBytes(readBlob); Assert.assertArrayEquals(blob[i], readBlob); + deserializedBlob.getBlobData().release(); if (headerVersions[i] == MessageFormatRecord.Message_Header_Version_V1) { Assert.assertEquals(null, send.getMessageMetadataList().get(i)); diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java index da672e8dfc..8fe8ed11e3 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java @@ -977,7 +977,7 @@ private void verifySievedTransformedMessage(MessageSievingInputStream sievedStre Assert.assertEquals(containerId, propsFromStream.getContainerId()); Assert.assertEquals(ByteBuffer.wrap(usermetadata), userMetadataFromStream); Assert.assertEquals(blobType, blobDataFromStream.getBlobType()); - ByteBuf byteBuf = blobDataFromStream.getAndRelease(); + ByteBuf byteBuf = blobDataFromStream.content(); try { Assert.assertEquals(Unpooled.wrappedBuffer(data), byteBuf); } finally { @@ -1086,7 +1086,7 @@ public TransformationOutput transform(Message message) { MessageInfo transformedMsgInfo; PutMessageFormatInputStream transformedStream = new PutMessageFormatInputStream(newKey, encryptionKey, props, metadata, - new ByteBufInputStream(blobData.getAndRelease(), true), blobData.getSize(), blobData.getBlobType()); + new ByteBufInputStream(blobData.content(), true), blobData.getSize(), blobData.getBlobType()); transformedMsgInfo = new MessageInfo(newKey, transformedStream.getSize(), msgInfo.isDeleted(), msgInfo.isTtlUpdated(), msgInfo.getExpirationTimeInMs(), msgInfo.getCrc(), msgInfo.getAccountId(), msgInfo.getContainerId(), diff --git a/ambry-network/src/main/java/com.github.ambry.network/LocalNetworkClient.java b/ambry-network/src/main/java/com.github.ambry.network/LocalNetworkClient.java index e646c974a3..149967ed58 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/LocalNetworkClient.java +++ b/ambry-network/src/main/java/com.github.ambry.network/LocalNetworkClient.java @@ -17,6 +17,8 @@ import com.github.ambry.network.LocalRequestResponseChannel.LocalChannelRequest; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Time; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; import java.nio.ByteBuffer; import java.util.List; import java.util.Set; @@ -80,8 +82,8 @@ public List sendAndPoll(List requestInfos, Set responseQueue = getResponseQueue(localRequest.processorId); responseQueue.put(responseInfo); logger.debug("Added response for {}, size now {}", localRequest.processorId, responseQueue.size()); @@ -118,17 +119,16 @@ public void shutdown() { } /** - * Utility to extract a byte buffer from a {@link Send} object, skipping the size header. + * Utility to extract a {@link ByteBuf} from a {@link Send} object, skipping the size header. * @param payload the payload whose bytes we want. */ - static ByteBuffer byteBufferFromPayload(Send payload) throws IOException { + static ByteBuf byteBufFromPayload(Send payload) throws IOException { int bufferSize = (int) payload.sizeInBytes() - sizeByteArray.length; - ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + ByteBuf buffer = Unpooled.buffer(bufferSize); // Skip the size header - long bytesWritten = payload.writeTo(new ByteBufferChannel(ByteBuffer.wrap(sizeByteArray))); - WritableByteChannel byteChannel = Channels.newChannel(new ByteBufferOutputStream(buffer)); + payload.writeTo(new ByteBufferChannel(ByteBuffer.wrap(sizeByteArray))); + WritableByteChannel byteChannel = Channels.newChannel(new ByteBufOutputStream(buffer)); payload.writeTo(byteChannel); - buffer.rewind(); return buffer; } diff --git a/ambry-network/src/main/java/com.github.ambry.network/SocketNetworkClient.java b/ambry-network/src/main/java/com.github.ambry.network/SocketNetworkClient.java index fd7a59608a..7adbc4ced1 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SocketNetworkClient.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SocketNetworkClient.java @@ -355,7 +355,9 @@ private void handleSelectorEvents(List responseInfoList) { connectionTracker.checkInConnection(connId); RequestMetadata requestMetadata = connectionIdToRequestInFlight.remove(connId); correlationIdInFlightToConnectionId.remove(requestMetadata.requestInfo.getRequest().getCorrelationId()); - responseInfoList.add(new ResponseInfo(requestMetadata.requestInfo, null, recv.getReceivedBytes().getAndRelease())); + // This would transfer the ownership of the content from BoundedNettyByteBufReceive to ResponseInfo. + // Don't use this BoundedNettyByteBufReceive anymore. + responseInfoList.add(new ResponseInfo(requestMetadata.requestInfo, null, recv.getReceivedBytes().content())); requestMetadata.onResponseReceive(); } } diff --git a/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java b/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java index 72a473d517..5029d4192f 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java @@ -13,9 +13,10 @@ */ package com.github.ambry.network; +import com.github.ambry.utils.AbstractByteBufHolder; +import com.github.ambry.utils.NettyByteBufDataInputStream; import com.github.ambry.utils.SystemTime; -import io.netty.util.ReferenceCountUtil; -import java.io.IOException; +import io.netty.buffer.ByteBuf; import java.io.InputStream; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; @@ -26,19 +27,19 @@ // The request at the network layer -class SocketServerRequest implements NetworkRequest { +class SocketServerRequest extends AbstractByteBufHolder implements NetworkRequest { private final int processor; private final String connectionId; private final InputStream input; private final long startTimeInMs; private static final Logger logger = LoggerFactory.getLogger(SocketServerRequest.class); - private Object buffer; + private ByteBuf content; - public SocketServerRequest(int processor, String connectionId, Object buffer, InputStream input) throws IOException { + public SocketServerRequest(int processor, String connectionId, ByteBuf content) { this.processor = processor; this.connectionId = connectionId; - this.buffer = buffer; - this.input = input; + this.content = content; + this.input = new NettyByteBufDataInputStream(content); this.startTimeInMs = SystemTime.getInstance().milliseconds(); logger.trace("Processor {} received request : {}", processor, connectionId); } @@ -53,14 +54,6 @@ public long getStartTimeInMs() { return startTimeInMs; } - @Override - public void release() { - if (buffer != null) { - ReferenceCountUtil.release(buffer); - buffer = null; - } - } - public int getProcessor() { return processor; } @@ -68,6 +61,16 @@ public int getProcessor() { public String getConnectionId() { return connectionId; } + + @Override + public ByteBuf content() { + return content; + } + + @Override + public SocketServerRequest replace(ByteBuf content) { + return new SocketServerRequest(getProcessor(), getConnectionId(), content); + } } // The response at the network layer @@ -114,7 +117,7 @@ public ServerNetworkResponseMetrics getMetrics() { } interface ResponseListener { - public void onResponse(int processorId); + void onResponse(int processorId); } /** diff --git a/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java b/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java index 52185b19f0..5904c3572d 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java @@ -20,6 +20,7 @@ import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketException; @@ -419,9 +420,8 @@ public void run() { List completedReceives = selector.completedReceives(); for (NetworkReceive networkReceive : completedReceives) { String connectionId = networkReceive.getConnectionId(); - Object buffer = networkReceive.getReceivedBytes().getAndRelease(); - SocketServerRequest req = new SocketServerRequest(id, connectionId, buffer, - Utils.createDataInputStreamFromBuffer(buffer, networkConfig.networkPutRequestShareMemory)); + ByteBuf buffer = networkReceive.getReceivedBytes().content(); + SocketServerRequest req = new SocketServerRequest(id, connectionId, buffer); channel.sendRequest(req); } } diff --git a/ambry-network/src/main/java/com.github.ambry.network/Transmission.java b/ambry-network/src/main/java/com.github.ambry.network/Transmission.java index 5598e1a026..04ce475b15 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Transmission.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Transmission.java @@ -16,7 +16,6 @@ import com.github.ambry.config.NetworkConfig; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; -import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.SelectionKey; @@ -76,9 +75,7 @@ public void setNetworkSend(NetworkSend networkSend) { } protected void initializeNetworkReceive() { - BoundedReceive boundedReceive = - config.networkUseNettyByteBuf ? new BoundedNettyByteBufReceive() : new BoundedByteBufferReceive(); - networkReceive = new NetworkReceive(getConnectionId(), boundedReceive, time); + networkReceive = new NetworkReceive(getConnectionId(), new BoundedNettyByteBufReceive(), time); } /** @@ -181,7 +178,7 @@ public void clearReceive() { protected void release() { if (networkReceive != null) { - ReferenceCountUtil.release(networkReceive.getReceivedBytes().getAndRelease()); + networkReceive.getReceivedBytes().release(); } } diff --git a/ambry-network/src/test/java/com.github.ambry.network/BoundedByteBufferReceiveTest.java b/ambry-network/src/test/java/com.github.ambry.network/BoundedByteBufferReceiveTest.java deleted file mode 100644 index e50b3ccd42..0000000000 --- a/ambry-network/src/test/java/com.github.ambry.network/BoundedByteBufferReceiveTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright 2016 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.network; - -import com.github.ambry.utils.ByteBufferInputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.util.Random; -import org.junit.Assert; -import org.junit.Test; - - -public class BoundedByteBufferReceiveTest { - - /** - * Test basic operation of {@link BoundedByteBufferReceive}. - * @throws Exception - */ - @Test - public void testBoundedByteBufferReceive() throws Exception { - int bufferSize = 2000; - ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - buffer.putLong(bufferSize); - byte[] buf = new byte[bufferSize - Long.BYTES]; - new Random().nextBytes(buf); - buffer.put(buf); - buffer.flip(); - BoundedByteBufferReceive set = new BoundedByteBufferReceive(); - Assert.assertEquals("Wrong number of bytes read", bufferSize, - set.readFrom(Channels.newChannel(new ByteBufferInputStream(buffer)))); - buffer.clear(); - ByteBuffer payload = set.getAndRelease(); - for (int i = 8; i < bufferSize; i++) { - Assert.assertEquals(buffer.array()[i], payload.get()); - } - } -} diff --git a/ambry-network/src/test/java/com.github.ambry.network/BoundedNettyByteBufReceiveTest.java b/ambry-network/src/test/java/com.github.ambry.network/BoundedNettyByteBufReceiveTest.java index ad593f6fb9..a16ec62b19 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/BoundedNettyByteBufReceiveTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/BoundedNettyByteBufReceiveTest.java @@ -41,7 +41,7 @@ public void testBoundedByteBufferReceive() throws Exception { Assert.assertEquals("Wrong number of bytes read", bufferSize, set.readFrom(Channels.newChannel(new ByteBufferInputStream(buffer)))); buffer.clear(); - ByteBuf payload = set.getAndRelease(); + ByteBuf payload = set.content(); for (int i = 8; i < bufferSize; i++) { Assert.assertEquals(buffer.array()[i], payload.readByte()); } diff --git a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java index 3120fb84c0..dac301c1da 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java @@ -19,13 +19,14 @@ import com.github.ambry.config.NetworkConfig; import com.github.ambry.config.SSLConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Time; +import io.netty.buffer.ByteBuf; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -37,6 +38,7 @@ import org.conscrypt.Conscrypt; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -55,6 +57,17 @@ public class SSLSelectorTest { private Selector selector; private final File trustStoreFile; private final NetworkConfig networkConfig; + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } @Parameterized.Parameters public static List data() { @@ -212,14 +225,14 @@ public void testNormalOperation() throws Exception { // handle any responses we may have gotten for (NetworkReceive receive : selector.completedReceives()) { - ByteBuffer payload = (ByteBuffer) (receive.getReceivedBytes().getAndRelease()); + ByteBuf payload = receive.getReceivedBytes().content(); String[] pieces = SelectorTest.asString(payload).split("&"); assertEquals("Should be in the form 'conn-counter'", 2, pieces.length); assertEquals("Check the source", receive.getConnectionId(), pieces[0]); - assertEquals("Check that the receive has kindly been rewound", 0, payload.position()); assertTrue("Received connectionId is as expected ", connectionIds.contains(receive.getConnectionId())); assertEquals("Check the request counter", 0, Integer.parseInt(pieces[1])); responseCount++; + receive.getReceivedBytes().release(); } // prepare new sends for the next round @@ -335,8 +348,12 @@ private String blockingRequest(String connectionId, String s) throws Exception { selector.poll(1000L); for (NetworkReceive receive : selector.completedReceives()) { if (receive.getConnectionId().equals(connectionId)) { - ByteBuffer payload = (ByteBuffer) (receive.getReceivedBytes().getAndRelease()); - return SelectorTest.asString(payload); + ByteBuf payload = receive.getReceivedBytes().content(); + try { + return SelectorTest.asString(payload); + } finally { + payload.release(); + } } } } diff --git a/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java index 013adc206a..a03a4e30b8 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java @@ -16,7 +16,9 @@ import com.codahale.metrics.MetricRegistry; import com.github.ambry.config.NetworkConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.SystemTime; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -44,6 +46,17 @@ public class SelectorTest { private EchoServer server; private Selector selector; private int selectorExecutorPoolSize; + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } @Parameterized.Parameters public static List data() { @@ -197,15 +210,15 @@ public void testNormalOperation() throws Exception { // handle any responses we may have gotten for (NetworkReceive receive : selector.completedReceives()) { - ByteBuffer payload = (ByteBuffer) (receive.getReceivedBytes().getAndRelease()); + ByteBuf payload = receive.getReceivedBytes().content(); String[] pieces = asString(payload).split("&"); assertEquals("Should be in the form 'conn-counter'", 2, pieces.length); assertEquals("Check the source", receive.getConnectionId(), pieces[0]); - assertEquals("Check that the receive has kindly been rewound", 0, payload.position()); int index = Integer.parseInt(receive.getConnectionId().split("_")[1]); assertEquals("Check the request counter", responses[index], Integer.parseInt(pieces[1])); responses[index]++; // increment the expected counter responseCount++; + receive.getReceivedBytes().release(); } // prepare new sends for the next round @@ -247,8 +260,12 @@ private String blockingRequest(String connectionId, String s) throws Exception { selector.poll(1000L); for (NetworkReceive receive : selector.completedReceives()) { if (receive.getConnectionId() == connectionId) { - ByteBuffer payload = (ByteBuffer) (receive.getReceivedBytes().getAndRelease()); - return asString(payload); + ByteBuf payload = receive.getReceivedBytes().content(); + try { + return asString(payload); + } finally { + receive.getReceivedBytes().release(); + } } } } @@ -272,8 +289,11 @@ static NetworkSend createSend(String connectionId, String s) { return new NetworkSend(connectionId, new BoundedByteBufferSend(buf), null, SystemTime.getInstance()); } - static String asString(ByteBuffer payload) { - return new String(payload.array(), payload.arrayOffset()); + static String asString(ByteBuf payload) { + ByteBuffer buffer = ByteBuffer.allocate(payload.readableBytes()); + payload.readBytes(buffer); + buffer.flip(); + return new String(buffer.array(), buffer.arrayOffset()); } /** diff --git a/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java b/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java index 1b25aa5df3..dd67d9f99c 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java @@ -25,7 +25,6 @@ import com.github.ambry.utils.Time; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -45,14 +44,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; /** * Test the {@link SocketNetworkClient} */ -@RunWith(Parameterized.class) public class SocketNetworkClientTest { private static final int CHECKOUT_TIMEOUT_MS = 1000; private static final int MAX_PORTS_PLAIN_TEXT = 3; @@ -72,14 +68,8 @@ public class SocketNetworkClientTest { private List localSslDataNodes; private MockClusterMap sslEnabledClusterMap; private MockClusterMap sslDisabledClusterMap; - private boolean usingNettyByteBuffer; private NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); - @Parameterized.Parameters - public static List data() { - return Arrays.asList(new Object[][]{{false}, {true}}); - } - @Before public void before() { nettyByteBufLeakHelper.beforeTest(); @@ -106,18 +96,17 @@ public void testNetworkClientFactory() throws IOException { Assert.assertNotNull("SocketNetworkClient returned should be non-null", networkClientFactory.getNetworkClient()); } - public SocketNetworkClientTest(boolean usingNettyByteBuffer) throws IOException { - this.usingNettyByteBuffer = usingNettyByteBuffer; + public SocketNetworkClientTest() throws IOException { Properties props = new Properties(); props.setProperty(NetworkConfig.NETWORK_CLIENT_ENABLE_CONNECTION_REPLENISHMENT, "true"); - props.setProperty(NetworkConfig.NETWORK_USE_NETTY_BYTE_BUF, usingNettyByteBuffer ? "true" : "false"); VerifiableProperties vprops = new VerifiableProperties(props); NetworkConfig networkConfig = new NetworkConfig(vprops); selector = new MockSelector(networkConfig); time = new MockTime(); networkMetrics = new NetworkMetrics(new MetricRegistry()); - networkClient = new SocketNetworkClient(selector, networkConfig, networkMetrics, MAX_PORTS_PLAIN_TEXT, MAX_PORTS_SSL, - CHECKOUT_TIMEOUT_MS, time); + networkClient = + new SocketNetworkClient(selector, networkConfig, networkMetrics, MAX_PORTS_PLAIN_TEXT, MAX_PORTS_SSL, + CHECKOUT_TIMEOUT_MS, time); sslEnabledClusterMap = new MockClusterMap(true, 9, 3, 3, false); localSslDataNodes = sslEnabledClusterMap.getDataNodeIds() .stream() @@ -136,12 +125,6 @@ public SocketNetworkClientTest(boolean usingNettyByteBuffer) throws IOException replicaOnSslNode = sslEnabledClusterMap.getReplicaIds(dataNodeId).get(0); } - @After - public void cleanup() { - // force JVM to gc to trigger more intense resource leak detection. - System.gc(); - } - /** * Test {@link SocketNetworkClient#warmUpConnections(List, int, long, List)} */ @@ -195,12 +178,11 @@ public void testBasicSendAndPoll() { for (ResponseInfo responseInfo : responseInfoList) { MockSend send = (MockSend) responseInfo.getRequestInfo().getRequest(); NetworkClientErrorCode error = responseInfo.getError(); - Object response = responseInfo.getResponse(); + ByteBuf response = responseInfo.content(); Assert.assertNull("Should not have encountered an error", error); Assert.assertNotNull("Should receive a valid response", response); int correlationIdInRequest = send.getCorrelationId(); - int correlationIdInResponse = - usingNettyByteBuffer ? ((ByteBuf) response).readInt() : ((ByteBuffer) response).getInt(); + int correlationIdInResponse = response.readInt(); Assert.assertEquals("Received response for the wrong request", correlationIdInRequest, correlationIdInResponse); responseCount++; responseInfo.release(); @@ -218,7 +200,7 @@ public void testBasicSendAndPoll() { * Tests a failure scenario where requests remain too long in the {@link SocketNetworkClient}'s pending requests queue. */ @Test - public void testConnectionUnavailable() throws InterruptedException { + public void testConnectionUnavailable() { List requestInfoList = new ArrayList<>(); List responseInfoList; requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(3), replicaOnSslNode)); @@ -240,7 +222,7 @@ public void testConnectionUnavailable() throws InterruptedException { requestInfoList.clear(); for (ResponseInfo responseInfo : responseInfoList) { NetworkClientErrorCode error = responseInfo.getError(); - Object response = responseInfo.getResponse(); + ByteBuf response = responseInfo.content(); Assert.assertNotNull("Should have encountered an error", error); Assert.assertEquals("Should have received a connection unavailable error", NetworkClientErrorCode.ConnectionUnavailable, error); @@ -274,7 +256,7 @@ public void testNetworkError() { requestInfoList.clear(); for (ResponseInfo responseInfo : responseInfoList) { NetworkClientErrorCode error = responseInfo.getError(); - Object response = responseInfo.getResponse(); + ByteBuf response = responseInfo.content(); Assert.assertNotNull("Should have encountered an error", error); Assert.assertEquals("Should have received a connection unavailable error", NetworkClientErrorCode.NetworkError, error); @@ -467,17 +449,16 @@ public void testRequestDropping() { MockSend send = (MockSend) responseInfo.getRequestInfo().getRequest(); if (send.getCorrelationId() == 1) { NetworkClientErrorCode error = responseInfo.getError(); - Object response = responseInfo.getResponse(); + ByteBuf response = responseInfo.content(); Assert.assertNull("Should not have encountered an error", error); Assert.assertNotNull("Should receive a valid response", response); int correlationIdInRequest = send.getCorrelationId(); - int correlationIdInResponse = - usingNettyByteBuffer ? ((ByteBuf) response).readInt() : ((ByteBuffer) response).getInt(); + int correlationIdInResponse = response.readInt(); Assert.assertEquals("Received response for the wrong request", correlationIdInRequest, correlationIdInResponse); } else { Assert.assertEquals("Expected connection unavailable on dropped request", NetworkClientErrorCode.ConnectionUnavailable, responseInfo.getError()); - Assert.assertNull("Should not receive a response", responseInfo.getResponse()); + Assert.assertNull("Should not receive a response", responseInfo.content()); } } responseInfoList.forEach(ResponseInfo::release); @@ -496,18 +477,17 @@ public void testRequestDropping() { MockSend send = (MockSend) responseInfo.getRequestInfo().getRequest(); if (send.getCorrelationId() != 4) { NetworkClientErrorCode error = responseInfo.getError(); - Object response = responseInfo.getResponse(); + ByteBuf response = responseInfo.content(); Assert.assertNull("Should not have encountered an error", error); Assert.assertNotNull("Should receive a valid response", response); int correlationIdInRequest = send.getCorrelationId(); - int correlationIdInResponse = - usingNettyByteBuffer ? ((ByteBuf) response).readInt() : ((ByteBuffer) response).getInt(); + int correlationIdInResponse = response.readInt(); Assert.assertEquals("Received response for the wrong request", correlationIdInRequest, correlationIdInResponse); responseInfo.release(); } else { Assert.assertEquals("Expected network error (from closed connection for dropped request)", NetworkClientErrorCode.NetworkError, responseInfo.getError()); - Assert.assertNull("Should not receive a response", responseInfo.getResponse()); + Assert.assertNull("Should not receive a response", responseInfo.content()); responseInfo.release(); } } @@ -730,67 +710,16 @@ public String toString() { /** * A mock implementation of {@link BoundedNettyByteBufReceive} that constructs a buffer with the passed in correlation - * id and returns that buffer as part of {@link #getAndRelease()}. + * id. */ class MockBoundedNettyByteBufReceive extends BoundedNettyByteBufReceive { - private ByteBuf buf; /** * Construct a MockBoundedByteBufferReceive with the given correlation id. * @param correlationId the correlation id associated with this object. */ public MockBoundedNettyByteBufReceive(int correlationId) { - buf = ByteBufAllocator.DEFAULT.heapBuffer(16); - buf.writeInt(correlationId); - } - - /** - * Return the buffer associated with this object. - * @return the buffer associated with this object. - */ - @Override - public ByteBuf getAndRelease() { - if (buf == null) { - return null; - } else { - try { - return buf.retainedDuplicate(); - } finally { - buf.release(); - buf = null; - } - } - } -} - -/** - * A mock implementation of {@link BoundedByteBufferReceive} that constructs a buffer with the passed in correlation - * id and returns that buffer as part of {@link #getAndRelease()}. - */ -class MockBoundedByteBufferReceive extends BoundedByteBufferReceive { - private ByteBuffer buf; - - /** - * Construct a MockBoundedByteBufferReceive with the given correlation id. - * @param correlationId the correlation id associated with this object. - */ - public MockBoundedByteBufferReceive(int correlationId) { - buf = ByteBuffer.allocate(16); - buf.putInt(correlationId); - buf.rewind(); - } - - /** - * Return the buffer associated with this object. - * @return the buffer associated with this object. - */ - @Override - public ByteBuffer getAndRelease() { - try { - return buf; - } finally { - buf = null; - } + super(ByteBufAllocator.DEFAULT.heapBuffer(16).writeInt(correlationId), (long) 16); } } @@ -849,7 +778,6 @@ class MockSelector extends Selector { private boolean wakeUpCalled = false; private int connectCallCount = 0; private boolean isOpen = true; - private boolean usingNettyByteBuf; /** * Create a MockSelector @@ -858,7 +786,6 @@ class MockSelector extends Selector { MockSelector(NetworkConfig networkConfig) throws IOException { super(new NetworkMetrics(new MetricRegistry()), new MockTime(), null, networkConfig); super.close(); - this.usingNettyByteBuf = networkConfig.networkUseNettyByteBuf; } /** @@ -938,10 +865,8 @@ public void poll(long timeoutMs, List sends) throws IOException { if (state == MockSelectorState.DisconnectOnSend) { disconnected.add(send.getConnectionId()); } else if (!closedConnections.contains(send.getConnectionId())) { - BoundedReceive boundedReceive = - usingNettyByteBuf ? new MockBoundedNettyByteBufReceive(mockSend.getCorrelationId()) - : new MockBoundedByteBufferReceive(mockSend.getCorrelationId()); - receives.add(new NetworkReceive(send.getConnectionId(), boundedReceive, new MockTime())); + receives.add(new NetworkReceive(send.getConnectionId(), + new MockBoundedNettyByteBufReceive(mockSend.getCorrelationId()), new MockTime())); } } } @@ -1030,7 +955,7 @@ public void close(String conn) { receives.removeIf((receive) -> { boolean r = conn.equals(receive.getConnectionId()); if (r) { - ReferenceCountUtil.release(receive.getReceivedBytes().getAndRelease()); + receive.getReceivedBytes().release(); } return r; }); diff --git a/ambry-network/src/test/java/com.github.ambry.network/SocketRequestResponseChannelTest.java b/ambry-network/src/test/java/com.github.ambry.network/SocketRequestResponseChannelTest.java index 1748f476f3..3bab1b37df 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SocketRequestResponseChannelTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SocketRequestResponseChannelTest.java @@ -13,17 +13,31 @@ */ package com.github.ambry.network; -import com.github.ambry.utils.ByteBufferInputStream; +import com.github.ambry.utils.NettyByteBufLeakHelper; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.Random; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class SocketRequestResponseChannelTest { + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } class ResponseListenerMock implements ResponseListener { public int call = 0; @@ -60,16 +74,18 @@ public void testSocketRequestResponseChannelTest() { SocketRequestResponseChannel channel = new SocketRequestResponseChannel(2, 10); Integer key = new Integer(5); String connectionId = "test_connectionId"; - ByteBuffer buffer = ByteBuffer.allocate(1000); - new Random().nextBytes(buffer.array()); - SocketServerRequest request = new SocketServerRequest(0, connectionId, buffer, new ByteBufferInputStream(buffer)); + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1000); + byte[] content = new byte[1000]; + new Random().nextBytes(content); + buffer.writeBytes(content); + SocketServerRequest request = new SocketServerRequest(0, connectionId, buffer); channel.sendRequest(request); request = (SocketServerRequest) channel.receiveRequest(); Assert.assertEquals(request.getProcessor(), 0); Assert.assertEquals(request.getConnectionId(), connectionId); InputStream stream = request.getInputStream(); for (int i = 0; i < 1000; i++) { - Assert.assertEquals((byte) stream.read(), buffer.array()[i]); + Assert.assertEquals((byte) stream.read(), content[i]); } request.release(); diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java b/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java index 105936ebbf..e7dcf96020 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java @@ -30,7 +30,6 @@ import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.store.TransformationOutput; import com.github.ambry.store.Transformer; -import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Pair; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; @@ -157,7 +156,7 @@ private Message newMessage(InputStream inputStream, StoreKey newKey, MessageInfo BlobProperties oldProperties = deserializeBlobProperties(inputStream); ByteBuffer userMetaData = deserializeUserMetadata(inputStream); BlobData blobData = deserializeBlob(inputStream); - ByteBuf blobDataBytes = blobData.getAndRelease(); + ByteBuf blobDataBytes = blobData.content(); long blobPropertiesSize = oldProperties.getBlobSize(); @@ -224,6 +223,7 @@ private Message newMessage(InputStream inputStream, StoreKey newKey, MessageInfo } blobPropertiesSize = compositeBlobInfo.getTotalSize(); metadataContent.flip(); + blobDataBytes.release(); blobDataBytes = Unpooled.wrappedBuffer(metadataContent); blobData = new BlobData(blobData.getBlobType(), metadataContent.remaining(), blobDataBytes); } @@ -235,7 +235,7 @@ private Message newMessage(InputStream inputStream, StoreKey newKey, MessageInfo oldProperties.isEncrypted(), null); // BlobIDTransformer only exists on ambry-server and we don't enable netty on ambry server yet. So And blobData.getAndRelease - // will return an Unpooled ByteBuf, it's not not to release it. + // will return an Unpooled ByteBuf, it's not required to release it. // @todo, when enabling netty in ambry-server, release this ByteBuf. PutMessageFormatInputStream putMessageFormatInputStream = new PutMessageFormatInputStream(newKey, blobEncryptionKey, newProperties, userMetaData, diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java b/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java index 843e1f7fc1..c6c733bf66 100644 --- a/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java @@ -25,7 +25,6 @@ import com.github.ambry.commons.NettyInternalMetrics; import com.github.ambry.commons.SSLFactory; import com.github.ambry.config.NettyConfig; -import com.github.ambry.config.NetworkConfig; import com.github.ambry.config.RestServerConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.notification.NotificationSystem; @@ -208,12 +207,7 @@ public RestServer(VerifiableProperties verifiableProperties, ClusterMap clusterM || nioServer == null) { throw new InstantiationException("Some of the server components were null"); } - NetworkConfig networkConfig = new NetworkConfig(verifiableProperties); - if (networkConfig.networkUseNettyByteBuf) { - nettyInternalMetrics = new NettyInternalMetrics(metricRegistry, new NettyConfig(verifiableProperties)); - } else { - nettyInternalMetrics = null; - } + nettyInternalMetrics = new NettyInternalMetrics(metricRegistry, new NettyConfig(verifiableProperties)); logger.trace("Instantiated RestServer"); } diff --git a/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java b/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java index e9a1b633cb..12949df605 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java @@ -142,7 +142,7 @@ void handleResponse(ResponseInfo responseInfo) { long startTime = time.milliseconds(); DeleteResponse deleteResponse = RouterUtils.extractResponseAndNotifyResponseHandler(responseHandler, routerMetrics, responseInfo, - DeleteResponse::readFrom, DeleteResponse::getError, false); + DeleteResponse::readFrom, DeleteResponse::getError); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); int correlationId = ((DeleteRequest) routerRequestInfo.getRequest()).getCorrelationId(); DeleteOperation deleteOperation = correlationIdToDeleteOperation.remove(correlationId); diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java index 1dd29b3f08..143966805d 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java @@ -143,7 +143,6 @@ class GetBlobOperation extends GetOperation { firstChunk = new FirstGetChunk(); } - /** * Release all the {@link ByteBuf} in the map. Use {@link ConcurrentHashMap#remove(Object)} method to avoid * conflict with the release call in the chunk async callback. @@ -764,7 +763,7 @@ void handleBody(InputStream payload, MessageMetadata messageMetadata, MessageInf if (!successfullyDeserialized) { BlobData blobData = MessageFormatRecord.deserializeBlob(payload); ByteBuffer encryptionKey = messageMetadata == null ? null : messageMetadata.getEncryptionKey(); - ByteBuf chunkBuf = blobData.getAndRelease(); + ByteBuf chunkBuf = blobData.content(); try { boolean launchedJob = maybeLaunchCryptoJob(chunkBuf, null, encryptionKey, chunkBlobId); @@ -1220,7 +1219,7 @@ void handleBody(InputStream payload, MessageMetadata messageMetadata, MessageInf if (rawMode) { if (blobData != null) { // RawMode, release blob data. - blobData.getAndRelease().release(); + blobData.release(); } // Return the raw bytes from storage if (encryptionKey != null) { @@ -1291,7 +1290,7 @@ RouterErrorCode processServerError(ServerErrorCode errorCode) { */ private void handleMetadataBlob(BlobData blobData, byte[] userMetadata, ByteBuffer encryptionKey) throws IOException, MessageFormatException { - ByteBuf serializedMetadataContent = blobData.getAndRelease(); + ByteBuf serializedMetadataContent = blobData.content(); try { compositeBlobInfo = MetadataContentSerDe.deserializeMetadataContentRecord(serializedMetadataContent.nioBuffer(), blobIdFactory); @@ -1376,8 +1375,8 @@ private void initializeDataChunks() { * @param encryptionKey encryption key for the blob. Could be null for non encrypted blob. */ private void handleSimpleBlob(BlobData blobData, byte[] userMetadata, ByteBuffer encryptionKey) { - ByteBuf chunkBuf = blobData.getAndRelease(); try { + ByteBuf chunkBuf = blobData.content(); boolean rangeResolutionFailure = false; if (encryptionKey == null) { totalSize = blobData.getSize(); @@ -1401,7 +1400,7 @@ private void handleSimpleBlob(BlobData blobData, byte[] userMetadata, ByteBuffer } } } finally { - chunkBuf.release(); + blobData.release(); } } diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetManager.java b/ambry-router/src/main/java/com.github.ambry.router/GetManager.java index 24fd3d2737..e901652054 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetManager.java @@ -217,7 +217,7 @@ void handleResponse(ResponseInfo responseInfo) { serverError = response.getPartitionResponseInfoList().get(0).getErrorCode(); } return serverError; - }, routerConfig.routerGetBlobOperationShareMemory); + }); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); GetRequest getRequest = (GetRequest) routerRequestInfo.getRequest(); GetOperation getOperation = correlationIdToGetOperation.remove(getRequest.getCorrelationId()); diff --git a/ambry-router/src/main/java/com.github.ambry.router/PutManager.java b/ambry-router/src/main/java/com.github.ambry.router/PutManager.java index 7d2084523a..19cd3a65ff 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/PutManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/PutManager.java @@ -220,7 +220,7 @@ void handleResponse(ResponseInfo responseInfo) { long startTime = time.milliseconds(); PutResponse putResponse = RouterUtils.extractResponseAndNotifyResponseHandler(responseHandler, routerMetrics, responseInfo, - PutResponse::readFrom, PutResponse::getError, false); + PutResponse::readFrom, PutResponse::getError); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); int correlationId = routerRequestInfo.getRequest().getCorrelationId(); // Get the PutOperation that generated the request. diff --git a/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java b/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java index fefcddfb15..cc5dcf730c 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java +++ b/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java @@ -26,6 +26,7 @@ import com.github.ambry.network.ResponseInfo; import com.github.ambry.protocol.Response; import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.utils.NettyByteBufDataInputStream; import com.github.ambry.utils.Pair; import com.github.ambry.utils.Utils; import java.io.DataInputStream; @@ -185,13 +186,13 @@ static void replaceOperationException(AtomicReference operationExcept */ static R extractResponseAndNotifyResponseHandler(ResponseHandler responseHandler, NonBlockingRouterMetrics routerMetrics, ResponseInfo responseInfo, Deserializer deserializer, - Function errorExtractor, boolean shareMemory) { + Function errorExtractor) { R response = null; ReplicaId replicaId = responseInfo.getRequestInfo().getReplicaId(); NetworkClientErrorCode networkClientErrorCode = responseInfo.getError(); if (networkClientErrorCode == null) { try { - DataInputStream dis = Utils.createDataInputStreamFromBuffer(responseInfo.getResponse(), shareMemory); + DataInputStream dis = new NettyByteBufDataInputStream(responseInfo.content()); response = deserializer.readFrom(dis); responseHandler.onEvent(replicaId, errorExtractor.apply(response)); } catch (Exception e) { diff --git a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java index 11090e8c24..33fd6a00a7 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java @@ -155,7 +155,7 @@ void handleResponse(ResponseInfo responseInfo) { long startTime = time.milliseconds(); TtlUpdateResponse ttlUpdateResponse = RouterUtils.extractResponseAndNotifyResponseHandler(responseHandler, routerMetrics, responseInfo, - TtlUpdateResponse::readFrom, TtlUpdateResponse::getError, false); + TtlUpdateResponse::readFrom, TtlUpdateResponse::getError); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); int correlationId = ((TtlUpdateRequest) routerRequestInfo.getRequest()).getCorrelationId(); TtlUpdateOperation ttlUpdateOperation = correlationIdToTtlUpdateOperation.remove(correlationId); diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java index 292772182b..dcaa1ab8fd 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java @@ -34,6 +34,8 @@ import com.github.ambry.protocol.GetResponse; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.utils.MockTime; +import com.github.ambry.utils.NettyByteBufDataInputStream; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; import java.io.IOException; @@ -53,6 +55,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -104,6 +107,7 @@ public class GetBlobInfoOperationTest { private ReplicaId localReplica; private ReplicaId remoteReplica; private String localDcName; + private NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); /** * Running for both {@link SimpleOperationTracker} and {@link AdaptiveOperationTracker}, with and without encryption @@ -168,6 +172,11 @@ networkClientFactory, new LoggingNotificationSystem(), mockClusterMap, kms, cryp } } + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + @After public void after() { if (networkClient != null) { @@ -177,6 +186,7 @@ public void after() { if (cryptoJobHandler != null) { cryptoJobHandler.close(); } + nettyByteBufLeakHelper.afterTest(); } /** @@ -246,13 +256,13 @@ public void testPollAndResponseHandling() throws Exception { List responses = sendAndWaitForResponses(requestListToFill); for (ResponseInfo responseInfo : responses) { GetResponse getResponse = responseInfo.getError() == null ? GetResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse()), mockClusterMap) : null; + new NettyByteBufDataInputStream(responseInfo.content()), mockClusterMap) : null; op.handleResponse(responseInfo, getResponse); - responseInfo.release(); if (op.isOperationComplete()) { break; } } + responses.forEach(ResponseInfo::release); if (testEncryption) { Assert.assertTrue("Latch should have been zeroed ", onPollLatch.await(500, TimeUnit.MILLISECONDS)); op.poll(requestRegistrationCallback); @@ -714,13 +724,13 @@ private void completeOp(GetBlobInfoOperation op) throws IOException { List responses = sendAndWaitForResponses(requestRegistrationCallback.getRequestsToSend()); for (ResponseInfo responseInfo : responses) { GetResponse getResponse = responseInfo.getError() == null ? GetResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse()), mockClusterMap) : null; + new NettyByteBufDataInputStream(responseInfo.content()), mockClusterMap) : null; op.handleResponse(responseInfo, getResponse); - responseInfo.release(); if (op.isOperationComplete()) { break; } } + responses.forEach(ResponseInfo::release); } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java index 54be311bd9..4c71ba6755 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java @@ -24,6 +24,7 @@ import com.github.ambry.commons.ByteBufferAsyncWritableChannel; import com.github.ambry.commons.ByteBufferReadableStreamChannel; import com.github.ambry.commons.LoggingNotificationSystem; +import com.github.ambry.utils.NettyByteBufDataInputStream; import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.commons.ResponseHandler; import com.github.ambry.config.CryptoServiceConfig; @@ -120,7 +121,6 @@ public class GetBlobOperationTest { private final RouterCallback routerCallback; private final String operationTrackerType; private final boolean testEncryption; - private final boolean networkUseNetty; private MockKeyManagementService kms = null; private MockCryptoService cryptoService = null; private CryptoJobHandler cryptoJobHandler = null; @@ -181,10 +181,9 @@ public void after() { */ @Parameterized.Parameters public static List data() { - return Arrays.asList(new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false, false}, - {SimpleOperationTracker.class.getSimpleName(), false, true}, - {AdaptiveOperationTracker.class.getSimpleName(), false, false}, - {AdaptiveOperationTracker.class.getSimpleName(), true, false}}); + return Arrays.asList(new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false}, + {AdaptiveOperationTracker.class.getSimpleName(), false}, + {AdaptiveOperationTracker.class.getSimpleName(), true}}); } /** @@ -193,11 +192,9 @@ public static List data() { * @param operationTrackerType the type of {@link OperationTracker} to use. * @param testEncryption {@code true} if blobs need to be tested w/ encryption. {@code false} otherwise */ - public GetBlobOperationTest(String operationTrackerType, boolean testEncryption, boolean networkUseNetty) - throws Exception { + public GetBlobOperationTest(String operationTrackerType, boolean testEncryption) throws Exception { this.operationTrackerType = operationTrackerType; this.testEncryption = testEncryption; - this.networkUseNetty = networkUseNetty; // Defaults. Tests may override these and do new puts as appropriate. maxChunkSize = random.nextInt(1024 * 1024) + 1; // a blob size that is greater than the maxChunkSize and is not a multiple of it. Will result in a composite blob. @@ -287,7 +284,8 @@ private void doDirectPut(BlobType blobType, ByteBuffer blobContent) throws Excep new PutRequest(random.nextInt(), "clientId", blobId, blobProperties, userMetadataBuf.duplicate(), blobContent.duplicate(), blobContent.remaining(), blobType, blobEncryptionKey == null ? null : blobEncryptionKey.duplicate()); - server.send(request); + // Make sure we release the BoundedNettyByteBufReceive. + server.send(request).release(); } } @@ -407,7 +405,7 @@ public void testCompositeBlobRawMode() throws Exception { // extract chunk ids BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(new ByteArrayInputStream(payload.array()), blobIdFactory); - ByteBuf metadataBuffer = blobAll.getBlobData().getAndRelease(); + ByteBuf metadataBuffer = blobAll.getBlobData().content(); try { CompositeBlobInfo compositeBlobInfo = MetadataContentSerDe.deserializeMetadataContentRecord(metadataBuffer.nioBuffer(), blobIdFactory); @@ -574,7 +572,7 @@ public void testRequestTimeoutAndBlobNotFoundLocalTimeout() throws Exception { List responses = sendAndWaitForResponses(requestRegistrationCallback.getRequestsToSend()); for (ResponseInfo responseInfo : responses) { GetResponse getResponse = responseInfo.getError() == null ? GetResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse()), mockClusterMap) : null; + new NettyByteBufDataInputStream(responseInfo.content()), mockClusterMap) : null; op.handleResponse(responseInfo, getResponse); responseInfo.release(); } @@ -629,7 +627,7 @@ public void testTimeoutAndBlobNotFoundInOriginDc() throws Exception { List responses = sendAndWaitForResponses(requestRegistrationCallback.getRequestsToSend()); for (ResponseInfo responseInfo : responses) { GetResponse getResponse = responseInfo.getError() == null ? GetResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse()), mockClusterMap) : null; + new NettyByteBufDataInputStream(responseInfo.content()), mockClusterMap) : null; op.handleResponse(responseInfo, getResponse); responseInfo.release(); } @@ -1434,8 +1432,7 @@ private GetBlobOperation createOperationAndComplete(Callback responses = sendAndWaitForResponses(requestRegistrationCallback.getRequestsToSend()); for (ResponseInfo responseInfo : responses) { - DataInputStream dis = Utils.createDataInputStreamFromBuffer(responseInfo.getResponse(), - routerConfig.routerGetBlobOperationShareMemory); + DataInputStream dis = new NettyByteBufDataInputStream(responseInfo.content()); GetResponse getResponse = responseInfo.getError() == null ? GetResponse.readFrom(dis, mockClusterMap) : null; op.handleResponse(responseInfo, getResponse); responseInfo.release(); @@ -1623,7 +1620,6 @@ private Properties getDefaultNonBlockingRouterProperties(boolean excludeTimeout) properties.setProperty("router.operation.tracker.exclude.timeout.enabled", Boolean.toString(excludeTimeout)); properties.setProperty("router.operation.tracker.terminate.on.not.found.enabled", "true"); properties.setProperty("router.get.blob.operation.share.memory", "true"); - properties.setProperty("network.use.netty.byte.buf", Boolean.toString(networkUseNetty)); return properties; } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java b/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java index 2e04319ce6..30a07dd6d7 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java +++ b/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java @@ -14,29 +14,19 @@ package com.github.ambry.router; import com.codahale.metrics.MetricRegistry; -import com.github.ambry.commons.ByteBufferReadableStreamChannel; import com.github.ambry.config.NetworkConfig; -import com.github.ambry.config.VerifiableProperties; -import com.github.ambry.network.BoundedByteBufferReceive; import com.github.ambry.network.BoundedNettyByteBufReceive; import com.github.ambry.network.NetworkMetrics; import com.github.ambry.network.NetworkReceive; import com.github.ambry.network.NetworkSend; import com.github.ambry.network.PortType; import com.github.ambry.network.Selector; -import com.github.ambry.utils.ByteBufferChannel; -import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Time; import java.io.IOException; -import java.io.SequenceInputStream; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; @@ -129,24 +119,9 @@ public void poll(long timeoutMs, List sends) throws IOException { disconnected.add(send.getConnectionId()); } else { MockServer server = connIdToServer.get(send.getConnectionId()); - BoundedByteBufferReceive receive = server.send(send.getPayload()); + BoundedNettyByteBufReceive receive = server.send(send.getPayload()); if (receive != null) { - if (!config.networkUseNettyByteBuf) { - receives.add(new NetworkReceive(send.getConnectionId(), receive, time)); - } else { - // Convert a BoundedByteBufferReceive to BoundedNettyByteBufReceive - BoundedNettyByteBufReceive boundedNettyByteBufReceive = new BoundedNettyByteBufReceive(); - ByteBuffer buffer = (ByteBuffer) receive.getAndRelease(); - ByteBuffer sizeBuffer = ByteBuffer.allocate(Long.BYTES); - sizeBuffer.putLong(buffer.remaining() + Long.BYTES); - sizeBuffer.flip(); - ReadableByteChannel channel = Channels.newChannel( - new SequenceInputStream(new ByteBufferInputStream(sizeBuffer), new ByteBufferInputStream(buffer))); - while (!boundedNettyByteBufReceive.isReadComplete()) { - boundedNettyByteBufReceive.readFrom(channel); - } - receives.add(new NetworkReceive(send.getConnectionId(), boundedNettyByteBufReceive, time)); - } + receives.add(new NetworkReceive(send.getConnectionId(), receive, time)); } } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/MockServer.java b/ambry-router/src/test/java/com.github.ambry.router/MockServer.java index 42cc9f4906..047618f8b7 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/MockServer.java +++ b/ambry-router/src/test/java/com.github.ambry.router/MockServer.java @@ -16,7 +16,7 @@ import com.github.ambry.account.Account; import com.github.ambry.account.Container; import com.github.ambry.clustermap.ClusterMap; -import com.github.ambry.network.BoundedByteBufferReceive; +import com.github.ambry.network.BoundedNettyByteBufReceive; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.messageformat.BlobProperties; import com.github.ambry.messageformat.BlobType; @@ -82,12 +82,12 @@ class MockServer { /** * Take in a request in the form of {@link Send} and return a response in the form of a - * {@link BoundedByteBufferReceive}. + * {@link BoundedNettyByteBufReceive}. * @param send the request. * @return the response. * @throws IOException if there was an error in interpreting the request. */ - public BoundedByteBufferReceive send(Send send) throws IOException { + public BoundedNettyByteBufReceive send(Send send) throws IOException { if (!shouldRespond) { return null; } @@ -116,9 +116,9 @@ public BoundedByteBufferReceive send(Send send) throws IOException { response.writeTo(channel); ByteBuffer payload = channel.getBuffer(); payload.flip(); - BoundedByteBufferReceive boundedByteBufferReceive = new BoundedByteBufferReceive(); - boundedByteBufferReceive.readFrom(Channels.newChannel(new ByteBufferInputStream(payload))); - return boundedByteBufferReceive; + BoundedNettyByteBufReceive receive = new BoundedNettyByteBufReceive(); + receive.readFrom(Channels.newChannel(new ByteBufferInputStream(payload))); + return receive; } /** diff --git a/ambry-router/src/test/java/com.github.ambry.router/NonBlockingRouterTest.java b/ambry-router/src/test/java/com.github.ambry.router/NonBlockingRouterTest.java index ff9f5bc34d..39a84b215e 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/NonBlockingRouterTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/NonBlockingRouterTest.java @@ -39,11 +39,12 @@ import com.github.ambry.protocol.RequestOrResponseType; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.utils.MockTime; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -68,6 +69,7 @@ import java.util.stream.LongStream; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -120,6 +122,7 @@ public class NonBlockingRouterTest { byte[] putUserMetadata; byte[] putContent; ReadableStreamChannel putChannel; + private NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); /** * Running for both regular and encrypted blobs, and versions 2 and 3 of MetadataContent @@ -152,9 +155,15 @@ public NonBlockingRouterTest(boolean testEncryption, int metadataContentVersion) accountService = new InMemAccountService(false, true); } + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + @After public void after() { Assert.assertEquals("Current operations count should be 0", 0, NonBlockingRouter.currentOperationsCount.get()); + nettyByteBufLeakHelper.afterTest(); } /** @@ -1083,8 +1092,8 @@ private void testNoResponseNoNotification(OperationHelper opHelper, List()), null, null // make 1st request of first chunk encounter Temporarily_Disabled mockServer.setServerErrorForAllRequests(ServerErrorCode.Temporarily_Disabled); ResponseInfo responseInfo = getResponseInfo(requestInfos.get(0)); - PutResponse putResponse = responseInfo.getError() == null ? PutResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse())) : null; + PutResponse putResponse = + responseInfo.getError() == null ? PutResponse.readFrom(new NettyByteBufDataInputStream(responseInfo.content())) + : null; op.handleResponse(responseInfo, putResponse); + responseInfo.release(); PutOperation.PutChunk putChunk = op.getPutChunks().get(0); SimpleOperationTracker operationTracker = (SimpleOperationTracker) putChunk.getOperationTrackerInUse(); Assert.assertEquals("Disabled count should be 1", 1, operationTracker.getDisabledCount()); @@ -221,9 +240,11 @@ null, new RouterCallback(new MockNetworkClient(), new ArrayList<>()), null, null // make 2nd request of first chunk encounter Replica_Unavailable mockServer.setServerErrorForAllRequests(ServerErrorCode.Replica_Unavailable); responseInfo = getResponseInfo(requestInfos.get(1)); - putResponse = responseInfo.getError() == null ? PutResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse())) : null; + putResponse = + responseInfo.getError() == null ? PutResponse.readFrom(new NettyByteBufDataInputStream(responseInfo.content())) + : null; op.handleResponse(responseInfo, putResponse); + responseInfo.release(); putChunk = op.getPutChunks().get(0); Assert.assertEquals("Failure count should be 1", 1, ((SimpleOperationTracker) putChunk.getOperationTrackerInUse()).getFailedCount()); @@ -327,7 +348,7 @@ private void resetCorrelationId(byte[] request) { */ private ResponseInfo getResponseInfo(RequestInfo requestInfo) throws IOException { NetworkReceive networkReceive = new NetworkReceive(null, mockServer.send(requestInfo.getRequest()), time); - return new ResponseInfo(requestInfo, null, networkReceive.getReceivedBytes().getAndRelease()); + return new ResponseInfo(requestInfo, null, networkReceive.getReceivedBytes().content()); } } diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java index dc0d8023f1..395f7d638b 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java @@ -439,7 +439,7 @@ void getAndVerify(ConnectedChannel channel, int blobsCount) throws Exception { BlobData blobData = MessageFormatRecord.deserializeBlob(resp.getInputStream()); Assert.assertEquals(properties.get(i).getBlobSize(), blobData.getSize()); byte[] dataOutput = new byte[(int) blobData.getSize()]; - ByteBuf buffer = blobData.getAndRelease(); + ByteBuf buffer = blobData.content(); try { buffer.readBytes(dataOutput); } finally { diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java index 6596899509..0d016da705 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java @@ -130,7 +130,7 @@ final class ServerTestUtil { static byte[] getBlobDataAndRelease(BlobData blobData) { byte[] actualBlobData = new byte[(int) blobData.getSize()]; - ByteBuf buffer = blobData.getAndRelease(); + ByteBuf buffer = blobData.content(); try { buffer.readBytes(actualBlobData); } finally { diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java b/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java index 5607f9916b..f468821817 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java @@ -199,7 +199,7 @@ public void run() { try { BlobData blobData = MessageFormatRecord.deserializeBlob(resp.getInputStream()); byte[] blobout = new byte[(int) blobData.getSize()]; - ByteBuf buffer = blobData.getAndRelease(); + ByteBuf buffer = blobData.content(); try { buffer.readBytes(blobout); } finally { @@ -231,7 +231,7 @@ public void run() { BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(resp.getInputStream(), new BlobIdFactory(clusterMap)); byte[] blobout = new byte[(int) blobAll.getBlobData().getSize()]; - ByteBuf buffer = blobAll.getBlobData().getAndRelease(); + ByteBuf buffer = blobAll.getBlobData().content(); try { buffer.readBytes(blobout); } finally { diff --git a/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java b/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java index bd4ecbb39c..a9b1b1c241 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java +++ b/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java @@ -135,17 +135,15 @@ public class AmbryServerRequestsTest { private final ReplicationConfig replicationConfig; private final ServerConfig serverConfig; private final ReplicaStatusDelegate mockDelegate = Mockito.mock(ReplicaStatusDelegate.class); - private final boolean putRequestShareMemory; private final boolean validateRequestOnStoreState; @Parameterized.Parameters public static List data() { - return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false, true}, {true, true}}); + return Arrays.asList(new Object[][]{{false}, {true}}); } - public AmbryServerRequestsTest(boolean putRequestShareMemory, boolean validateRequestOnStoreState) + public AmbryServerRequestsTest(boolean validateRequestOnStoreState) throws IOException, ReplicationException, StoreException, InterruptedException, ReflectiveOperationException { - this.putRequestShareMemory = putRequestShareMemory; this.validateRequestOnStoreState = validateRequestOnStoreState; clusterMap = new MockClusterMap(); Properties properties = new Properties(); @@ -790,7 +788,7 @@ public void ttlUpdateTest() throws InterruptedException, IOException, MessageFor */ private Response sendRequestGetResponse(RequestOrResponse request, ServerErrorCode expectedServerErrorCode) throws InterruptedException, IOException { - NetworkRequest mockRequest = MockRequest.fromRequest(request, this.putRequestShareMemory); + NetworkRequest mockRequest = MockRequest.fromRequest(request); ambryRequests.handleRequests(mockRequest); assertEquals("Request accompanying response does not match original request", mockRequest, requestResponseChannel.lastOriginalRequest); @@ -1315,13 +1313,13 @@ private static class MockRequest implements NetworkRequest { * @return an instance of {@link MockRequest} that represents {@code request}. * @throws IOException */ - static MockRequest fromRequest(RequestOrResponse request, boolean shareMemory) throws IOException { + static MockRequest fromRequest(RequestOrResponse request) throws IOException { ByteBuffer buffer = ByteBuffer.allocate((int) request.sizeInBytes()); request.writeTo(new ByteBufferChannel(buffer)); buffer.flip(); // read length (to bring it to a state where AmbryRequests can handle it). buffer.getLong(); - return new MockRequest(shareMemory ? new ByteBufferDataInputStream(buffer) : new ByteBufferInputStream(buffer)); + return new MockRequest(new ByteBufferDataInputStream(buffer)); } /** diff --git a/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java b/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java index 2b0028d0e5..e595e0dedb 100644 --- a/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java +++ b/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java @@ -29,7 +29,6 @@ import com.github.ambry.utils.CrcInputStream; import com.github.ambry.utils.Utils; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; @@ -458,7 +457,7 @@ private void verify(String dataDir) throws Exception { } if (isDeleted) { - ByteBuf byteBuf = output.getAndRelease(); + ByteBuf byteBuf = output.content(); try { if (!verifyZeroed(metadata.array()) || !verifyZeroed( Utils.readBytesFromByteBuf(byteBuf, new byte[(int) output.getSize()], 0, @@ -478,7 +477,6 @@ private void verify(String dataDir) throws Exception { } } finally { byteBuf.release(); - byteBuf = null; } } else { unDeletedPuts++; @@ -735,7 +733,7 @@ boolean deserializeUserMetadataAndBlob(InputStream streamlog, InputStream oldStr if (!caughtException) { if (isDeleted) { - ByteBuf byteBuf = blobData.getAndRelease(); + ByteBuf byteBuf = blobData.content(); try { asExpected = verifyZeroed(usermetadata.array()) && verifyZeroed( Utils.readBytesFromByteBuf(byteBuf, new byte[(int) blobData.getSize()], 0, (int) blobData.getSize())); @@ -743,11 +741,10 @@ boolean deserializeUserMetadataAndBlob(InputStream streamlog, InputStream oldStr asExpected = false; } finally { byteBuf.release(); - byteBuf = null; } } else { - ByteBuf byteBuf = blobData.getAndRelease(); - ByteBuf oldByteBuf = oldBlobData.getAndRelease(); + ByteBuf byteBuf = blobData.content(); + ByteBuf oldByteBuf = oldBlobData.content(); try { asExpected = Arrays.equals(usermetadata.array(), oldUsermetadata.array()) && Arrays.equals( Utils.readBytesFromByteBuf(byteBuf, new byte[(int) blobData.getSize()], 0, (int) blobData.getSize()), @@ -758,8 +755,6 @@ boolean deserializeUserMetadataAndBlob(InputStream streamlog, InputStream oldStr } finally { byteBuf.release(); oldByteBuf.release(); - byteBuf = null; - oldByteBuf = null; } } return asExpected; diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java b/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java index 712664ac2f..c8af61a3a7 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java @@ -416,7 +416,7 @@ private ServerResponse getRecordFromNode(DataNodeId dataNodeId, BlobId blobId, G ServerErrorCode errorCode = response.getFirst(); if (errorCode == ServerErrorCode.No_Error) { BlobAll blobAll = response.getSecond(); - ByteBuf buffer = blobAll.getBlobData().getAndRelease(); + ByteBuf buffer = blobAll.getBlobData().content(); byte[] blobBytes = new byte[buffer.readableBytes()]; buffer.readBytes(blobBytes); buffer.release(); diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java b/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java index bee47864da..470a6a8b5d 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java @@ -55,6 +55,7 @@ import com.github.ambry.server.ServerErrorCode; import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.tools.util.ToolUtils; +import com.github.ambry.utils.NettyByteBufDataInputStream; import com.github.ambry.utils.Pair; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; @@ -316,7 +317,7 @@ public static void main(String[] args) throws Exception { serverAdminTool.getBlob(dataNodeId, blobId, config.getOption, clusterMap); if (bResponse.getFirst() == ServerErrorCode.No_Error) { LOGGER.info("Blob type of {} from {} is {}", blobId, dataNodeId, bResponse.getSecond().getBlobType()); - ByteBuf buffer = bResponse.getSecond().getAndRelease(); + ByteBuf buffer = bResponse.getSecond().content(); try { writeByteBufToFile(buffer, outputFileStream); } finally { @@ -656,7 +657,7 @@ public ServerErrorCode triggerCompaction(DataNodeId dataNodeId, PartitionId part new AdminRequest(AdminRequestOrResponseType.TriggerCompaction, partitionId, correlationId.incrementAndGet(), CLIENT_ID); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, adminRequest); - AdminResponse adminResponse = AdminResponse.readFrom(Utils.createDataInputStreamFromBuffer(response.getResponse())); + AdminResponse adminResponse = AdminResponse.readFrom(new NettyByteBufDataInputStream(response.content())); response.release(); return adminResponse.getError(); } @@ -679,7 +680,7 @@ public ServerErrorCode controlRequest(DataNodeId dataNodeId, PartitionId partiti CLIENT_ID); RequestControlAdminRequest controlRequest = new RequestControlAdminRequest(toControl, enable, adminRequest); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, controlRequest); - AdminResponse adminResponse = AdminResponse.readFrom(Utils.createDataInputStreamFromBuffer(response.getResponse())); + AdminResponse adminResponse = AdminResponse.readFrom(new NettyByteBufDataInputStream(response.content())); response.release(); return adminResponse.getError(); } @@ -702,7 +703,7 @@ public ServerErrorCode controlReplication(DataNodeId dataNodeId, PartitionId par CLIENT_ID); ReplicationControlAdminRequest controlRequest = new ReplicationControlAdminRequest(origins, enable, adminRequest); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, controlRequest); - AdminResponse adminResponse = AdminResponse.readFrom(Utils.createDataInputStreamFromBuffer(response.getResponse())); + AdminResponse adminResponse = AdminResponse.readFrom(new NettyByteBufDataInputStream(response.content())); response.release(); return adminResponse.getError(); } @@ -728,7 +729,7 @@ private ServerErrorCode controlBlobStore(DataNodeId dataNodeId, PartitionId part BlobStoreControlAdminRequest controlRequest = new BlobStoreControlAdminRequest(numReplicasCaughtUpPerPartition, storeControlRequestType, adminRequest); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, controlRequest); - AdminResponse adminResponse = AdminResponse.readFrom(Utils.createDataInputStreamFromBuffer(response.getResponse())); + AdminResponse adminResponse = AdminResponse.readFrom(new NettyByteBufDataInputStream(response.content())); response.release(); return adminResponse.getError(); } @@ -756,7 +757,7 @@ public Pair isCaughtUp(DataNodeId dataNodeId, Partitio new CatchupStatusAdminRequest(acceptableLagInBytes, numReplicasCaughtUpPerPartition, adminRequest); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, catchupStatusRequest); CatchupStatusAdminResponse adminResponse = - CatchupStatusAdminResponse.readFrom(Utils.createDataInputStreamFromBuffer(response.getResponse())); + CatchupStatusAdminResponse.readFrom(new NettyByteBufDataInputStream(response.content())); response.release(); return new Pair<>(adminResponse.getError(), adminResponse.getError() == ServerErrorCode.No_Error && adminResponse.isCaughtUp()); @@ -784,7 +785,7 @@ private Pair getGetResponse(DataNodeId dataNodeId, GetRequest getRequest = new GetRequest(correlationId.incrementAndGet(), CLIENT_ID, flags, partitionRequestInfos, getOption); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, getRequest); - InputStream serverResponseStream = Utils.createDataInputStreamFromBuffer(response.getResponse()); + InputStream serverResponseStream = new NettyByteBufDataInputStream(response.content()); response.release(); GetResponse getResponse = GetResponse.readFrom(new DataInputStream(serverResponseStream), clusterMap); ServerErrorCode partitionErrorCode = getResponse.getPartitionResponseInfoList().get(0).getErrorCode(); diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java b/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java index e12bad1f8c..12c5aeeaf3 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java @@ -261,12 +261,11 @@ public void run() { long sizeRead = 0; byte[] outputBuffer = new byte[(int) blobData.getSize()]; ByteBufferOutputStream streamOut = new ByteBufferOutputStream(ByteBuffer.wrap(outputBuffer)); - ByteBuf buffer = blobData.getAndRelease(); + ByteBuf buffer = blobData.content(); try { buffer.readBytes(streamOut, (int) blobData.getSize()); } finally { buffer.release(); - buffer = null; } long latencyPerBlob = SystemTime.getInstance().nanoseconds() - startTimeGetBlob; totalTimeTaken.addAndGet(latencyPerBlob); diff --git a/ambry-utils/src/main/java/com.github.ambry.utils/AbstractByteBufHolder.java b/ambry-utils/src/main/java/com.github.ambry.utils/AbstractByteBufHolder.java new file mode 100644 index 0000000000..615462e59e --- /dev/null +++ b/ambry-utils/src/main/java/com.github.ambry.utils/AbstractByteBufHolder.java @@ -0,0 +1,96 @@ +/** + * Copyright 2020 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; + + +/** + * An abstract class that implements most of the {@link ByteBufHolder} interface so the subclass of this class + * only have to provide necessary implementation of a few methods. + * @param The subclass type. + */ +public abstract class AbstractByteBufHolder implements ByteBufHolder { + + @Override + public abstract ByteBuf content(); + + @Override + public abstract T replace(ByteBuf content); + + @Override + public T copy() { + return replace(content().copy()); + } + + @Override + public T duplicate() { + return replace(content().duplicate()); + } + + @Override + public T retainedDuplicate() { + return replace(content().retainedDuplicate()); + } + + @Override + public int refCnt() { + return content().refCnt(); + } + + @Override + public T retain() { + content().retain(); + return (T) this; + } + + @Override + public T retain(int increment) { + content().retain(increment); + return (T) this; + } + + @Override + public T touch() { + if (content() != null) { + content().touch(); + } + return (T) this; + } + + @Override + public T touch(Object hint) { + if (content() != null) { + content().touch(hint); + } + return (T) this; + } + + @Override + public boolean release() { + if (content() != null) { + return content().release(); + } + return false; + } + + @Override + public boolean release(int decrement) { + if (content() != null) { + content().release(decrement); + } + return false; + } +} diff --git a/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java b/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java index 21fa821ee7..63b42da6bf 100644 --- a/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java +++ b/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java @@ -15,7 +15,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; import java.io.BufferedReader; import java.io.DataInputStream; @@ -219,28 +218,6 @@ public static ByteBuffer readShortBuffer(DataInputStream input) throws IOExcepti return buffer; } - /** - * A helper function to return a {@link ByteBuffer} from given {@link ByteBufferDataInputStream} at the given size. - * The returned {@link ByteBuffer} will share the memory with the underlying {@link ByteBuffer} in {@link ByteBufferDataInputStream}. - * @param stream The {@link ByteBufferDataInputStream} to read {@link ByteBuffer} out. - * @param dataSize The size of {@link ByteBuffer}. - * @return The {@link ByteBuffer} - * @throws IOException Unexpected IO errors. - */ - private static ByteBuffer getByteBufferFromByteBufferDataInputStream(ByteBufferDataInputStream stream, int dataSize) - throws IOException { - ByteBuffer byteBuffer = stream.getBuffer(); - int startIndex = byteBuffer.position(); - int oldLimit = byteBuffer.limit(); - - byteBuffer.limit(startIndex + dataSize); - ByteBuffer dataBuffer = byteBuffer.slice(); - byteBuffer.limit(oldLimit); - // Change the byte buffer's position as if the data is fetched. - byteBuffer.position(startIndex + dataSize); - return dataBuffer; - } - /** * Create a {@link ByteBufferInputStream} from the {@link CrcInputStream} by sharing the underlying memory if the * crcStream is built upon a {@link ByteBufferDataInputStream}. @@ -288,10 +265,7 @@ public static ByteBuffer getByteBufferFromInputStream(InputStream stream, int da public static ByteBuffer readByteBufferFromCrcInputStream(CrcInputStream crcStream, int dataSize) throws IOException { ByteBuffer output; InputStream inputStream = crcStream.getUnderlyingInputStream(); - if (inputStream instanceof ByteBufferDataInputStream) { - output = getByteBufferFromByteBufferDataInputStream((ByteBufferDataInputStream) inputStream, dataSize); - crcStream.updateCrc(output.duplicate()); - } else if (inputStream instanceof NettyByteBufDataInputStream) { + if (inputStream instanceof NettyByteBufDataInputStream) { // getBuffer() doesn't increase the reference count on this ByteBuf. ByteBuf nettyByteBuf = ((NettyByteBufDataInputStream) inputStream).getBuffer(); // construct a java.nio.ByteBuffer to create a ByteBufferInputStream @@ -323,10 +297,6 @@ public static ByteBuf readNettyByteBufFromCrcInputStream(CrcInputStream crcStrea output = nettyByteBuf.retainedSlice(startIndex, dataSize); crcStream.updateCrc(output.nioBuffer()); nettyByteBuf.readerIndex(startIndex + dataSize); - } else if (inputStream instanceof ByteBufferDataInputStream) { - ByteBuffer buffer = getByteBufferFromByteBufferDataInputStream((ByteBufferDataInputStream) inputStream, dataSize); - crcStream.updateCrc(buffer.duplicate()); - output = Unpooled.wrappedBuffer(buffer); } else { ByteBuffer buffer = getByteBufferFromInputStream(crcStream, dataSize); output = Unpooled.wrappedBuffer(buffer); @@ -912,33 +882,6 @@ public static byte[] readBytesFromByteBuf(ByteBuf buffer, byte[] data, int offse return data; } - /** - * Create a {@link DataInputStream} from the given buffer, which has to be either a {@link ByteBuffer} or a {@link ByteBuf}. - * This is equivalent to {@link #createDataInputStreamFromBuffer(Object, boolean)}, where the {@code shareMemory} is false. - * @param buffer The buffer where we are going to create a {@link DataInputStream} from. - * @return {@link DataInputStream}. - */ - public static DataInputStream createDataInputStreamFromBuffer(Object buffer) { - return createDataInputStreamFromBuffer(buffer, false); - } - - /** - * Create a {@link DataInputStream} from the given buffer, which has to be either a {@link ByteBuffer} or a {@link ByteBuf}. - * @param buffer The buffer where we are going to create a {@link DataInputStream} from. - * @param shareMemory If true, the {@link DataInputStream} would share the memory with the given buffer. - * @return {@link DataInputStream}. - */ - public static DataInputStream createDataInputStreamFromBuffer(Object buffer, boolean shareMemory) { - if (shareMemory) { - return buffer instanceof ByteBuf ? new NettyByteBufDataInputStream((ByteBuf) buffer) - : new ByteBufferDataInputStream((ByteBuffer) buffer); - } else { - InputStream src = buffer instanceof ByteBuf ? new ByteBufInputStream((ByteBuf) buffer) - : new ByteBufferInputStream((ByteBuffer) buffer); - return new DataInputStream(src); - } - } - /** * Split the input string "data" using the delimiter and return as list of strings for the slices obtained. * This method will ignore empty segments. That is, a call like {@code splitString(",a1,,b2,c3,", ","}} will return diff --git a/ambry-utils/src/test/java/com.github.ambry.utils/UtilsTest.java b/ambry-utils/src/test/java/com.github.ambry.utils/UtilsTest.java index 64adfb52ad..48f1cd33bc 100644 --- a/ambry-utils/src/test/java/com.github.ambry.utils/UtilsTest.java +++ b/ambry-utils/src/test/java/com.github.ambry.utils/UtilsTest.java @@ -198,50 +198,6 @@ public void testReadBuffers() throws IOException { } } - @Test - public void testGetByteBufferInputStreamFromCrcStreamShareMemory() throws Exception { - int blobSize = 1000; - // The first 8 bytes are the size of blob, the next 1000 bytes are the blob content, the next 8 bytes are the crc - // value, and we do this twice. - int bufferSize = (Long.SIZE / Byte.SIZE + blobSize + Long.SIZE / Byte.SIZE) * 2; - byte[] firstRandomBytes = new byte[blobSize]; - byte[] secondRandomBytes = new byte[blobSize]; - new Random().nextBytes(firstRandomBytes); - new Random().nextBytes(secondRandomBytes); - - ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - ByteBufferOutputStream bbos = new ByteBufferOutputStream(buffer); - - // Fill the buffer - byte[] arrayToFill = firstRandomBytes; - while (arrayToFill != null) { - CrcOutputStream crcStream = new CrcOutputStream(bbos); - DataOutputStream dos = new DataOutputStream(crcStream); - dos.writeLong((long) blobSize); - dos.write(arrayToFill); - buffer.putLong(crcStream.getValue()); - arrayToFill = (arrayToFill == firstRandomBytes) ? secondRandomBytes : null; - } - - buffer.flip(); - byte[] expectedArray = firstRandomBytes; - while (expectedArray != null) { - CrcInputStream cis = new CrcInputStream(new ByteBufferDataInputStream(buffer)); - DataInputStream dis = new DataInputStream(cis); - long dataSize = dis.readLong(); - assertEquals(dataSize, blobSize); - ByteBufferInputStream obtained = Utils.getByteBufferInputStreamFromCrcInputStream(cis, (int) dataSize); - // Make sure these two ByteBuffers actually share the underlying memory. - assertEquals(getByteArrayFromByteBuffer(buffer), getByteArrayFromByteBuffer(obtained.getByteBuffer())); - byte[] obtainedArray = new byte[blobSize]; - obtained.read(obtainedArray); - assertArrayEquals(obtainedArray, expectedArray); - long crcRead = buffer.getLong(); - assertEquals(crcRead, cis.getValue()); - expectedArray = (expectedArray == firstRandomBytes) ? secondRandomBytes : null; - } - } - @Test public void testGetByteBufferInputStreamFromCrcStreamShareMemoryWithNettyByteBuf() throws Exception { int blobSize = 1000;