From 5d4e99a81012b526d5484b3b0fc0d629a05939f1 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 20 Oct 2022 19:07:14 +0530 Subject: [PATCH 1/4] Fail weight update when decommission ongoing and fail decommission when attribute not weighed away (#4839) * Add checks for decommission before setting weights Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + .../AwarenessAttributeDecommissionIT.java | 69 +++++++++++++++ .../decommission/DecommissionService.java | 28 ++++++ .../routing/WeightedRoutingService.java | 15 ++++ .../DecommissionServiceTests.java | 64 ++++++++++++++ .../routing/WeightedRoutingServiceTests.java | 86 +++++++++++++++++++ 6 files changed, 263 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1929bfc5aa1c2..5fd446928c2a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084)) - Add APIs (GET/PUT) to decommission awareness attribute ([#4261](https://github.com/opensearch-project/OpenSearch/pull/4261)) - Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590)) +- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839)) ### Deprecated ### Removed diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index a2270d63ba6fa..2dc964e3e8845 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -20,11 +20,15 @@ import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.decommission.DecommissionAttribute; import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.cluster.decommission.DecommissioningFailedException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; @@ -37,6 +41,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import static org.opensearch.test.NodeRoles.onlyRole; @@ -102,6 +107,17 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx ensureStableCluster(6); + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + logger.info("--> starting decommissioning nodes in zone {}", 'c'); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); @@ -162,4 +178,57 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx // as by then all nodes should have joined the cluster ensureStableCluster(6, TimeValue.timeValueMinutes(2)); } + + public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + // Start 3 cluster manager eligible nodes + internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).build()); + // start 3 data nodes + internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).build()); + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + assertBusy(() -> { + DecommissioningFailedException ex = expectThrows( + DecommissioningFailedException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() + ); + assertTrue( + ex.getMessage() + .contains("no weights are set to the attribute. Please set appropriate weights before triggering decommission action") + ); + }); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + assertBusy(() -> { + DecommissioningFailedException ex = expectThrows( + DecommissioningFailedException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() + ); + assertTrue(ex.getMessage().contains("weight for decommissioned attribute is expected to be [0.0] but found [1.0]")); + }); + } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index b2c8bfbc0cdc8..e6639ae058066 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -20,7 +20,9 @@ import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -129,6 +131,8 @@ public ClusterState execute(ClusterState currentState) { DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); // check that request is eligible to proceed ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); + // ensure attribute is weighed away + ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString()); return ClusterState.builder(currentState) @@ -413,6 +417,30 @@ private static void validateAwarenessAttribute( } } + private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState state, DecommissionAttribute decommissionAttribute) { + WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().weightedRoutingMetadata(); + if (weightedRoutingMetadata == null) { + throw new DecommissioningFailedException( + decommissionAttribute, + "no weights are set to the attribute. Please set appropriate weights before triggering decommission action" + ); + } + WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); + if (weightedRouting.attributeName().equals(decommissionAttribute.attributeName()) == false) { + throw new DecommissioningFailedException( + decommissionAttribute, + "no weights are specified to attribute [" + decommissionAttribute.attributeName() + "]" + ); + } + Double attributeValueWeight = weightedRouting.weights().get(decommissionAttribute.attributeValue()); + if (attributeValueWeight == null || attributeValueWeight.equals(0.0) == false) { + throw new DecommissioningFailedException( + decommissionAttribute, + "weight for decommissioned attribute is expected to be [0.0] but found [" + attributeValueWeight + "]" + ); + } + } + private static void ensureEligibleRequest( DecommissionAttributeMetadata decommissionAttributeMetadata, DecommissionAttribute requestedDecommissionAttribute diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index 54f2c81aea384..6acb4a1e832cb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -19,6 +19,8 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; @@ -68,6 +70,8 @@ public void registerWeightedRoutingMetadata( clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { + // verify currently no decommission action is ongoing + ensureNoOngoingDecommissionAction(currentState); Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); @@ -154,4 +158,15 @@ public void verifyAwarenessAttribute(String attributeName) { throw validationException; } } + + public void ensureNoOngoingDecommissionAction(ClusterState state) { + DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false) { + throw new IllegalStateException( + "a decommission action is ongoing with status [" + + decommissionAttributeMetadata.status().status() + + "], cannot update weight during this state" + ); + } + } } diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 7dee51b7713f9..c4cf6c7cc6641 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -22,14 +22,17 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.TestThreadPool; @@ -169,6 +172,56 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testDecommissionNotStartedWithoutWeighingAwayAttribute_1() throws InterruptedException { + Map weights = Map.of("zone_1", 1.0, "zone_2", 1.0, "zone_3", 0.0); + setWeightedRoutingWeights(weights); + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1"); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("on response shouldn't have been called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof DecommissioningFailedException); + assertThat( + e.getMessage(), + Matchers.containsString("weight for decommissioned attribute is expected to be [0.0] but found [1.0]") + ); + countDownLatch.countDown(); + } + }; + decommissionService.startDecommissionAction(decommissionAttribute, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + + public void testDecommissionNotStartedWithoutWeighingAwayAttribute_2() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1"); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DecommissionResponse decommissionResponse) { + fail("on response shouldn't have been called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof DecommissioningFailedException); + assertThat( + e.getMessage(), + Matchers.containsString( + "no weights are set to the attribute. Please set appropriate weights before triggering decommission action" + ) + ); + countDownLatch.countDown(); + } + }; + decommissionService.startDecommissionAction(decommissionAttribute, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + @SuppressWarnings("unchecked") public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessful() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); @@ -286,6 +339,17 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + private void setWeightedRoutingWeights(Map weights) { + ClusterState clusterState = clusterService.state(); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + ClusterServiceUtils.setState(clusterService, builder); + } + private ClusterState addDataNodes(ClusterState clusterState, String zone, String... nodeIds) { DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newDataNode(nodeId, singletonMap("zone", zone)))); diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index fc5d46ef84c79..91b8703cacf5c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.routing; +import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.Before; import org.opensearch.Version; @@ -21,6 +22,9 @@ import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -43,6 +47,11 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; public class WeightedRoutingServiceTests extends OpenSearchTestCase { private ThreadPool threadPool; @@ -173,6 +182,15 @@ private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); ClusterState state = clusterService.state(); @@ -276,4 +294,72 @@ public void testVerifyAwarenessAttribute_ValidAttributeName() { fail("verify awareness attribute should not fail"); } } + + public void testAddWeightedRoutingFailsWhenDecommissionOngoing() throws InterruptedException { + Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); + DecommissionStatus status = randomFrom(DecommissionStatus.INIT, DecommissionStatus.IN_PROGRESS, DecommissionStatus.SUCCESSFUL); + ClusterState state = clusterService.state(); + state = setWeightedRoutingWeights(state, weights); + state = setDecommissionAttribute(state, status); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(updatedWeightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionReference.set(e); + countDownLatch.countDown(); + } + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); + MatcherAssert.assertThat(exceptionReference.get(), instanceOf(IllegalStateException.class)); + MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("a decommission action is ongoing with status")); + } + + public void testAddWeightedRoutingPassesWhenDecommissionFailed() throws InterruptedException { + Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); + DecommissionStatus status = DecommissionStatus.FAILED; + ClusterState state = clusterService.state(); + state = setWeightedRoutingWeights(state, weights); + state = setDecommissionAttribute(state, status); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(updatedWeightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + assertTrue(clusterStateUpdateResponse.isAcknowledged()); + assertEquals(updatedWeightedRouting, clusterService.state().metadata().weightedRoutingMetadata().getWeightedRouting()); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) {} + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } } From e07842df9c138f7aff5baef6ec5417d382abc078 Mon Sep 17 00:00:00 2001 From: pranikum <109206473+pranikum@users.noreply.github.com> Date: Tue, 25 Oct 2022 18:19:25 +0530 Subject: [PATCH 2/4] Add changes for graceful node decommission (#4586) * Add changes for graceful node decommission Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com> Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + .../AwarenessAttributeDecommissionIT.java | 30 +++++-- .../awareness/put/DecommissionRequest.java | 31 +++++++ .../put/TransportDecommissionAction.java | 2 +- .../DecommissionAttributeMetadata.java | 18 ++-- .../decommission/DecommissionController.java | 64 +++++++++++++++ .../decommission/DecommissionService.java | 82 +++++++++++++++++-- .../decommission/DecommissionStatus.java | 7 ++ .../admin/cluster/RestDecommissionAction.java | 5 ++ .../put/DecommissionRequestTests.java | 14 ++++ .../coordination/JoinTaskExecutorTests.java | 6 +- .../DecommissionControllerTests.java | 63 ++++++++++---- .../DecommissionServiceTests.java | 64 +++++++++++++-- .../cluster/RestDecommissionActionTests.java | 31 +++++++ 14 files changed, 374 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fd446928c2a4..8a307a2acba02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565)) - Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805)) - Added search backpressure stats API ([#4932](https://github.com/opensearch-project/OpenSearch/pull/4932)) +- Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586)) ### Dependencies - Bumps `com.diffplug.spotless` from 6.9.1 to 6.10.0 diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 2dc964e3e8845..14ec041b7464b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -120,31 +120,44 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx logger.info("--> starting decommissioning nodes in zone {}", 'c'); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); + // Set the timeout to 0 to do immediate Decommission DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); assertTrue(decommissionResponse.isAcknowledged()); + logger.info("--> Received decommissioning nodes in zone {}", 'c'); + // Keep some delay for scheduler to invoke decommission flow + Thread.sleep(500); + // Will wait for all events to complete client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + logger.info("--> Received LANGUID event"); + // assert that decommission status is successful - GetDecommissionStateResponse response = client().execute( + GetDecommissionStateResponse response = client(clusterManagerNodes.get(0)).execute( GetDecommissionStateAction.INSTANCE, new GetDecommissionStateRequest(decommissionAttribute.attributeName()) ).get(); assertEquals(response.getAttributeValue(), decommissionAttribute.attributeValue()); - assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL); + assertEquals(DecommissionStatus.SUCCESSFUL, response.getDecommissionStatus()); + logger.info("--> Decommission status is successful"); ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState(); assertEquals(4, clusterState.nodes().getSize()); + logger.info("--> Got cluster state with 4 nodes."); // assert status on nodes that are part of cluster currently Iterator discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt(); + DiscoveryNode clusterManagerNodeAfterDecommission = null; while (discoveryNodeIterator.hasNext()) { // assert no node has decommissioned attribute DiscoveryNode node = discoveryNodeIterator.next(); assertNotEquals(node.getAttributes().get("zone"), "c"); - + if (node.isClusterManagerNode()) { + clusterManagerNodeAfterDecommission = node; + } // assert all the nodes has status as SUCCESSFUL ClusterService localNodeClusterService = internalCluster().getInstance(ClusterService.class, node.getName()); assertEquals( @@ -152,6 +165,8 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx DecommissionStatus.SUCCESSFUL ); } + assertNotNull("Cluster Manager not found after decommission", clusterManagerNodeAfterDecommission); + logger.info("--> Cluster Manager node found after decommission"); // assert status on decommissioned node // Here we will verify that until it got kicked out, it received appropriate status updates @@ -163,16 +178,18 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), DecommissionStatus.IN_PROGRESS ); + logger.info("--> Verified the decommissioned node Has in progress state."); // Will wait for all events to complete - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); - + client(clusterManagerNodeAfterDecommission.getName()).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + logger.info("--> Got LANGUID event"); // Recommissioning the zone back to gracefully succeed the test once above tests succeeds - DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodes.get(0)).execute( + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodeAfterDecommission.getName()).execute( DeleteDecommissionStateAction.INSTANCE, new DeleteDecommissionStateRequest() ).get(); assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + logger.info("--> Deleting decommission done."); // will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes) // as by then all nodes should have joined the cluster @@ -201,6 +218,7 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); assertBusy(() -> { DecommissioningFailedException ex = expectThrows( DecommissioningFailedException.class, diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 79a6688dc6049..e2fb353b6c749 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -14,6 +14,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; import java.io.IOException; @@ -28,8 +29,15 @@ */ public class DecommissionRequest extends ClusterManagerNodeRequest { + public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120); + private DecommissionAttribute decommissionAttribute; + private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT; + + // holder for no_delay param. To avoid draining time timeout. + private boolean noDelay = false; + public DecommissionRequest() {} public DecommissionRequest(DecommissionAttribute decommissionAttribute) { @@ -39,12 +47,14 @@ public DecommissionRequest(DecommissionAttribute decommissionAttribute) { public DecommissionRequest(StreamInput in) throws IOException { super(in); decommissionAttribute = new DecommissionAttribute(in); + this.delayTimeout = in.readTimeValue(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); decommissionAttribute.writeTo(out); + out.writeTimeValue(delayTimeout); } /** @@ -65,6 +75,19 @@ public DecommissionAttribute getDecommissionAttribute() { return this.decommissionAttribute; } + public TimeValue getDelayTimeout() { + return this.delayTimeout; + } + + public void setNoDelay(boolean noDelay) { + this.delayTimeout = TimeValue.ZERO; + this.noDelay = noDelay; + } + + public boolean isNoDelay() { + return noDelay; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -74,6 +97,14 @@ public ActionRequestValidationException validate() { if (decommissionAttribute.attributeValue() == null || Strings.isEmpty(decommissionAttribute.attributeValue())) { validationException = addValidationError("attribute value is missing", validationException); } + // This validation should not fail since we are not allowing delay timeout to be set externally. + // Still keeping it for double check. + if (noDelay && delayTimeout.getSeconds() > 0) { + final String validationMessage = "Invalid decommission request. no_delay is true and delay_timeout is set to " + + delayTimeout.getSeconds() + + "] Seconds"; + validationException = addValidationError(validationMessage, validationException); + } return validationException; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java index 3a067d2f110b9..6f4e3cf82d2ce 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java @@ -76,6 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener listener) throws Exception { logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString()); - decommissionService.startDecommissionAction(request.getDecommissionAttribute(), listener); + decommissionService.startDecommissionAction(request, listener); } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java index 7f605d0710a38..395d733b8f1b1 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.Objects; +import java.util.Set; /** * Contains metadata about decommission attribute @@ -88,11 +89,14 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) { } // We don't expect that INIT will be new status, as it is registered only when starting the decommission action switch (newStatus) { + case DRAINING: + validateStatus(Set.of(DecommissionStatus.INIT), newStatus); + break; case IN_PROGRESS: - validateStatus(DecommissionStatus.INIT, newStatus); + validateStatus(Set.of(DecommissionStatus.DRAINING, DecommissionStatus.INIT), newStatus); break; case SUCCESSFUL: - validateStatus(DecommissionStatus.IN_PROGRESS, newStatus); + validateStatus(Set.of(DecommissionStatus.IN_PROGRESS), newStatus); break; default: throw new IllegalArgumentException( @@ -101,17 +105,17 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) { } } - private void validateStatus(DecommissionStatus expected, DecommissionStatus next) { - if (status.equals(expected) == false) { + private void validateStatus(Set expectedStatuses, DecommissionStatus next) { + if (expectedStatuses.contains(status) == false) { assert false : "can't move decommission status to [" + next + "]. current status: [" + status - + "] (expected [" - + expected + + "] (allowed statuses [" + + expectedStatuses + "])"; throw new IllegalStateException( - "can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])" + "can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expectedStatuses + "])" ); } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index b58d99a9d59db..ffb20a05b3ef7 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -18,6 +18,10 @@ import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -32,6 +36,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.unit.TimeValue; +import org.opensearch.http.HttpStats; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportResponseHandler; @@ -39,7 +44,9 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; @@ -271,4 +278,61 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }); } + + private void logActiveConnections(NodesStatsResponse nodesStatsResponse) { + if (nodesStatsResponse == null || nodesStatsResponse.getNodes() == null) { + logger.info("Node stats response received is null/empty."); + return; + } + + Map nodeActiveConnectionMap = new HashMap<>(); + List responseNodes = nodesStatsResponse.getNodes(); + for (int i = 0; i < responseNodes.size(); i++) { + HttpStats httpStats = responseNodes.get(i).getHttp(); + DiscoveryNode node = responseNodes.get(i).getNode(); + nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen()); + } + logger.info("Decommissioning node with connections : [{}]", nodeActiveConnectionMap); + } + + void getActiveRequestCountOnDecommissionedNodes(Set decommissionedNodes) { + if (decommissionedNodes == null || decommissionedNodes.isEmpty()) { + return; + } + String[] nodes = decommissionedNodes.stream().map(DiscoveryNode::getId).toArray(String[]::new); + if (nodes.length == 0) { + return; + } + + final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodes); + nodesStatsRequest.clear(); + nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName()); + + transportService.sendRequest( + transportService.getLocalNode(), + NodesStatsAction.NAME, + nodesStatsRequest, + new TransportResponseHandler() { + @Override + public void handleResponse(NodesStatsResponse response) { + logActiveConnections(response); + } + + @Override + public void handleException(TransportException exp) { + logger.error("Failure occurred while dumping connection for decommission nodes - ", exp.unwrapCause()); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public NodesStatsResponse read(StreamInput in) throws IOException { + return new NodesStatsResponse(in); + } + } + ); + } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index e6639ae058066..f284eb476a755 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -15,6 +15,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateUpdateTask; @@ -115,13 +116,14 @@ private void setForcedAwarenessAttributes(Settings forceSettings) { * Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT} * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration * - * @param decommissionAttribute register decommission attribute in the metadata request + * @param decommissionRequest decommission request Object * @param listener register decommission listener */ public void startDecommissionAction( - final DecommissionAttribute decommissionAttribute, + final DecommissionRequest decommissionRequest, final ActionListener listener ) { + final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); // register the metadata with status as INIT as first step clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) { @Override @@ -161,15 +163,16 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS decommissionAttributeMetadata.decommissionAttribute(), decommissionAttributeMetadata.status() ); - decommissionClusterManagerNodes(decommissionAttributeMetadata.decommissionAttribute(), listener); + decommissionClusterManagerNodes(decommissionRequest, listener); } }); } private synchronized void decommissionClusterManagerNodes( - final DecommissionAttribute decommissionAttribute, + final DecommissionRequest decommissionRequest, ActionListener listener ) { + final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); ClusterState state = clusterService.getClusterApplierService().state(); // since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further // join the cluster @@ -212,7 +215,7 @@ public void onResponse(Void unused) { // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission // nodes can be part of Voting Config listener.onResponse(new DecommissionResponse(true)); - failDecommissionedNodes(clusterService.getClusterApplierService().state()); + drainNodesWithDecommissionedAttribute(decommissionRequest); } } else { // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager @@ -309,17 +312,74 @@ public void onFailure(Exception e) { } } - private void failDecommissionedNodes(ClusterState state) { - // this method ensures no matter what, we always exit from this function after clearing the voting config exclusion + void drainNodesWithDecommissionedAttribute(DecommissionRequest decommissionRequest) { + ClusterState state = clusterService.getClusterApplierService().state(); + Set decommissionedNodes = filterNodesWithDecommissionAttribute( + state, + decommissionRequest.getDecommissionAttribute(), + false + ); + + if (decommissionRequest.isNoDelay()) { + // Call to fail the decommission nodes + failDecommissionedNodes(decommissionedNodes, decommissionRequest.getDecommissionAttribute()); + } else { + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.DRAINING, new ActionListener<>() { + @Override + public void onResponse(DecommissionStatus status) { + logger.info("updated the decommission status to [{}]", status); + // set the weights + scheduleNodesDecommissionOnTimeout(decommissionedNodes, decommissionRequest.getDelayTimeout()); + } + + @Override + public void onFailure(Exception e) { + logger.error( + () -> new ParameterizedMessage( + "failed to update decommission status for attribute [{}] to [{}]", + decommissionRequest.getDecommissionAttribute().toString(), + DecommissionStatus.DRAINING + ), + e + ); + // since we are not able to update the status, we will clear the voting config exclusion we have set earlier + clearVotingConfigExclusionAndUpdateStatus(false, false); + } + }); + } + } + + void scheduleNodesDecommissionOnTimeout(Set decommissionedNodes, TimeValue timeoutForNodeDraining) { + ClusterState state = clusterService.getClusterApplierService().state(); DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); + if (decommissionAttributeMetadata == null) { + return; + } + assert decommissionAttributeMetadata.status().equals(DecommissionStatus.DRAINING) + : "Unexpected status encountered while decommissioning nodes."; + + // This method ensures no matter what, we always exit from this function after clearing the voting config exclusion DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + + // Wait for timeout to happen. Log the active connection before decommissioning of nodes. + transportService.getThreadPool().schedule(() -> { + // Log active connections. + decommissionController.getActiveRequestCountOnDecommissionedNodes(decommissionedNodes); + // Call to fail the decommission nodes + failDecommissionedNodes(decommissionedNodes, decommissionAttribute); + }, timeoutForNodeDraining, ThreadPool.Names.GENERIC); + } + + private void failDecommissionedNodes(Set decommissionedNodes, DecommissionAttribute decommissionAttribute) { + + // Weighing away is complete. We have allowed the nodes to be drained. Let's move decommission status to IN_PROGRESS. decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.IN_PROGRESS, new ActionListener<>() { @Override public void onResponse(DecommissionStatus status) { logger.info("updated the decommission status to [{}]", status); // execute nodes decommissioning decommissionController.removeDecommissionedNodes( - filterNodesWithDecommissionAttribute(clusterService.getClusterApplierService().state(), decommissionAttribute, false), + decommissionedNodes, "nodes-decommissioned", TimeValue.timeValueSeconds(120L), new ActionListener() { @@ -454,6 +514,7 @@ private static void ensureEligibleRequest( case INIT: case FAILED: break; + case DRAINING: case IN_PROGRESS: case SUCCESSFUL: msg = "same request is already in status [" + decommissionAttributeMetadata.status() + "]"; @@ -471,6 +532,7 @@ private static void ensureEligibleRequest( + decommissionAttributeMetadata.decommissionAttribute().toString() + "] already successfully decommissioned, recommission before triggering another decommission"; break; + case DRAINING: case IN_PROGRESS: case INIT: // it means the decommission has been initiated or is inflight. In that case, will fail new request @@ -582,7 +644,9 @@ public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata met DecommissionStatus status = decommissionAttributeMetadata.status(); if (decommissionAttribute != null && status != null) { if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute) - && (status.equals(DecommissionStatus.IN_PROGRESS) || status.equals(DecommissionStatus.SUCCESSFUL))) { + && (status.equals(DecommissionStatus.IN_PROGRESS) + || status.equals(DecommissionStatus.SUCCESSFUL) + || status.equals(DecommissionStatus.DRAINING))) { return false; } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java index af88b0d0f5902..4ca8c3cc4286e 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java @@ -16,6 +16,11 @@ public enum DecommissionStatus { * Decommission process is initiated, and to-be-decommissioned leader is excluded from voting config */ INIT("init"), + /** + * Decommission process is initiated, and the zone is being drained. + */ + DRAINING("draining"), + /** * Decommission process has started, decommissioned nodes should be removed */ @@ -56,6 +61,8 @@ public static DecommissionStatus fromString(String status) { } if (status.equals(INIT.status())) { return INIT; + } else if (status.equals(DRAINING.status())) { + return DRAINING; } else if (status.equals(IN_PROGRESS.status())) { return IN_PROGRESS; } else if (status.equals(SUCCESSFUL.status())) { diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index 979bf05f537b7..5f1d1ba48c88b 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -49,6 +49,11 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { DecommissionRequest decommissionRequest = Requests.decommissionRequest(); String attributeName = request.param("awareness_attribute_name"); String attributeValue = request.param("awareness_attribute_value"); + // Check if we have no delay set. + boolean noDelay = request.paramAsBoolean("no_delay", false); + if (noDelay) { + decommissionRequest.setNoDelay(noDelay); + } return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java index c189b5702dea0..112609b0cf8ec 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java @@ -10,6 +10,7 @@ import org.opensearch.action.ActionRequestValidationException; import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -25,6 +26,7 @@ public void testSerialization() throws IOException { final DecommissionRequest deserialized = copyWriteable(originalRequest, writableRegistry(), DecommissionRequest::new); assertEquals(deserialized.getDecommissionAttribute(), originalRequest.getDecommissionAttribute()); + assertEquals(deserialized.getDelayTimeout(), originalRequest.getDelayTimeout()); } public void testValidation() { @@ -54,8 +56,20 @@ public void testValidation() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); final DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + request.setNoDelay(true); ActionRequestValidationException e = request.validate(); assertNull(e); + assertEquals(TimeValue.ZERO, request.getDelayTimeout()); + } + { + String attributeName = "zone"; + String attributeValue = "test"; + DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); + + final DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + ActionRequestValidationException e = request.validate(); + assertNull(e); + assertEquals(DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT, request.getDelayTimeout()); } } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index 5a0da5830ebf2..9bb0084b20817 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -308,7 +308,11 @@ public void testJoinClusterWithNoDecommission() { public void testPreventJoinClusterWithDecommission() { Settings.builder().build(); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); - DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.IN_PROGRESS, DecommissionStatus.SUCCESSFUL); + DecommissionStatus decommissionStatus = randomFrom( + DecommissionStatus.IN_PROGRESS, + DecommissionStatus.SUCCESSFUL, + DecommissionStatus.DRAINING + ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, decommissionStatus diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 8b5343184dabd..ba74b29081c26 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -235,10 +235,45 @@ public void onFailure(Exception e) { } public void testSuccessfulDecommissionStatusMetadataUpdate() throws InterruptedException { + Map> decommissionStateTransitionMap = Map.of( + DecommissionStatus.INIT, + Set.of(DecommissionStatus.DRAINING, DecommissionStatus.IN_PROGRESS), + DecommissionStatus.DRAINING, + Set.of(DecommissionStatus.IN_PROGRESS), + DecommissionStatus.IN_PROGRESS, + Set.of(DecommissionStatus.SUCCESSFUL) + ); + + for (Map.Entry> entry : decommissionStateTransitionMap.entrySet()) { + for (DecommissionStatus val : entry.getValue()) { + verifyDecommissionStatusTransition(entry.getKey(), val); + } + } + } + + public void testSuccessfulDecommissionStatusMetadataUpdateForFailedState() throws InterruptedException { + Map> decommissionStateTransitionMap = Map.of( + DecommissionStatus.INIT, + Set.of(DecommissionStatus.FAILED), + DecommissionStatus.DRAINING, + Set.of(DecommissionStatus.FAILED), + DecommissionStatus.IN_PROGRESS, + Set.of(DecommissionStatus.FAILED) + ); + + for (Map.Entry> entry : decommissionStateTransitionMap.entrySet()) { + for (DecommissionStatus val : entry.getValue()) { + verifyDecommissionStatusTransition(entry.getKey(), val); + } + } + } + + private void verifyDecommissionStatusTransition(DecommissionStatus currentStatus, DecommissionStatus newStatus) + throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( new DecommissionAttribute("zone", "zone-1"), - DecommissionStatus.IN_PROGRESS + currentStatus ); ClusterState state = clusterService.state(); Metadata metadata = state.metadata(); @@ -247,25 +282,23 @@ public void testSuccessfulDecommissionStatusMetadataUpdate() throws InterruptedE state = ClusterState.builder(state).metadata(mdBuilder).build(); setState(clusterService, state); - decommissionController.updateMetadataWithDecommissionStatus( - DecommissionStatus.SUCCESSFUL, - new ActionListener() { - @Override - public void onResponse(DecommissionStatus status) { - assertEquals(DecommissionStatus.SUCCESSFUL, status); - countDownLatch.countDown(); - } + decommissionController.updateMetadataWithDecommissionStatus(newStatus, new ActionListener() { + @Override + public void onResponse(DecommissionStatus status) { + assertEquals(newStatus, status); + countDownLatch.countDown(); + } - @Override - public void onFailure(Exception e) { - fail("decommission status update failed"); - } + @Override + public void onFailure(Exception e) { + fail("decommission status update failed"); + countDownLatch.countDown(); } - ); + }); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); ClusterState newState = clusterService.getClusterApplierService().state(); DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); - assertEquals(decommissionAttributeMetadata.status(), DecommissionStatus.SUCCESSFUL); + assertEquals(decommissionAttributeMetadata.status(), newStatus); } private static class AdjustConfigurationForExclusions implements ClusterStateObserver.Listener { diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index c4cf6c7cc6641..3f39d67dee765 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -16,6 +16,7 @@ import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; import org.opensearch.cluster.ClusterName; @@ -32,6 +33,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.MockTransport; @@ -141,7 +143,7 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(decommissionAttribute, listener); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -168,7 +170,7 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(decommissionAttribute, listener); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -193,7 +195,7 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(decommissionAttribute, listener); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -218,7 +220,7 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(decommissionAttribute, listener); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -255,10 +257,62 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(new DecommissionAttribute("zone", "zone_2"), listener); + DecommissionRequest request = new DecommissionRequest(new DecommissionAttribute("zone", "zone_2")); + decommissionService.startDecommissionAction(request, listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testScheduleNodesDecommissionOnTimeout() { + TransportService mockTransportService = Mockito.mock(TransportService.class); + ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class); + Mockito.when(mockTransportService.getLocalNode()).thenReturn(Mockito.mock(DiscoveryNode.class)); + Mockito.when(mockTransportService.getThreadPool()).thenReturn(mockThreadPool); + DecommissionService decommissionService = new DecommissionService( + Settings.EMPTY, + clusterSettings, + clusterService, + mockTransportService, + threadPool, + allocationService + ); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + DecommissionStatus.DRAINING + ); + Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); + ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + DiscoveryNode decommissionedNode1 = Mockito.mock(DiscoveryNode.class); + DiscoveryNode decommissionedNode2 = Mockito.mock(DiscoveryNode.class); + + setState(clusterService, state); + decommissionService.scheduleNodesDecommissionOnTimeout( + Set.of(decommissionedNode1, decommissionedNode2), + DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT + ); + + Mockito.verify(mockThreadPool).schedule(Mockito.any(Runnable.class), Mockito.any(TimeValue.class), Mockito.anyString()); + } + + public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + DecommissionStatus.INIT + ); + + Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); + ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + request.setNoDelay(true); + + setState(clusterService, state); + decommissionService.drainNodesWithDecommissionedAttribute(request); + + } + public void testClearClusterDecommissionState() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java index b724de0bd5cc6..bbb21ff8f816c 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java @@ -32,12 +32,43 @@ public void testCreateRequest() throws IOException { Map params = new HashMap<>(); params.put("awareness_attribute_name", "zone"); params.put("awareness_attribute_value", "zone-1"); + params.put("draining_timeout", "60s"); RestRequest deprecatedRequest = buildRestRequest(params); DecommissionRequest request = action.createRequest(deprecatedRequest); assertEquals(request.getDecommissionAttribute().attributeName(), "zone"); assertEquals(request.getDecommissionAttribute().attributeValue(), "zone-1"); + assertEquals(request.getDelayTimeout().getSeconds(), 120); + assertEquals(deprecatedRequest.getHttpRequest().method(), RestRequest.Method.PUT); + } + + public void testCreateRequestWithDefaultTimeout() throws IOException { + Map params = new HashMap<>(); + params.put("awareness_attribute_name", "zone"); + params.put("awareness_attribute_value", "zone-1"); + + RestRequest deprecatedRequest = buildRestRequest(params); + + DecommissionRequest request = action.createRequest(deprecatedRequest); + assertEquals(request.getDecommissionAttribute().attributeName(), "zone"); + assertEquals(request.getDecommissionAttribute().attributeValue(), "zone-1"); + assertEquals(request.getDelayTimeout().getSeconds(), DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT.getSeconds()); + assertEquals(deprecatedRequest.getHttpRequest().method(), RestRequest.Method.PUT); + } + + public void testCreateRequestWithNoDelay() throws IOException { + Map params = new HashMap<>(); + params.put("awareness_attribute_name", "zone"); + params.put("awareness_attribute_value", "zone-1"); + params.put("no_delay", "true"); + + RestRequest deprecatedRequest = buildRestRequest(params); + + DecommissionRequest request = action.createRequest(deprecatedRequest); + assertEquals(request.getDecommissionAttribute().attributeName(), "zone"); + assertEquals(request.getDecommissionAttribute().attributeValue(), "zone-1"); + assertEquals(request.getDelayTimeout().getSeconds(), 0); assertEquals(deprecatedRequest.getHttpRequest().method(), RestRequest.Method.PUT); } From da1a1f7ae8831d272c3b334d485d10553e9a6435 Mon Sep 17 00:00:00 2001 From: pranikum <109206473+pranikum@users.noreply.github.com> Date: Wed, 26 Oct 2022 21:46:56 +0530 Subject: [PATCH 3/4] Add delay timeout for decommission request (#4931) * Add delay timeout for decommission request Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com> Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + .../awareness/put/DecommissionRequest.java | 10 +++++++++- .../awareness/put/DecommissionRequestBuilder.java | 11 +++++++++++ .../admin/cluster/RestDecommissionAction.java | 8 ++++++-- .../awareness/put/DecommissionRequestTests.java | 12 ++++++++++++ .../cluster/RestDecommissionActionTests.java | 15 +++++++++++++++ 6 files changed, 54 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a307a2acba02..2c626d706312b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -103,6 +103,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800)) - Fix bug in AwarenessAttributeDecommissionIT([4822](https://github.com/opensearch-project/OpenSearch/pull/4822)) - Fix for failing checkExtraction, checkLicense and checkNotice tasks for windows gradle check ([#4941](https://github.com/opensearch-project/OpenSearch/pull/4941)) +- [BUG]: Allow decommission to support delay timeout [#4930](https://github.com/opensearch-project/OpenSearch/pull/4930)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index e2fb353b6c749..ae96c8ddb2fde 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -48,6 +48,7 @@ public DecommissionRequest(StreamInput in) throws IOException { super(in); decommissionAttribute = new DecommissionAttribute(in); this.delayTimeout = in.readTimeValue(); + this.noDelay = in.readBoolean(); } @Override @@ -55,6 +56,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); decommissionAttribute.writeTo(out); out.writeTimeValue(delayTimeout); + out.writeBoolean(noDelay); } /** @@ -75,12 +77,18 @@ public DecommissionAttribute getDecommissionAttribute() { return this.decommissionAttribute; } + public void setDelayTimeout(TimeValue delayTimeout) { + this.delayTimeout = delayTimeout; + } + public TimeValue getDelayTimeout() { return this.delayTimeout; } public void setNoDelay(boolean noDelay) { - this.delayTimeout = TimeValue.ZERO; + if (noDelay) { + this.delayTimeout = TimeValue.ZERO; + } this.noDelay = noDelay; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestBuilder.java index 47af3b952c895..1c7a03fa10e76 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestBuilder.java @@ -12,6 +12,7 @@ import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder; import org.opensearch.client.OpenSearchClient; import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.common.unit.TimeValue; /** * Register decommission request builder @@ -35,4 +36,14 @@ public DecommissionRequestBuilder setDecommissionedAttribute(DecommissionAttribu request.setDecommissionAttribute(decommissionAttribute); return this; } + + public DecommissionRequestBuilder setDelayTimeOut(TimeValue delayTimeOut) { + request.setDelayTimeout(delayTimeOut); + return this; + } + + public DecommissionRequestBuilder setNoDelay(boolean noDelay) { + request.setNoDelay(noDelay); + return this; + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index 5f1d1ba48c88b..c041974165eb6 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -12,6 +12,7 @@ import org.opensearch.client.Requests; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.common.unit.TimeValue; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestToXContentListener; @@ -51,8 +52,11 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { String attributeValue = request.param("awareness_attribute_value"); // Check if we have no delay set. boolean noDelay = request.paramAsBoolean("no_delay", false); - if (noDelay) { - decommissionRequest.setNoDelay(noDelay); + decommissionRequest.setNoDelay(noDelay); + + if (request.hasParam("delay_timeout")) { + TimeValue delayTimeout = request.paramAsTime("delay_timeout", DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT); + decommissionRequest.setDelayTimeout(delayTimeout); } return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java index 112609b0cf8ec..8cd407b3aecf2 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java @@ -71,5 +71,17 @@ public void testValidation() { assertNull(e); assertEquals(DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT, request.getDelayTimeout()); } + { + String attributeName = "zone"; + String attributeValue = "test"; + DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); + + final DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + request.setNoDelay(true); + request.setDelayTimeout(TimeValue.timeValueSeconds(30)); + ActionRequestValidationException e = request.validate(); + assertNotNull(e); + assertTrue(e.getMessage().contains("Invalid decommission request")); + } } } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java index bbb21ff8f816c..b5f61f751b19f 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java @@ -72,6 +72,21 @@ public void testCreateRequestWithNoDelay() throws IOException { assertEquals(deprecatedRequest.getHttpRequest().method(), RestRequest.Method.PUT); } + public void testCreateRequestWithDelayTimeout() throws IOException { + Map params = new HashMap<>(); + params.put("awareness_attribute_name", "zone"); + params.put("awareness_attribute_value", "zone-1"); + params.put("delay_timeout", "300s"); + + RestRequest deprecatedRequest = buildRestRequest(params); + + DecommissionRequest request = action.createRequest(deprecatedRequest); + assertEquals(request.getDecommissionAttribute().attributeName(), "zone"); + assertEquals(request.getDecommissionAttribute().attributeValue(), "zone-1"); + assertEquals(request.getDelayTimeout().getSeconds(), 300); + assertEquals(deprecatedRequest.getHttpRequest().method(), RestRequest.Method.PUT); + } + private FakeRestRequest buildRestRequest(Map params) { return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) .withPath("/_cluster/decommission/awareness/{awareness_attribute_name}/{awareness_attribute_value}") From 02edc4f160e42726675b340f8635dd64e07c25b9 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 3 Nov 2022 22:44:39 +0530 Subject: [PATCH 4/4] Integ Tests for Awareness Attribute Decommissioning (#4715) * Add integ test for awareness attribute decommissioning Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + .../AwarenessAttributeDecommissionIT.java | 634 +++++++++++++++++- .../awareness/put/DecommissionRequest.java | 4 + .../decommission/DecommissionService.java | 2 +- 4 files changed, 634 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c626d706312b..7645adfe23733 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084)) - Add APIs (GET/PUT) to decommission awareness attribute ([#4261](https://github.com/opensearch-project/OpenSearch/pull/4261)) - Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590)) +- Integ Tests for Awareness Attribute Decommissioning ([#4715](https://github.com/opensearch-project/OpenSearch/pull/4715)) - Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 14ec041b7464b..067b127a667b4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -8,9 +8,12 @@ package org.opensearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; import org.junit.After; +import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateAction; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateRequest; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; @@ -23,9 +26,13 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionService; import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.cluster.decommission.DecommissioningFailedException; +import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.WeightedRouting; @@ -33,16 +40,26 @@ import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.discovery.Discovery; import org.opensearch.plugins.Plugin; +import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportService; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import static org.opensearch.test.NodeRoles.onlyRole; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout; @@ -61,6 +78,461 @@ public void cleanup() throws Exception { assertNoTimeout(client().admin().cluster().prepareHealth().get()); } + public void testDecommissionFailedWhenNotZoneAware() throws Exception { + Settings commonSettings = Settings.builder().build(); + // Start 3 cluster manager eligible nodes + internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).build()); + // start 3 data nodes + internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).build()); + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + assertBusy(() -> { + DecommissioningFailedException ex = expectThrows( + DecommissioningFailedException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() + ); + assertTrue(ex.getMessage().contains("invalid awareness attribute requested for decommissioning")); + }); + } + + public void testDecommissionFailedWhenNotForceZoneAware() throws Exception { + Settings commonSettings = Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone").build(); + // Start 3 cluster manager eligible nodes + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + logger.info("--> starting data node each on zones 'a' & 'b' & 'c'"); + internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()); + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + assertBusy(() -> { + DecommissioningFailedException ex = expectThrows( + DecommissioningFailedException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() + ); + assertTrue(ex.getMessage().contains("doesn't have the decommissioning attribute")); + }); + } + + public void testNodesRemovedAfterZoneDecommission_ClusterManagerNotInToBeDecommissionedZone() throws Exception { + assertNodesRemovedAfterZoneDecommission(false); + } + + public void testNodesRemovedAfterZoneDecommission_ClusterManagerInToBeDecommissionedZone() throws Exception { + assertNodesRemovedAfterZoneDecommission(true); + } + + public void testInvariantsAndLogsOnDecommissionedNodes() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + List clusterManagerNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + List dataNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + logger.info("--> starting decommissioning nodes in zone {}", 'a'); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); + assertTrue(decommissionResponse.isAcknowledged()); + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + String currentClusterManager = internalCluster().getClusterManagerName(); + String decommissionedNode = randomFrom(clusterManagerNodes.get(0), dataNodes.get(0)); + + ClusterService decommissionedNodeClusterService = internalCluster().getInstance(ClusterService.class, decommissionedNode); + DecommissionAttributeMetadata metadata = decommissionedNodeClusterService.state() + .metadata() + .custom(DecommissionAttributeMetadata.TYPE); + // The decommissioned node would not be having status as SUCCESS as it was kicked out later + // and not receiving any further state updates + // This also helps to test metadata status updates was received by this node until it got kicked by the leader + assertEquals(metadata.decommissionAttribute(), decommissionAttribute); + assertNotNull(metadata.status()); + assertEquals(metadata.status(), DecommissionStatus.IN_PROGRESS); + + // assert the node has decommissioned attribute + assertEquals(decommissionedNodeClusterService.localNode().getAttributes().get("zone"), "a"); + + // assert exception on decommissioned node + Logger clusterLogger = LogManager.getLogger(JoinHelper.class); + MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(clusterLogger); + mockLogAppender.addExpectation( + new MockLogAppender.PatternSeenEventExpectation( + "test", + JoinHelper.class.getCanonicalName(), + Level.INFO, + "local node is decommissioned \\[.*]\\. Will not be able to join the cluster" + ) + ); + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation("test", JoinHelper.class.getCanonicalName(), Level.INFO, "failed to join") { + @Override + public boolean innerMatch(LogEvent event) { + return event.getThrown() != null + && event.getThrown().getClass() == RemoteTransportException.class + && event.getThrown().getCause() != null + && event.getThrown().getCause().getClass() == NodeDecommissionedException.class; + } + } + ); + TransportService clusterManagerTransportService = internalCluster().getInstance( + TransportService.class, + internalCluster().getClusterManagerName() + ); + MockTransportService decommissionedNodeTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + decommissionedNode + ); + final CountDownLatch countDownLatch = new CountDownLatch(2); + decommissionedNodeTransportService.addSendBehavior( + clusterManagerTransportService, + (connection, requestId, action, request, options) -> { + if (action.equals(JoinHelper.JOIN_ACTION_NAME)) { + countDownLatch.countDown(); + } + connection.sendRequest(requestId, action, request, options); + } + ); + decommissionedNodeTransportService.addConnectBehavior(clusterManagerTransportService, Transport::openConnection); + countDownLatch.await(); + mockLogAppender.assertAllExpectationsMatched(); + + // decommissioned node should have Coordinator#localNodeCommissioned = false + Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, decommissionedNode); + assertFalse(coordinator.localNodeCommissioned()); + + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(currentClusterManager).execute( + DeleteDecommissionStateAction.INSTANCE, + new DeleteDecommissionStateRequest() + ).get(); + assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + + // Will wait for all events to complete + client(currentClusterManager).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + // will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster + ensureStableCluster(6, TimeValue.timeValueSeconds(121)); + } + + private void assertNodesRemovedAfterZoneDecommission(boolean originalClusterManagerDecommission) throws Exception { + int dataNodeCountPerAZ = 4; + List zones = new ArrayList<>(Arrays.asList("a", "b", "c")); + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + List clusterManagerNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + Map clusterManagerNameToZone = new HashMap<>(); + clusterManagerNameToZone.put(clusterManagerNodes.get(0), "a"); + clusterManagerNameToZone.put(clusterManagerNodes.get(1), "b"); + clusterManagerNameToZone.put(clusterManagerNodes.get(2), "c"); + + logger.info("--> starting 4 data nodes each on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + dataNodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + dataNodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + dataNodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + ensureStableCluster(15); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(15)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + String originalClusterManager = internalCluster().getClusterManagerName(); + String originalClusterManagerZone = clusterManagerNameToZone.get(originalClusterManager); + logger.info("--> original cluster manager - name {}, zone {}", originalClusterManager, originalClusterManagerZone); + + String zoneToDecommission = originalClusterManagerZone; + + if (originalClusterManagerDecommission == false) { + // decommission one zone where active cluster manager is not present + List tempZones = new ArrayList<>(zones); + tempZones.remove(originalClusterManagerZone); + zoneToDecommission = randomFrom(tempZones); + } + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = new HashMap<>(Map.of("a", 1.0, "b", 1.0, "c", 1.0)); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + weights.put(zoneToDecommission, 0.0); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + logger.info("--> starting decommissioning nodes in zone {}", zoneToDecommission); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", zoneToDecommission); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); + assertTrue(decommissionResponse.isAcknowledged()); + + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + // assert that number of nodes should be 10 ( 2 cluster manager nodes + 8 data nodes ) + assertEquals(clusterState.nodes().getNodes().size(), 10); + assertEquals(clusterState.nodes().getDataNodes().size(), 8); + assertEquals(clusterState.nodes().getClusterManagerNodes().size(), 2); + + Iterator discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt(); + while (discoveryNodeIterator.hasNext()) { + // assert no node has decommissioned attribute + DiscoveryNode node = discoveryNodeIterator.next(); + assertNotEquals(node.getAttributes().get("zone"), zoneToDecommission); + + // assert no node is decommissioned from Coordinator#localNodeCommissioned + Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, node.getName()); + assertTrue(coordinator.localNodeCommissioned()); + } + + // assert that decommission status is successful + GetDecommissionStateResponse response = client().execute( + GetDecommissionStateAction.INSTANCE, + new GetDecommissionStateRequest(decommissionAttribute.attributeName()) + ).get(); + assertEquals(response.getAttributeValue(), decommissionAttribute.attributeValue()); + assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL); + + // assert that no node present in Voting Config Exclusion + assertEquals(clusterState.metadata().coordinationMetadata().getVotingConfigExclusions().size(), 0); + + String currentClusterManager = internalCluster().getClusterManagerName(); + assertNotNull(currentClusterManager); + if (originalClusterManagerDecommission) { + // assert that cluster manager switched during the test + assertNotEquals(originalClusterManager, currentClusterManager); + } else { + // assert that cluster manager didn't switch during test + assertEquals(originalClusterManager, currentClusterManager); + } + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(currentClusterManager).execute( + DeleteDecommissionStateAction.INSTANCE, + new DeleteDecommissionStateRequest() + ).get(); + assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + + // will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster + ensureStableCluster(15, TimeValue.timeValueMinutes(2)); + } + + public void testDecommissionFailedWhenDifferentAttributeAlreadyDecommissioned() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + logger.info("--> starting 1 nodes each on zones 'a' & 'b' & 'c'"); + internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + String node_in_c = internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()); + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + logger.info("--> starting decommissioning nodes in zone {}", 'a'); + DecommissionRequest decommissionRequest = new DecommissionRequest(new DecommissionAttribute("zone", "a")); + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); + assertTrue(decommissionResponse.isAcknowledged()); + + DecommissionRequest newDecommissionRequest = new DecommissionRequest(new DecommissionAttribute("zone", "b")); + assertBusy( + () -> expectThrows( + DecommissioningFailedException.class, + () -> client(node_in_c).execute(DecommissionAction.INSTANCE, newDecommissionRequest).actionGet() + ) + ); + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(node_in_c).execute( + DeleteDecommissionStateAction.INSTANCE, + new DeleteDecommissionStateRequest() + ).get(); + assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + + // will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster + ensureStableCluster(6, TimeValue.timeValueMinutes(2)); + } + public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionException, InterruptedException { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") @@ -126,10 +598,6 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); assertTrue(decommissionResponse.isAcknowledged()); - logger.info("--> Received decommissioning nodes in zone {}", 'c'); - // Keep some delay for scheduler to invoke decommission flow - Thread.sleep(500); - // Will wait for all events to complete client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); @@ -178,7 +646,7 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), DecommissionStatus.IN_PROGRESS ); - logger.info("--> Verified the decommissioned node Has in progress state."); + logger.info("--> Verified the decommissioned node has in_progress state."); // Will wait for all events to complete client(clusterManagerNodeAfterDecommission.getName()).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); @@ -193,7 +661,7 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx // will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes) // as by then all nodes should have joined the cluster - ensureStableCluster(6, TimeValue.timeValueMinutes(2)); + ensureStableCluster(6, TimeValue.timeValueSeconds(121)); } public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception { @@ -249,4 +717,158 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception assertTrue(ex.getMessage().contains("weight for decommissioned attribute is expected to be [0.0] but found [1.0]")); }); } + + public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a") + .build(); + // Start 3 cluster manager eligible nodes + internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + // start 3 data nodes + internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + logger.info("--> setting shard routing weights"); + Map weights = Map.of("a", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + // prepare request to attempt to decommission zone 'a' + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); + + // since there is just one zone present in the cluster, and on initiating decommission for that zone, + // although all the nodes would be added to voting config exclusion list, but those nodes won't be able to + // abdicate themselves as we wouldn't have any other leader eligible node which would be declare itself cluster manager + // and hence due to which the leader won't get abdicated and decommission request should eventually fail. + // And in this case, to ensure decommission request doesn't leave mutating change in the cluster, we ensure + // that no exclusion is set to the cluster and state for decommission is marked as FAILED + Logger clusterLogger = LogManager.getLogger(DecommissionService.class); + MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(clusterLogger); + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test", + DecommissionService.class.getCanonicalName(), + Level.ERROR, + "failure in removing to-be-decommissioned cluster manager eligible nodes" + ) + ); + + assertBusy(() -> { + OpenSearchTimeoutException ex = expectThrows( + OpenSearchTimeoutException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() + ); + assertTrue(ex.getMessage().contains("timed out waiting for voting config exclusions")); + }); + + ClusterService leaderClusterService = internalCluster().getInstance( + ClusterService.class, + internalCluster().getClusterManagerName() + ); + ClusterStateObserver clusterStateObserver = new ClusterStateObserver( + leaderClusterService, + null, + logger, + client(internalCluster().getClusterManagerName()).threadPool().getThreadContext() + ); + CountDownLatch expectedStateLatch = new CountDownLatch(1); + + ClusterState currentState = internalCluster().clusterService().state(); + if (currentState.getVotingConfigExclusions().isEmpty()) { + logger.info("exclusion already cleared"); + expectedStateLatch.countDown(); + } else { + clusterStateObserver.waitForNextChange(new WaitForClearVotingConfigExclusion(expectedStateLatch)); + } + // if the below condition is passed, then we are sure exclusion is cleared + assertTrue(expectedStateLatch.await(30, TimeUnit.SECONDS)); + + expectedStateLatch = new CountDownLatch(1); + currentState = internalCluster().clusterService().state(); + DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED)) { + logger.info("decommission status has already turned false"); + expectedStateLatch.countDown(); + } else { + clusterStateObserver.waitForNextChange(new WaitForFailedDecommissionState(expectedStateLatch)); + } + + // if the below condition is passed, then we are sure current decommission status is marked FAILED + assertTrue(expectedStateLatch.await(30, TimeUnit.SECONDS)); + mockLogAppender.assertAllExpectationsMatched(); + + // ensure all nodes are part of cluster + ensureStableCluster(6, TimeValue.timeValueMinutes(2)); + } + + private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener { + + final CountDownLatch doneLatch; + + WaitForFailedDecommissionState(CountDownLatch latch) { + this.doneLatch = latch; + } + + @Override + public void onNewClusterState(ClusterState state) { + DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED)) { + doneLatch.countDown(); + } + } + + @Override + public void onClusterServiceClose() { + throw new AssertionError("unexpected close"); + } + + @Override + public void onTimeout(TimeValue timeout) { + throw new AssertionError("unexpected timeout"); + } + } + + private static class WaitForClearVotingConfigExclusion implements ClusterStateObserver.Listener { + + final CountDownLatch doneLatch; + + WaitForClearVotingConfigExclusion(CountDownLatch latch) { + this.doneLatch = latch; + } + + @Override + public void onNewClusterState(ClusterState state) { + if (state.getVotingConfigExclusions().isEmpty()) { + doneLatch.countDown(); + } + } + + @Override + public void onClusterServiceClose() { + throw new AssertionError("unexpected close"); + } + + @Override + public void onTimeout(TimeValue timeout) { + throw new AssertionError("unexpected timeout"); + } + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index ae96c8ddb2fde..7ec2cea769069 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -99,6 +99,10 @@ public boolean isNoDelay() { @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; + if (decommissionAttribute == null) { + validationException = addValidationError("decommission attribute is missing", validationException); + return validationException; + } if (decommissionAttribute.attributeName() == null || Strings.isEmpty(decommissionAttribute.attributeName())) { validationException = addValidationError("attribute name is missing", validationException); } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index f284eb476a755..85030a1e902db 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -238,7 +238,7 @@ public void onResponse(Void unused) { public void onFailure(Exception e) { listener.onFailure(e); // attempting to mark the status as FAILED - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); + clearVotingConfigExclusionAndUpdateStatus(false, false); } };