From 767803eaa1af2ea655da719c77f518673c6e8133 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 13 Dec 2022 20:09:17 +0530 Subject: [PATCH 01/37] Control concurrency and handle retries Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 34 +++++++++++- .../put/DecommissionRequestBuilder.java | 11 ++++ .../decommission/DecommissionController.java | 53 +++++++++++++++++++ .../decommission/DecommissionService.java | 48 ++++++++++++----- .../admin/cluster/RestDecommissionAction.java | 3 +- 5 files changed, 134 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 7ec2cea769069..0e8e1126fc0e6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -32,6 +32,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest() { + @Override + public void handleResponse(DecommissionResponse response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public DecommissionResponse read(StreamInput in) throws IOException { + return new DecommissionResponse(in); + } + } + ); + } /** * This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor} * Once the tasks are submitted, it waits for an expected cluster state to guarantee diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index f36d7b3e06da9..a47173fe29efb 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -75,6 +75,7 @@ public class DecommissionService { private final TransportService transportService; private final ThreadPool threadPool; private final DecommissionController decommissionController; + private final long startTime; private volatile List awarenessAttributes; private volatile Map> forcedAwarenessAttributes; private volatile int maxVotingConfigExclusions; @@ -92,6 +93,7 @@ public DecommissionService( this.transportService = transportService; this.threadPool = threadPool; this.decommissionController = new DecommissionController(clusterService, transportService, allocationService, threadPool); + this.startTime = threadPool.relativeTimeInMillis(); this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes); @@ -128,7 +130,7 @@ private void setMaxVotingConfigExclusions(int maxVotingConfigExclusions) { * Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT} * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration * - * @param decommissionRequest decommission request Object + * @param decommissionRequest request for decommission action * @param listener register decommission listener */ public void startDecommissionAction( @@ -147,6 +149,7 @@ public ClusterState execute(ClusterState currentState) { DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); // check that request is eligible to proceed and attribute is weighed away ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); + ensureEligibleRetry(decommissionRequest, decommissionAttributeMetadata); ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); @@ -241,18 +244,22 @@ public void onNewClusterState(ClusterState state) { drainNodesWithDecommissionedAttribute(decommissionRequest); } } else { - // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not leader - // this will ensures that request is retried until cluster manager times out - logger.info( - "local node is not eligible to process the request, " - + "throwing NotClusterManagerException to attempt a retry on an eligible node" - ); - listener.onFailure( - new NotClusterManagerException( - "node [" - + transportService.getLocalNode().toString() - + "] not eligible to execute decommission request. Will retry until timeout." - ) + // since the local node is no longer cluster manager which could've happened due to leader abdication, + // hence retrying the decommission action until it times out + logger.info("local node is not eligible to process the request, retrying the transport action until it times out"); + decommissionController.retryDecommissionAction( + decommissionRequest, + startTime, + ActionListener.delegateResponse(listener, (delegatedListener, t) -> { + logger.debug( + () -> new ParameterizedMessage( + "failed to retry decommission action for attribute [{}]", + decommissionRequest.getDecommissionAttribute() + ), + t + ); + delegatedListener.onFailure(t); + }) ); } } @@ -503,6 +510,21 @@ private static void ensureEligibleRequest( } } + private static void ensureEligibleRetry( + DecommissionRequest decommissionRequest, + DecommissionAttributeMetadata decommissionAttributeMetadata + ) { + if (decommissionAttributeMetadata != null) { + if (decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT) + && decommissionRequest.retryOnClusterManagerChange() == false) { + throw new DecommissioningFailedException( + decommissionRequest.getDecommissionAttribute(), + "concurrent request received to decommission attribute" + ); + } + } + } + private ActionListener statusUpdateListener() { return new ActionListener<>() { @Override diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index c041974165eb6..f4fdb6a3aa3fc 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -58,6 +58,7 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { TimeValue delayTimeout = request.paramAsTime("delay_timeout", DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT); decommissionRequest.setDelayTimeout(delayTimeout); } - return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)); + return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)) + .setRetryOnClusterManagerChange(false); } } From 743ca013849c73dcf94782ce0688d409028bc9b6 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 13 Dec 2022 20:11:10 +0530 Subject: [PATCH 02/37] Fix spotless check Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 19 ++++++++++++------- .../decommission/DecommissionController.java | 10 ++++++++-- .../decommission/DecommissionService.java | 5 +++-- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 0e8e1126fc0e6..9627dfae09ba2 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -148,12 +148,17 @@ public ActionRequestValidationException validate() { @Override public String toString() { - return "DecommissionRequest{" + - "decommissionAttribute=" + decommissionAttribute + - ", retryOnClusterManagerChange=" + retryOnClusterManagerChange + - ", delayTimeout=" + delayTimeout + - ", noDelay=" + noDelay + - ", clusterManagerNodeTimeout=" + clusterManagerNodeTimeout + - '}'; + return "DecommissionRequest{" + + "decommissionAttribute=" + + decommissionAttribute + + ", retryOnClusterManagerChange=" + + retryOnClusterManagerChange + + ", delayTimeout=" + + delayTimeout + + ", noDelay=" + + noDelay + + ", clusterManagerNodeTimeout=" + + clusterManagerNodeTimeout + + '}'; } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 8c9af47b4c852..2d7074e368083 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -91,9 +91,14 @@ public void retryDecommissionAction( long startTime, ActionListener listener ) { - final long remainingTimeoutMS = decommissionRequest.clusterManagerNodeTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime); + final long remainingTimeoutMS = decommissionRequest.clusterManagerNodeTimeout().millis() - (threadPool.relativeTimeInMillis() + - startTime); if (remainingTimeoutMS <= 0) { - String errorMsg = "cluster manager node timed out before retrying [" + DecommissionAction.NAME + "] for attribute [" + decommissionRequest.getDecommissionAttribute() + "] after cluster manager change"; + String errorMsg = "cluster manager node timed out before retrying [" + + DecommissionAction.NAME + + "] for attribute [" + + decommissionRequest.getDecommissionAttribute() + + "] after cluster manager change"; logger.debug(errorMsg); listener.onFailure(new OpenSearchTimeoutException(errorMsg)); return; @@ -127,6 +132,7 @@ public DecommissionResponse read(StreamInput in) throws IOException { } ); } + /** * This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor} * Once the tasks are submitted, it waits for an expected cluster state to guarantee diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index a47173fe29efb..6870fb617a576 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -20,7 +20,6 @@ import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateObserver.Listener; import org.opensearch.cluster.ClusterStateUpdateTask; -import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -246,7 +245,9 @@ public void onNewClusterState(ClusterState state) { } else { // since the local node is no longer cluster manager which could've happened due to leader abdication, // hence retrying the decommission action until it times out - logger.info("local node is not eligible to process the request, retrying the transport action until it times out"); + logger.info( + "local node is not eligible to process the request, retrying the transport action until it times out" + ); decommissionController.retryDecommissionAction( decommissionRequest, startTime, From dbb404e8b43bdccccce23698a7d0c3f0dc394cba Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 13 Dec 2022 20:16:19 +0530 Subject: [PATCH 03/37] Add changelog Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b29a0526a6ffc..4788950514883 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955)) +- Gracefully handle concurrent zone decommission action ([#5542](https://github.com/opensearch-project/OpenSearch/pull/5542)) ### Deprecated From ee90987ef516d5c21e684aac00bae34961a9ee6f Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 14 Dec 2022 12:49:30 +0530 Subject: [PATCH 04/37] Add request timeout param Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 37 ++++++++++++++++--- .../put/DecommissionRequestBuilder.java | 11 ++++++ .../decommission/DecommissionController.java | 5 +-- .../admin/cluster/RestDecommissionAction.java | 4 +- 4 files changed, 48 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 9627dfae09ba2..93350d93d9550 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -29,11 +29,12 @@ */ public class DecommissionRequest extends ClusterManagerNodeRequest { + public static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(2L); public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120); private DecommissionAttribute decommissionAttribute; private boolean retryOnClusterManagerChange; - + private TimeValue timeout; private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT; // holder for no_delay param. To avoid draining time timeout. @@ -42,12 +43,17 @@ public class DecommissionRequest extends ClusterManagerNodeRequest Date: Fri, 16 Dec 2022 12:21:52 +0530 Subject: [PATCH 05/37] Changes Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 15 ++++++--------- .../cluster/decommission/DecommissionService.java | 3 +++ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 93350d93d9550..b06753c353173 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -21,8 +21,6 @@ import static org.opensearch.action.ValidateActions.addValidationError; /** - * Register decommission request. - *

* Registers a decommission request with decommission attribute and timeout * * @opensearch.internal @@ -33,8 +31,8 @@ public class DecommissionRequest extends ClusterManagerNodeRequest Date: Fri, 16 Dec 2022 12:41:28 +0530 Subject: [PATCH 06/37] Fix spotless check Signed-off-by: Rishab Nahata --- .../cluster/decommission/DecommissionService.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 457ba5064766d..fcd6a00e2529d 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -259,8 +259,12 @@ public void onNewClusterState(ClusterState state) { ), t ); - // since the request failed to retry, we will attempt to mark it failed which will also ensure cleaning up the exclusion set for the to-be-decommissioned nodes - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); + // since the request failed to retry, we will attempt to mark it failed which will also ensure cleaning + // up the exclusion set for the to-be-decommissioned nodes + decommissionController.updateMetadataWithDecommissionStatus( + DecommissionStatus.FAILED, + statusUpdateListener() + ); delegatedListener.onFailure(t); }) ); From 4f5e73c023f0abc12d8c026545bc72e7cda7caa2 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Fri, 16 Dec 2022 12:48:07 +0530 Subject: [PATCH 07/37] Fix Signed-off-by: Rishab Nahata --- .../decommission/awareness/put/DecommissionRequest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index b06753c353173..c75931ab2a5a5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -46,8 +46,8 @@ public DecommissionRequest(DecommissionAttribute decommissionAttribute, boolean this.timeout = timeout; } - public DecommissionRequest(DecommissionAttribute decommissionAttribute, TimeValue timeout) { - this(decommissionAttribute, false, timeout); + public DecommissionRequest(DecommissionAttribute decommissionAttribute) { + this.decommissionAttribute = decommissionAttribute; } public DecommissionRequest(StreamInput in) throws IOException { From 378e3d26dcf49166b34b39ca3310ff73395a9273 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 27 Dec 2022 18:36:58 +0530 Subject: [PATCH 08/37] Refactor Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 28 +++++++++---------- .../put/DecommissionRequestBuilder.java | 8 +++--- .../decommission/DecommissionController.java | 2 +- .../decommission/DecommissionService.java | 2 +- .../admin/cluster/RestDecommissionAction.java | 6 ++-- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index c75931ab2a5a5..58d2d1561ec33 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -27,12 +27,12 @@ */ public class DecommissionRequest extends ClusterManagerNodeRequest { - public static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(2L); + public static final TimeValue DEFAULT_REQUEST_TIMEOUT = TimeValue.timeValueMinutes(2L); public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120); private DecommissionAttribute decommissionAttribute; - private boolean retryOnClusterManagerChange = false; - private TimeValue timeout = TIMEOUT; + private boolean retryOnClusterManagerSwitch = false; + private TimeValue timeout = DEFAULT_REQUEST_TIMEOUT; private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT; // holder for no_delay param. To avoid draining time timeout. @@ -40,9 +40,9 @@ public class DecommissionRequest extends ClusterManagerNodeRequest Date: Tue, 27 Dec 2022 18:46:20 +0530 Subject: [PATCH 09/37] Refactor Signed-off-by: Rishab Nahata --- .../cluster/decommission/DecommissionService.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 37b3a514dcebb..a6b6d7502e292 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -147,8 +147,7 @@ public ClusterState execute(ClusterState currentState) { validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); // check that request is eligible to proceed and attribute is weighed away - ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); - ensureEligibleRetry(decommissionRequest, decommissionAttributeMetadata); + ensureEligibleRequest(decommissionAttributeMetadata, decommissionRequest); ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); @@ -465,9 +464,10 @@ private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState st private static void ensureEligibleRequest( DecommissionAttributeMetadata decommissionAttributeMetadata, - DecommissionAttribute requestedDecommissionAttribute + DecommissionRequest decommissionRequest ) { String msg = null; + DecommissionAttribute requestedDecommissionAttribute = decommissionRequest.getDecommissionAttribute(); if (decommissionAttributeMetadata != null) { // check if the same attribute is registered and handle it accordingly if (decommissionAttributeMetadata.decommissionAttribute().equals(requestedDecommissionAttribute)) { @@ -515,11 +515,12 @@ private static void ensureEligibleRequest( if (msg != null) { throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); } + ensureEligibleRetry(decommissionAttributeMetadata, decommissionRequest); } private static void ensureEligibleRetry( - DecommissionRequest decommissionRequest, - DecommissionAttributeMetadata decommissionAttributeMetadata + DecommissionAttributeMetadata decommissionAttributeMetadata, + DecommissionRequest decommissionRequest ) { if (decommissionAttributeMetadata != null) { // we just need to check for INIT status as for other transient statuses we already handle it separately From e4107d540d96c4c061ae7d2adec773f389d4a76c Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 5 Jan 2023 10:55:27 +0530 Subject: [PATCH 10/37] Add test for request Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 6 +++--- .../awareness/put/DecommissionRequestTests.java | 11 +++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 58d2d1561ec33..117112b1d6b3a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -27,7 +27,7 @@ */ public class DecommissionRequest extends ClusterManagerNodeRequest { - public static final TimeValue DEFAULT_REQUEST_TIMEOUT = TimeValue.timeValueMinutes(2L); + public static final TimeValue DEFAULT_REQUEST_TIMEOUT = TimeValue.timeValueSeconds(120); public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120); private DecommissionAttribute decommissionAttribute; @@ -164,8 +164,8 @@ public ActionRequestValidationException validate() { + "] Seconds"; validationException = addValidationError(validationMessage, validationException); } - if (timeout.getMillis() < 0) { - validationException = addValidationError("request timeout is negative", validationException); + if (timeout.getMillis() < DEFAULT_REQUEST_TIMEOUT.getMillis()) { + validationException = addValidationError("request timeout should be at least 2 minutes", validationException); } return validationException; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java index 8cd407b3aecf2..8cec236381d4a 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java @@ -83,5 +83,16 @@ public void testValidation() { assertNotNull(e); assertTrue(e.getMessage().contains("Invalid decommission request")); } + { + String attributeName = "zone"; + String attributeValue = "test"; + DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); + + final DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + request.setTimeout(TimeValue.timeValueMinutes(1)); + ActionRequestValidationException e = request.validate(); + assertNotNull(e); + assertTrue(e.getMessage().contains("request timeout should be at least 2 minutes")); + } } } From 3caed7b962b61694aedcd7148a5e35be576ca0ce Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 5 Jan 2023 13:30:43 +0530 Subject: [PATCH 11/37] Add tests for controller Signed-off-by: Rishab Nahata --- .../DecommissionControllerTests.java | 123 ++++++++++-------- 1 file changed, 69 insertions(+), 54 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index cf92130095e12..03e7f824046d5 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -16,6 +16,9 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction; import org.opensearch.action.support.ActionFilters; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -37,6 +40,7 @@ import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportService; import java.util.Arrays; @@ -118,6 +122,21 @@ public void setTransportServiceAndDefaultClusterState() { new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)) ); // registers action + new TransportDecommissionAction( + transportService, + clusterService, + new DecommissionService( + nodeSettingsBuilder.build(), + clusterSettings, + clusterService, + transportService, + threadPool, + allocationService + ), + threadPool, + new ActionFilters(emptySet()), + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY))); //registers action + transportService.start(); transportService.acceptIncomingRequests(); decommissionController = new DecommissionController(clusterService, transportService, allocationService, threadPool); @@ -129,6 +148,56 @@ public void shutdownThreadPoolAndClusterService() { threadPool.shutdown(); } + public void testRetryDecommissionActionTimedOut() throws InterruptedException { + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + long startTime = threadPool.relativeTimeInMillis() - 125000; + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference exceptionReference = new AtomicReference<>(); + decommissionController.retryDecommissionAction(decommissionRequest, startTime, new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("onResponse shouldn't have been called"); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionReference.set(e); + countDownLatch.countDown(); + } + }); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); + MatcherAssert.assertThat(exceptionReference.get(), instanceOf(OpenSearchTimeoutException.class)); + MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("node timed out before retrying")); + } + + public void testRetryDecommissionAction() throws InterruptedException { + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + long startTime = threadPool.relativeTimeInMillis(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference exceptionReference = new AtomicReference<>(); + decommissionController.retryDecommissionAction(decommissionRequest, startTime, new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("onResponse shouldn't have been called"); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionReference.set(e); + countDownLatch.countDown(); + } + }); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); + MatcherAssert.assertThat(exceptionReference.get(), instanceOf(RemoteTransportException.class)); + MatcherAssert.assertThat(exceptionReference.get().getCause(), instanceOf(DecommissioningFailedException.class)); + } + public void testNodesRemovedForDecommissionRequestSuccessfulResponse() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); Set nodesToBeRemoved = new HashSet<>(); @@ -265,60 +334,6 @@ public void onFailure(Exception e) { assertEquals(decommissionAttributeMetadata.status(), newStatus); } - private static class AdjustConfigurationForExclusions implements ClusterStateObserver.Listener { - - final CountDownLatch doneLatch; - - AdjustConfigurationForExclusions(CountDownLatch latch) { - this.doneLatch = latch; - } - - @Override - public void onNewClusterState(ClusterState state) { - clusterService.getClusterManagerService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - assertThat(currentState, sameInstance(state)); - final Set votingNodeIds = new HashSet<>(); - currentState.nodes().forEach(n -> votingNodeIds.add(n.getId())); - currentState.getVotingConfigExclusions().forEach(t -> votingNodeIds.remove(t.getNodeId())); - final CoordinationMetadata.VotingConfiguration votingConfiguration = new CoordinationMetadata.VotingConfiguration( - votingNodeIds - ); - return builder(currentState).metadata( - Metadata.builder(currentState.metadata()) - .coordinationMetadata( - CoordinationMetadata.builder(currentState.coordinationMetadata()) - .lastAcceptedConfiguration(votingConfiguration) - .lastCommittedConfiguration(votingConfiguration) - .build() - ) - ).build(); - } - - @Override - public void onFailure(String source, Exception e) { - throw new AssertionError("unexpected failure", e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - doneLatch.countDown(); - } - }); - } - - @Override - public void onClusterServiceClose() { - throw new AssertionError("unexpected close"); - } - - @Override - public void onTimeout(TimeValue timeout) { - throw new AssertionError("unexpected timeout"); - } - } - private ClusterState addNodes(ClusterState clusterState, String zone, String... nodeIds) { DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newNode(nodeId, singletonMap("zone", zone)))); From 8e4ab4dcc8796586fa9e39393779bb6b1d736c58 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 5 Jan 2023 14:34:50 +0530 Subject: [PATCH 12/37] Test for retry Signed-off-by: Rishab Nahata --- .../decommission/DecommissionService.java | 22 ++--------- .../DecommissionServiceTests.java | 39 +++++++++++++++++++ 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index ee0e0dc8ee091..0e4d0add6716d 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -477,8 +477,11 @@ private static void ensureEligibleRequest( // check if the same attribute is registered and handle it accordingly if (decommissionAttributeMetadata.decommissionAttribute().equals(requestedDecommissionAttribute)) { switch (decommissionAttributeMetadata.status()) { - // for INIT and FAILED - we are good to process it again + // for INIT - check if it is eligible internal retry case INIT: + msg = (decommissionRequest.retryOnClusterManagerSwitch() == false) ? "concurrent request received to decommission attribute" : null; + break; + // for FAILED - we are good to process it again case FAILED: break; case DRAINING: @@ -520,23 +523,6 @@ private static void ensureEligibleRequest( if (msg != null) { throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); } - ensureEligibleRetry(decommissionAttributeMetadata, decommissionRequest); - } - - private static void ensureEligibleRetry( - DecommissionAttributeMetadata decommissionAttributeMetadata, - DecommissionRequest decommissionRequest - ) { - if (decommissionAttributeMetadata != null) { - // we just need to check for INIT status as for other transient statuses we already handle it separately - if (decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT) - && decommissionRequest.retryOnClusterManagerSwitch() == false) { - throw new DecommissioningFailedException( - decommissionRequest.getDecommissionAttribute(), - "concurrent request received to decommission attribute" - ); - } - } } private ActionListener statusUpdateListener() { diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 81fbd7c0e332b..69331db39799f 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.decommission; +import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -37,6 +38,7 @@ import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportService; import java.util.Collections; @@ -49,6 +51,9 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; import static org.opensearch.cluster.ClusterState.builder; import static org.opensearch.cluster.OpenSearchAllocationTestCase.createAllocationService; import static org.opensearch.test.ClusterServiceUtils.createClusterService; @@ -195,6 +200,40 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testExternalDecommissionRetryNotAllowed() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionStatus oldStatus = DecommissionStatus.INIT; + DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( + new DecommissionAttribute("zone", "zone_1"), + oldStatus + ); + final ClusterState.Builder builder = builder(clusterService.state()); + setState( + clusterService, + builder.metadata(Metadata.builder(clusterService.state().metadata()).decommissionAttributeMetadata(oldMetadata).build()) + ); + AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("on response shouldn't have been called"); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionReference.set(e); + countDownLatch.countDown(); + } + }; + DecommissionRequest request = new DecommissionRequest(new DecommissionAttribute("zone", "zone_1")); + decommissionService.startDecommissionAction(request, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); + MatcherAssert.assertThat(exceptionReference.get(), instanceOf(DecommissioningFailedException.class)); + MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("concurrent request received to decommission attribute")); + } + @SuppressWarnings("unchecked") public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessful() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); From a5c4e08cfb4910359d59786246c13bbd8b33d504 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 5 Jan 2023 16:16:20 +0530 Subject: [PATCH 13/37] Fix spotless check Signed-off-by: Rishab Nahata --- .../cluster/decommission/DecommissionService.java | 4 +++- .../rest/action/admin/cluster/RestDecommissionAction.java | 4 +++- .../cluster/decommission/DecommissionControllerTests.java | 6 ++---- .../cluster/decommission/DecommissionServiceTests.java | 6 ++++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 0e4d0add6716d..b17b36b5abb58 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -479,7 +479,9 @@ private static void ensureEligibleRequest( switch (decommissionAttributeMetadata.status()) { // for INIT - check if it is eligible internal retry case INIT: - msg = (decommissionRequest.retryOnClusterManagerSwitch() == false) ? "concurrent request received to decommission attribute" : null; + msg = (decommissionRequest.retryOnClusterManagerSwitch() == false) + ? "concurrent request received to decommission attribute" + : null; break; // for FAILED - we are good to process it again case FAILED: diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index 5005d610b5818..798fe300173f6 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -61,6 +61,8 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { } return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)) .setRetryOnClusterManagerSwitch(false) - .setTimeout(TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_REQUEST_TIMEOUT, getClass().getSimpleName() + ".timeout")); + .setTimeout( + TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_REQUEST_TIMEOUT, getClass().getSimpleName() + ".timeout") + ); } } diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 03e7f824046d5..01e6cc773ea91 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -22,8 +22,6 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.ClusterStateObserver; -import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; @@ -59,7 +57,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.sameInstance; import static org.opensearch.cluster.ClusterState.builder; import static org.opensearch.cluster.OpenSearchAllocationTestCase.createAllocationService; import static org.opensearch.test.ClusterServiceUtils.createClusterService; @@ -135,7 +132,8 @@ public void setTransportServiceAndDefaultClusterState() { ), threadPool, new ActionFilters(emptySet()), - new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY))); //registers action + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)) + ); // registers action transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 69331db39799f..15d59c05edd6b 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -38,7 +38,6 @@ import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportService; import java.util.Collections; @@ -231,7 +230,10 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); MatcherAssert.assertThat(exceptionReference.get(), instanceOf(DecommissioningFailedException.class)); - MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("concurrent request received to decommission attribute")); + MatcherAssert.assertThat( + exceptionReference.get().getMessage(), + containsString("concurrent request received to decommission attribute") + ); } @SuppressWarnings("unchecked") From 51743ceba8be3600a71f0e046e1f2e2b64bb408c Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 5 Jan 2023 17:55:07 +0530 Subject: [PATCH 14/37] Move check at rest layer Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 3 --- .../admin/cluster/RestDecommissionAction.java | 14 +++++++++----- .../awareness/put/DecommissionRequestTests.java | 11 ----------- .../admin/cluster/RestDecommissionActionTests.java | 11 +++++++++++ 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 117112b1d6b3a..19a4575ab680a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -164,9 +164,6 @@ public ActionRequestValidationException validate() { + "] Seconds"; validationException = addValidationError(validationMessage, validationException); } - if (timeout.getMillis() < DEFAULT_REQUEST_TIMEOUT.getMillis()) { - validationException = addValidationError("request timeout should be at least 2 minutes", validationException); - } return validationException; } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index 798fe300173f6..5f8d475cd9601 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -59,10 +59,14 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { TimeValue delayTimeout = request.paramAsTime("delay_timeout", DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT); decommissionRequest.setDelayTimeout(delayTimeout); } - return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)) - .setRetryOnClusterManagerSwitch(false) - .setTimeout( - TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_REQUEST_TIMEOUT, getClass().getSimpleName() + ".timeout") - ); + + if (request.hasParam("timeout")) { + TimeValue requestTimeout = request.paramAsTime("timeout", DEFAULT_REQUEST_TIMEOUT); + if (requestTimeout.getMillis() < DEFAULT_REQUEST_TIMEOUT.getMillis()) { + throw new IllegalArgumentException(String.format("default request timeout has to be at least [%s]", DEFAULT_REQUEST_TIMEOUT)); + } + decommissionRequest.setTimeout(requestTimeout); + } + return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)).setRetryOnClusterManagerSwitch(false); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java index 8cec236381d4a..8cd407b3aecf2 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java @@ -83,16 +83,5 @@ public void testValidation() { assertNotNull(e); assertTrue(e.getMessage().contains("Invalid decommission request")); } - { - String attributeName = "zone"; - String attributeValue = "test"; - DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); - - final DecommissionRequest request = new DecommissionRequest(decommissionAttribute); - request.setTimeout(TimeValue.timeValueMinutes(1)); - ActionRequestValidationException e = request.validate(); - assertNotNull(e); - assertTrue(e.getMessage().contains("request timeout should be at least 2 minutes")); - } } } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java index b5f61f751b19f..a373212d186a0 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java @@ -87,6 +87,17 @@ public void testCreateRequestWithDelayTimeout() throws IOException { assertEquals(deprecatedRequest.getHttpRequest().method(), RestRequest.Method.PUT); } + public void testCreateRequestWithInvalidTimeout() throws IOException { + Map params = new HashMap<>(); + params.put("awareness_attribute_name", "zone"); + params.put("awareness_attribute_value", "zone-1"); + params.put("timeout", "1m"); + + RestRequest request = buildRestRequest(params); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> action.createRequest(request)); + assertEquals("default request timeout has to be at least [2m]", e.getMessage()); + } + private FakeRestRequest buildRestRequest(Map params) { return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) .withPath("/_cluster/decommission/awareness/{awareness_attribute_name}/{awareness_attribute_value}") From b63baf7d2c9a3a6b34cac844bab75b3a0d8f5172 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 5 Jan 2023 18:02:14 +0530 Subject: [PATCH 15/37] Fix spotless check Signed-off-by: Rishab Nahata --- .../rest/action/admin/cluster/RestDecommissionAction.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index 5f8d475cd9601..562cc1e58fd24 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -63,10 +63,13 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { if (request.hasParam("timeout")) { TimeValue requestTimeout = request.paramAsTime("timeout", DEFAULT_REQUEST_TIMEOUT); if (requestTimeout.getMillis() < DEFAULT_REQUEST_TIMEOUT.getMillis()) { - throw new IllegalArgumentException(String.format("default request timeout has to be at least [%s]", DEFAULT_REQUEST_TIMEOUT)); + throw new IllegalArgumentException( + String.format("default request timeout has to be at least [%s]", DEFAULT_REQUEST_TIMEOUT) + ); } decommissionRequest.setTimeout(requestTimeout); } - return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)).setRetryOnClusterManagerSwitch(false); + return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)) + .setRetryOnClusterManagerSwitch(false); } } From b8b1434ae9f11ba6646a6393717703ae0f01f171 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 5 Jan 2023 18:20:59 +0530 Subject: [PATCH 16/37] Minor fix Signed-off-by: Rishab Nahata --- .../rest/action/admin/cluster/RestDecommissionAction.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index 562cc1e58fd24..5de1bb0b3e532 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.List; +import java.util.Locale; import static java.util.Collections.singletonList; import static org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest.DEFAULT_REQUEST_TIMEOUT; @@ -64,7 +65,7 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { TimeValue requestTimeout = request.paramAsTime("timeout", DEFAULT_REQUEST_TIMEOUT); if (requestTimeout.getMillis() < DEFAULT_REQUEST_TIMEOUT.getMillis()) { throw new IllegalArgumentException( - String.format("default request timeout has to be at least [%s]", DEFAULT_REQUEST_TIMEOUT) + String.format(Locale.ROOT, "default request timeout has to be at least [%s]", DEFAULT_REQUEST_TIMEOUT) ); } decommissionRequest.setTimeout(requestTimeout); From ad6207f43fdadbc8ee2a3ec96976e3632d216e69 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 5 Jan 2023 18:50:55 +0530 Subject: [PATCH 17/37] Fix spotless check Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 6 ++++-- .../cluster/decommission/DecommissionController.java | 10 +++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 19a4575ab680a..9dbc65aea9d10 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -88,19 +88,21 @@ public DecommissionAttribute getDecommissionAttribute() { return this.decommissionAttribute; } - public void setDelayTimeout(TimeValue delayTimeout) { + public DecommissionRequest setDelayTimeout(TimeValue delayTimeout) { this.delayTimeout = delayTimeout; + return this; } public TimeValue getDelayTimeout() { return this.delayTimeout; } - public void setNoDelay(boolean noDelay) { + public DecommissionRequest setNoDelay(boolean noDelay) { if (noDelay) { this.delayTimeout = TimeValue.ZERO; } this.noDelay = noDelay; + return this; } public boolean isNoDelay() { diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 5e111df8cf8e5..d328ec5012f6c 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -102,12 +102,16 @@ public void retryDecommissionAction( listener.onFailure(new OpenSearchTimeoutException(errorMsg)); return; } - decommissionRequest.setRetryOnClusterManagerSwitch(true); - decommissionRequest.setTimeout(TimeValue.timeValueMillis(remainingTimeoutMS)); + DecommissionRequest newRequest = new DecommissionRequest().setDecommissionAttribute(decommissionRequest.getDecommissionAttribute()) + .setRetryOnClusterManagerSwitch(true) + .setTimeout(TimeValue.timeValueMillis(remainingTimeoutMS)) + .setDelayTimeout(decommissionRequest.getDelayTimeout()) + .setNoDelay(decommissionRequest.isNoDelay()); + transportService.sendRequest( transportService.getLocalNode(), DecommissionAction.NAME, - decommissionRequest, + newRequest, new TransportResponseHandler() { @Override public void handleResponse(DecommissionResponse response) { From 48015015895a0c4e241b7c056a68e1dffdbbcf5c Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 5 Jan 2023 18:58:21 +0530 Subject: [PATCH 18/37] Fix Signed-off-by: Rishab Nahata --- .../decommission/DecommissionService.java | 21 +------------------ 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index b17b36b5abb58..42217ffc374d2 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -247,26 +247,7 @@ public void onNewClusterState(ClusterState state) { logger.info( "local node is not eligible to process the request, retrying the transport action until it times out" ); - decommissionController.retryDecommissionAction( - decommissionRequest, - startTime, - ActionListener.delegateResponse(listener, (delegatedListener, t) -> { - logger.debug( - () -> new ParameterizedMessage( - "failed to retry decommission action for attribute [{}]", - decommissionRequest.getDecommissionAttribute() - ), - t - ); - // since the request failed to retry, we will attempt to mark it failed which will also ensure cleaning - // up the exclusion set for the to-be-decommissioned nodes - decommissionController.updateMetadataWithDecommissionStatus( - DecommissionStatus.FAILED, - statusUpdateListener() - ); - delegatedListener.onFailure(t); - }) - ); + decommissionController.retryDecommissionAction(decommissionRequest, startTime, listener); } } From eb5c3939f741c5f074356e0664304b9adf2cc35d Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 01:04:14 +0530 Subject: [PATCH 19/37] Remove retry flag and use original flag Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 52 +++++----------- .../put/DecommissionRequestBuilder.java | 19 ++---- .../decommission/DecommissionController.java | 59 ------------------- .../decommission/DecommissionService.java | 21 +++++-- .../admin/cluster/RestDecommissionAction.java | 3 +- .../DecommissionControllerTests.java | 50 ---------------- 6 files changed, 35 insertions(+), 169 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 9dbc65aea9d10..5b4df14d0d347 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -31,8 +31,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest() { - @Override - public void handleResponse(DecommissionResponse response) { - listener.onResponse(response); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public DecommissionResponse read(StreamInput in) throws IOException { - return new DecommissionResponse(in); - } - } - ); - } - /** * This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor} * Once the tasks are submitted, it waits for an expected cluster state to guarantee diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 42217ffc374d2..5017c85467e6a 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -20,6 +20,7 @@ import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateObserver.Listener; import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -146,6 +147,9 @@ public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); + if (decommissionAttributeMetadata == null) { + decommissionRequest.originalRequest(true); + } // check that request is eligible to proceed and attribute is weighed away ensureEligibleRequest(decommissionAttributeMetadata, decommissionRequest); ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); @@ -242,12 +246,19 @@ public void onNewClusterState(ClusterState state) { drainNodesWithDecommissionedAttribute(decommissionRequest); } } else { - // since the local node is no longer cluster manager which could've happened due to leader abdication, - // hence retrying the decommission action until it times out + // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not leader + // this will ensures that request is retried until cluster manager times out logger.info( - "local node is not eligible to process the request, retrying the transport action until it times out" + "local node is not eligible to process the request, " + + "throwing NotClusterManagerException to attempt a retry on an eligible node" + ); + listener.onFailure( + new NotClusterManagerException( + "node [" + + transportService.getLocalNode().toString() + + "] not eligible to execute decommission request. Will retry until timeout." + ) ); - decommissionController.retryDecommissionAction(decommissionRequest, startTime, listener); } } @@ -460,7 +471,7 @@ private static void ensureEligibleRequest( switch (decommissionAttributeMetadata.status()) { // for INIT - check if it is eligible internal retry case INIT: - msg = (decommissionRequest.retryOnClusterManagerSwitch() == false) + msg = (decommissionRequest.originalRequest() == false) ? "concurrent request received to decommission attribute" : null; break; diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index 5de1bb0b3e532..ed0507b30ebcd 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -68,9 +68,8 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { String.format(Locale.ROOT, "default request timeout has to be at least [%s]", DEFAULT_REQUEST_TIMEOUT) ); } - decommissionRequest.setTimeout(requestTimeout); } return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)) - .setRetryOnClusterManagerSwitch(false); + .originalRequest(false); } } diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 01e6cc773ea91..63b8c31fc0e98 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -146,56 +146,6 @@ public void shutdownThreadPoolAndClusterService() { threadPool.shutdown(); } - public void testRetryDecommissionActionTimedOut() throws InterruptedException { - DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1"); - DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); - long startTime = threadPool.relativeTimeInMillis() - 125000; - final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicReference exceptionReference = new AtomicReference<>(); - decommissionController.retryDecommissionAction(decommissionRequest, startTime, new ActionListener() { - @Override - public void onResponse(DecommissionResponse decommissionResponse) { - fail("onResponse shouldn't have been called"); - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - exceptionReference.set(e); - countDownLatch.countDown(); - } - }); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); - MatcherAssert.assertThat(exceptionReference.get(), instanceOf(OpenSearchTimeoutException.class)); - MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("node timed out before retrying")); - } - - public void testRetryDecommissionAction() throws InterruptedException { - DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1"); - DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); - long startTime = threadPool.relativeTimeInMillis(); - final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicReference exceptionReference = new AtomicReference<>(); - decommissionController.retryDecommissionAction(decommissionRequest, startTime, new ActionListener() { - @Override - public void onResponse(DecommissionResponse decommissionResponse) { - fail("onResponse shouldn't have been called"); - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - exceptionReference.set(e); - countDownLatch.countDown(); - } - }); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); - MatcherAssert.assertThat(exceptionReference.get(), instanceOf(RemoteTransportException.class)); - MatcherAssert.assertThat(exceptionReference.get().getCause(), instanceOf(DecommissioningFailedException.class)); - } - public void testNodesRemovedForDecommissionRequestSuccessfulResponse() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); Set nodesToBeRemoved = new HashSet<>(); From ed6bffb5f8ca51744e8a8e3d4276402fe5d666fe Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 01:05:39 +0530 Subject: [PATCH 20/37] Fix spotless check Signed-off-by: Rishab Nahata --- .../cluster/decommission/DecommissionController.java | 3 --- .../cluster/decommission/DecommissionControllerTests.java | 3 --- 2 files changed, 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 00ba1f90beee5..1ff2fb52175c7 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -12,9 +12,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; -import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction; -import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; -import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 63b8c31fc0e98..ba87248b9627d 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -16,8 +16,6 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction; -import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; -import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction; import org.opensearch.action.support.ActionFilters; import org.opensearch.cluster.ClusterName; @@ -38,7 +36,6 @@ import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportService; import java.util.Arrays; From c68cdc7689a0de2a61c9b348ce2cdc6c7c2f441e Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 01:10:08 +0530 Subject: [PATCH 21/37] Refactor code Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 1 - .../admin/cluster/RestDecommissionAction.java | 10 ---------- .../DecommissionControllerTests.java | 17 ----------------- .../cluster/RestDecommissionActionTests.java | 11 ----------- 4 files changed, 39 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 5b4df14d0d347..8c8eef7f09ec6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -27,7 +27,6 @@ */ public class DecommissionRequest extends ClusterManagerNodeRequest { - public static final TimeValue DEFAULT_REQUEST_TIMEOUT = TimeValue.timeValueSeconds(120); public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120); private DecommissionAttribute decommissionAttribute; diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index ed0507b30ebcd..8de0d71aea2c4 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -19,10 +19,8 @@ import java.io.IOException; import java.util.List; -import java.util.Locale; import static java.util.Collections.singletonList; -import static org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest.DEFAULT_REQUEST_TIMEOUT; import static org.opensearch.rest.RestRequest.Method.PUT; /** @@ -61,14 +59,6 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { decommissionRequest.setDelayTimeout(delayTimeout); } - if (request.hasParam("timeout")) { - TimeValue requestTimeout = request.paramAsTime("timeout", DEFAULT_REQUEST_TIMEOUT); - if (requestTimeout.getMillis() < DEFAULT_REQUEST_TIMEOUT.getMillis()) { - throw new IllegalArgumentException( - String.format(Locale.ROOT, "default request timeout has to be at least [%s]", DEFAULT_REQUEST_TIMEOUT) - ); - } - } return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)) .originalRequest(false); } diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index ba87248b9627d..dbf70be2ccf42 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -16,7 +16,6 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction; -import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction; import org.opensearch.action.support.ActionFilters; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -116,22 +115,6 @@ public void setTransportServiceAndDefaultClusterState() { new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)) ); // registers action - new TransportDecommissionAction( - transportService, - clusterService, - new DecommissionService( - nodeSettingsBuilder.build(), - clusterSettings, - clusterService, - transportService, - threadPool, - allocationService - ), - threadPool, - new ActionFilters(emptySet()), - new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)) - ); // registers action - transportService.start(); transportService.acceptIncomingRequests(); decommissionController = new DecommissionController(clusterService, transportService, allocationService, threadPool); diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java index a373212d186a0..b5f61f751b19f 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java @@ -87,17 +87,6 @@ public void testCreateRequestWithDelayTimeout() throws IOException { assertEquals(deprecatedRequest.getHttpRequest().method(), RestRequest.Method.PUT); } - public void testCreateRequestWithInvalidTimeout() throws IOException { - Map params = new HashMap<>(); - params.put("awareness_attribute_name", "zone"); - params.put("awareness_attribute_value", "zone-1"); - params.put("timeout", "1m"); - - RestRequest request = buildRestRequest(params); - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> action.createRequest(request)); - assertEquals("default request timeout has to be at least [2m]", e.getMessage()); - } - private FakeRestRequest buildRestRequest(Map params) { return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) .withPath("/_cluster/decommission/awareness/{awareness_attribute_name}/{awareness_attribute_value}") From 1368ff45ef4533ca2ed18efe3034767dae60dcac Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 01:12:54 +0530 Subject: [PATCH 22/37] Clean up Signed-off-by: Rishab Nahata --- .../opensearch/cluster/decommission/DecommissionService.java | 2 -- .../rest/action/admin/cluster/RestDecommissionAction.java | 1 - 2 files changed, 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 5017c85467e6a..a60e9f779463e 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -75,7 +75,6 @@ public class DecommissionService { private final TransportService transportService; private final ThreadPool threadPool; private final DecommissionController decommissionController; - private final long startTime; private volatile List awarenessAttributes; private volatile Map> forcedAwarenessAttributes; private volatile int maxVotingConfigExclusions; @@ -93,7 +92,6 @@ public DecommissionService( this.transportService = transportService; this.threadPool = threadPool; this.decommissionController = new DecommissionController(clusterService, transportService, allocationService, threadPool); - this.startTime = threadPool.relativeTimeInMillis(); this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index 8de0d71aea2c4..37329ad9b307f 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -58,7 +58,6 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { TimeValue delayTimeout = request.paramAsTime("delay_timeout", DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT); decommissionRequest.setDelayTimeout(delayTimeout); } - return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)) .originalRequest(false); } From 8ddd128a7e9f7c80bd1fa97a60107c095f85c992 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 01:19:59 +0530 Subject: [PATCH 23/37] Empty-Commit Signed-off-by: Rishab Nahata From aa5c4b492e28efff2a553850c7e094ebf61bc3be Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 02:43:06 +0530 Subject: [PATCH 24/37] Empty-Commit Signed-off-by: Rishab Nahata From 3993de9600c276ecc75e80b1f8af8378bda32623 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 03:13:11 +0530 Subject: [PATCH 25/37] Empty-Commit Signed-off-by: Rishab Nahata From afe5f461a1a4ea374ec4c00bcf6e4634a0099ead Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 11:16:37 +0530 Subject: [PATCH 26/37] Cleanup Signed-off-by: Rishab Nahata --- .../cluster/decommission/awareness/put/DecommissionRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 8c8eef7f09ec6..67027c87849da 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -38,7 +38,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest Date: Tue, 10 Jan 2023 11:32:46 +0530 Subject: [PATCH 27/37] Fix Signed-off-by: Rishab Nahata --- .../opensearch/cluster/decommission/DecommissionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index a60e9f779463e..ef3effd680956 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -145,7 +145,7 @@ public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); - if (decommissionAttributeMetadata == null) { + if (decommissionAttributeMetadata == null || decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED)) { decommissionRequest.originalRequest(true); } // check that request is eligible to proceed and attribute is weighed away From 0e8b76c9cfd3b6dfc8fb645c9c916fa8a0a92898 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 14:44:50 +0530 Subject: [PATCH 28/37] Throw exception in line Signed-off-by: Rishab Nahata --- .../cluster/decommission/DecommissionService.java | 14 +++++--------- .../decommission/DecommissionServiceTests.java | 2 +- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index ef3effd680956..db39ae323fac5 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -470,9 +470,9 @@ private static void ensureEligibleRequest( // for INIT - check if it is eligible internal retry case INIT: msg = (decommissionRequest.originalRequest() == false) - ? "concurrent request received to decommission attribute" + ? "same request is already in status [INIT]" : null; - break; + throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); // for FAILED - we are good to process it again case FAILED: break; @@ -480,7 +480,7 @@ private static void ensureEligibleRequest( case IN_PROGRESS: case SUCCESSFUL: msg = "same request is already in status [" + decommissionAttributeMetadata.status() + "]"; - break; + throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); default: throw new IllegalStateException( "unknown status [" + decommissionAttributeMetadata.status() + "] currently registered in metadata" @@ -493,7 +493,7 @@ private static void ensureEligibleRequest( msg = "one awareness attribute [" + decommissionAttributeMetadata.decommissionAttribute().toString() + "] already successfully decommissioned, recommission before triggering another decommission"; - break; + throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); case DRAINING: case IN_PROGRESS: case INIT: @@ -501,7 +501,7 @@ private static void ensureEligibleRequest( msg = "there's an inflight decommission request for attribute [" + decommissionAttributeMetadata.decommissionAttribute().toString() + "] is in progress, cannot process this request"; - break; + throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); case FAILED: break; default: @@ -511,10 +511,6 @@ private static void ensureEligibleRequest( } } } - - if (msg != null) { - throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); - } } private ActionListener statusUpdateListener() { diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 5883f092d22ec..a9d159a7cc149 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -232,7 +232,7 @@ public void onFailure(Exception e) { MatcherAssert.assertThat(exceptionReference.get(), instanceOf(DecommissioningFailedException.class)); MatcherAssert.assertThat( exceptionReference.get().getMessage(), - containsString("concurrent request received to decommission attribute") + containsString("same request is already in status [INIT]") ); } From 7395dffd6c84b45d6a8f0d0b614d7c958e5b3890 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 14:49:12 +0530 Subject: [PATCH 29/37] Fixes Signed-off-by: Rishab Nahata --- .../decommission/awareness/put/DecommissionRequest.java | 5 ----- .../rest/action/admin/cluster/RestDecommissionAction.java | 3 +-- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 67027c87849da..a9dc5bb27614e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -38,11 +38,6 @@ public class DecommissionRequest extends ClusterManagerNodeRequest Date: Tue, 10 Jan 2023 17:07:10 +0530 Subject: [PATCH 30/37] Add IT for concurrency Signed-off-by: Rishab Nahata --- .../AwarenessAttributeDecommissionIT.java | 111 ++++++++++++++++++ .../decommission/DecommissionService.java | 4 +- .../DecommissionServiceTests.java | 5 +- 3 files changed, 113 insertions(+), 7 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 07580f17a67bc..e2eb08bd0969c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -44,6 +44,8 @@ import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; @@ -59,6 +61,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import static org.opensearch.test.NodeRoles.onlyRole; @@ -961,6 +964,114 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw ensureStableCluster(6, TimeValue.timeValueMinutes(2)); } + public void testConcurrentDecommissionAction() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + AtomicInteger numRequestAcknowledged = new AtomicInteger(); + AtomicInteger numRequestUnAcknowledged = new AtomicInteger(); + AtomicInteger numRequestFailed = new AtomicInteger(); + int concurrentRuns = randomIntBetween(5, 10); + TestThreadPool testThreadPool = null; + logger.info("--> starting {} concurrent decommission action in zone {}", concurrentRuns, 'a'); + try { + testThreadPool = new TestThreadPool(AwarenessAttributeDecommissionIT.class.getName()); + List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + for (int i = 0; i < concurrentRuns; i++) { + Runnable thread = () -> { + logger.info("Triggering decommission action"); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); + try { + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest) + .get(); + if (decommissionResponse.isAcknowledged()) { + numRequestAcknowledged.incrementAndGet(); + } else { + numRequestUnAcknowledged.incrementAndGet(); + } + } catch (Exception e) { + numRequestFailed.incrementAndGet(); + } + countDownLatch.countDown(); + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + assertEquals(concurrentRuns, numRequestAcknowledged.get() + numRequestUnAcknowledged.get() + numRequestFailed.get()); + assertEquals(concurrentRuns - 1, numRequestFailed.get()); + assertEquals(1, numRequestAcknowledged.get() + numRequestUnAcknowledged.get()); + } + private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener { final CountDownLatch doneLatch; diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index db39ae323fac5..de2b4b5b0d07f 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -469,9 +469,7 @@ private static void ensureEligibleRequest( switch (decommissionAttributeMetadata.status()) { // for INIT - check if it is eligible internal retry case INIT: - msg = (decommissionRequest.originalRequest() == false) - ? "same request is already in status [INIT]" - : null; + msg = (decommissionRequest.originalRequest() == false) ? "same request is already in status [INIT]" : null; throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); // for FAILED - we are good to process it again case FAILED: diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index a9d159a7cc149..ac7e1451dda5a 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -230,10 +230,7 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); MatcherAssert.assertThat(exceptionReference.get(), instanceOf(DecommissioningFailedException.class)); - MatcherAssert.assertThat( - exceptionReference.get().getMessage(), - containsString("same request is already in status [INIT]") - ); + MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("same request is already in status [INIT]")); } @SuppressWarnings("unchecked") From c46992cc6e858b121379eab31d53098a6797fcc0 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 19:44:55 +0530 Subject: [PATCH 31/37] Add request id to decommission request Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 23 +++++------ .../put/DecommissionRequestBuilder.java | 8 ++-- .../put/TransportDecommissionAction.java | 4 ++ .../DecommissionAttributeMetadata.java | 38 +++++++++++++++---- .../decommission/DecommissionController.java | 3 +- .../decommission/DecommissionHelper.java | 5 ++- .../decommission/DecommissionService.java | 22 +++++++---- .../coordination/JoinTaskExecutorTests.java | 12 ++++-- .../cluster/coordination/NodeJoinTests.java | 3 +- .../DecommissionControllerTests.java | 3 +- .../decommission/DecommissionHelperTests.java | 11 ++++-- .../DecommissionServiceTests.java | 15 +++++--- ...onAttributeMetadataSerializationTests.java | 11 ++++-- .../DecommissionAttributeMetadataTests.java | 2 +- ...missionAttributeMetadataXContentTests.java | 2 +- .../routing/WeightedRoutingServiceTests.java | 6 ++- 16 files changed, 113 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index a9dc5bb27614e..be9ee7cd47e88 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -30,7 +30,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest private final DecommissionAttribute decommissionAttribute; private DecommissionStatus status; + private String requestID; public static final String attributeType = "awareness"; /** @@ -45,18 +46,19 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable * @param decommissionAttribute attribute details * @param status current status of the attribute decommission */ - public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status) { + public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status, String requestId) { this.decommissionAttribute = decommissionAttribute; this.status = status; + this.requestID = requestId; } /** - * Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT} + * Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT} and request id * * @param decommissionAttribute attribute details */ - public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute) { - this(decommissionAttribute, DecommissionStatus.INIT); + public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, String requestID) { + this(decommissionAttribute, DecommissionStatus.INIT, requestID); } /** @@ -77,6 +79,15 @@ public DecommissionStatus status() { return this.status; } + /** + * Returns the request id of the decommission + * + * @return request id + */ + public String requestID() { + return this.requestID; + } + /** * Returns instance of the metadata with updated status * @param newStatus status to be updated with @@ -128,12 +139,13 @@ public boolean equals(Object o) { DecommissionAttributeMetadata that = (DecommissionAttributeMetadata) o; if (!status.equals(that.status)) return false; + if (!requestID.equals(that.requestID)) return false; return decommissionAttribute.equals(that.decommissionAttribute); } @Override public int hashCode() { - return Objects.hash(attributeType, decommissionAttribute, status); + return Objects.hash(attributeType, decommissionAttribute, status, requestID); } /** @@ -152,6 +164,7 @@ public Version getMinimalSupportedVersion() { public DecommissionAttributeMetadata(StreamInput in) throws IOException { this.decommissionAttribute = new DecommissionAttribute(in); this.status = DecommissionStatus.fromString(in.readString()); + this.requestID = in.readString(); } public static NamedDiff readDiffFrom(StreamInput in) throws IOException { @@ -165,12 +178,14 @@ public static NamedDiff readDiffFrom(StreamInput in) throws IOException public void writeTo(StreamOutput out) throws IOException { decommissionAttribute.writeTo(out); out.writeString(status.status()); + out.writeString(requestID); } public static DecommissionAttributeMetadata fromXContent(XContentParser parser) throws IOException { XContentParser.Token token; DecommissionAttribute decommissionAttribute = null; DecommissionStatus status = null; + String requestID = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { String currentFieldName = parser.currentName(); @@ -210,6 +225,13 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) ); } status = DecommissionStatus.fromString(parser.text()); + } else if ("requestID".equals(currentFieldName)) { + if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { + throw new OpenSearchParseException( + "failed to parse status of decommissioning, expected string but found unknown type" + ); + } + requestID = parser.text(); } else { throw new OpenSearchParseException( "unknown field found [{}], failed to parse the decommission attribute", @@ -218,7 +240,7 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) } } } - return new DecommissionAttributeMetadata(decommissionAttribute, status); + return new DecommissionAttributeMetadata(decommissionAttribute, status, requestID); } /** @@ -226,7 +248,7 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) */ @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - toXContent(decommissionAttribute, status, attributeType, builder, params); + toXContent(decommissionAttribute, status, requestID, attributeType, builder, params); return builder; } @@ -245,6 +267,7 @@ public EnumSet context() { public static void toXContent( DecommissionAttribute decommissionAttribute, DecommissionStatus status, + String requestID, String attributeType, XContentBuilder builder, ToXContent.Params params @@ -253,6 +276,7 @@ public static void toXContent( builder.field(decommissionAttribute.attributeName(), decommissionAttribute.attributeValue()); builder.endObject(); builder.field("status", status.status()); + builder.field("requestID", requestID); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 1ff2fb52175c7..79a91ee85f016 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -175,7 +175,8 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata.validateNewStatus(decommissionStatus); decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttributeMetadata.decommissionAttribute(), - decommissionStatus + decommissionStatus, + decommissionAttributeMetadata.requestID() ); ClusterState newState = ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java index 8305bda545998..83589a2f55685 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -32,9 +32,10 @@ public class DecommissionHelper { static ClusterState registerDecommissionAttributeInClusterState( ClusterState currentState, - DecommissionAttribute decommissionAttribute + DecommissionAttribute decommissionAttribute, + String requestID ) { - DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, requestID); return ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build(); diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index de2b4b5b0d07f..5a656b108658c 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -145,14 +145,15 @@ public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); - if (decommissionAttributeMetadata == null || decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED)) { - decommissionRequest.originalRequest(true); - } // check that request is eligible to proceed and attribute is weighed away ensureEligibleRequest(decommissionAttributeMetadata, decommissionRequest); ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); - ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); + ClusterState newState = registerDecommissionAttributeInClusterState( + currentState, + decommissionAttribute, + decommissionRequest.id() + ); // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion nodeIdsToBeExcluded = filterNodesWithDecommissionAttribute(currentState, decommissionAttribute, true).stream() .map(DiscoveryNode::getId) @@ -191,6 +192,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute()); assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT); + assert decommissionAttributeMetadata.requestID().equals(decommissionRequest.id()); assert newState.getVotingConfigExclusions() .stream() .map(CoordinationMetadata.VotingConfigExclusion::getNodeId) @@ -461,7 +463,7 @@ private static void ensureEligibleRequest( DecommissionAttributeMetadata decommissionAttributeMetadata, DecommissionRequest decommissionRequest ) { - String msg = null; + String msg; DecommissionAttribute requestedDecommissionAttribute = decommissionRequest.getDecommissionAttribute(); if (decommissionAttributeMetadata != null) { // check if the same attribute is registered and handle it accordingly @@ -469,9 +471,13 @@ private static void ensureEligibleRequest( switch (decommissionAttributeMetadata.status()) { // for INIT - check if it is eligible internal retry case INIT: - msg = (decommissionRequest.originalRequest() == false) ? "same request is already in status [INIT]" : null; - throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); - // for FAILED - we are good to process it again + if (decommissionRequest.id().equals(decommissionAttributeMetadata.requestID()) == false) { + throw new DecommissioningFailedException( + requestedDecommissionAttribute, + "same request is already in status [INIT]" + ); + } + // for FAILED - we are good to process it again case FAILED: break; case DRAINING: diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index d7253e6f57b38..f6401558221b0 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -240,7 +240,8 @@ public void testPreventJoinClusterWithDecommission() { ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - decommissionStatus + decommissionStatus, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-1")); @@ -257,7 +258,8 @@ public void testJoinClusterWithDifferentDecommission() { ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - decommissionStatus + decommissionStatus, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); @@ -277,7 +279,8 @@ public void testJoinFailedForDecommissionedNode() throws Exception { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone1"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.SUCCESSFUL + DecommissionStatus.SUCCESSFUL, + randomAlphaOfLength(10) ); final ClusterState clusterManagerClusterState = ClusterState.builder(ClusterName.DEFAULT) .nodes( @@ -315,7 +318,8 @@ public void testJoinClusterWithDecommissionFailed() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.FAILED + DecommissionStatus.FAILED, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 18a7b892a424c..ec8d5bcf1c687 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -847,7 +847,8 @@ private static ClusterState initialStateWithDecommissionedAttribute( ) { DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.SUCCESSFUL + DecommissionStatus.SUCCESSFUL, + randomAlphaOfLength(10) ); return ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index dbf70be2ccf42..9736355629fd9 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -234,7 +234,8 @@ private void verifyDecommissionStatusTransition(DecommissionStatus currentStatus final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( new DecommissionAttribute("zone", "zone-1"), - currentStatus + currentStatus, + randomAlphaOfLength(10) ); ClusterState state = clusterService.state(); Metadata metadata = state.metadata(); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java index ab2d8218ec97d..94833e15f55d0 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java @@ -38,7 +38,11 @@ public class DecommissionHelperTests extends OpenSearchTestCase { public void testRegisterAndDeleteDecommissionAttributeInClusterState() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone2"); - ClusterState updatedState = registerDecommissionAttributeInClusterState(initialClusterState, decommissionAttribute); + ClusterState updatedState = registerDecommissionAttributeInClusterState( + initialClusterState, + decommissionAttribute, + randomAlphaOfLength(10) + ); assertEquals(decommissionAttribute, updatedState.metadata().decommissionAttributeMetadata().decommissionAttribute()); updatedState = deleteDecommissionAttributeInClusterState(updatedState); assertNull(updatedState.metadata().decommissionAttributeMetadata()); @@ -79,13 +83,14 @@ public void testNodeCommissioned() { ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - decommissionStatus + decommissionStatus, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); assertTrue(nodeCommissioned(node2, metadata)); assertFalse(nodeCommissioned(node1, metadata)); DecommissionStatus commissionStatus = randomFrom(DecommissionStatus.FAILED, DecommissionStatus.INIT); - decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, commissionStatus); + decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, commissionStatus, randomAlphaOfLength(10)); metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); assertTrue(nodeCommissioned(node2, metadata)); assertTrue(nodeCommissioned(node1, metadata)); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index ac7e1451dda5a..8daee126b0fd0 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -204,7 +204,8 @@ public void testExternalDecommissionRetryNotAllowed() throws InterruptedExceptio DecommissionStatus oldStatus = DecommissionStatus.INIT; DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( new DecommissionAttribute("zone", "zone_1"), - oldStatus + oldStatus, + randomAlphaOfLength(10) ); final ClusterState.Builder builder = builder(clusterService.state()); setState( @@ -239,7 +240,8 @@ public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessf DecommissionStatus oldStatus = randomFrom(DecommissionStatus.SUCCESSFUL, DecommissionStatus.IN_PROGRESS, DecommissionStatus.INIT); DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( new DecommissionAttribute("zone", "zone_1"), - oldStatus + oldStatus, + randomAlphaOfLength(10) ); final ClusterState.Builder builder = builder(clusterService.state()); setState( @@ -287,7 +289,8 @@ public void testScheduleNodesDecommissionOnTimeout() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.DRAINING + DecommissionStatus.DRAINING, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); @@ -308,7 +311,8 @@ public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.INIT + DecommissionStatus.INIT, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); @@ -327,7 +331,8 @@ public void testRecommissionAction() throws InterruptedException { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.SUCCESSFUL + DecommissionStatus.SUCCESSFUL, + randomAlphaOfLength(10) ); final ClusterState.Builder builder = builder(clusterService.state()); setState( diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java index 60b3a03848830..4686526971146 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java @@ -33,7 +33,7 @@ protected Metadata.Custom createTestInstance() { String attributeValue = randomAlphaOfLength(6); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); - return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus, randomAlphaOfLength(10)); } @Override @@ -57,7 +57,11 @@ protected Metadata.Custom makeTestChanges(Metadata.Custom testInstance) { if (randomBoolean()) { attributeValue = randomAlphaOfLength(6); } - return new DecommissionAttributeMetadata(new DecommissionAttribute(attributeName, attributeValue), decommissionStatus); + return new DecommissionAttributeMetadata( + new DecommissionAttribute(attributeName, attributeValue), + decommissionStatus, + randomAlphaOfLength(10) + ); } @Override @@ -77,7 +81,8 @@ protected Metadata.Custom doParseInstance(XContentParser parser) throws IOExcept assertEquals(XContentParser.Token.END_OBJECT, parser.currentToken()); return new DecommissionAttributeMetadata( decommissionAttributeMetadata.decommissionAttribute(), - decommissionAttributeMetadata.status() + decommissionAttributeMetadata.status(), + randomAlphaOfLength(10) ); } } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java index 746d4565b0db3..98cecd8439413 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java @@ -24,7 +24,7 @@ protected DecommissionAttributeMetadata createTestInstance() { String attributeValue = randomAlphaOfLength(6); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); - return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus, randomAlphaOfLength(10)); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java index 030946f4510a1..a83914cec23c0 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java @@ -23,7 +23,7 @@ protected DecommissionAttributeMetadata createTestInstance() { String attributeValue = randomAlphaOfLength(6); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); - return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus, randomAlphaOfLength(10)); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 1f892b993d4d6..65fc1b902f9a4 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -184,7 +184,11 @@ private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map Date: Tue, 10 Jan 2023 19:50:31 +0530 Subject: [PATCH 32/37] Fix Signed-off-by: Rishab Nahata --- .../org/opensearch/cluster/decommission/DecommissionService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 5a656b108658c..806dbbfb821ca 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -477,6 +477,7 @@ private static void ensureEligibleRequest( "same request is already in status [INIT]" ); } + break; // for FAILED - we are good to process it again case FAILED: break; From 7ecbe6e97a45cdb45718c9e1379cac251e1fd5b9 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 20:05:29 +0530 Subject: [PATCH 33/37] Fix Signed-off-by: Rishab Nahata --- .../opensearch/cluster/decommission/DecommissionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 806dbbfb821ca..b0a5497570686 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -478,7 +478,7 @@ private static void ensureEligibleRequest( ); } break; - // for FAILED - we are good to process it again + // for FAILED - we are good to process it again case FAILED: break; case DRAINING: From 88ec79d2e0115e872ce18b538d7143dbb2d6f7c3 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 20:24:28 +0530 Subject: [PATCH 34/37] Fix Signed-off-by: Rishab Nahata --- .../put/TransportDecommissionAction.java | 4 --- .../decommission/DecommissionService.java | 4 +++ .../DecommissionServiceTests.java | 34 +++++++++++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java index 6764fc23aa22b..6f4e3cf82d2ce 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java @@ -19,7 +19,6 @@ import org.opensearch.cluster.decommission.DecommissionService; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.UUIDs; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.threadpool.ThreadPool; @@ -77,9 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener listener) throws Exception { logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString()); - if (request.id() == null) { - request.setID(UUIDs.base64UUID()); - } decommissionService.startDecommissionAction(request, listener); } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index b0a5497570686..e7905bfbe2381 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -28,6 +28,7 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; +import org.opensearch.common.UUIDs; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -144,6 +145,9 @@ public void startDecommissionAction( public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); + if (decommissionRequest.id() == null) { + decommissionRequest.setID(UUIDs.base64UUID()); + } DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); // check that request is eligible to proceed and attribute is weighed away ensureEligibleRequest(decommissionAttributeMetadata, decommissionRequest); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 8daee126b0fd0..2ea6fc9eff4e3 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -273,6 +273,40 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testDecommissioningFailedWhenAnotherRequestForSameAttributeIsExecuted() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionStatus oldStatus = DecommissionStatus.INIT; + DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( + new DecommissionAttribute("zone", "zone_1"), + oldStatus, + randomAlphaOfLength(10) + ); + final ClusterState.Builder builder = builder(clusterService.state()); + setState( + clusterService, + builder.metadata(Metadata.builder(clusterService.state().metadata()).decommissionAttributeMetadata(oldMetadata).build()) + ); + AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("on response shouldn't have been called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof DecommissioningFailedException); + exceptionReference.set(e); + countDownLatch.countDown(); + } + }; + DecommissionRequest request = new DecommissionRequest(new DecommissionAttribute("zone", "zone_1")); + decommissionService.startDecommissionAction(request, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertTrue(exceptionReference.get() instanceof DecommissioningFailedException); + assertThat(exceptionReference.get().getMessage(), Matchers.endsWith("same request is already in status [INIT]")); + } + public void testScheduleNodesDecommissionOnTimeout() { TransportService mockTransportService = Mockito.mock(TransportService.class); ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class); From 2b40e12351e8ab2f54351b44ace995330e72bd71 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 22:03:25 +0530 Subject: [PATCH 35/37] Resolve comments Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 16 ++++++++-------- .../put/DecommissionRequestBuilder.java | 8 ++++---- .../decommission/DecommissionService.java | 11 ++++++----- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index be9ee7cd47e88..44f047137eece 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -30,7 +30,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest decommissionedNodes = filterNodesWithDecommissionAttribute( state, decommissionRequest.getDecommissionAttribute(), @@ -475,7 +476,7 @@ private static void ensureEligibleRequest( switch (decommissionAttributeMetadata.status()) { // for INIT - check if it is eligible internal retry case INIT: - if (decommissionRequest.id().equals(decommissionAttributeMetadata.requestID()) == false) { + if (decommissionRequest.requestID().equals(decommissionAttributeMetadata.requestID()) == false) { throw new DecommissioningFailedException( requestedDecommissionAttribute, "same request is already in status [INIT]" From a6d0c3b5337d29b1c4b1c8158ec5097438bbd004 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 22:08:33 +0530 Subject: [PATCH 36/37] Fix test Signed-off-by: Rishab Nahata --- .../DecommissionAttributeMetadataSerializationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java index 4686526971146..284a0fe77edfb 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java @@ -82,7 +82,7 @@ protected Metadata.Custom doParseInstance(XContentParser parser) throws IOExcept return new DecommissionAttributeMetadata( decommissionAttributeMetadata.decommissionAttribute(), decommissionAttributeMetadata.status(), - randomAlphaOfLength(10) + decommissionAttributeMetadata.requestID() ); } } From 927d0cc0592c72ecc0fb5bde65cd6f46b525499b Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 10 Jan 2023 22:58:43 +0530 Subject: [PATCH 37/37] Test fix Signed-off-by: Rishab Nahata --- .../cluster/decommission/DecommissionServiceTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 2ea6fc9eff4e3..51cd0e6eb23ed 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -343,10 +343,11 @@ public void testScheduleNodesDecommissionOnTimeout() { public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); + String requestID = randomAlphaOfLength(10); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, DecommissionStatus.INIT, - randomAlphaOfLength(10) + requestID ); Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); @@ -354,6 +355,7 @@ public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { DecommissionRequest request = new DecommissionRequest(decommissionAttribute); request.setNoDelay(true); + request.setRequestID(requestID); setState(clusterService, state); decommissionService.drainNodesWithDecommissionedAttribute(request);