From 850bb2e9759e85b9628bdc377a49ef2e174ec06f Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 3 Nov 2022 22:44:39 +0530 Subject: [PATCH] Integ Tests for Awareness Attribute Decommissioning (#4715) * Add integ test for awareness attribute decommissioning Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + .../AwarenessAttributeDecommissionIT.java | 634 +++++++++++++++++- .../awareness/put/DecommissionRequest.java | 4 + .../decommission/DecommissionService.java | 2 +- 4 files changed, 634 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a05e2cb692004..3a9a4f9fc218c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -113,6 +113,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Update to Apache Lucene 9.4.0 ([#4661](https://github.com/opensearch-project/OpenSearch/pull/4661)) - Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590)) - Backport Apache Lucene version change for 2.4.0 ([#4677](https://github.com/opensearch-project/OpenSearch/pull/4677)) +- Integ Tests for Awareness Attribute Decommissioning ([#4715](https://github.com/opensearch-project/OpenSearch/pull/4715)) - Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725)) - Fix weighted routing metadata deserialization error on process restart ([#4691](https://github.com/opensearch-project/OpenSearch/pull/4691)) - Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 14ec041b7464b..067b127a667b4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -8,9 +8,12 @@ package org.opensearch.cluster.coordination; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; import org.junit.After; +import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateAction; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateRequest; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; @@ -23,9 +26,13 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionService; import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.cluster.decommission.DecommissioningFailedException; +import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.WeightedRouting; @@ -33,16 +40,26 @@ import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.discovery.Discovery; import org.opensearch.plugins.Plugin; +import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportService; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import static org.opensearch.test.NodeRoles.onlyRole; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout; @@ -61,6 +78,461 @@ public void cleanup() throws Exception { assertNoTimeout(client().admin().cluster().prepareHealth().get()); } + public void testDecommissionFailedWhenNotZoneAware() throws Exception { + Settings commonSettings = Settings.builder().build(); + // Start 3 cluster manager eligible nodes + internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).build()); + // start 3 data nodes + internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).build()); + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + assertBusy(() -> { + DecommissioningFailedException ex = expectThrows( + DecommissioningFailedException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() + ); + assertTrue(ex.getMessage().contains("invalid awareness attribute requested for decommissioning")); + }); + } + + public void testDecommissionFailedWhenNotForceZoneAware() throws Exception { + Settings commonSettings = Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone").build(); + // Start 3 cluster manager eligible nodes + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + logger.info("--> starting data node each on zones 'a' & 'b' & 'c'"); + internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()); + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + assertBusy(() -> { + DecommissioningFailedException ex = expectThrows( + DecommissioningFailedException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() + ); + assertTrue(ex.getMessage().contains("doesn't have the decommissioning attribute")); + }); + } + + public void testNodesRemovedAfterZoneDecommission_ClusterManagerNotInToBeDecommissionedZone() throws Exception { + assertNodesRemovedAfterZoneDecommission(false); + } + + public void testNodesRemovedAfterZoneDecommission_ClusterManagerInToBeDecommissionedZone() throws Exception { + assertNodesRemovedAfterZoneDecommission(true); + } + + public void testInvariantsAndLogsOnDecommissionedNodes() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + List clusterManagerNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + List dataNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + logger.info("--> starting decommissioning nodes in zone {}", 'a'); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); + assertTrue(decommissionResponse.isAcknowledged()); + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + String currentClusterManager = internalCluster().getClusterManagerName(); + String decommissionedNode = randomFrom(clusterManagerNodes.get(0), dataNodes.get(0)); + + ClusterService decommissionedNodeClusterService = internalCluster().getInstance(ClusterService.class, decommissionedNode); + DecommissionAttributeMetadata metadata = decommissionedNodeClusterService.state() + .metadata() + .custom(DecommissionAttributeMetadata.TYPE); + // The decommissioned node would not be having status as SUCCESS as it was kicked out later + // and not receiving any further state updates + // This also helps to test metadata status updates was received by this node until it got kicked by the leader + assertEquals(metadata.decommissionAttribute(), decommissionAttribute); + assertNotNull(metadata.status()); + assertEquals(metadata.status(), DecommissionStatus.IN_PROGRESS); + + // assert the node has decommissioned attribute + assertEquals(decommissionedNodeClusterService.localNode().getAttributes().get("zone"), "a"); + + // assert exception on decommissioned node + Logger clusterLogger = LogManager.getLogger(JoinHelper.class); + MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(clusterLogger); + mockLogAppender.addExpectation( + new MockLogAppender.PatternSeenEventExpectation( + "test", + JoinHelper.class.getCanonicalName(), + Level.INFO, + "local node is decommissioned \\[.*]\\. Will not be able to join the cluster" + ) + ); + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation("test", JoinHelper.class.getCanonicalName(), Level.INFO, "failed to join") { + @Override + public boolean innerMatch(LogEvent event) { + return event.getThrown() != null + && event.getThrown().getClass() == RemoteTransportException.class + && event.getThrown().getCause() != null + && event.getThrown().getCause().getClass() == NodeDecommissionedException.class; + } + } + ); + TransportService clusterManagerTransportService = internalCluster().getInstance( + TransportService.class, + internalCluster().getClusterManagerName() + ); + MockTransportService decommissionedNodeTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + decommissionedNode + ); + final CountDownLatch countDownLatch = new CountDownLatch(2); + decommissionedNodeTransportService.addSendBehavior( + clusterManagerTransportService, + (connection, requestId, action, request, options) -> { + if (action.equals(JoinHelper.JOIN_ACTION_NAME)) { + countDownLatch.countDown(); + } + connection.sendRequest(requestId, action, request, options); + } + ); + decommissionedNodeTransportService.addConnectBehavior(clusterManagerTransportService, Transport::openConnection); + countDownLatch.await(); + mockLogAppender.assertAllExpectationsMatched(); + + // decommissioned node should have Coordinator#localNodeCommissioned = false + Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, decommissionedNode); + assertFalse(coordinator.localNodeCommissioned()); + + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(currentClusterManager).execute( + DeleteDecommissionStateAction.INSTANCE, + new DeleteDecommissionStateRequest() + ).get(); + assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + + // Will wait for all events to complete + client(currentClusterManager).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + // will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster + ensureStableCluster(6, TimeValue.timeValueSeconds(121)); + } + + private void assertNodesRemovedAfterZoneDecommission(boolean originalClusterManagerDecommission) throws Exception { + int dataNodeCountPerAZ = 4; + List zones = new ArrayList<>(Arrays.asList("a", "b", "c")); + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + List clusterManagerNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + Map clusterManagerNameToZone = new HashMap<>(); + clusterManagerNameToZone.put(clusterManagerNodes.get(0), "a"); + clusterManagerNameToZone.put(clusterManagerNodes.get(1), "b"); + clusterManagerNameToZone.put(clusterManagerNodes.get(2), "c"); + + logger.info("--> starting 4 data nodes each on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + dataNodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + dataNodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + dataNodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + ensureStableCluster(15); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(15)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + String originalClusterManager = internalCluster().getClusterManagerName(); + String originalClusterManagerZone = clusterManagerNameToZone.get(originalClusterManager); + logger.info("--> original cluster manager - name {}, zone {}", originalClusterManager, originalClusterManagerZone); + + String zoneToDecommission = originalClusterManagerZone; + + if (originalClusterManagerDecommission == false) { + // decommission one zone where active cluster manager is not present + List tempZones = new ArrayList<>(zones); + tempZones.remove(originalClusterManagerZone); + zoneToDecommission = randomFrom(tempZones); + } + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = new HashMap<>(Map.of("a", 1.0, "b", 1.0, "c", 1.0)); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + weights.put(zoneToDecommission, 0.0); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + logger.info("--> starting decommissioning nodes in zone {}", zoneToDecommission); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", zoneToDecommission); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); + assertTrue(decommissionResponse.isAcknowledged()); + + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + // assert that number of nodes should be 10 ( 2 cluster manager nodes + 8 data nodes ) + assertEquals(clusterState.nodes().getNodes().size(), 10); + assertEquals(clusterState.nodes().getDataNodes().size(), 8); + assertEquals(clusterState.nodes().getClusterManagerNodes().size(), 2); + + Iterator discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt(); + while (discoveryNodeIterator.hasNext()) { + // assert no node has decommissioned attribute + DiscoveryNode node = discoveryNodeIterator.next(); + assertNotEquals(node.getAttributes().get("zone"), zoneToDecommission); + + // assert no node is decommissioned from Coordinator#localNodeCommissioned + Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, node.getName()); + assertTrue(coordinator.localNodeCommissioned()); + } + + // assert that decommission status is successful + GetDecommissionStateResponse response = client().execute( + GetDecommissionStateAction.INSTANCE, + new GetDecommissionStateRequest(decommissionAttribute.attributeName()) + ).get(); + assertEquals(response.getAttributeValue(), decommissionAttribute.attributeValue()); + assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL); + + // assert that no node present in Voting Config Exclusion + assertEquals(clusterState.metadata().coordinationMetadata().getVotingConfigExclusions().size(), 0); + + String currentClusterManager = internalCluster().getClusterManagerName(); + assertNotNull(currentClusterManager); + if (originalClusterManagerDecommission) { + // assert that cluster manager switched during the test + assertNotEquals(originalClusterManager, currentClusterManager); + } else { + // assert that cluster manager didn't switch during test + assertEquals(originalClusterManager, currentClusterManager); + } + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(currentClusterManager).execute( + DeleteDecommissionStateAction.INSTANCE, + new DeleteDecommissionStateRequest() + ).get(); + assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + + // will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster + ensureStableCluster(15, TimeValue.timeValueMinutes(2)); + } + + public void testDecommissionFailedWhenDifferentAttributeAlreadyDecommissioned() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + logger.info("--> starting 1 nodes each on zones 'a' & 'b' & 'c'"); + internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + String node_in_c = internalCluster().startDataOnlyNode(Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()); + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + logger.info("--> starting decommissioning nodes in zone {}", 'a'); + DecommissionRequest decommissionRequest = new DecommissionRequest(new DecommissionAttribute("zone", "a")); + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); + assertTrue(decommissionResponse.isAcknowledged()); + + DecommissionRequest newDecommissionRequest = new DecommissionRequest(new DecommissionAttribute("zone", "b")); + assertBusy( + () -> expectThrows( + DecommissioningFailedException.class, + () -> client(node_in_c).execute(DecommissionAction.INSTANCE, newDecommissionRequest).actionGet() + ) + ); + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(node_in_c).execute( + DeleteDecommissionStateAction.INSTANCE, + new DeleteDecommissionStateRequest() + ).get(); + assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + + // will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster + ensureStableCluster(6, TimeValue.timeValueMinutes(2)); + } + public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionException, InterruptedException { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") @@ -126,10 +598,6 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); assertTrue(decommissionResponse.isAcknowledged()); - logger.info("--> Received decommissioning nodes in zone {}", 'c'); - // Keep some delay for scheduler to invoke decommission flow - Thread.sleep(500); - // Will wait for all events to complete client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); @@ -178,7 +646,7 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), DecommissionStatus.IN_PROGRESS ); - logger.info("--> Verified the decommissioned node Has in progress state."); + logger.info("--> Verified the decommissioned node has in_progress state."); // Will wait for all events to complete client(clusterManagerNodeAfterDecommission.getName()).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); @@ -193,7 +661,7 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx // will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes) // as by then all nodes should have joined the cluster - ensureStableCluster(6, TimeValue.timeValueMinutes(2)); + ensureStableCluster(6, TimeValue.timeValueSeconds(121)); } public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception { @@ -249,4 +717,158 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception assertTrue(ex.getMessage().contains("weight for decommissioned attribute is expected to be [0.0] but found [1.0]")); }); } + + public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a") + .build(); + // Start 3 cluster manager eligible nodes + internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + // start 3 data nodes + internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + ensureStableCluster(6); + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(6)) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + logger.info("--> setting shard routing weights"); + Map weights = Map.of("a", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + // prepare request to attempt to decommission zone 'a' + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); + + // since there is just one zone present in the cluster, and on initiating decommission for that zone, + // although all the nodes would be added to voting config exclusion list, but those nodes won't be able to + // abdicate themselves as we wouldn't have any other leader eligible node which would be declare itself cluster manager + // and hence due to which the leader won't get abdicated and decommission request should eventually fail. + // And in this case, to ensure decommission request doesn't leave mutating change in the cluster, we ensure + // that no exclusion is set to the cluster and state for decommission is marked as FAILED + Logger clusterLogger = LogManager.getLogger(DecommissionService.class); + MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(clusterLogger); + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test", + DecommissionService.class.getCanonicalName(), + Level.ERROR, + "failure in removing to-be-decommissioned cluster manager eligible nodes" + ) + ); + + assertBusy(() -> { + OpenSearchTimeoutException ex = expectThrows( + OpenSearchTimeoutException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() + ); + assertTrue(ex.getMessage().contains("timed out waiting for voting config exclusions")); + }); + + ClusterService leaderClusterService = internalCluster().getInstance( + ClusterService.class, + internalCluster().getClusterManagerName() + ); + ClusterStateObserver clusterStateObserver = new ClusterStateObserver( + leaderClusterService, + null, + logger, + client(internalCluster().getClusterManagerName()).threadPool().getThreadContext() + ); + CountDownLatch expectedStateLatch = new CountDownLatch(1); + + ClusterState currentState = internalCluster().clusterService().state(); + if (currentState.getVotingConfigExclusions().isEmpty()) { + logger.info("exclusion already cleared"); + expectedStateLatch.countDown(); + } else { + clusterStateObserver.waitForNextChange(new WaitForClearVotingConfigExclusion(expectedStateLatch)); + } + // if the below condition is passed, then we are sure exclusion is cleared + assertTrue(expectedStateLatch.await(30, TimeUnit.SECONDS)); + + expectedStateLatch = new CountDownLatch(1); + currentState = internalCluster().clusterService().state(); + DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED)) { + logger.info("decommission status has already turned false"); + expectedStateLatch.countDown(); + } else { + clusterStateObserver.waitForNextChange(new WaitForFailedDecommissionState(expectedStateLatch)); + } + + // if the below condition is passed, then we are sure current decommission status is marked FAILED + assertTrue(expectedStateLatch.await(30, TimeUnit.SECONDS)); + mockLogAppender.assertAllExpectationsMatched(); + + // ensure all nodes are part of cluster + ensureStableCluster(6, TimeValue.timeValueMinutes(2)); + } + + private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener { + + final CountDownLatch doneLatch; + + WaitForFailedDecommissionState(CountDownLatch latch) { + this.doneLatch = latch; + } + + @Override + public void onNewClusterState(ClusterState state) { + DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED)) { + doneLatch.countDown(); + } + } + + @Override + public void onClusterServiceClose() { + throw new AssertionError("unexpected close"); + } + + @Override + public void onTimeout(TimeValue timeout) { + throw new AssertionError("unexpected timeout"); + } + } + + private static class WaitForClearVotingConfigExclusion implements ClusterStateObserver.Listener { + + final CountDownLatch doneLatch; + + WaitForClearVotingConfigExclusion(CountDownLatch latch) { + this.doneLatch = latch; + } + + @Override + public void onNewClusterState(ClusterState state) { + if (state.getVotingConfigExclusions().isEmpty()) { + doneLatch.countDown(); + } + } + + @Override + public void onClusterServiceClose() { + throw new AssertionError("unexpected close"); + } + + @Override + public void onTimeout(TimeValue timeout) { + throw new AssertionError("unexpected timeout"); + } + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index ae96c8ddb2fde..7ec2cea769069 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -99,6 +99,10 @@ public boolean isNoDelay() { @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; + if (decommissionAttribute == null) { + validationException = addValidationError("decommission attribute is missing", validationException); + return validationException; + } if (decommissionAttribute.attributeName() == null || Strings.isEmpty(decommissionAttribute.attributeName())) { validationException = addValidationError("attribute name is missing", validationException); } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index f284eb476a755..85030a1e902db 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -238,7 +238,7 @@ public void onResponse(Void unused) { public void onFailure(Exception e) { listener.onFailure(e); // attempting to mark the status as FAILED - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); + clearVotingConfigExclusionAndUpdateStatus(false, false); } };