diff --git a/CHANGELOG.md b/CHANGELOG.md index 37f9fc821a5c0..21d8041fc2a87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955)) +- Gracefully handle concurrent zone decommission action ([#5542](https://github.com/opensearch-project/OpenSearch/pull/5542)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 07580f17a67bc..e2eb08bd0969c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -44,6 +44,8 @@ import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; @@ -59,6 +61,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import static org.opensearch.test.NodeRoles.onlyRole; @@ -961,6 +964,114 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw ensureStableCluster(6, TimeValue.timeValueMinutes(2)); } + public void testConcurrentDecommissionAction() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + AtomicInteger numRequestAcknowledged = new AtomicInteger(); + AtomicInteger numRequestUnAcknowledged = new AtomicInteger(); + AtomicInteger numRequestFailed = new AtomicInteger(); + int concurrentRuns = randomIntBetween(5, 10); + TestThreadPool testThreadPool = null; + logger.info("--> starting {} concurrent decommission action in zone {}", concurrentRuns, 'a'); + try { + testThreadPool = new TestThreadPool(AwarenessAttributeDecommissionIT.class.getName()); + List operationThreads = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns); + for (int i = 0; i < concurrentRuns; i++) { + Runnable thread = () -> { + logger.info("Triggering decommission action"); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); + try { + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest) + .get(); + if (decommissionResponse.isAcknowledged()) { + numRequestAcknowledged.incrementAndGet(); + } else { + numRequestUnAcknowledged.incrementAndGet(); + } + } catch (Exception e) { + numRequestFailed.incrementAndGet(); + } + countDownLatch.countDown(); + }; + operationThreads.add(thread); + } + TestThreadPool finalTestThreadPool = testThreadPool; + operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable)); + countDownLatch.await(); + } finally { + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + assertEquals(concurrentRuns, numRequestAcknowledged.get() + numRequestUnAcknowledged.get() + numRequestFailed.get()); + assertEquals(concurrentRuns - 1, numRequestFailed.get()); + assertEquals(1, numRequestAcknowledged.get() + numRequestUnAcknowledged.get()); + } + private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener { final CountDownLatch doneLatch; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 7ec2cea769069..44f047137eece 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -21,8 +21,6 @@ import static org.opensearch.action.ValidateActions.addValidationError; /** - * Register decommission request. - *

* Registers a decommission request with decommission attribute and timeout * * @opensearch.internal @@ -32,7 +30,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest * @param decommissionAttribute attribute details * @param status current status of the attribute decommission */ - public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status) { + public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status, String requestId) { this.decommissionAttribute = decommissionAttribute; this.status = status; + this.requestID = requestId; } /** - * Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT} + * Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT} and request id * * @param decommissionAttribute attribute details */ - public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute) { - this(decommissionAttribute, DecommissionStatus.INIT); + public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, String requestID) { + this(decommissionAttribute, DecommissionStatus.INIT, requestID); } /** @@ -77,6 +79,15 @@ public DecommissionStatus status() { return this.status; } + /** + * Returns the request id of the decommission + * + * @return request id + */ + public String requestID() { + return this.requestID; + } + /** * Returns instance of the metadata with updated status * @param newStatus status to be updated with @@ -128,12 +139,13 @@ public boolean equals(Object o) { DecommissionAttributeMetadata that = (DecommissionAttributeMetadata) o; if (!status.equals(that.status)) return false; + if (!requestID.equals(that.requestID)) return false; return decommissionAttribute.equals(that.decommissionAttribute); } @Override public int hashCode() { - return Objects.hash(attributeType, decommissionAttribute, status); + return Objects.hash(attributeType, decommissionAttribute, status, requestID); } /** @@ -152,6 +164,7 @@ public Version getMinimalSupportedVersion() { public DecommissionAttributeMetadata(StreamInput in) throws IOException { this.decommissionAttribute = new DecommissionAttribute(in); this.status = DecommissionStatus.fromString(in.readString()); + this.requestID = in.readString(); } public static NamedDiff readDiffFrom(StreamInput in) throws IOException { @@ -165,12 +178,14 @@ public static NamedDiff readDiffFrom(StreamInput in) throws IOException public void writeTo(StreamOutput out) throws IOException { decommissionAttribute.writeTo(out); out.writeString(status.status()); + out.writeString(requestID); } public static DecommissionAttributeMetadata fromXContent(XContentParser parser) throws IOException { XContentParser.Token token; DecommissionAttribute decommissionAttribute = null; DecommissionStatus status = null; + String requestID = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { String currentFieldName = parser.currentName(); @@ -210,6 +225,13 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) ); } status = DecommissionStatus.fromString(parser.text()); + } else if ("requestID".equals(currentFieldName)) { + if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { + throw new OpenSearchParseException( + "failed to parse status of decommissioning, expected string but found unknown type" + ); + } + requestID = parser.text(); } else { throw new OpenSearchParseException( "unknown field found [{}], failed to parse the decommission attribute", @@ -218,7 +240,7 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) } } } - return new DecommissionAttributeMetadata(decommissionAttribute, status); + return new DecommissionAttributeMetadata(decommissionAttribute, status, requestID); } /** @@ -226,7 +248,7 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) */ @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - toXContent(decommissionAttribute, status, attributeType, builder, params); + toXContent(decommissionAttribute, status, requestID, attributeType, builder, params); return builder; } @@ -245,6 +267,7 @@ public EnumSet context() { public static void toXContent( DecommissionAttribute decommissionAttribute, DecommissionStatus status, + String requestID, String attributeType, XContentBuilder builder, ToXContent.Params params @@ -253,6 +276,7 @@ public static void toXContent( builder.field(decommissionAttribute.attributeName(), decommissionAttribute.attributeValue()); builder.endObject(); builder.field("status", status.status()); + builder.field("requestID", requestID); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 1ff2fb52175c7..79a91ee85f016 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -175,7 +175,8 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata.validateNewStatus(decommissionStatus); decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttributeMetadata.decommissionAttribute(), - decommissionStatus + decommissionStatus, + decommissionAttributeMetadata.requestID() ); ClusterState newState = ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java index 8305bda545998..83589a2f55685 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -32,9 +32,10 @@ public class DecommissionHelper { static ClusterState registerDecommissionAttributeInClusterState( ClusterState currentState, - DecommissionAttribute decommissionAttribute + DecommissionAttribute decommissionAttribute, + String requestID ) { - DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, requestID); return ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build(); diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 4139ad8d36ed0..2e27898dd413c 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -28,6 +28,7 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; +import org.opensearch.common.UUIDs; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -128,7 +129,7 @@ private void setMaxVotingConfigExclusions(int maxVotingConfigExclusions) { * Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT} * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration * - * @param decommissionRequest decommission request Object + * @param decommissionRequest request for decommission action * @param listener register decommission listener */ public void startDecommissionAction( @@ -144,12 +145,19 @@ public void startDecommissionAction( public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); + if (decommissionRequest.requestID() == null) { + decommissionRequest.setRequestID(UUIDs.randomBase64UUID()); + } DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); // check that request is eligible to proceed and attribute is weighed away - ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); + ensureEligibleRequest(decommissionAttributeMetadata, decommissionRequest); ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); - ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); + ClusterState newState = registerDecommissionAttributeInClusterState( + currentState, + decommissionAttribute, + decommissionRequest.requestID() + ); // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion nodeIdsToBeExcluded = filterNodesWithDecommissionAttribute(currentState, decommissionAttribute, true).stream() .map(DiscoveryNode::getId) @@ -188,6 +196,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute()); assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT); + assert decommissionAttributeMetadata.requestID().equals(decommissionRequest.requestID()); assert newState.getVotingConfigExclusions() .stream() .map(CoordinationMetadata.VotingConfigExclusion::getNodeId) @@ -294,6 +303,7 @@ public void onTimeout(TimeValue timeout) { // the action again (retry) void drainNodesWithDecommissionedAttribute(DecommissionRequest decommissionRequest) { ClusterState state = clusterService.getClusterApplierService().state(); + assert state.metadata().decommissionAttributeMetadata().requestID().equals(decommissionRequest.requestID()); Set decommissionedNodes = filterNodesWithDecommissionAttribute( state, decommissionRequest.getDecommissionAttribute(), @@ -456,22 +466,31 @@ private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState st private static void ensureEligibleRequest( DecommissionAttributeMetadata decommissionAttributeMetadata, - DecommissionAttribute requestedDecommissionAttribute + DecommissionRequest decommissionRequest ) { - String msg = null; + String msg; + DecommissionAttribute requestedDecommissionAttribute = decommissionRequest.getDecommissionAttribute(); if (decommissionAttributeMetadata != null) { // check if the same attribute is registered and handle it accordingly if (decommissionAttributeMetadata.decommissionAttribute().equals(requestedDecommissionAttribute)) { switch (decommissionAttributeMetadata.status()) { - // for INIT and FAILED - we are good to process it again + // for INIT - check if it is eligible internal retry case INIT: + if (decommissionRequest.requestID().equals(decommissionAttributeMetadata.requestID()) == false) { + throw new DecommissioningFailedException( + requestedDecommissionAttribute, + "same request is already in status [INIT]" + ); + } + break; + // for FAILED - we are good to process it again case FAILED: break; case DRAINING: case IN_PROGRESS: case SUCCESSFUL: msg = "same request is already in status [" + decommissionAttributeMetadata.status() + "]"; - break; + throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); default: throw new IllegalStateException( "unknown status [" + decommissionAttributeMetadata.status() + "] currently registered in metadata" @@ -484,7 +503,7 @@ private static void ensureEligibleRequest( msg = "one awareness attribute [" + decommissionAttributeMetadata.decommissionAttribute().toString() + "] already successfully decommissioned, recommission before triggering another decommission"; - break; + throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); case DRAINING: case IN_PROGRESS: case INIT: @@ -492,7 +511,7 @@ private static void ensureEligibleRequest( msg = "there's an inflight decommission request for attribute [" + decommissionAttributeMetadata.decommissionAttribute().toString() + "] is in progress, cannot process this request"; - break; + throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); case FAILED: break; default: @@ -502,10 +521,6 @@ private static void ensureEligibleRequest( } } } - - if (msg != null) { - throw new DecommissioningFailedException(requestedDecommissionAttribute, msg); - } } private ActionListener statusUpdateListener() { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index d7253e6f57b38..f6401558221b0 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -240,7 +240,8 @@ public void testPreventJoinClusterWithDecommission() { ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - decommissionStatus + decommissionStatus, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-1")); @@ -257,7 +258,8 @@ public void testJoinClusterWithDifferentDecommission() { ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - decommissionStatus + decommissionStatus, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); @@ -277,7 +279,8 @@ public void testJoinFailedForDecommissionedNode() throws Exception { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone1"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.SUCCESSFUL + DecommissionStatus.SUCCESSFUL, + randomAlphaOfLength(10) ); final ClusterState clusterManagerClusterState = ClusterState.builder(ClusterName.DEFAULT) .nodes( @@ -315,7 +318,8 @@ public void testJoinClusterWithDecommissionFailed() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.FAILED + DecommissionStatus.FAILED, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 18a7b892a424c..ec8d5bcf1c687 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -847,7 +847,8 @@ private static ClusterState initialStateWithDecommissionedAttribute( ) { DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.SUCCESSFUL + DecommissionStatus.SUCCESSFUL, + randomAlphaOfLength(10) ); return ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index cf92130095e12..9736355629fd9 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -19,8 +19,6 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.ClusterStateObserver; -import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; @@ -55,7 +53,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.sameInstance; import static org.opensearch.cluster.ClusterState.builder; import static org.opensearch.cluster.OpenSearchAllocationTestCase.createAllocationService; import static org.opensearch.test.ClusterServiceUtils.createClusterService; @@ -237,7 +234,8 @@ private void verifyDecommissionStatusTransition(DecommissionStatus currentStatus final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( new DecommissionAttribute("zone", "zone-1"), - currentStatus + currentStatus, + randomAlphaOfLength(10) ); ClusterState state = clusterService.state(); Metadata metadata = state.metadata(); @@ -265,60 +263,6 @@ public void onFailure(Exception e) { assertEquals(decommissionAttributeMetadata.status(), newStatus); } - private static class AdjustConfigurationForExclusions implements ClusterStateObserver.Listener { - - final CountDownLatch doneLatch; - - AdjustConfigurationForExclusions(CountDownLatch latch) { - this.doneLatch = latch; - } - - @Override - public void onNewClusterState(ClusterState state) { - clusterService.getClusterManagerService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - assertThat(currentState, sameInstance(state)); - final Set votingNodeIds = new HashSet<>(); - currentState.nodes().forEach(n -> votingNodeIds.add(n.getId())); - currentState.getVotingConfigExclusions().forEach(t -> votingNodeIds.remove(t.getNodeId())); - final CoordinationMetadata.VotingConfiguration votingConfiguration = new CoordinationMetadata.VotingConfiguration( - votingNodeIds - ); - return builder(currentState).metadata( - Metadata.builder(currentState.metadata()) - .coordinationMetadata( - CoordinationMetadata.builder(currentState.coordinationMetadata()) - .lastAcceptedConfiguration(votingConfiguration) - .lastCommittedConfiguration(votingConfiguration) - .build() - ) - ).build(); - } - - @Override - public void onFailure(String source, Exception e) { - throw new AssertionError("unexpected failure", e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - doneLatch.countDown(); - } - }); - } - - @Override - public void onClusterServiceClose() { - throw new AssertionError("unexpected close"); - } - - @Override - public void onTimeout(TimeValue timeout) { - throw new AssertionError("unexpected timeout"); - } - } - private ClusterState addNodes(ClusterState clusterState, String zone, String... nodeIds) { DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newNode(nodeId, singletonMap("zone", zone)))); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java index ab2d8218ec97d..94833e15f55d0 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java @@ -38,7 +38,11 @@ public class DecommissionHelperTests extends OpenSearchTestCase { public void testRegisterAndDeleteDecommissionAttributeInClusterState() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone2"); - ClusterState updatedState = registerDecommissionAttributeInClusterState(initialClusterState, decommissionAttribute); + ClusterState updatedState = registerDecommissionAttributeInClusterState( + initialClusterState, + decommissionAttribute, + randomAlphaOfLength(10) + ); assertEquals(decommissionAttribute, updatedState.metadata().decommissionAttributeMetadata().decommissionAttribute()); updatedState = deleteDecommissionAttributeInClusterState(updatedState); assertNull(updatedState.metadata().decommissionAttributeMetadata()); @@ -79,13 +83,14 @@ public void testNodeCommissioned() { ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - decommissionStatus + decommissionStatus, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); assertTrue(nodeCommissioned(node2, metadata)); assertFalse(nodeCommissioned(node1, metadata)); DecommissionStatus commissionStatus = randomFrom(DecommissionStatus.FAILED, DecommissionStatus.INIT); - decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, commissionStatus); + decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, commissionStatus, randomAlphaOfLength(10)); metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); assertTrue(nodeCommissioned(node2, metadata)); assertTrue(nodeCommissioned(node1, metadata)); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index a942c62bd05eb..51cd0e6eb23ed 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.decommission; +import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -49,6 +50,9 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; import static org.opensearch.cluster.ClusterState.builder; import static org.opensearch.cluster.OpenSearchAllocationTestCase.createAllocationService; import static org.opensearch.test.ClusterServiceUtils.createClusterService; @@ -195,13 +199,49 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testExternalDecommissionRetryNotAllowed() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionStatus oldStatus = DecommissionStatus.INIT; + DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( + new DecommissionAttribute("zone", "zone_1"), + oldStatus, + randomAlphaOfLength(10) + ); + final ClusterState.Builder builder = builder(clusterService.state()); + setState( + clusterService, + builder.metadata(Metadata.builder(clusterService.state().metadata()).decommissionAttributeMetadata(oldMetadata).build()) + ); + AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("on response shouldn't have been called"); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionReference.set(e); + countDownLatch.countDown(); + } + }; + DecommissionRequest request = new DecommissionRequest(new DecommissionAttribute("zone", "zone_1")); + decommissionService.startDecommissionAction(request, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); + MatcherAssert.assertThat(exceptionReference.get(), instanceOf(DecommissioningFailedException.class)); + MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("same request is already in status [INIT]")); + } + @SuppressWarnings("unchecked") public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessful() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionStatus oldStatus = randomFrom(DecommissionStatus.SUCCESSFUL, DecommissionStatus.IN_PROGRESS, DecommissionStatus.INIT); DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( new DecommissionAttribute("zone", "zone_1"), - oldStatus + oldStatus, + randomAlphaOfLength(10) ); final ClusterState.Builder builder = builder(clusterService.state()); setState( @@ -233,6 +273,40 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testDecommissioningFailedWhenAnotherRequestForSameAttributeIsExecuted() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionStatus oldStatus = DecommissionStatus.INIT; + DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( + new DecommissionAttribute("zone", "zone_1"), + oldStatus, + randomAlphaOfLength(10) + ); + final ClusterState.Builder builder = builder(clusterService.state()); + setState( + clusterService, + builder.metadata(Metadata.builder(clusterService.state().metadata()).decommissionAttributeMetadata(oldMetadata).build()) + ); + AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("on response shouldn't have been called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof DecommissioningFailedException); + exceptionReference.set(e); + countDownLatch.countDown(); + } + }; + DecommissionRequest request = new DecommissionRequest(new DecommissionAttribute("zone", "zone_1")); + decommissionService.startDecommissionAction(request, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertTrue(exceptionReference.get() instanceof DecommissioningFailedException); + assertThat(exceptionReference.get().getMessage(), Matchers.endsWith("same request is already in status [INIT]")); + } + public void testScheduleNodesDecommissionOnTimeout() { TransportService mockTransportService = Mockito.mock(TransportService.class); ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class); @@ -249,7 +323,8 @@ public void testScheduleNodesDecommissionOnTimeout() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.DRAINING + DecommissionStatus.DRAINING, + randomAlphaOfLength(10) ); Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); @@ -268,9 +343,11 @@ public void testScheduleNodesDecommissionOnTimeout() { public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); + String requestID = randomAlphaOfLength(10); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.INIT + DecommissionStatus.INIT, + requestID ); Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); @@ -278,6 +355,7 @@ public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { DecommissionRequest request = new DecommissionRequest(decommissionAttribute); request.setNoDelay(true); + request.setRequestID(requestID); setState(clusterService, state); decommissionService.drainNodesWithDecommissionedAttribute(request); @@ -289,7 +367,8 @@ public void testRecommissionAction() throws InterruptedException { DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, - DecommissionStatus.SUCCESSFUL + DecommissionStatus.SUCCESSFUL, + randomAlphaOfLength(10) ); final ClusterState.Builder builder = builder(clusterService.state()); setState( diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java index 60b3a03848830..284a0fe77edfb 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java @@ -33,7 +33,7 @@ protected Metadata.Custom createTestInstance() { String attributeValue = randomAlphaOfLength(6); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); - return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus, randomAlphaOfLength(10)); } @Override @@ -57,7 +57,11 @@ protected Metadata.Custom makeTestChanges(Metadata.Custom testInstance) { if (randomBoolean()) { attributeValue = randomAlphaOfLength(6); } - return new DecommissionAttributeMetadata(new DecommissionAttribute(attributeName, attributeValue), decommissionStatus); + return new DecommissionAttributeMetadata( + new DecommissionAttribute(attributeName, attributeValue), + decommissionStatus, + randomAlphaOfLength(10) + ); } @Override @@ -77,7 +81,8 @@ protected Metadata.Custom doParseInstance(XContentParser parser) throws IOExcept assertEquals(XContentParser.Token.END_OBJECT, parser.currentToken()); return new DecommissionAttributeMetadata( decommissionAttributeMetadata.decommissionAttribute(), - decommissionAttributeMetadata.status() + decommissionAttributeMetadata.status(), + decommissionAttributeMetadata.requestID() ); } } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java index 746d4565b0db3..98cecd8439413 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java @@ -24,7 +24,7 @@ protected DecommissionAttributeMetadata createTestInstance() { String attributeValue = randomAlphaOfLength(6); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); - return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus, randomAlphaOfLength(10)); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java index 030946f4510a1..a83914cec23c0 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java @@ -23,7 +23,7 @@ protected DecommissionAttributeMetadata createTestInstance() { String attributeValue = randomAlphaOfLength(6); DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); - return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus, randomAlphaOfLength(10)); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 1f892b993d4d6..65fc1b902f9a4 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -184,7 +184,11 @@ private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map