Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail weight update when decommission ongoing and fail decommission when attribute not weighed away #4839

Merged
merged 8 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732))
- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459))
- Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761))
- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839))
### Deprecated
### Removed
- Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.decommission.DecommissioningFailedException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
Expand All @@ -37,6 +41,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.test.NodeRoles.onlyRole;
Expand Down Expand Up @@ -102,6 +107,17 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx

ensureStableCluster(6);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

logger.info("--> starting decommissioning nodes in zone {}", 'c');
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
Expand Down Expand Up @@ -162,4 +178,57 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx
// as by then all nodes should have joined the cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();
// Start 3 cluster manager eligible nodes
internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).build());
// start 3 data nodes
internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).build());
ensureStableCluster(6);
ClusterHealthResponse health = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes(Integer.toString(6))
.execute()
.actionGet();
assertFalse(health.isTimedOut());

DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
assertBusy(() -> {
DecommissioningFailedException ex = expectThrows(
DecommissioningFailedException.class,
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
);
assertTrue(
ex.getMessage()
.contains("no weights are set to the attribute. Please set appropriate weights before triggering decommission action")
);
});

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

assertBusy(() -> {
DecommissioningFailedException ex = expectThrows(
DecommissioningFailedException.class,
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
);
assertTrue(ex.getMessage().contains("weight for decommissioned attribute is expected to be [0.0] but found [1.0]"));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -129,6 +131,8 @@ public ClusterState execute(ClusterState currentState) {
DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata();
// check that request is eligible to proceed
ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute);
// ensure attribute is weighed away
ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute);
decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute);
logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString());
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -413,6 +417,27 @@ private static void validateAwarenessAttribute(
}
}

private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState state, DecommissionAttribute decommissionAttribute) {
String msg = null;
WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting.attributeName().equals(decommissionAttribute.attributeName())) {
Double attributeValueWeight = weightedRouting.weights().get(decommissionAttribute.attributeValue());
if (attributeValueWeight == null || attributeValueWeight != 0.0) {
Copy link
Collaborator

@Bukhtawar Bukhtawar Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this value float or double, should we make specific comparisons. For instance Double.valueOf(0.0)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is Double. using .equals()

msg = "weight for decommissioned attribute is expected to be [0.0] but found [" + attributeValueWeight + "]";
}
} else {
msg = "no weights are specified to attribute [" + decommissionAttribute.attributeName() + "]";
}
} else {
msg = "no weights are set to the attribute. Please set appropriate weights before triggering decommission action";
}
if (msg != null) {
throw new DecommissioningFailedException(decommissionAttribute, msg);
}
}
imRishN marked this conversation as resolved.
Show resolved Hide resolved

private static void ensureEligibleRequest(
DecommissionAttributeMetadata decommissionAttributeMetadata,
DecommissionAttribute requestedDecommissionAttribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
Expand Down Expand Up @@ -68,6 +70,8 @@ public void registerWeightedRoutingMetadata(
clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
// verify currently no decommission action is ongoing
ensureNoDecommissionActionOngoing(currentState);
imRishN marked this conversation as resolved.
Show resolved Hide resolved
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE);
Expand Down Expand Up @@ -154,4 +158,15 @@ public void verifyAwarenessAttribute(String attributeName) {
throw validationException;
}
}

public void ensureNoDecommissionActionOngoing(ClusterState state) {
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if status is SUCCESSFUL?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only when Decommission has failed, we can allow the weight update can go through. Not for any other status

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't SUCCESSFUL a final state? The error message of this if condition says: a decommission action is ongoing with status...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A succesful decommission state means the transport execution has completed. The nodes are decommissioned in this state (not able to start pre voting or join). Hence DecommissionAction ongoing means a zone is decommissioned and the cluster rejects pings from decommissioned node

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If decommission status is FAILED can certain nodes be out of service or decommissioned while others not as in partially failed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if the decommission is FAILED, joins or prevoting is not blocked on those nodes and hence if some nodes are out of service (which can happen), the weights request can go through

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then the zone isn't fully equipped to take traffic right like 90% nodes are out of service and we place weights 10:1:1 when 10 is for the zone only operating at 10% capacity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No

throw new IllegalStateException(
"a decommission action is ongoing with status ["
+ decommissionAttributeMetadata.status().status()
+ "], cannot update weight during this state"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.MockTransport;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -169,6 +172,56 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testDecommissionNotStartedWithoutWeighingAwayAttribute_1() throws InterruptedException {
Map<String, Double> weights = Map.of("zone_1", 1.0, "zone_2", 1.0, "zone_3", 0.0);
setWeightedRoutingWeights(weights);
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1");
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
@Override
public void onResponse(DecommissionResponse decommissionResponse) {
fail("on response shouldn't have been called");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertThat(
e.getMessage(),
Matchers.containsString("weight for decommissioned attribute is expected to be [0.0] but found [1.0]")
);
countDownLatch.countDown();
}
};
decommissionService.startDecommissionAction(decommissionAttribute, listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testDecommissionNotStartedWithoutWeighingAwayAttribute_2() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone_1");
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
@Override
public void onResponse(DecommissionResponse decommissionResponse) {
fail("on response shouldn't have been called");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertThat(
e.getMessage(),
Matchers.containsString(
"no weights are set to the attribute. Please set appropriate weights before triggering decommission action"
)
);
countDownLatch.countDown();
}
};
decommissionService.startDecommissionAction(decommissionAttribute, listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

@SuppressWarnings("unchecked")
public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessful() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -286,6 +339,17 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

private void setWeightedRoutingWeights(Map<String, Double> weights) {
ClusterState clusterState = clusterService.state();
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting);
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata);
clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build();
ClusterState.Builder builder = ClusterState.builder(clusterState);
ClusterServiceUtils.setState(clusterService, builder);
}

private ClusterState addDataNodes(ClusterState clusterState, String zone, String... nodeIds) {
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes());
org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newDataNode(nodeId, singletonMap("zone", zone))));
Expand Down
Loading