Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default to use netty bytebuf in network layer and remove getAndRelease method #1375

Merged
merged 7 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class NetworkConfig {
public static final String SELECTOR_EXECUTOR_POOL_SIZE = "selector.executor.pool.size";
public static final String SELECTOR_MAX_KEY_TO_PROCESS = "selector.max.key.to.process";
public static final String SELECTOR_USE_DIRECT_BUFFERS = "selector.use.direct.buffers";
public static final String NETWORK_USE_NETTY_BYTE_BUF = "network.use.netty.byte.buf";
public static final String NETWORK_PUT_REQUEST_SHARE_MEMORY = "network.put.request.share.memory";

/**
* The number of io threads that the server uses for carrying out network requests
Expand Down Expand Up @@ -121,14 +119,6 @@ public class NetworkConfig {
@Default("false")
public final boolean selectorUseDirectBuffers;

@Config(NETWORK_USE_NETTY_BYTE_BUF)
@Default("false")
public final boolean networkUseNettyByteBuf;

@Config(NETWORK_PUT_REQUEST_SHARE_MEMORY)
@Default("false")
public final boolean networkPutRequestShareMemory;

public NetworkConfig(VerifiableProperties verifiableProperties) {
numIoThreads = verifiableProperties.getIntInRange(NUM_IO_THREADS, 8, 1, Integer.MAX_VALUE);
queuedMaxRequests = verifiableProperties.getIntInRange(QUEUED_MAX_REQUESTS, 500, 1, Integer.MAX_VALUE);
Expand All @@ -147,7 +137,5 @@ public NetworkConfig(VerifiableProperties verifiableProperties) {
selectorMaxKeyToProcess =
verifiableProperties.getIntInRange(SELECTOR_MAX_KEY_TO_PROCESS, -1, -1, Integer.MAX_VALUE);
selectorUseDirectBuffers = verifiableProperties.getBoolean(SELECTOR_USE_DIRECT_BUFFERS, false);
networkUseNettyByteBuf = verifiableProperties.getBoolean(NETWORK_USE_NETTY_BYTE_BUF, false);
networkPutRequestShareMemory = verifiableProperties.getBoolean(NETWORK_PUT_REQUEST_SHARE_MEMORY, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public class RouterConfig {
"router.operation.tracker.histogram.cache.timeout.ms";
public static final String ROUTER_MAX_IN_MEM_PUT_CHUNKS = "router.max.in.mem.put.chunks";
public static final String ROUTER_MAX_IN_MEM_GET_CHUNKS = "router.max.in.mem.get.chunks";
public static final String ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY = "router.get.blob.operation.share.memory";
public static final String ROUTER_GET_ELIGIBLE_REPLICAS_BY_STATE_ENABLED =
"router.get.eligible.replicas.by.state.enabled";
public static final String ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET = "router.put.use.dynamic.success.target";
Expand Down Expand Up @@ -440,13 +439,6 @@ public class RouterConfig {
@Default("4")
public final int routerMaxInMemGetChunks;

/**
* If {@code true}, the blob data shares memory with networking buffer in GetBlobOperation
*/
@Config(ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY)
@Default("false")
public final boolean routerGetBlobOperationShareMemory;

/**
* if {@code true}, operation tracker will get replicas in required states based on the type of operation. This helps
* dynamically manage replicas in cluster (i.e. add/remove/move replicas) without restarting frontends.
Expand Down Expand Up @@ -558,7 +550,6 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
Integer.MAX_VALUE / routerMaxPutChunkSizeBytes);
routerMaxInMemGetChunks = verifiableProperties.getIntInRange(ROUTER_MAX_IN_MEM_GET_CHUNKS, 4, 1,
Integer.MAX_VALUE / routerMaxPutChunkSizeBytes);
routerGetBlobOperationShareMemory = verifiableProperties.getBoolean(ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY, false);
routerGetEligibleReplicasByStateEnabled =
verifiableProperties.getBoolean(ROUTER_GET_ELIGIBLE_REPLICAS_BY_STATE_ENABLED, false);
routerPutUseDynamicSuccessTarget = verifiableProperties.getBoolean(ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET, false);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,38 @@
*/
package com.github.ambry.network;

import com.github.ambry.utils.AbstractByteBufHolder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A netty {@link ByteBuf} version of Receive to buffer the incoming request or response.
*/
public class BoundedNettyByteBufReceive implements BoundedReceive<ByteBuf> {
public class BoundedNettyByteBufReceive extends AbstractByteBufHolder<BoundedNettyByteBufReceive> {

private ByteBuf buffer = null;
private ByteBuf sizeBuffer = null;
private long sizeToRead = 0;
private long sizeRead = 0;
private final static Logger logger = LoggerFactory.getLogger(BoundedNettyByteBufReceive.class);

@Override
public BoundedNettyByteBufReceive() {
}

BoundedNettyByteBufReceive(ByteBuf buffer, long sizeToRead) {
this.buffer = Objects.requireNonNull(buffer);
this.sizeToRead = sizeToRead;
}

public boolean isReadComplete() {
return buffer != null && sizeRead >= sizeToRead;
return buffer != null && sizeRead >= sizeToRead;
}

/**
Expand All @@ -56,7 +63,6 @@ private int readBytesFromReadableByteChannel(ReadableByteChannel channel, ByteBu
return n;
}

@Override
public long readFrom(ReadableByteChannel channel) throws IOException {
long bytesRead = 0;
if (buffer == null) {
Expand Down Expand Up @@ -99,32 +105,26 @@ public long readFrom(ReadableByteChannel channel) throws IOException {
return bytesRead;
}

/**
* Returns the payload as {@link ByteBuf}, at the same time release the current reference to this payload.
* It's not safe to call this function multiple times.
* @return
*/
@Override
public ByteBuf getAndRelease() {
if (buffer == null) {
return null;
} else {
try {
return buffer.retainedDuplicate();
} finally {
buffer.release();
buffer = null;
}
}
}

/**
* The total size in bytes that needs to receive from the channel
* It will be initialized only after header is read.
* @return the size of the data in bytes to receive after reading header, otherwise return 0
*/
@Override
public long sizeRead() {
return sizeRead;
}

/**
* Returns the byte buffer received.
* @return
justinlin-linkedin marked this conversation as resolved.
Show resolved Hide resolved
*/
public ByteBuf content() {
return buffer;
}

@Override
public BoundedNettyByteBufReceive replace(ByteBuf content) {
BoundedNettyByteBufReceive receive = new BoundedNettyByteBufReceive(content, sizeToRead);
return receive;
justinlin-linkedin marked this conversation as resolved.
Show resolved Hide resolved
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ public class NetworkReceive {
/**
* The bytes received from the destination
*/
private final BoundedReceive receivedBytes;
private final BoundedNettyByteBufReceive receivedBytes;

/**
* The start time of when the receive started
*/
private final long receiveStartTimeInMs;

public NetworkReceive(String connectionId, BoundedReceive receivedBytes, Time time) {
public NetworkReceive(String connectionId, BoundedNettyByteBufReceive receivedBytes, Time time) {
this.connectionId = connectionId;
this.receivedBytes = receivedBytes;
this.receiveStartTimeInMs = time.milliseconds();
Expand All @@ -44,7 +44,7 @@ public String getConnectionId() {
return connectionId;
}

public BoundedReceive getReceivedBytes() {
public BoundedNettyByteBufReceive getReceivedBytes() {
return receivedBytes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.network;

import io.netty.buffer.ByteBuf;
import java.io.InputStream;


Expand All @@ -33,7 +34,11 @@ public interface NetworkRequest {
long getStartTimeInMs();

/**
* Release any resource this request is holding.
* Release any resource this request is holding. By default it returns false so this method can be compatible
* with {@link ByteBuf#release()}
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
*/
default void release() {};
default boolean release() {
return false;
}
}
Loading