Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre conditions check before updating weighted routing metadata #4955

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support remote translog transfer for request level durability([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480))
- Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.routing.UnsupportedWeightedRoutingStateException;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -70,6 +71,7 @@
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.Version.V_2_4_0;
import static org.opensearch.Version.V_2_5_0;
import static org.opensearch.Version.V_3_0_0;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
Expand Down Expand Up @@ -1618,6 +1620,12 @@ private enum OpenSearchExceptionHandle {
SnapshotInUseDeletionException::new,
166,
UNKNOWN_VERSION_ADDED
),
UNSUPPORTED_WEIGHTED_ROUTING_STATE_EXCEPTION(
UnsupportedWeightedRoutingStateException.class,
UnsupportedWeightedRoutingStateException::new,
167,
V_2_5_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import static org.opensearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -127,26 +126,17 @@ public ActionRequestValidationException validate() {
if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) {
validationException = addValidationError("Weights are missing", validationException);
}
int countValueWithZeroWeights = 0;
double weight;
try {
for (Object value : weightedRouting.weights().values()) {
if (value == null) {
validationException = addValidationError(("Weight is null"), validationException);
} else {
weight = Double.parseDouble(value.toString());
countValueWithZeroWeights = (weight == 0) ? countValueWithZeroWeights + 1 : countValueWithZeroWeights;
Double.parseDouble(value.toString());
}
}
} catch (NumberFormatException e) {
validationException = addValidationError(("Weight is not a number"), validationException);
}
if (countValueWithZeroWeights > 1) {
validationException = addValidationError(
(String.format(Locale.ROOT, "More than one [%d] value has weight set as 0", countValueWithZeroWeights)),
validationException
);
}
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

import java.io.IOException;

/**
* Thrown when failing to update the routing weight due to an unsupported state. See {@link WeightedRoutingService} for more details.
*
* @opensearch.internal
*/
public class UnsupportedWeightedRoutingStateException extends OpenSearchException {
public UnsupportedWeightedRoutingStateException(StreamInput in) throws IOException {
super(in);
}

public UnsupportedWeightedRoutingStateException(String msg, Object... args) {
super(msg, args);
}

@Override
public RestStatus status() {
return RestStatus.CONFLICT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.metadata.Metadata;
Expand All @@ -32,10 +33,16 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.opensearch.action.ValidateActions.addValidationError;
import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING;

/**
* * Service responsible for updating cluster state metadata with weighted routing weights
Expand All @@ -45,6 +52,8 @@ public class WeightedRoutingService {
private final ClusterService clusterService;
private final ThreadPool threadPool;
private volatile List<String> awarenessAttributes;
private volatile Map<String, List<String>> forcedAwarenessAttributes;
private static final Double DECOMMISSIONED_AWARENESS_VALUE_WEIGHT = 0.0;

@Inject
public WeightedRoutingService(
Expand All @@ -60,6 +69,11 @@ public WeightedRoutingService(
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
this::setAwarenessAttributes
);
setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
this::setForcedAwarenessAttributes
);
}

public void registerWeightedRoutingMetadata(
Expand All @@ -70,8 +84,10 @@ public void registerWeightedRoutingMetadata(
clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
// verify currently no decommission action is ongoing
ensureNoOngoingDecommissionAction(currentState);
// verify that request object has weights for all discovered and forced awareness values
ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues(currentState, request);
// verify weights will not be updated for a decommissioned attribute
ensureDecommissionedAttributeHasZeroWeight(currentState, request);
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE);
Expand Down Expand Up @@ -148,6 +164,18 @@ private void setAwarenessAttributes(List<String> awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

private void setForcedAwarenessAttributes(Settings forceSettings) {
Map<String, List<String>> forcedAwarenessAttributes = new HashMap<>();
Map<String, Settings> forceGroups = forceSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : forceGroups.entrySet()) {
List<String> aValues = entry.getValue().getAsList("values");
if (aValues.size() > 0) {
forcedAwarenessAttributes.put(entry.getKey(), aValues);
}
}
this.forcedAwarenessAttributes = forcedAwarenessAttributes;
}

public void verifyAwarenessAttribute(String attributeName) {
if (getAwarenessAttributes().contains(attributeName) == false) {
ActionRequestValidationException validationException = null;
Expand All @@ -159,13 +187,62 @@ public void verifyAwarenessAttribute(String attributeName) {
}
}

public void ensureNoOngoingDecommissionAction(ClusterState state) {
private void ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues(ClusterState state, ClusterPutWeightedRoutingRequest request) {
String attributeName = request.getWeightedRouting().attributeName();
Set<String> discoveredAwarenessValues = new HashSet<>();
state.nodes().forEach(node -> {
if (node.getAttributes().containsKey(attributeName)) {
discoveredAwarenessValues.add(node.getAttributes().get(attributeName));
}
});
Set<String> allAwarenessValues;
if (forcedAwarenessAttributes.get(attributeName) == null) {
allAwarenessValues = new HashSet<>();
} else {
allAwarenessValues = new HashSet<>(forcedAwarenessAttributes.get(attributeName));
}
allAwarenessValues.addAll(discoveredAwarenessValues);
allAwarenessValues.forEach(awarenessValue -> {
if (request.getWeightedRouting().weights().containsKey(awarenessValue) == false) {
throw new UnsupportedWeightedRoutingStateException(
"weight for [" + awarenessValue + "] is not set and it is part of forced awareness value or a node has this attribute."
);
}
});
}

private void ensureDecommissionedAttributeHasZeroWeight(ClusterState state, ClusterPutWeightedRoutingRequest request) {
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false) {
throw new IllegalStateException(
"a decommission action is ongoing with status ["
+ decommissionAttributeMetadata.status().status()
+ "], cannot update weight during this state"
if (decommissionAttributeMetadata == null || decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED)) {
// here either there's no decommission action is ongoing or it is in failed state. In this case, we will allow weight update
return;
}
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
WeightedRouting weightedRouting = request.getWeightedRouting();
if (weightedRouting.attributeName().equals(decommissionAttribute.attributeName()) == false) {
// this is unexpected when a different attribute is requested for decommission and weight update is on another attribute
throw new UnsupportedWeightedRoutingStateException(
"decommission action ongoing for attribute [{}], cannot update weight for [{}]",
decommissionAttribute.attributeName(),
weightedRouting.attributeName()
);
}
if (weightedRouting.weights().containsKey(decommissionAttribute.attributeValue()) == false) {
// weight of an attribute undergoing decommission must be specified
throw new UnsupportedWeightedRoutingStateException(
"weight for [{}] is not specified. Please specify its weight to [{}] as it is under decommission action",
decommissionAttribute.attributeValue(),
DECOMMISSIONED_AWARENESS_VALUE_WEIGHT
);
}
if (Objects.equals(
weightedRouting.weights().get(decommissionAttribute.attributeValue()),
DECOMMISSIONED_AWARENESS_VALUE_WEIGHT
) == false) {
throw new UnsupportedWeightedRoutingStateException(
"weight for [{}] must be set to [{}] as it is under decommission action",
decommissionAttribute.attributeValue(),
DECOMMISSIONED_AWARENESS_VALUE_WEIGHT
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.cluster.routing.UnsupportedWeightedRoutingStateException;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.common.ParsingException;
import org.opensearch.common.Strings;
Expand Down Expand Up @@ -864,6 +865,7 @@ public void testIds() {
ids.put(164, NodeDecommissionedException.class);
ids.put(165, ClusterManagerThrottlingException.class);
ids.put(166, SnapshotInUseDeletionException.class);
ids.put(167, UnsupportedWeightedRoutingStateException.class);

Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ public void testValidate_ValuesAreProper() {
assertNull(actionRequestValidationException);
}

public void testValidate_TwoZonesWithZeroWeight() {
String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}";
ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone");
request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON);
ActionRequestValidationException actionRequestValidationException = request.validate();
assertNotNull(actionRequestValidationException);
assertTrue(actionRequestValidationException.getMessage().contains("More than one [2] value has weight set as " + "0"));
}

public void testValidate_MissingWeights() {
String reqString = "{}";
ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,38 @@ public void testVerifyAwarenessAttribute_ValidAttributeName() {
}
}

public void testAddWeightedRoutingFailsWhenWeightsNotSetForAllDiscoveredZones() throws InterruptedException {
ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder(
client,
ClusterAddWeightedRoutingAction.INSTANCE
);
Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_C", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
request.setWeightedRouting(weightedRouting);
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionReference = new AtomicReference<>();
ActionListener<ClusterStateUpdateResponse> listener = new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {
exceptionReference.set(e);
countDownLatch.countDown();
}
};
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue());
MatcherAssert.assertThat(exceptionReference.get(), instanceOf(UnsupportedWeightedRoutingStateException.class));
MatcherAssert.assertThat(
exceptionReference.get().getMessage(),
containsString("weight for [zone_B] is not set and it is part of forced awareness value or a node has this attribute.")
);
}

public void testAddWeightedRoutingFailsWhenDecommissionOngoing() throws InterruptedException {
Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0);
DecommissionStatus status = randomFrom(DecommissionStatus.INIT, DecommissionStatus.IN_PROGRESS, DecommissionStatus.SUCCESSFUL);
Expand Down Expand Up @@ -327,8 +359,11 @@ public void onFailure(Exception e) {
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue());
MatcherAssert.assertThat(exceptionReference.get(), instanceOf(IllegalStateException.class));
MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("a decommission action is ongoing with status"));
MatcherAssert.assertThat(exceptionReference.get(), instanceOf(UnsupportedWeightedRoutingStateException.class));
MatcherAssert.assertThat(
exceptionReference.get().getMessage(),
containsString("weight for [zone_A] must be set to [0.0] as it is under decommission action")
);
}

public void testAddWeightedRoutingPassesWhenDecommissionFailed() throws InterruptedException {
Expand Down Expand Up @@ -362,4 +397,36 @@ public void onFailure(Exception e) {}
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testAddWeightedRoutingPassesWhenWeightOfDecommissionedAttributeStillZero() throws InterruptedException {
imRishN marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Double> weights = Map.of("zone_A", 0.0, "zone_B", 1.0, "zone_C", 1.0);
DecommissionStatus status = DecommissionStatus.SUCCESSFUL;
ClusterState state = clusterService.state();
state = setWeightedRoutingWeights(state, weights);
state = setDecommissionAttribute(state, status);
ClusterState.Builder builder = ClusterState.builder(state);
ClusterServiceUtils.setState(clusterService, builder);

ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder(
client,
ClusterAddWeightedRoutingAction.INSTANCE
);
Map<String, Double> updatedWeights = Map.of("zone_A", 0.0, "zone_B", 2.0, "zone_C", 1.0);
WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", updatedWeights);
request.setWeightedRouting(updatedWeightedRouting);
final CountDownLatch countDownLatch = new CountDownLatch(1);
ActionListener<ClusterStateUpdateResponse> listener = new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
assertTrue(clusterStateUpdateResponse.isAcknowledged());
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {}
};
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
assertEquals(updatedWeightedRouting, clusterService.state().metadata().weightedRoutingMetadata().getWeightedRouting());
}
}