-
Notifications
You must be signed in to change notification settings - Fork 275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create executor pool to take over network read/write tasks. #1227
Conversation
@@ -556,16 +590,20 @@ private void read(SelectionKey key, Transmission transmission) throws IOExceptio | |||
transmission.onReceiveComplete(); | |||
transmission.clearReceive(); | |||
} | |||
} catch (IOException e) { | |||
handleReadWriteIOException(e, key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's handle this from within the poll method since close
modifies thread unsafe data structs
@@ -556,16 +590,20 @@ private void read(SelectionKey key, Transmission transmission) throws IOExceptio | |||
transmission.onReceiveComplete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's have this method return the NetworkReceive if readComplete
and then use the future result to modify completedReceives from the main thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments for now. Will keep reviewing when new code is added.
@@ -76,6 +76,13 @@ | |||
@Default("false") | |||
public final boolean networkClientEnableConnectionReplenishment; | |||
|
|||
/** | |||
* The size of the pool if selector executor pool. If the value is 0, executor pool won't be used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor, change to The size of the pool if selector executor pool is employed. When size is 0, executor pool won't be used
@@ -160,7 +160,9 @@ private int readFromSocketChannel() throws IOException { | |||
private boolean flush(ByteBuffer buf) throws IOException { | |||
int remaining = buf.remaining(); | |||
if (remaining > 0) { | |||
long start = SystemTime.getInstance().nanoseconds(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: consider using startTimeNs
as you did in another method.
ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java
Show resolved
Hide resolved
try { | ||
future.get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
logger.error("Read/Write key executor exception {}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: no need for curly brackets here
* | ||
* @throws IOException If a send is given for which we have no existing connection or for which there is | ||
* already an in-progress send | ||
*/ | ||
@Override | ||
public void poll(long timeoutMs, List<NetworkSend> sends) throws IOException { | ||
if (executorPool != null) { | ||
pollWithExecutorPool(timeoutMs, sends); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: move the code below line 314 into another method like pollOnMainThread()
and use an if-else to switch between them.
logger.error("Connection {} disconnected", socketDescription, e); | ||
} else { | ||
metrics.selectorIOErrorCount.inc(); | ||
logger.warn("Error in I/O with connection to {}", socketDescription, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why here we use warn
instead of error
level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was warn
before. It may cause by network issue and is not critical to us.
/** | ||
* Process read/write events on current thread. | ||
*/ | ||
public void pollOnMainThread(long timeoutMs, List<NetworkSend> sends) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be private
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
*/ | ||
private void read(SelectionKey key, Transmission transmission) throws IOException { | ||
private NetworkReceive read(SelectionKey key, Transmission transmission) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both read
and write
methods can remove throws IOException
since there is no exception thrown from these two methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
} | ||
} | ||
} catch (InterruptedException | ExecutionException e) { | ||
if (!(e.getCause() instanceof IOException)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not catch IOException
and Exception
separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy logic from write future get()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
@@ -307,6 +318,7 @@ private String blockingSSLConnect(int socketBufSize) throws IOException { | |||
String connectionId = | |||
selector.connect(new InetSocketAddress("localhost", server.port), socketBufSize, socketBufSize, PortType.SSL); | |||
while (!selector.connected().contains(connectionId)) { | |||
System.out.println("here"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
@@ -327,7 +339,7 @@ private void useCustomBufferSizeSelector(final Integer netReadBufSizeStart, fina | |||
selector.close(); | |||
NetworkMetrics metrics = new NetworkMetrics(new MetricRegistry()); | |||
Time time = SystemTime.getInstance(); | |||
selector = new Selector(metrics, time, clientSSLFactory) { | |||
selector = new Selector(metrics, time, clientSSLFactory, 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can pass in different poolSize
to this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once some minor comments are addressed.
Codecov Report
@@ Coverage Diff @@
## master #1227 +/- ##
=========================================
Coverage 83.62% 83.62%
Complexity 57 57
=========================================
Files 6 6
Lines 348 348
Branches 38 38
=========================================
Hits 291 291
Misses 45 45
Partials 12 12 Continue to review full report at Codecov.
|
SSL transmission latency was high in recent performance test, especially in PUT.
The reason is selector.poll() needs to iterate all events and do SSL encryption + network IO for each of them. This increased round trip time dramatically in the queuing system.
The change introduced an executor pool to take over event handling and selector.poll() blocks until events done.
In the performance test, 7MB blobs were sent at 20QPS, PUT latency P95 was decreased to 380ms from 1.4kms with this change.