Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control concurrency and add retry action in decommission flow #4684

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Update to Apache Lucene 9.4.0 ([#4661](https://github.com/opensearch-project/OpenSearch/pull/4661))
- Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590))
- Backport Apache Lucene version change for 2.4.0 ([#4677](https://github.com/opensearch-project/OpenSearch/pull/4677))
- Control concurrency and add retry action in decommission flow ([#4684](https://github.com/opensearch-project/OpenSearch/pull/4684))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;

Expand All @@ -29,22 +30,34 @@
public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionRequest> {

private DecommissionAttribute decommissionAttribute;
private boolean retryOnClusterManagerChange;
private TimeValue retryTimeout;

public DecommissionRequest() {}

public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
public DecommissionRequest(DecommissionAttribute decommissionAttribute, boolean retryOnClusterManagerChange, TimeValue retryTimeout) {
this.decommissionAttribute = decommissionAttribute;
this.retryOnClusterManagerChange = retryOnClusterManagerChange;
this.retryTimeout = retryTimeout;
}

public DecommissionRequest(DecommissionAttribute decommissionAttribute, TimeValue retryTimeout) {
this(decommissionAttribute, false, retryTimeout);
}

public DecommissionRequest(StreamInput in) throws IOException {
super(in);
decommissionAttribute = new DecommissionAttribute(in);
retryOnClusterManagerChange = in.readBoolean();
retryTimeout = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
decommissionAttribute.writeTo(out);
out.writeBoolean(retryOnClusterManagerChange);
out.writeTimeValue(retryTimeout);
}

/**
Expand All @@ -58,13 +71,49 @@ public DecommissionRequest setDecommissionAttribute(DecommissionAttribute decomm
return this;
}

/**
* Sets retryOnClusterManagerChange for decommission request
*
* @param retryOnClusterManagerChange boolean for request to retry decommission action on cluster manager change
* @return this request
*/
public DecommissionRequest setRetryOnClusterManagerChange(boolean retryOnClusterManagerChange) {
this.retryOnClusterManagerChange = retryOnClusterManagerChange;
return this;
}

/**
* Sets the retry timeout for the request
*
* @param retryTimeout retry time out for the request
* @return this request
*/
public DecommissionRequest setRetryTimeout(TimeValue retryTimeout) {
this.retryTimeout = retryTimeout;
return this;
}

/**
* @return Returns the decommission attribute key-value
*/
public DecommissionAttribute getDecommissionAttribute() {
return this.decommissionAttribute;
}

/**
* @return Returns whether decommission is retry eligible on cluster manager change
*/
public boolean retryOnClusterManagerChange() {
return this.retryOnClusterManagerChange;
}

/**
* @return retry timeout
*/
public TimeValue getRetryTimeout() {
return this.retryTimeout;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -79,6 +128,13 @@ public ActionRequestValidationException validate() {

@Override
public String toString() {
return "DecommissionRequest{" + "decommissionAttribute=" + decommissionAttribute + '}';
return "DecommissionRequest{"
+ "decommissionAttribute="
+ decommissionAttribute
+ ", retryOnClusterManagerChange="
+ retryOnClusterManagerChange
+ ", retryTimeout="
+ retryTimeout
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.common.unit.TimeValue;

/**
* Register decommission request builder
Expand All @@ -35,4 +36,26 @@ public DecommissionRequestBuilder setDecommissionedAttribute(DecommissionAttribu
request.setDecommissionAttribute(decommissionAttribute);
return this;
}

/**
* Sets retryOnClusterManagerChange for decommission request
*
* @param retryOnClusterManagerChange boolean for request to retry decommission action on cluster manager change
* @return current object
*/
public DecommissionRequestBuilder setRetryOnClusterManagerChange(boolean retryOnClusterManagerChange) {
request.setRetryOnClusterManagerChange(retryOnClusterManagerChange);
return this;
}

/**
* Sets the retry timeout for the decommission request
*
* @param retryTimeout retry time out for the request
* @return current object
*/
public DecommissionRequestBuilder setRetryTimeout(TimeValue retryTimeout) {
request.setRetryTimeout(retryTimeout);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS
protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener<DecommissionResponse> listener)
throws Exception {
logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString());
decommissionService.startDecommissionAction(request.getDecommissionAttribute(), listener);
decommissionService.startDecommissionAction(request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand Down Expand Up @@ -72,6 +75,66 @@ public class DecommissionController {
this.threadPool = threadPool;
}

/**
* This method sends a transport call to retry decommission action, given that -
* 1. The request is not timed out
* 2. And executed when there was a cluster manager change
*
* @param decommissionRequest decommission request object
* @param startTime start time of previous request
* @param listener callback for the retry action
*/
public void retryDecommissionAction(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename to tryDecommissionOnNewClusterManager as it is used only there for now ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The controller, is more of generic methods which can be used in the service layer. This method is just attempting a retry and hence the name. The place where we are using it is during leader switch (one use case of it.

Let me know if you think it makes sense. I can rename it if required

DecommissionRequest decommissionRequest,
long startTime,
ActionListener<DecommissionResponse> listener
) {
final long remainingTimeoutMS = decommissionRequest.getRetryTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime);
if (remainingTimeoutMS <= 0) {
logger.debug(
"timed out before retrying [{}] for attribute [{}] after cluster manager change",
DecommissionAction.NAME,
decommissionRequest.getDecommissionAttribute()
);
listener.onFailure(
new OpenSearchTimeoutException(
"timed out before retrying [{}] for attribute [{}] after cluster manager change",
DecommissionAction.NAME,
decommissionRequest.getDecommissionAttribute()
)
);
return;
}
decommissionRequest.setRetryOnClusterManagerChange(true);
decommissionRequest.setRetryTimeout(TimeValue.timeValueMillis(remainingTimeoutMS));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not be used in the new cluster manager . this retry timeout is checked only at the the time of cluster manager abdication and not anywhere else . Hence more than not (66% in case of 3 cluster manager setup), this parameter is never used , in scope of this PR . What are the cons of removing this altogether ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an unstable cluster having multiple leader switches, if not given a timeout, we might get stuck into endless loop of retrying and exhausting a transport thread. This timeout, helps to control the retry action. Although I agree, we might not hit this for a stable cluster. But I feel this will help to reject a work later on which the unstable cluster might not be able to execute

transportService.sendRequest(
transportService.getLocalNode(),
DecommissionAction.NAME,
decommissionRequest,
new TransportResponseHandler<DecommissionResponse>() {
@Override
public void handleResponse(DecommissionResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public DecommissionResponse read(StreamInput in) throws IOException {
return new DecommissionResponse(in);
}
}
);
}

/**
* Transport call to add nodes to voting config exclusion
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -67,6 +67,7 @@ public class DecommissionService {
private final TransportService transportService;
private final ThreadPool threadPool;
private final DecommissionController decommissionController;
private final long startTime;
private volatile List<String> awarenessAttributes;
private volatile Map<String, List<String>> forcedAwarenessAttributes;

Expand All @@ -83,6 +84,7 @@ public DecommissionService(
this.transportService = transportService;
this.threadPool = threadPool;
this.decommissionController = new DecommissionController(clusterService, transportService, allocationService, threadPool);
this.startTime = threadPool.relativeTimeInMillis();
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes);

Expand Down Expand Up @@ -113,13 +115,14 @@ private void setForcedAwarenessAttributes(Settings forceSettings) {
* Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT}
* Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration
*
* @param decommissionAttribute register decommission attribute in the metadata request
* @param decommissionRequest request for decommission action
* @param listener register decommission listener
*/
public void startDecommissionAction(
final DecommissionAttribute decommissionAttribute,
final DecommissionRequest decommissionRequest,
final ActionListener<DecommissionResponse> listener
) {
final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute();
// register the metadata with status as INIT as first step
clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
Expand All @@ -129,6 +132,7 @@ public ClusterState execute(ClusterState currentState) {
DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata();
// check that request is eligible to proceed
ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute);
ensureEligibleRetry(decommissionRequest, decommissionAttributeMetadata);
decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute);
logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString());
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -157,15 +161,17 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
decommissionAttributeMetadata.decommissionAttribute(),
decommissionAttributeMetadata.status()
);
decommissionClusterManagerNodes(decommissionAttributeMetadata.decommissionAttribute(), listener);
assert decommissionAttributeMetadata.decommissionAttribute().equals(decommissionRequest.getDecommissionAttribute());
decommissionClusterManagerNodes(decommissionRequest, listener);
}
});
}

private synchronized void decommissionClusterManagerNodes(
final DecommissionAttribute decommissionAttribute,
final DecommissionRequest decommissionRequest,
ActionListener<DecommissionResponse> listener
) {
final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute();
ClusterState state = clusterService.getClusterApplierService().state();
// since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further
// join the cluster
Expand Down Expand Up @@ -211,18 +217,22 @@ public void onResponse(Void unused) {
failDecommissionedNodes(clusterService.getClusterApplierService().state());
}
} else {
// explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager
// this will ensures that request is retried until cluster manager times out
logger.info(
"local node is not eligible to process the request, "
+ "throwing NotClusterManagerException to attempt a retry on an eligible node"
);
listener.onFailure(
new NotClusterManagerException(
"node ["
+ transportService.getLocalNode().toString()
+ "] not eligible to execute decommission request. Will retry until timeout."
)
// since the local node is no longer cluster manager which could've happened due to leader abdication,
// hence retrying the decommission action until it times out
logger.info("local node is not eligible to process the request, " + "retrying the transport action until it times out");
decommissionController.retryDecommissionAction(
decommissionRequest,
startTime,
ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
logger.debug(
() -> new ParameterizedMessage(
"failed to retry decommission action for attribute [{}]",
decommissionRequest.getDecommissionAttribute()
),
t
);
delegatedListener.onFailure(t);
})
);
}
}
Expand Down Expand Up @@ -465,6 +475,21 @@ private static void ensureEligibleRequest(
}
}

private static void ensureEligibleRetry(
DecommissionRequest decommissionRequest,
DecommissionAttributeMetadata decommissionAttributeMetadata
) {
if (decommissionAttributeMetadata != null) {
if (decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT)
&& decommissionRequest.retryOnClusterManagerChange() == false) {
throw new DecommissioningFailedException(
decommissionRequest.getDecommissionAttribute(),
"concurrent request received to decommission attribute"
);
}
}
}

private ActionListener<DecommissionStatus> statusUpdateListener() {
return new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.client.Requests;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
Expand All @@ -29,6 +30,8 @@
*/
public class RestDecommissionAction extends BaseRestHandler {

private static final TimeValue DEFAULT_RETRY_TIMEOUT = TimeValue.timeValueMinutes(5L);

@Override
public List<Route> routes() {
return singletonList(new Route(PUT, "/_cluster/decommission/awareness/{awareness_attribute_name}/{awareness_attribute_value}"));
Expand All @@ -49,6 +52,11 @@ DecommissionRequest createRequest(RestRequest request) throws IOException {
DecommissionRequest decommissionRequest = Requests.decommissionRequest();
String attributeName = request.param("awareness_attribute_name");
String attributeValue = request.param("awareness_attribute_value");
return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue));
// for REST request, we will set the retry flag to false. User won't have the option to execute retry on REST
return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue))
.setRetryOnClusterManagerChange(false)
.setRetryTimeout(
TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_RETRY_TIMEOUT, getClass().getSimpleName() + ".timeout")
);
}
}
Loading