Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cgtz committed Jun 20, 2019
1 parent 67b5e9c commit e48bddc
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,10 @@ public void testReplenishConnections() {

// replenish connections again
connectionTracker.replenishConnections(this::mockNewConnection);
totalConnectionsCount += 2;
assertCounts(totalConnectionsCount, availableCount);
newConnections = getNewlyEstablishedConnections();
newConnections.forEach(connectionTracker::checkInConnection);
totalConnectionsCount += 2;
availableCount += 2;
assertCounts(totalConnectionsCount, availableCount);

Expand All @@ -262,7 +263,7 @@ public void testReplenishConnections() {
availableCount -= 2;
assertCounts(totalConnectionsCount, availableCount);

// destroy one and return the otheer and then replenish
// destroy one and return the other and then replenish
connectionTracker.removeConnection(conn1);
connectionTracker.checkInConnection(conn2);
totalConnectionsCount -= 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class NetworkClientTest {
private static final int CHECKOUT_TIMEOUT_MS = 1000;
private static final int MAX_PORTS_PLAIN_TEXT = 3;
private static final int MAX_PORTS_SSL = 4;
public static final int POLL_TIMEOUT_MS = 100;
public static final int TIME_FOR_WARM_UP_MS = 2000;

private final Time time;

Expand Down Expand Up @@ -128,7 +130,7 @@ public void testWarmUpConnectionsFailedAll() {
selector.setState(MockSelectorState.FailConnectionInitiationOnPoll);
List<ResponseInfo> responseInfos = new ArrayList<>();
Assert.assertEquals("Connection count is not expected", 0,
networkClient.warmUpConnections(localPlainTextDataNodes, 100, 2000, responseInfos));
networkClient.warmUpConnections(localPlainTextDataNodes, 100, TIME_FOR_WARM_UP_MS, responseInfos));
// verify that the connections to all local nodes get disconnected
Assert.assertEquals("Mismatch in timeout responses", localPlainTextDataNodes.size() * MAX_PORTS_PLAIN_TEXT,
responseInfos.size());
Expand All @@ -152,7 +154,7 @@ public void testBasicSendAndPoll() {
int responseCount = 0;

do {
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
requestInfoList.clear();
for (ResponseInfo responseInfo : responseInfoList) {
MockSend send = (MockSend) responseInfo.getRequestInfo().getRequest();
Expand All @@ -168,7 +170,7 @@ public void testBasicSendAndPoll() {
} while (requestCount > responseCount);
Assert.assertEquals("Should receive only as many responses as there were requests", requestCount, responseCount);

responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
requestInfoList.clear();
Assert.assertEquals("No responses are expected at this time", 0, responseInfoList.size());
}
Expand All @@ -185,7 +187,7 @@ public void testConnectionUnavailable() throws InterruptedException {
int requestCount = requestInfoList.size();
int responseCount = 0;

responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
requestInfoList.clear();
// The first sendAndPoll() initiates the connections. So, after the selector poll, new connections
// would have been established, but no new responses or disconnects, so the NetworkClient should not have been
Expand All @@ -195,7 +197,7 @@ public void testConnectionUnavailable() throws InterruptedException {
time.sleep(CHECKOUT_TIMEOUT_MS + 1);

do {
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
requestInfoList.clear();
for (ResponseInfo responseInfo : responseInfoList) {
NetworkClientErrorCode error = responseInfo.getError();
Expand All @@ -207,7 +209,7 @@ public void testConnectionUnavailable() throws InterruptedException {
responseCount++;
}
} while (requestCount > responseCount);
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
requestInfoList.clear();
Assert.assertEquals("No responses are expected at this time", 0, responseInfoList.size());
}
Expand All @@ -227,7 +229,7 @@ public void testNetworkError() {
// set beBad so that requests end up failing due to "network error".
selector.setState(MockSelectorState.DisconnectOnSend);
do {
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
requestInfoList.clear();
for (ResponseInfo responseInfo : responseInfoList) {
NetworkClientErrorCode error = responseInfo.getError();
Expand All @@ -239,7 +241,7 @@ public void testNetworkError() {
responseCount++;
}
} while (requestCount > responseCount);
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
requestInfoList.clear();
Assert.assertEquals("No responses are expected at this time", 0, responseInfoList.size());
selector.setState(MockSelectorState.Good);
Expand All @@ -253,7 +255,7 @@ public void testExceptionOnConnect() {
List<RequestInfo> requestInfoList = new ArrayList<>();
// test that IllegalStateException would be thrown if replica is not specified in RequestInfo
requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(-1), null));
networkClient.sendAndPoll(requestInfoList, 100);
networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
Assert.assertEquals("NetworkClientException should increase because replica is null in request", 1,
networkMetrics.networkClientException.getCount());
requestInfoList.clear();
Expand All @@ -262,7 +264,7 @@ public void testExceptionOnConnect() {
requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(4), replicaOnSslNode));
selector.setState(MockSelectorState.ThrowExceptionOnConnect);
try {
networkClient.sendAndPoll(requestInfoList, 100);
networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
} catch (Exception e) {
Assert.fail("If selector throws on connect, sendAndPoll() should not throw");
}
Expand All @@ -283,32 +285,32 @@ public void testConnectionInitializationFailures() {
selector.setState(MockSelectorState.IdlePoll);
Assert.assertEquals(0, selector.connectCallCount());
// this sendAndPoll() should initiate a connect().
List<ResponseInfo> responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
List<ResponseInfo> responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
// At this time a single connection would have been initiated for the above request.
Assert.assertEquals(1, selector.connectCallCount());
Assert.assertEquals(0, responseInfoList.size());
requestInfoList.clear();

// Subsequent calls to sendAndPoll() should not initiate any connections.
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
Assert.assertEquals(1, selector.connectCallCount());
Assert.assertEquals(0, responseInfoList.size());

// Another connection should get initialized if a new request comes in for the same destination.
requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(1), replicaOnSslNode));
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
Assert.assertEquals(2, selector.connectCallCount());
Assert.assertEquals(0, responseInfoList.size());
requestInfoList.clear();

// Subsequent calls to sendAndPoll() should not initiate any more connections.
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
Assert.assertEquals(2, selector.connectCallCount());
Assert.assertEquals(0, responseInfoList.size());

// Once connect failure kicks in, the pending requests should be failed immediately.
selector.setState(MockSelectorState.FailConnectionInitiationOnPoll);
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
Assert.assertEquals(2, selector.connectCallCount());
Assert.assertEquals(2, responseInfoList.size());
Assert.assertEquals(NetworkClientErrorCode.NetworkError, responseInfoList.get(0).getError());
Expand All @@ -328,35 +330,36 @@ public void testConnectionReplenishment() {
i -> new RequestInfo(sslHost, sslPort, new MockSend(nextCorrelationId.getAndIncrement()), replicaOnSslNode))
.collect(Collectors.toList());
// 1 host x 1 port x 3 connections x 100%
AtomicInteger expectedConnectCalls = new AtomicInteger(3);
int warmUpPercentage = 100;
AtomicInteger expectedConnectCalls = new AtomicInteger(warmUpPercentage * 3 /100);
Runnable checkConnectCalls = () -> Assert.assertEquals(expectedConnectCalls.get(), selector.connectCallCount());

networkClient.warmUpConnections(Collections.singletonList(replicaOnSslNode.getDataNodeId()), 100, 2000,
new ArrayList<>());
networkClient.warmUpConnections(Collections.singletonList(replicaOnSslNode.getDataNodeId()), warmUpPercentage,
TIME_FOR_WARM_UP_MS, new ArrayList<>());
checkConnectCalls.run();

selector.setState(MockSelectorState.Good);
// this sendAndPoll() should use one of the pre-warmed connections
List<ResponseInfo> responseInfoList = networkClient.sendAndPoll(requestGen.apply(3), 100);
List<ResponseInfo> responseInfoList = networkClient.sendAndPoll(requestGen.apply(3), POLL_TIMEOUT_MS);
checkConnectCalls.run();
Assert.assertEquals(3, responseInfoList.size());

// this sendAndPoll() should disconnect two of the pre-warmed connections
selector.setState(MockSelectorState.DisconnectOnSend);
responseInfoList = networkClient.sendAndPoll(requestGen.apply(2), 100);
responseInfoList = networkClient.sendAndPoll(requestGen.apply(2), POLL_TIMEOUT_MS);
checkConnectCalls.run();
Assert.assertEquals(2, responseInfoList.size());

// the two connections lost in the previous sendAndPoll should be replenished
selector.setState(MockSelectorState.Good);
responseInfoList = networkClient.sendAndPoll(requestGen.apply(1), 100);
responseInfoList = networkClient.sendAndPoll(requestGen.apply(1), POLL_TIMEOUT_MS);
expectedConnectCalls.addAndGet(2);
checkConnectCalls.run();
Assert.assertEquals(1, responseInfoList.size());

// this call should use the existing connections in the pool
selector.setState(MockSelectorState.Good);
responseInfoList = networkClient.sendAndPoll(requestGen.apply(3), 100);
responseInfoList = networkClient.sendAndPoll(requestGen.apply(3), POLL_TIMEOUT_MS);
checkConnectCalls.run();
Assert.assertEquals(3, responseInfoList.size());
}
Expand All @@ -379,13 +382,13 @@ public void testOutOfOrderConnectionEstablishment() {
List<RequestInfo> requestInfoList = new ArrayList<>();
requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(2), replicaOnSslNode));
requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(3), replicaOnSslNode));
List<ResponseInfo> responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
List<ResponseInfo> responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
requestInfoList.clear();
Assert.assertEquals(2, selector.connectCallCount());
Assert.assertEquals(0, responseInfoList.size());

// Invoke sendAndPoll() again, the Connection C1 will get disconnected
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
// Verify that no more connection is initiated in network client
Assert.assertEquals(2, selector.connectCallCount());
// There should be 2 responses, one is success from Request R1, another is from Connection C1 timeout.
Expand All @@ -400,7 +403,7 @@ public void testOutOfOrderConnectionEstablishment() {
responseInfoList.clear();

// Invoke sendAndPoll() again, Request R2 will get sent via Connection C2
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
Assert.assertEquals(2, selector.connectCallCount());
Assert.assertEquals(1, responseInfoList.size());
Assert.assertNull(responseInfoList.get(0).getError());
Expand All @@ -420,14 +423,14 @@ public void testPendingRequestTimeOutWithDisconnection() throws Exception {
List<RequestInfo> requestInfoList = new ArrayList<>();
selector.setState(MockSelectorState.IdlePoll);
requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(4), replicaOnSslNode));
List<ResponseInfo> responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
List<ResponseInfo> responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
Assert.assertEquals(0, responseInfoList.size());
requestInfoList.clear();
// now make the selector return any attempted connections as disconnections.
selector.setState(MockSelectorState.FailConnectionInitiationOnPoll);
// increment the time so that the request times out in the next cycle.
time.sleep(2000);
responseInfoList = networkClient.sendAndPoll(requestInfoList, 100);
responseInfoList = networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
// the size of responseInfoList should be 2 because first response comes from dropping request in the queue that
// waits too long. (This response would be handled by corresponding manager, i.e PutManager, GetManager, etc); second
// response comes from underlying connection timeout in nioSelector (usually due to remote node is down). This response
Expand All @@ -451,7 +454,7 @@ public void testExceptionOnPoll() {
requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(4), replicaOnSslNode));
selector.setState(MockSelectorState.ThrowExceptionOnPoll);
try {
networkClient.sendAndPoll(requestInfoList, 100);
networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
} catch (Exception e) {
Assert.fail("If selector throws on poll, sendAndPoll() should not throw.");
}
Expand All @@ -476,7 +479,7 @@ public void testClose() {
List<RequestInfo> requestInfoList = new ArrayList<RequestInfo>();
networkClient.close();
try {
networkClient.sendAndPoll(requestInfoList, 100);
networkClient.sendAndPoll(requestInfoList, POLL_TIMEOUT_MS);
Assert.fail("Polling after close should throw");
} catch (IllegalStateException e) {
}
Expand All @@ -490,17 +493,17 @@ private void doTestWarmUpConnections(List<DataNodeId> localDataNodeIds, int maxP
Assert.assertEquals("Port type is not expected.", expectedPortType,
localDataNodeIds.get(0).getPortToConnectTo().getPortType());
Assert.assertEquals("Connection count is not expected", 0,
networkClient.warmUpConnections(localDataNodeIds, 0, 2000, new ArrayList<>()));
networkClient.warmUpConnections(localDataNodeIds, 0, TIME_FOR_WARM_UP_MS, new ArrayList<>()));
selector.setState(MockSelectorState.FailConnectionInitiationOnPoll);
Assert.assertEquals("Connection count is not expected", 0,
networkClient.warmUpConnections(localDataNodeIds, 100, 2000, new ArrayList<>()));
networkClient.warmUpConnections(localDataNodeIds, 100, TIME_FOR_WARM_UP_MS, new ArrayList<>()));
selector.setState(MockSelectorState.Good);
int halfConnections = 50 * maxPort / 100 * localDataNodeIds.size();
int allConnections = maxPort * localDataNodeIds.size();
Assert.assertEquals("Connection count is not expected", halfConnections,
networkClient.warmUpConnections(localDataNodeIds, 50, 2000, new ArrayList<>()));
networkClient.warmUpConnections(localDataNodeIds, 50, TIME_FOR_WARM_UP_MS, new ArrayList<>()));
Assert.assertEquals("Connection count is not expected", allConnections - halfConnections,
networkClient.warmUpConnections(localDataNodeIds, 100, 2000, new ArrayList<>()));
networkClient.warmUpConnections(localDataNodeIds, 100, TIME_FOR_WARM_UP_MS, new ArrayList<>()));
}
}

Expand Down

0 comments on commit e48bddc

Please sign in to comment.