Skip to content

Commit

Permalink
Pre conditions check before updating weighted routing metadata (opens…
Browse files Browse the repository at this point in the history
…earch-project#4955)

* Pre conditions check to allow weight updates for non decommissioned attribute

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Dec 13, 2022
1 parent eedddab commit 6121b65
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bumps `protobuf-java` from 3.21.7 to 3.21.9 ([#5319](https://github.com/opensearch-project/OpenSearch/pull/5319))
### Changed
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955))

### Deprecated
### Removed
Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.routing.UnsupportedWeightedRoutingStateException;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -72,6 +73,7 @@
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.Version.V_2_3_0;
import static org.opensearch.Version.V_2_4_0;
import static org.opensearch.Version.V_2_5_0;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName;
Expand Down Expand Up @@ -1633,6 +1635,12 @@ private enum OpenSearchExceptionHandle {
SnapshotInUseDeletionException::new,
166,
UNKNOWN_VERSION_ADDED
),
UNSUPPORTED_WEIGHTED_ROUTING_STATE_EXCEPTION(
UnsupportedWeightedRoutingStateException.class,
UnsupportedWeightedRoutingStateException::new,
167,
V_2_5_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import static org.opensearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -127,26 +126,17 @@ public ActionRequestValidationException validate() {
if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) {
validationException = addValidationError("Weights are missing", validationException);
}
int countValueWithZeroWeights = 0;
double weight;
try {
for (Object value : weightedRouting.weights().values()) {
if (value == null) {
validationException = addValidationError(("Weight is null"), validationException);
} else {
weight = Double.parseDouble(value.toString());
countValueWithZeroWeights = (weight == 0) ? countValueWithZeroWeights + 1 : countValueWithZeroWeights;
Double.parseDouble(value.toString());
}
}
} catch (NumberFormatException e) {
validationException = addValidationError(("Weight is not a number"), validationException);
}
if (countValueWithZeroWeights > 1) {
validationException = addValidationError(
(String.format(Locale.ROOT, "More than one [%d] value has weight set as 0", countValueWithZeroWeights)),
validationException
);
}
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

import java.io.IOException;

/**
* Thrown when failing to update the routing weight due to an unsupported state. See {@link WeightedRoutingService} for more details.
*
* @opensearch.internal
*/
public class UnsupportedWeightedRoutingStateException extends OpenSearchException {
public UnsupportedWeightedRoutingStateException(StreamInput in) throws IOException {
super(in);
}

public UnsupportedWeightedRoutingStateException(String msg, Object... args) {
super(msg, args);
}

@Override
public RestStatus status() {
return RestStatus.CONFLICT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.metadata.Metadata;
Expand All @@ -32,10 +33,16 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.opensearch.action.ValidateActions.addValidationError;
import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING;

/**
* * Service responsible for updating cluster state metadata with weighted routing weights
Expand All @@ -45,6 +52,8 @@ public class WeightedRoutingService {
private final ClusterService clusterService;
private final ThreadPool threadPool;
private volatile List<String> awarenessAttributes;
private volatile Map<String, List<String>> forcedAwarenessAttributes;
private static final Double DECOMMISSIONED_AWARENESS_VALUE_WEIGHT = 0.0;

@Inject
public WeightedRoutingService(
Expand All @@ -60,6 +69,11 @@ public WeightedRoutingService(
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
this::setAwarenessAttributes
);
setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
this::setForcedAwarenessAttributes
);
}

public void registerWeightedRoutingMetadata(
Expand All @@ -70,8 +84,10 @@ public void registerWeightedRoutingMetadata(
clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
// verify currently no decommission action is ongoing
ensureNoOngoingDecommissionAction(currentState);
// verify that request object has weights for all discovered and forced awareness values
ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues(currentState, request);
// verify weights will not be updated for a decommissioned attribute
ensureDecommissionedAttributeHasZeroWeight(currentState, request);
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE);
Expand Down Expand Up @@ -148,6 +164,18 @@ private void setAwarenessAttributes(List<String> awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

private void setForcedAwarenessAttributes(Settings forceSettings) {
Map<String, List<String>> forcedAwarenessAttributes = new HashMap<>();
Map<String, Settings> forceGroups = forceSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : forceGroups.entrySet()) {
List<String> aValues = entry.getValue().getAsList("values");
if (aValues.size() > 0) {
forcedAwarenessAttributes.put(entry.getKey(), aValues);
}
}
this.forcedAwarenessAttributes = forcedAwarenessAttributes;
}

public void verifyAwarenessAttribute(String attributeName) {
if (getAwarenessAttributes().contains(attributeName) == false) {
ActionRequestValidationException validationException = null;
Expand All @@ -159,13 +187,62 @@ public void verifyAwarenessAttribute(String attributeName) {
}
}

public void ensureNoOngoingDecommissionAction(ClusterState state) {
private void ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues(ClusterState state, ClusterPutWeightedRoutingRequest request) {
String attributeName = request.getWeightedRouting().attributeName();
Set<String> discoveredAwarenessValues = new HashSet<>();
state.nodes().forEach(node -> {
if (node.getAttributes().containsKey(attributeName)) {
discoveredAwarenessValues.add(node.getAttributes().get(attributeName));
}
});
Set<String> allAwarenessValues;
if (forcedAwarenessAttributes.get(attributeName) == null) {
allAwarenessValues = new HashSet<>();
} else {
allAwarenessValues = new HashSet<>(forcedAwarenessAttributes.get(attributeName));
}
allAwarenessValues.addAll(discoveredAwarenessValues);
allAwarenessValues.forEach(awarenessValue -> {
if (request.getWeightedRouting().weights().containsKey(awarenessValue) == false) {
throw new UnsupportedWeightedRoutingStateException(
"weight for [" + awarenessValue + "] is not set and it is part of forced awareness value or a node has this attribute."
);
}
});
}

private void ensureDecommissionedAttributeHasZeroWeight(ClusterState state, ClusterPutWeightedRoutingRequest request) {
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false) {
throw new IllegalStateException(
"a decommission action is ongoing with status ["
+ decommissionAttributeMetadata.status().status()
+ "], cannot update weight during this state"
if (decommissionAttributeMetadata == null || decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED)) {
// here either there's no decommission action is ongoing or it is in failed state. In this case, we will allow weight update
return;
}
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
WeightedRouting weightedRouting = request.getWeightedRouting();
if (weightedRouting.attributeName().equals(decommissionAttribute.attributeName()) == false) {
// this is unexpected when a different attribute is requested for decommission and weight update is on another attribute
throw new UnsupportedWeightedRoutingStateException(
"decommission action ongoing for attribute [{}], cannot update weight for [{}]",
decommissionAttribute.attributeName(),
weightedRouting.attributeName()
);
}
if (weightedRouting.weights().containsKey(decommissionAttribute.attributeValue()) == false) {
// weight of an attribute undergoing decommission must be specified
throw new UnsupportedWeightedRoutingStateException(
"weight for [{}] is not specified. Please specify its weight to [{}] as it is under decommission action",
decommissionAttribute.attributeValue(),
DECOMMISSIONED_AWARENESS_VALUE_WEIGHT
);
}
if (Objects.equals(
weightedRouting.weights().get(decommissionAttribute.attributeValue()),
DECOMMISSIONED_AWARENESS_VALUE_WEIGHT
) == false) {
throw new UnsupportedWeightedRoutingStateException(
"weight for [{}] must be set to [{}] as it is under decommission action",
decommissionAttribute.attributeValue(),
DECOMMISSIONED_AWARENESS_VALUE_WEIGHT
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.cluster.routing.UnsupportedWeightedRoutingStateException;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.common.ParsingException;
import org.opensearch.common.Strings;
Expand Down Expand Up @@ -868,6 +869,7 @@ public void testIds() {
ids.put(164, NodeDecommissionedException.class);
ids.put(165, ClusterManagerThrottlingException.class);
ids.put(166, SnapshotInUseDeletionException.class);
ids.put(167, UnsupportedWeightedRoutingStateException.class);

Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ public void testValidate_ValuesAreProper() {
assertNull(actionRequestValidationException);
}

public void testValidate_TwoZonesWithZeroWeight() {
String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}";
ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone");
request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON);
ActionRequestValidationException actionRequestValidationException = request.validate();
assertNotNull(actionRequestValidationException);
assertTrue(actionRequestValidationException.getMessage().contains("More than one [2] value has weight set as " + "0"));
}

public void testValidate_MissingWeights() {
String reqString = "{}";
ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,38 @@ public void testVerifyAwarenessAttribute_ValidAttributeName() {
}
}

public void testAddWeightedRoutingFailsWhenWeightsNotSetForAllDiscoveredZones() throws InterruptedException {
ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder(
client,
ClusterAddWeightedRoutingAction.INSTANCE
);
Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_C", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
request.setWeightedRouting(weightedRouting);
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionReference = new AtomicReference<>();
ActionListener<ClusterStateUpdateResponse> listener = new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {
exceptionReference.set(e);
countDownLatch.countDown();
}
};
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue());
MatcherAssert.assertThat(exceptionReference.get(), instanceOf(UnsupportedWeightedRoutingStateException.class));
MatcherAssert.assertThat(
exceptionReference.get().getMessage(),
containsString("weight for [zone_B] is not set and it is part of forced awareness value or a node has this attribute.")
);
}

public void testAddWeightedRoutingFailsWhenDecommissionOngoing() throws InterruptedException {
Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0);
DecommissionStatus status = randomFrom(DecommissionStatus.INIT, DecommissionStatus.IN_PROGRESS, DecommissionStatus.SUCCESSFUL);
Expand Down Expand Up @@ -327,8 +359,11 @@ public void onFailure(Exception e) {
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue());
MatcherAssert.assertThat(exceptionReference.get(), instanceOf(IllegalStateException.class));
MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("a decommission action is ongoing with status"));
MatcherAssert.assertThat(exceptionReference.get(), instanceOf(UnsupportedWeightedRoutingStateException.class));
MatcherAssert.assertThat(
exceptionReference.get().getMessage(),
containsString("weight for [zone_A] must be set to [0.0] as it is under decommission action")
);
}

public void testAddWeightedRoutingPassesWhenDecommissionFailed() throws InterruptedException {
Expand Down Expand Up @@ -362,4 +397,36 @@ public void onFailure(Exception e) {}
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testAddWeightedRoutingPassesWhenWeightOfDecommissionedAttributeStillZero() throws InterruptedException {
Map<String, Double> weights = Map.of("zone_A", 0.0, "zone_B", 1.0, "zone_C", 1.0);
DecommissionStatus status = DecommissionStatus.SUCCESSFUL;
ClusterState state = clusterService.state();
state = setWeightedRoutingWeights(state, weights);
state = setDecommissionAttribute(state, status);
ClusterState.Builder builder = ClusterState.builder(state);
ClusterServiceUtils.setState(clusterService, builder);

ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder(
client,
ClusterAddWeightedRoutingAction.INSTANCE
);
Map<String, Double> updatedWeights = Map.of("zone_A", 0.0, "zone_B", 2.0, "zone_C", 1.0);
WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", updatedWeights);
request.setWeightedRouting(updatedWeightedRouting);
final CountDownLatch countDownLatch = new CountDownLatch(1);
ActionListener<ClusterStateUpdateResponse> listener = new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
assertTrue(clusterStateUpdateResponse.isAcknowledged());
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {}
};
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
assertEquals(updatedWeightedRouting, clusterService.state().metadata().weightedRoutingMetadata().getWeightedRouting());
}
}

0 comments on commit 6121b65

Please sign in to comment.