Skip to content

Commit

Permalink
Add DecommissionService and helper to execute awareness attribute dec…
Browse files Browse the repository at this point in the history
…ommissioning (opensearch-project#4084)

* Add Executor to decommission node attribute
* Decommission service implementation with cluster metadata
* Master abdication changes to decommission local awareness leader
* Update join validator changes to validate decommissioned node join request

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com>
  • Loading branch information
imRishN authored and pranikum committed Sep 25, 2022
1 parent e77e260 commit 5274f07
Show file tree
Hide file tree
Showing 12 changed files with 614 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))
- Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580))
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))


### Deprecated

Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,18 @@ private enum OpenSearchExceptionHandle {
org.opensearch.index.shard.PrimaryShardClosedException::new,
162,
V_3_0_0
),
DECOMMISSIONING_FAILED_EXCEPTION(
org.opensearch.cluster.decommission.DecommissioningFailedException.class,
org.opensearch.cluster.decommission.DecommissioningFailedException::new,
163,
V_3_0_0
),
NODE_DECOMMISSIONED_EXCEPTION(
org.opensearch.cluster.decommission.NodeDecommissionedException.class,
org.opensearch.cluster.decommission.NodeDecommissionedException::new,
164,
V_3_0_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.cluster.action.index.MappingUpdatedAction;
import org.opensearch.cluster.action.index.NodeMappingRefreshAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.metadata.ComponentTemplateMetadata;
import org.opensearch.cluster.metadata.ComposableIndexTemplateMetadata;
import org.opensearch.cluster.metadata.DataStreamMetadata;
Expand Down Expand Up @@ -193,6 +194,12 @@ public static List<Entry> getNamedWriteables() {
);
registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
registerMetadataCustom(entries, WeightedRoutingMetadata.TYPE, WeightedRoutingMetadata::new, WeightedRoutingMetadata::readDiffFrom);
registerMetadataCustom(
entries,
DecommissionAttributeMetadata.TYPE,
DecommissionAttributeMetadata::new,
DecommissionAttributeMetadata::readDiffFrom
);
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
Expand Down Expand Up @@ -283,6 +290,13 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
WeightedRoutingMetadata::fromXContent
)
);
entries.add(
new NamedXContentRegistry.Entry(
Metadata.Custom.class,
new ParseField(DecommissionAttributeMetadata.TYPE),
DecommissionAttributeMetadata::fromXContent
)
);
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
import org.opensearch.cluster.ClusterStateTaskExecutor;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -358,6 +362,7 @@ public boolean runOnlyOnClusterManager() {

/**
* a task indicates that the current node should become master
*
* @deprecated As of 2.0, because supporting inclusive language, replaced by {@link #newBecomeClusterManagerTask()}
*/
@Deprecated
Expand All @@ -384,8 +389,9 @@ public static Task newFinishElectionTask() {
* Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata
* will not be created with a newer version of opensearch as well as that all indices are newer or equal to the minimum index
* compatibility version.
* @see Version#minimumIndexCompatibilityVersion()
*
* @throws IllegalStateException if any index is incompatible with the given version
* @see Version#minimumIndexCompatibilityVersion()
*/
public static void ensureIndexCompatibility(final Version nodeVersion, Metadata metadata) {
Version supportedIndexVersion = nodeVersion.minimumIndexCompatibilityVersion();
Expand Down Expand Up @@ -415,14 +421,18 @@ public static void ensureIndexCompatibility(final Version nodeVersion, Metadata
}
}

/** ensures that the joining node has a version that's compatible with all current nodes*/
/**
* ensures that the joining node has a version that's compatible with all current nodes
*/
public static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) {
final Version minNodeVersion = currentNodes.getMinNodeVersion();
final Version maxNodeVersion = currentNodes.getMaxNodeVersion();
ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion);
}

/** ensures that the joining node has a version that's compatible with a given version range */
/**
* ensures that the joining node has a version that's compatible with a given version range
*/
public static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) {
assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion;
if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) {
Expand Down Expand Up @@ -466,13 +476,34 @@ public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version
}
}

public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata) {
DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null) {
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
DecommissionStatus status = decommissionAttributeMetadata.status();
if (decommissionAttribute != null && status != null) {
// We will let the node join the cluster if the current status is in FAILED state
if (node.getAttributes().get(decommissionAttribute.attributeName()).equals(decommissionAttribute.attributeValue())
&& status.equals(DecommissionStatus.FAILED) == false) {
throw new NodeDecommissionedException(
"node [{}] has decommissioned attribute [{}] with current status of decommissioning [{}]",
node.toString(),
decommissionAttribute.toString(),
status.status()
);
}
}
}
}

public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> validators = new ArrayList<>();
validators.add((node, state) -> {
ensureNodesCompatibility(node.getVersion(), state.getNodes());
ensureIndexCompatibility(node.getVersion(), state.getMetadata());
ensureNodeCommissioned(node, state.getMetadata());
});
validators.addAll(onJoinValidators);
return Collections.unmodifiableCollection(validators);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.decommission;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* This exception is thrown if the node is decommissioned by @{@link DecommissionService}
* and this nodes needs to be removed from the cluster
*
* @opensearch.internal
*/
public class NodeDecommissionedException extends OpenSearchException {

public NodeDecommissionedException(String msg, Object... args) {
super(msg, args);
}

public NodeDecommissionedException(StreamInput in) throws IOException {
super(in);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Decommission lifecycle classes
*/
package org.opensearch.cluster.decommission;
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.CoordinationStateRejectedException;
import org.opensearch.cluster.coordination.NoClusterManagerBlockService;
import org.opensearch.cluster.decommission.DecommissioningFailedException;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IllegalShardRoutingStateException;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -860,6 +862,8 @@ public void testIds() {
ids.put(160, NoSeedNodeLeftException.class);
ids.put(161, ReplicationFailedException.class);
ids.put(162, PrimaryShardClosedException.class);
ids.put(163, DecommissioningFailedException.class);
ids.put(164, NodeDecommissionedException.class);

Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateTaskExecutor;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
Expand All @@ -48,7 +53,9 @@
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;

import static org.hamcrest.Matchers.is;
import static org.opensearch.test.VersionUtils.allVersions;
Expand Down Expand Up @@ -216,4 +223,67 @@ public void testIsBecomeClusterManagerTask() {
JoinTaskExecutor.Task joinTaskOfClusterManager = JoinTaskExecutor.newBecomeClusterManagerTask();
assertThat(joinTaskOfClusterManager.isBecomeClusterManagerTask(), is(true));
}

public void testJoinClusterWithNoDecommission() {
Settings.builder().build();
Metadata.Builder metaBuilder = Metadata.builder();
Metadata metadata = metaBuilder.build();
DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-2"));
JoinTaskExecutor.ensureNodeCommissioned(discoveryNode, metadata);
}

public void testPreventJoinClusterWithDecommission() {
Settings.builder().build();
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1");
DecommissionStatus decommissionStatus = randomFrom(
DecommissionStatus.INIT,
DecommissionStatus.IN_PROGRESS,
DecommissionStatus.SUCCESSFUL
);
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttribute,
decommissionStatus
);
Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build();
DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-1"));
expectThrows(NodeDecommissionedException.class, () -> JoinTaskExecutor.ensureNodeCommissioned(discoveryNode, metadata));
}

public void testJoinClusterWithDifferentDecommission() {
Settings.builder().build();
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1");
DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values());
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttribute,
decommissionStatus
);
Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build();

DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-2"));
JoinTaskExecutor.ensureNodeCommissioned(discoveryNode, metadata);
}

public void testJoinClusterWithDecommissionFailed() {
Settings.builder().build();
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1");
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttribute,
DecommissionStatus.FAILED
);
Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build();

DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-1"));
JoinTaskExecutor.ensureNodeCommissioned(discoveryNode, metadata);
}

private DiscoveryNode newDiscoveryNode(Map<String, String> attributes) {
return new DiscoveryNode(
randomAlphaOfLength(10),
randomAlphaOfLength(10),
buildNewFakeTransportAddress(),
attributes,
Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE),
Version.CURRENT
);
}
}
Loading

0 comments on commit 5274f07

Please sign in to comment.