Skip to content

Commit

Permalink
Backport Graceful Decommission and Integ Tests (#5060)
Browse files Browse the repository at this point in the history
* Fail weight update when decommission ongoing and fail decommission when attribute not weighed away (#4839)

* Add changes for graceful node decommission (#4586)

* Add delay timeout for decommission request (#4931)

* Integ Tests for Awareness Attribute Decommissioning (#4715)

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com>
  • Loading branch information
imRishN committed Nov 3, 2022
1 parent 43561e9 commit 903679d
Show file tree
Hide file tree
Showing 17 changed files with 1,314 additions and 43 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565))
- Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805))
- Added search backpressure stats API ([#4932](https://github.com/opensearch-project/OpenSearch/pull/4932))
- Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586))

### Dependencies
- Bumps `com.diffplug.spotless` from 6.9.1 to 6.10.0
Expand Down Expand Up @@ -69,6 +70,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))
- Add APIs (GET/PUT) to decommission awareness attribute ([#4261](https://github.com/opensearch-project/OpenSearch/pull/4261))
- Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590))
- Integ Tests for Awareness Attribute Decommissioning ([#4715](https://github.com/opensearch-project/OpenSearch/pull/4715))
- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839))

### Deprecated
### Removed
Expand Down Expand Up @@ -101,6 +104,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800))
- Fix bug in AwarenessAttributeDecommissionIT([4822](https://github.com/opensearch-project/OpenSearch/pull/4822))
- Fix for failing checkExtraction, checkLicense and checkNotice tasks for windows gradle check ([#4941](https://github.com/opensearch-project/OpenSearch/pull/4941))
- [BUG]: Allow decommission to support delay timeout [#4930](https://github.com/opensearch-project/OpenSearch/pull/4930))
### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;

Expand All @@ -28,8 +29,15 @@
*/
public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionRequest> {

public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120);

private DecommissionAttribute decommissionAttribute;

private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT;

// holder for no_delay param. To avoid draining time timeout.
private boolean noDelay = false;

public DecommissionRequest() {}

public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
Expand All @@ -39,12 +47,16 @@ public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
public DecommissionRequest(StreamInput in) throws IOException {
super(in);
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
this.noDelay = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
out.writeBoolean(noDelay);
}

/**
Expand All @@ -65,15 +77,46 @@ public DecommissionAttribute getDecommissionAttribute() {
return this.decommissionAttribute;
}

public void setDelayTimeout(TimeValue delayTimeout) {
this.delayTimeout = delayTimeout;
}

public TimeValue getDelayTimeout() {
return this.delayTimeout;
}

public void setNoDelay(boolean noDelay) {
if (noDelay) {
this.delayTimeout = TimeValue.ZERO;
}
this.noDelay = noDelay;
}

public boolean isNoDelay() {
return noDelay;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (decommissionAttribute == null) {
validationException = addValidationError("decommission attribute is missing", validationException);
return validationException;
}
if (decommissionAttribute.attributeName() == null || Strings.isEmpty(decommissionAttribute.attributeName())) {
validationException = addValidationError("attribute name is missing", validationException);
}
if (decommissionAttribute.attributeValue() == null || Strings.isEmpty(decommissionAttribute.attributeValue())) {
validationException = addValidationError("attribute value is missing", validationException);
}
// This validation should not fail since we are not allowing delay timeout to be set externally.
// Still keeping it for double check.
if (noDelay && delayTimeout.getSeconds() > 0) {
final String validationMessage = "Invalid decommission request. no_delay is true and delay_timeout is set to "
+ delayTimeout.getSeconds()
+ "] Seconds";
validationException = addValidationError(validationMessage, validationException);
}
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.common.unit.TimeValue;

/**
* Register decommission request builder
Expand All @@ -35,4 +36,14 @@ public DecommissionRequestBuilder setDecommissionedAttribute(DecommissionAttribu
request.setDecommissionAttribute(decommissionAttribute);
return this;
}

public DecommissionRequestBuilder setDelayTimeOut(TimeValue delayTimeOut) {
request.setDelayTimeout(delayTimeOut);
return this;
}

public DecommissionRequestBuilder setNoDelay(boolean noDelay) {
request.setNoDelay(noDelay);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS
protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener<DecommissionResponse> listener)
throws Exception {
logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString());
decommissionService.startDecommissionAction(request.getDecommissionAttribute(), listener);
decommissionService.startDecommissionAction(request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;

/**
* Contains metadata about decommission attribute
Expand Down Expand Up @@ -88,11 +89,14 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) {
}
// We don't expect that INIT will be new status, as it is registered only when starting the decommission action
switch (newStatus) {
case DRAINING:
validateStatus(Set.of(DecommissionStatus.INIT), newStatus);
break;
case IN_PROGRESS:
validateStatus(DecommissionStatus.INIT, newStatus);
validateStatus(Set.of(DecommissionStatus.DRAINING, DecommissionStatus.INIT), newStatus);
break;
case SUCCESSFUL:
validateStatus(DecommissionStatus.IN_PROGRESS, newStatus);
validateStatus(Set.of(DecommissionStatus.IN_PROGRESS), newStatus);
break;
default:
throw new IllegalArgumentException(
Expand All @@ -101,17 +105,17 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) {
}
}

private void validateStatus(DecommissionStatus expected, DecommissionStatus next) {
if (status.equals(expected) == false) {
private void validateStatus(Set<DecommissionStatus> expectedStatuses, DecommissionStatus next) {
if (expectedStatuses.contains(status) == false) {
assert false : "can't move decommission status to ["
+ next
+ "]. current status: ["
+ status
+ "] (expected ["
+ expected
+ "] (allowed statuses ["
+ expectedStatuses
+ "])";
throw new IllegalStateException(
"can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])"
"can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expectedStatuses + "])"
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand All @@ -32,14 +36,17 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpStats;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -271,4 +278,61 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});
}

private void logActiveConnections(NodesStatsResponse nodesStatsResponse) {
if (nodesStatsResponse == null || nodesStatsResponse.getNodes() == null) {
logger.info("Node stats response received is null/empty.");
return;
}

Map<String, Long> nodeActiveConnectionMap = new HashMap<>();
List<NodeStats> responseNodes = nodesStatsResponse.getNodes();
for (int i = 0; i < responseNodes.size(); i++) {
HttpStats httpStats = responseNodes.get(i).getHttp();
DiscoveryNode node = responseNodes.get(i).getNode();
nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen());
}
logger.info("Decommissioning node with connections : [{}]", nodeActiveConnectionMap);
}

void getActiveRequestCountOnDecommissionedNodes(Set<DiscoveryNode> decommissionedNodes) {
if (decommissionedNodes == null || decommissionedNodes.isEmpty()) {
return;
}
String[] nodes = decommissionedNodes.stream().map(DiscoveryNode::getId).toArray(String[]::new);
if (nodes.length == 0) {
return;
}

final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodes);
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName());

transportService.sendRequest(
transportService.getLocalNode(),
NodesStatsAction.NAME,
nodesStatsRequest,
new TransportResponseHandler<NodesStatsResponse>() {
@Override
public void handleResponse(NodesStatsResponse response) {
logActiveConnections(response);
}

@Override
public void handleException(TransportException exp) {
logger.error("Failure occurred while dumping connection for decommission nodes - ", exp.unwrapCause());
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public NodesStatsResponse read(StreamInput in) throws IOException {
return new NodesStatsResponse(in);
}
}
);
}
}
Loading

0 comments on commit 903679d

Please sign in to comment.