Skip to content

Commit

Permalink
Call to wrr api
Browse files Browse the repository at this point in the history
Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com>
  • Loading branch information
pranikum committed Sep 25, 2022
1 parent 18ef223 commit 49916f8
Show file tree
Hide file tree
Showing 2 changed files with 380 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,12 @@ private void setWeightForDecommissionedZone(List<String> zones) {
new TransportResponseHandler<ClusterPutWRRWeightsResponse>() {
@Override
public void handleResponse(ClusterPutWRRWeightsResponse response) {
logger.info("Weights were set successfully set.");
logger.info("Weights are successfully set.");
}

@Override
public void handleException(TransportException exp) {
// Logging a warn message on failure. Should we do Retry? If weights are not set should we fail?
// Logging warn message on failure. Should we do Retry? If weights are not set should we fail?
logger.warn("Exception occurred while setting weights.Exception Messages - [{}]",
exp.unwrapCause().getMessage());
}
Expand All @@ -347,35 +347,41 @@ public void checkHttpStatsForDecommissionedNodes(

if (timeoutForNodeDecommission.getSeconds() > 0) {
// Wait for timeout to happen. Log the active connection before decommissioning of nodes.
scheduleDecommissionNodesRequestCheck(decommissionedNodes, timeoutForNodeDecommission);
scheduleDecommissionNodesRequestCheck(
decommissionedNodes,
reason,
timeout,
listener,
timeoutForNodeDecommission);
} else {
getActiveRequestCountOnDecommissionNodes(decommissionedNodes);
removeDecommissionedNodes(decommissionedNodes, reason, timeout, listener);
}
}

private void logActiveConnections(NodesStatsResponse nodesStatsResponse) {
boolean hasActiveConnections = false;
Map<String, Long> nodeActiveConnectionMap = new HashMap<>();
List<NodeStats> responseNodes = nodesStatsResponse.getNodes();
for (int i=0; i < responseNodes.size(); i++) {
HttpStats httpStats = responseNodes.get(i).getHttp();
DiscoveryNode node = responseNodes.get(i).getNode();
if (httpStats != null && httpStats.getServerOpen() != 0) {
hasActiveConnections = true;
}
nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen());
}
logger.info("Decommissioning node with connections : [{}]", nodeActiveConnectionMap);
}

private void scheduleDecommissionNodesRequestCheck(
Set<DiscoveryNode> decommissionedNodes,
String reason,
TimeValue timeout,
ActionListener<Void> nodesRemovedListener,
TimeValue timeoutForNodeDecommission) {
transportService.getThreadPool().schedule(new Runnable() {
@Override
public void run() {
// Check for active connections.
getRequestCountOnDecommissionNodes(decommissionedNodes);
getActiveRequestCountOnDecommissionNodes(decommissionedNodes);
removeDecommissionedNodes(decommissionedNodes, reason, timeout, nodesRemovedListener);
}

@Override
Expand All @@ -385,7 +391,7 @@ public String toString() {
}, timeoutForNodeDecommission, org.opensearch.threadpool.ThreadPool.Names.SAME);
}

private void getRequestCountOnDecommissionNodes(Set<DiscoveryNode> decommissionedNodes) {
private void getActiveRequestCountOnDecommissionNodes(Set<DiscoveryNode> decommissionedNodes) {
if(decommissionedNodes == null || decommissionedNodes.isEmpty()) {
return;
}
Expand Down
Loading

0 comments on commit 49916f8

Please sign in to comment.