Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create executor pool to take over network read/write tasks. #1227

Merged
merged 4 commits into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ public class NetworkConfig {
@Default("false")
public final boolean networkClientEnableConnectionReplenishment;

/**
* The size of the pool if selector executor pool is employed. When size is 0, executor pool won't be used.
*/
@Config("selector.executor.pool.size")
@Default("4")
public final int selectorExecutorPoolSize;

public NetworkConfig(VerifiableProperties verifiableProperties) {

numIoThreads = verifiableProperties.getIntInRange("num.io.threads", 8, 1, Integer.MAX_VALUE);
Expand All @@ -88,5 +95,7 @@ public NetworkConfig(VerifiableProperties verifiableProperties) {
queuedMaxRequests = verifiableProperties.getIntInRange("queued.max.requests", 500, 1, Integer.MAX_VALUE);
networkClientEnableConnectionReplenishment =
verifiableProperties.getBoolean("network.client.enable.connection.replenishment", false);
selectorExecutorPoolSize =
verifiableProperties.getIntInRange("selector.executor.pool.size", 4, 0, Integer.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public NetworkClientFactory(NetworkMetrics networkMetrics, NetworkConfig network
* @throws IOException if the {@link Selector} could not be instantiated.
*/
public NetworkClient getNetworkClient() throws IOException {
Selector selector = new Selector(networkMetrics, time, sslFactory);
Selector selector = new Selector(networkMetrics, time, sslFactory, networkConfig.selectorExecutorPoolSize);
return new NetworkClient(selector, networkConfig, networkMetrics, maxConnectionsPerPortPlainText,
maxConnectionsPerPortSsl, connectionCheckoutTimeoutMs, time);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ private int readFromSocketChannel() throws IOException {
private boolean flush(ByteBuffer buf) throws IOException {
int remaining = buf.remaining();
if (remaining > 0) {
long startNs = SystemTime.getInstance().nanoseconds();
int written = socketChannel.write(buf);
logger.trace("Flushed {} bytes in {} ns", written, SystemTime.getInstance().nanoseconds() - startNs);
return written >= remaining;
}
return true;
Expand Down Expand Up @@ -531,13 +533,12 @@ public int write(ByteBuffer src) throws IOException {
int written = 0;
while (src.remaining() != 0) {
netWriteBuffer.clear();
long startTimeMs = SystemTime.getInstance().milliseconds();
long startTimeNs = SystemTime.getInstance().nanoseconds();
SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
long encryptionTimeMs = SystemTime.getInstance().milliseconds() - startTimeMs;
logger.trace("SSL encryption time: {} ms for {} bytes", encryptionTimeMs, wrapResult.bytesConsumed());
long encryptionTimeNs = SystemTime.getInstance().nanoseconds() - startTimeNs;
logger.trace("SSL encryption time: {} ns for {} bytes", encryptionTimeNs, wrapResult.bytesConsumed());
if (wrapResult.bytesConsumed() > 0) {
metrics.sslEncryptionTimeInUsPerKB.update(
TimeUnit.MILLISECONDS.toMicros(encryptionTimeMs) * 1024 / wrapResult.bytesConsumed());
metrics.sslEncryptionTimeInUsPerKB.update(encryptionTimeNs / wrapResult.bytesConsumed());
cgtz marked this conversation as resolved.
Show resolved Hide resolved
}
netWriteBuffer.flip();
//handle ssl renegotiation
Expand Down
193 changes: 175 additions & 18 deletions ambry-network/src/main/java/com.github.ambry.network/Selector.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,11 +85,12 @@ public class Selector implements Selectable {
private final AtomicLong idGenerator;
private final AtomicLong numActiveConnections;
private final SSLFactory sslFactory;
private final ExecutorService executorPool;

/**
* Create a new selector
*/
public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) throws IOException {
public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory, int executorPoolSize) throws IOException {
this.nioSelector = java.nio.channels.Selector.open();
this.time = time;
this.keyMap = new HashMap<>();
Expand All @@ -99,6 +104,11 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) throws
idGenerator = new AtomicLong(0);
numActiveConnections = new AtomicLong(0);
unreadyConnections = new HashSet<>();
if (executorPoolSize > 0) {
executorPool = Executors.newFixedThreadPool(executorPoolSize);
} else {
executorPool = null;
}
metrics.registerSelectorActiveConnections(numActiveConnections);
metrics.registerSelectorUnreadyConnections(unreadyConnections);
}
Expand Down Expand Up @@ -291,13 +301,24 @@ public void poll(long timeoutMs) throws IOException {
* completed I/O.
*
* @param timeoutMs The amount of time to wait, in milliseconds. If negative, wait indefinitely.
* @param sends The list of new sends to begin
* @param sends The list of new sends to initiate.
*
* @throws IOException If a send is given for which we have no existing connection or for which there is
* already an in-progress send
*/
@Override
public void poll(long timeoutMs, List<NetworkSend> sends) throws IOException {
if (executorPool != null) {
pollWithExecutorPool(timeoutMs, sends);
} else {
pollOnMainThread(timeoutMs, sends);
}
}

/**
* Process read/write events on current thread.
*/
private void pollOnMainThread(long timeoutMs, List<NetworkSend> sends) throws IOException {
clear();

// register for write interest on any new sends
Expand Down Expand Up @@ -340,21 +361,26 @@ public void poll(long timeoutMs, List<NetworkSend> sends) throws IOException {
}

if (key.isReadable() && transmission.ready()) {
read(key, transmission);
NetworkReceive networkReceive = read(key, transmission);
if (networkReceive == null) {
// Exception happened in read.
close(key);
} else if (networkReceive.getReceivedBytes().isReadComplete()) {
this.completedReceives.add(networkReceive);
}
} else if (key.isWritable() && transmission.ready()) {
write(key, transmission);
NetworkSend networkSend = write(key, transmission);
if (networkSend == null) {
// Exception happened in write.
close(key);
} else if (networkSend.getPayload().isSendComplete()) {
this.completedSends.add(networkSend);
}
} else if (!key.isValid()) {
close(key);
}
} catch (IOException e) {
String socketDescription = socketDescription(channel(key));
if (e instanceof EOFException || e instanceof ConnectException) {
metrics.selectorDisconnectedErrorCount.inc();
logger.error("Connection {} disconnected", socketDescription, e);
} else {
metrics.selectorIOErrorCount.inc();
logger.warn("Error in I/O with connection to {}", socketDescription, e);
}
handleReadWriteIOException(e, key);
close(key);
} catch (Exception e) {
metrics.selectorKeyOperationErrorCount.inc();
Expand All @@ -363,7 +389,113 @@ public void poll(long timeoutMs, List<NetworkSend> sends) throws IOException {
}
}
checkUnreadyConnectionsStatus();
this.metrics.selectorIOCount.inc();
this.metrics.selectorIOCount.inc(readyKeys);
this.metrics.selectorIOTime.update(time.milliseconds() - endSelect);
}
disconnected.addAll(closedConnections);
closedConnections.clear();
}

/**
* Use {@link ExecutorService} to process read/write events.
*/
private void pollWithExecutorPool(long timeoutMs, List<NetworkSend> sends) throws IOException {
List<Future<NetworkSend>> completedSendsFutures = new ArrayList<>();
List<Future<NetworkReceive>> completedReceivesFutures = new ArrayList<>();
Set<SelectionKey> readWriteKeySet = new HashSet<>();
clear();

// register for write interest on any new sends
if (sends != null) {
for (NetworkSend networkSend : sends) {
send(networkSend);
}
}

// check ready keys
long startSelect = time.milliseconds();
int readyKeys = select(timeoutMs);
this.metrics.selectorSelectCount.inc();

if (readyKeys > 0) {
long endSelect = time.milliseconds();
this.metrics.selectorSelectTime.update(endSelect - startSelect);
Set<SelectionKey> keys = nioSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();

Transmission transmission = getTransmission(key);
try {
if (key.isConnectable()) {
transmission.finishConnect();
if (transmission.ready()) {
connected.add(transmission.getConnectionId());
metrics.selectorConnectionCreated.inc();
} else {
unreadyConnections.add(transmission.getConnectionId());
}
}

/* if channel is not ready, finish prepare */
if (transmission.isConnected() && !transmission.ready()) {
transmission.prepare();
continue;
}

if (key.isReadable() && transmission.ready()) {
completedReceivesFutures.add(executorPool.submit(() -> read(key, transmission)));
readWriteKeySet.add(key);
} else if (key.isWritable() && transmission.ready()) {
completedSendsFutures.add(executorPool.submit(() -> write(key, transmission)));
readWriteKeySet.add(key);
} else if (!key.isValid()) {
close(key);
}
} catch (IOException e) {
// handles IOException from transmission.finishConnect() and transmission.prepare()
handleReadWriteIOException(e, key);
close(key);
} catch (Exception e) {
close(key);
metrics.selectorKeyOperationErrorCount.inc();
logger.error("closing key on exception remote host {}", channel(key).socket().getRemoteSocketAddress(), e);
}
}
for (Future<NetworkReceive> future : completedReceivesFutures) {
try {
NetworkReceive networkReceive = future.get();
if (networkReceive != null) {
readWriteKeySet.remove(keyForId(networkReceive.getConnectionId()));
if (networkReceive.getReceivedBytes().isReadComplete()) {
this.completedReceives.add(networkReceive);
}
}
} catch (InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof IOException)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not catch IOException and Exception separately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy logic from write future get()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

logger.error("Hit unexpected exception on receive, ", e);
}
}
}
for (Future<NetworkSend> future : completedSendsFutures) {
try {
NetworkSend networkSend = future.get();
if (networkSend != null) {
readWriteKeySet.remove(keyForId(networkSend.getConnectionId()));
if (networkSend.getPayload().isSendComplete()) {
this.completedSends.add(networkSend);
}
}
} catch (ExecutionException | InterruptedException e) {
logger.error("Hit Unexpected exception on selector event processing, ", e);
}
}
for (SelectionKey keyWithError : readWriteKeySet) {
close(keyWithError);
}
checkUnreadyConnectionsStatus();
this.metrics.selectorIOCount.inc(readyKeys);
this.metrics.selectorIOTime.update(time.milliseconds() - endSelect);
}
disconnected.addAll(closedConnections);
Expand Down Expand Up @@ -435,7 +567,6 @@ public List<String> connected() {
* @param connectionId a unique ID for this connection.
* @param key the {@link SelectionKey} used to communicate socket events.
* @param hostname the remote hostname for the connection, used for SSL host verification.
* @param hostname the remote port for the connection, used for SSL host verification.
* @param portType used to select between a plaintext or SSL transmission.
* @param mode for SSL transmissions, whether to operate in client or server mode.
* @return either a {@link Transmission} or {@link SSLTransmission}.
Expand Down Expand Up @@ -544,18 +675,38 @@ private SelectionKey keyForId(String id) {
return this.keyMap.get(id);
}

/**
* Handle read/write IOException
*/
private void handleReadWriteIOException(IOException e, SelectionKey key) {
String socketDescription = socketDescription(channel(key));
if (e instanceof EOFException || e instanceof ConnectException) {
metrics.selectorDisconnectedErrorCount.inc();
logger.error("Connection {} disconnected", socketDescription, e);
} else {
metrics.selectorIOErrorCount.inc();
logger.warn("Error in I/O with connection to {}", socketDescription, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here we use warn instead of error level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was warn before. It may cause by network issue and is not critical to us.

}
}

/**
* Process reads from ready sockets
* @return the {@link NetworkReceive} if no IOException during read().
*/
private void read(SelectionKey key, Transmission transmission) throws IOException {
private NetworkReceive read(SelectionKey key, Transmission transmission) {
long startTimeToReadInMs = time.milliseconds();
try {
boolean readComplete = transmission.read();
NetworkReceive networkReceive = transmission.getNetworkReceive();
if (readComplete) {
this.completedReceives.add(transmission.getNetworkReceive());
transmission.onReceiveComplete();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have this method return the NetworkReceive if readComplete and then use the future result to modify completedReceives from the main thread.

transmission.clearReceive();
}
return networkReceive;
} catch (IOException e) {
// We have key information if we log IOException here.
handleReadWriteIOException(e, key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's handle this from within the poll method since close modifies thread unsafe data structs

return null;
} finally {
long readTime = time.milliseconds() - startTimeToReadInMs;
logger.trace("SocketServer time spent on read per key {} = {}", transmission.getConnectionId(), readTime);
Expand All @@ -564,19 +715,25 @@ private void read(SelectionKey key, Transmission transmission) throws IOExceptio

/**
* Process writes to ready sockets
* @return the {@link NetworkSend} if no IOException during write().
*/
private void write(SelectionKey key, Transmission transmission) throws IOException {
private NetworkSend write(SelectionKey key, Transmission transmission) {
long startTimeToWriteInMs = time.milliseconds();
try {
boolean sendComplete = transmission.write();
NetworkSend networkSend = transmission.getNetworkSend();
if (sendComplete) {
logger.trace("Finished writing, registering for read on connection {}", transmission.getRemoteSocketAddress());
transmission.onSendComplete();
this.completedSends.add(transmission.getNetworkSend());
metrics.sendInFlight.dec();
transmission.clearSend();
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE | SelectionKey.OP_READ);
}
return networkSend;
} catch (IOException e) {
// We have key information if we log IOException here.
handleReadWriteIOException(e, key);
return null;
} finally {
long writeTime = time.milliseconds() - startTimeToWriteInMs;
logger.trace("SocketServer time spent on write per key {} = {}", transmission.getConnectionId(), writeTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class SocketServer implements NetworkServer {
private final int sendBufferSize;
private final int recvBufferSize;
private final int maxRequestSize;
private final int selectorExecutorPoolSize;
private final ArrayList<Processor> processors;
private volatile ArrayList<Acceptor> acceptors;
private final SocketRequestResponseChannel requestResponseChannel;
Expand All @@ -72,6 +73,7 @@ public SocketServer(NetworkConfig config, SSLConfig sslConfig, MetricRegistry re
this.sendBufferSize = config.socketSendBufferBytes;
this.recvBufferSize = config.socketReceiveBufferBytes;
this.maxRequestSize = config.socketRequestMaxBytes;
this.selectorExecutorPoolSize = config.selectorExecutorPoolSize;
processors = new ArrayList<Processor>(numProcessorThreads);
requestResponseChannel = new SocketRequestResponseChannel(numProcessorThreads, maxQueuedRequests);
metrics = new ServerNetworkMetrics(requestResponseChannel, registry, processors);
Expand Down Expand Up @@ -149,7 +151,8 @@ private void validatePorts(ArrayList<Port> portList) {
public void start() throws IOException, InterruptedException {
logger.info("Starting {} processor threads", numProcessorThreads);
for (int i = 0; i < numProcessorThreads; i++) {
processors.add(i, new Processor(i, maxRequestSize, requestResponseChannel, metrics, sslFactory));
processors.add(i,
new Processor(i, maxRequestSize, requestResponseChannel, metrics, sslFactory, selectorExecutorPoolSize));
Utils.newThread("ambry-processor-" + port + " " + i, processors.get(i), false).start();
}

Expand Down Expand Up @@ -383,7 +386,6 @@ protected void accept(SelectionKey key, Processor processor) throws SocketExcept
* each of which has its own selectors
*/
class Processor extends AbstractServerThread {
private final int maxRequestSize;
private final SocketRequestResponseChannel channel;
private final int id;
private final Time time;
Expand All @@ -394,12 +396,11 @@ class Processor extends AbstractServerThread {
private static final long pollTimeoutMs = 300;

Processor(int id, int maxRequestSize, RequestResponseChannel channel, ServerNetworkMetrics metrics,
SSLFactory sslFactory) throws IOException {
this.maxRequestSize = maxRequestSize;
SSLFactory sslFactory, int selectorExecutorPoolSize) throws IOException {
this.channel = (SocketRequestResponseChannel) channel;
this.id = id;
this.time = SystemTime.getInstance();
selector = new Selector(metrics, time, sslFactory);
selector = new Selector(metrics, time, sslFactory, selectorExecutorPoolSize);
this.metrics = metrics;
}

Expand Down
Loading