Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x]Graceful Decommission and Integ Tests #5060

Merged
merged 4 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565))
- Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805))
- Added search backpressure stats API ([#4932](https://github.com/opensearch-project/OpenSearch/pull/4932))
- Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586))

### Dependencies
- Bumps `com.diffplug.spotless` from 6.9.1 to 6.10.0
Expand Down Expand Up @@ -69,6 +70,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))
- Add APIs (GET/PUT) to decommission awareness attribute ([#4261](https://github.com/opensearch-project/OpenSearch/pull/4261))
- Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590))
- Integ Tests for Awareness Attribute Decommissioning ([#4715](https://github.com/opensearch-project/OpenSearch/pull/4715))
- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839))

### Deprecated
### Removed
Expand Down Expand Up @@ -101,6 +104,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800))
- Fix bug in AwarenessAttributeDecommissionIT([4822](https://github.com/opensearch-project/OpenSearch/pull/4822))
- Fix for failing checkExtraction, checkLicense and checkNotice tasks for windows gradle check ([#4941](https://github.com/opensearch-project/OpenSearch/pull/4941))
- [BUG]: Allow decommission to support delay timeout [#4930](https://github.com/opensearch-project/OpenSearch/pull/4930))
### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;

Expand All @@ -28,8 +29,15 @@
*/
public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionRequest> {

public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120);

private DecommissionAttribute decommissionAttribute;

private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT;

// holder for no_delay param. To avoid draining time timeout.
private boolean noDelay = false;

public DecommissionRequest() {}

public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
Expand All @@ -39,12 +47,16 @@ public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
public DecommissionRequest(StreamInput in) throws IOException {
super(in);
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
this.noDelay = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
out.writeBoolean(noDelay);
}

/**
Expand All @@ -65,15 +77,46 @@ public DecommissionAttribute getDecommissionAttribute() {
return this.decommissionAttribute;
}

public void setDelayTimeout(TimeValue delayTimeout) {
this.delayTimeout = delayTimeout;
}

public TimeValue getDelayTimeout() {
return this.delayTimeout;
}

public void setNoDelay(boolean noDelay) {
if (noDelay) {
this.delayTimeout = TimeValue.ZERO;
}
this.noDelay = noDelay;
}

public boolean isNoDelay() {
return noDelay;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (decommissionAttribute == null) {
validationException = addValidationError("decommission attribute is missing", validationException);
return validationException;
}
if (decommissionAttribute.attributeName() == null || Strings.isEmpty(decommissionAttribute.attributeName())) {
validationException = addValidationError("attribute name is missing", validationException);
}
if (decommissionAttribute.attributeValue() == null || Strings.isEmpty(decommissionAttribute.attributeValue())) {
validationException = addValidationError("attribute value is missing", validationException);
}
// This validation should not fail since we are not allowing delay timeout to be set externally.
// Still keeping it for double check.
if (noDelay && delayTimeout.getSeconds() > 0) {
final String validationMessage = "Invalid decommission request. no_delay is true and delay_timeout is set to "
+ delayTimeout.getSeconds()
+ "] Seconds";
validationException = addValidationError(validationMessage, validationException);
}
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.common.unit.TimeValue;

/**
* Register decommission request builder
Expand All @@ -35,4 +36,14 @@ public DecommissionRequestBuilder setDecommissionedAttribute(DecommissionAttribu
request.setDecommissionAttribute(decommissionAttribute);
return this;
}

public DecommissionRequestBuilder setDelayTimeOut(TimeValue delayTimeOut) {
request.setDelayTimeout(delayTimeOut);
return this;
}

public DecommissionRequestBuilder setNoDelay(boolean noDelay) {
request.setNoDelay(noDelay);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS
protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener<DecommissionResponse> listener)
throws Exception {
logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString());
decommissionService.startDecommissionAction(request.getDecommissionAttribute(), listener);
decommissionService.startDecommissionAction(request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;

/**
* Contains metadata about decommission attribute
Expand Down Expand Up @@ -88,11 +89,14 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) {
}
// We don't expect that INIT will be new status, as it is registered only when starting the decommission action
switch (newStatus) {
case DRAINING:
validateStatus(Set.of(DecommissionStatus.INIT), newStatus);
break;
case IN_PROGRESS:
validateStatus(DecommissionStatus.INIT, newStatus);
validateStatus(Set.of(DecommissionStatus.DRAINING, DecommissionStatus.INIT), newStatus);
break;
case SUCCESSFUL:
validateStatus(DecommissionStatus.IN_PROGRESS, newStatus);
validateStatus(Set.of(DecommissionStatus.IN_PROGRESS), newStatus);
break;
default:
throw new IllegalArgumentException(
Expand All @@ -101,17 +105,17 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) {
}
}

private void validateStatus(DecommissionStatus expected, DecommissionStatus next) {
if (status.equals(expected) == false) {
private void validateStatus(Set<DecommissionStatus> expectedStatuses, DecommissionStatus next) {
if (expectedStatuses.contains(status) == false) {
assert false : "can't move decommission status to ["
+ next
+ "]. current status: ["
+ status
+ "] (expected ["
+ expected
+ "] (allowed statuses ["
+ expectedStatuses
+ "])";
throw new IllegalStateException(
"can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])"
"can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expectedStatuses + "])"
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand All @@ -32,14 +36,17 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpStats;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -271,4 +278,61 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});
}

private void logActiveConnections(NodesStatsResponse nodesStatsResponse) {
if (nodesStatsResponse == null || nodesStatsResponse.getNodes() == null) {
logger.info("Node stats response received is null/empty.");
return;
}

Map<String, Long> nodeActiveConnectionMap = new HashMap<>();
List<NodeStats> responseNodes = nodesStatsResponse.getNodes();
for (int i = 0; i < responseNodes.size(); i++) {
HttpStats httpStats = responseNodes.get(i).getHttp();
DiscoveryNode node = responseNodes.get(i).getNode();
nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen());
}
logger.info("Decommissioning node with connections : [{}]", nodeActiveConnectionMap);
}

void getActiveRequestCountOnDecommissionedNodes(Set<DiscoveryNode> decommissionedNodes) {
if (decommissionedNodes == null || decommissionedNodes.isEmpty()) {
return;
}
String[] nodes = decommissionedNodes.stream().map(DiscoveryNode::getId).toArray(String[]::new);
if (nodes.length == 0) {
return;
}

final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodes);
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName());

transportService.sendRequest(
transportService.getLocalNode(),
NodesStatsAction.NAME,
nodesStatsRequest,
new TransportResponseHandler<NodesStatsResponse>() {
@Override
public void handleResponse(NodesStatsResponse response) {
logActiveConnections(response);
}

@Override
public void handleException(TransportException exp) {
logger.error("Failure occurred while dumping connection for decommission nodes - ", exp.unwrapCause());
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public NodesStatsResponse read(StreamInput in) throws IOException {
return new NodesStatsResponse(in);
}
}
);
}
}
Loading