Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PUT api to update shard routing weights #4272

Merged
merged 63 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
162cbeb
Weighted round-robin scheduling policy for shard coordination traffic…
Aug 17, 2022
8513c4f
Add caching layer for wrr shard routing and moved wrr routing call to…
Aug 26, 2022
4cdbee3
Integrate ARS with weighted round robin,
Aug 27, 2022
7a49e5c
Remove ARS and add tests for zone with undefined weight
Sep 1, 2022
03c4d23
Add changelog for the commit
Sep 1, 2022
7c64537
Merge branch 'main' into feature/wrr-shard-routing-core
anshu1106 Sep 1, 2022
154a3b9
Fix java doc, add test of WeightedRoundRobinRouting metadata
Sep 1, 2022
3afb4e3
Fix minor change related to shuffling wrr shard routings
Sep 1, 2022
d9e2b08
Add PUT api for updating weighted round robin shard routing weights
Aug 22, 2022
c3b847a
Add more validations for request body for put wrr weights
Aug 29, 2022
9cc2cd5
Update thread pool executor and do code refactoring
Sep 1, 2022
f07a9ad
Fix missing java doc build failure
Sep 1, 2022
e12d247
Throw ActionRequestValidationException on invalid awareness attribute
Sep 2, 2022
30e17bc
Merge branch 'main' into feature/wrr-shard-routing-core
anshu1106 Sep 2, 2022
3aa6fbb
Add default weight 1 for zones with undefined weight
Sep 7, 2022
9ae063c
Merge remote-tracking branch 'origin/main' into feature/wrr-shard-rou…
Sep 7, 2022
1112366
Inject WRRShardsCache on node start
Sep 12, 2022
cd3e16c
Remove extra new lines
Sep 12, 2022
0c7dbd9
Fix import
Sep 12, 2022
438fe9e
Add size for shard routing cache
Sep 12, 2022
694be4d
Invalidate shard routing cache on close
Sep 12, 2022
9236b8d
Refactor code
Sep 13, 2022
93e7587
Add test for Weighted routing iterator and some refactoring changes
Sep 13, 2022
a9e30d1
Merge remote-tracking branch 'origin/main' into feature/wrr-shard-rou…
Sep 13, 2022
17dc58c
Update metadata minimal supported version
Sep 13, 2022
ed0cc1b
Add cluster setting for default weight
Sep 14, 2022
5a5cb11
Fix tests due to the change
Sep 14, 2022
1c33964
Fix cache concurrency issue
Sep 14, 2022
f8638f2
Spotless check fix
Sep 14, 2022
4016d27
Fix weighted round robin logic case when there is an entity with weig…
Sep 15, 2022
c98606d
Changes weight data type to double
Sep 15, 2022
1bd83f3
Fix test
Sep 15, 2022
8a72dc0
Empty commit
Sep 15, 2022
bff61b9
Empty commit
Sep 15, 2022
9102616
Fix spotless check
Sep 15, 2022
a291634
Create in-memory cache for shard routings
Sep 16, 2022
bd6c9e7
Fix put operation for weighted shard routings
Sep 16, 2022
d9368ab
Add tests for shard routing in-memory store
Sep 17, 2022
a3f0290
Merge branch 'main' into feature/wrr-shard-routing-core
Sep 17, 2022
01239bf
Add java docs and some code refactoring
Sep 17, 2022
dbbb263
Add null check for discovery nodes and single mutex for weighted shar…
Sep 19, 2022
785437b
Merge branch 'feature/wrr-shard-routing-core' into feature/wrr-put-api
Sep 19, 2022
f06a039
Refactor code
Sep 19, 2022
0a4b829
Fix tests
Sep 19, 2022
5f451d4
Merge remote-tracking branch 'origin/main' into feature/wrr-put-api
Sep 19, 2022
4f349e9
Refactor code
Sep 19, 2022
e59cff2
Remove unwanted changes
Sep 19, 2022
dc79fec
Remove unwanted changes for Requests.java
Sep 19, 2022
7671d63
Refactore code
Sep 19, 2022
454387d
Add change log
Sep 19, 2022
027d73a
Fix test failure
Sep 19, 2022
19047e6
Make WeightedRoutingService not to imoplement ClusterStateApplier
Sep 20, 2022
663017f
Update metadata call
Sep 20, 2022
70790e9
Add tests for WeightedRoutingService
Sep 21, 2022
ba87256
Add string formatter
Sep 22, 2022
4057441
Fix locale in string formatter
Sep 22, 2022
83797f4
Refactor code based on review comments
Sep 23, 2022
006600c
Refactor code
Sep 26, 2022
0240ff0
Refactor code
Sep 26, 2022
7763067
Address review comments
Sep 26, 2022
3a0b0ff
Merge branch 'main' into feature/wrr-put-api
anshu1106 Sep 26, 2022
703d918
Address review comments
Sep 26, 2022
f3cf04d
Add explicit count in request validation error message
Sep 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))
- PUT api for weighted shard routing ([#4272](https://github.com/opensearch-project/OpenSearch/pull/4272))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,8 @@ public void testApiNamingConventions() throws Exception {
"nodes.usage",
"nodes.reload_secure_settings",
"search_shards",
"remote_store.restore", };
"remote_store.restore",
"cluster.put_weighted_routing", };
List<String> booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password");
Set<String> deprecatedMethods = new HashSet<>();
deprecatedMethods.add("indices.force_merge");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"cluster.put_weighted_routing": {
"documentation": {
"url": "http://TBA",
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
"description": "TBA"
},
"stability": "stable",
"url": {
"paths": [
{
"path": "/_cluster/routing/awareness/{attribute}/weights",
"methods": [
"PUT"
],
"parts": {
"attribute": {
"type": "string",
"description": "Awareness attribute name"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.TransportPutWeightedRoutingAction;
import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.clone.TransportCloneSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
Expand Down Expand Up @@ -296,6 +298,7 @@
import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction;
import org.opensearch.rest.action.admin.cluster.RestClusterPutWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterRerouteAction;
import org.opensearch.rest.action.admin.cluster.RestClusterSearchShardsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterStateAction;
Expand Down Expand Up @@ -563,6 +566,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

actions.register(ClusterPutWeightedRoutingAction.INSTANCE, TransportPutWeightedRoutingAction.class);
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
Expand Down Expand Up @@ -745,6 +749,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestCloseIndexAction());
registerHandler.accept(new RestOpenIndexAction());
registerHandler.accept(new RestAddIndexBlockAction());
registerHandler.accept(new RestClusterPutWeightedRoutingAction());

registerHandler.accept(new RestUpdateSettingsAction());
registerHandler.accept(new RestGetSettingsAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.put;

import org.opensearch.action.ActionType;

/**
* Action to update weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public final class ClusterPutWeightedRoutingAction extends ActionType<ClusterPutWeightedRoutingResponse> {

public static final ClusterPutWeightedRoutingAction INSTANCE = new ClusterPutWeightedRoutingAction();
public static final String NAME = "cluster:admin/routing/awareness/weights/put";

private ClusterPutWeightedRoutingAction() {
super(NAME, ClusterPutWeightedRoutingResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.put;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchGenerationException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.DeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Request to update weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest<ClusterPutWeightedRoutingRequest> {
private static final Logger logger = LogManager.getLogger(ClusterPutWeightedRoutingRequest.class);

private WeightedRouting weightedRouting;
private String attributeName;

public WeightedRouting getWeightedRouting() {
return weightedRouting;
}

public ClusterPutWeightedRoutingRequest setWeightedRouting(WeightedRouting weightedRouting) {
this.weightedRouting = weightedRouting;
return this;
}

public void attributeName(String attributeName) {
this.attributeName = attributeName;
}

public ClusterPutWeightedRoutingRequest(StreamInput in) throws IOException {
super(in);
weightedRouting = new WeightedRouting(in);
}

public ClusterPutWeightedRoutingRequest() {}

public void setWeightedRouting(Map<String, String> source) {
try {
if (source.isEmpty()) {
throw new OpenSearchParseException(("Empty request body"));
}
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.map(source);
setWeightedRouting(BytesReference.bytes(builder), builder.contentType());
} catch (IOException e) {
throw new OpenSearchGenerationException("Failed to generate [" + source + "]", e);
}
}

public void setWeightedRouting(BytesReference source, XContentType contentType) {
try (
XContentParser parser = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
source,
contentType
)
) {
String attrValue = null;
Map<String, Double> weights = new HashMap<>();
Double attrWeight = null;
XContentParser.Token token;
// move to the first alias
parser.nextToken();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
attrValue = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
attrWeight = Double.parseDouble(parser.text());
weights.put(attrValue, attrWeight);
} else {
throw new OpenSearchParseException(
"failed to parse weighted routing request attribute [{}], " + "unknown type",
attrWeight
);
}
}
this.weightedRouting = new WeightedRouting(this.attributeName, weights);
} catch (IOException e) {
logger.error("error while parsing put for weighted routing request object", e);
}
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (weightedRouting == null) {
validationException = addValidationError("Weighted routing request object is null", validationException);
}
if (weightedRouting.attributeName() == null || weightedRouting.attributeName().isEmpty()) {
validationException = addValidationError("Attribute name is missing", validationException);
}
if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) {
validationException = addValidationError("Weights are missing", validationException);
}
int countValueWithZeroWeights = 0;
double weight;
try {
for (Object value : weightedRouting.weights().values()) {
if (value == null) {
validationException = addValidationError(("Weight is null"), validationException);
} else {
weight = Double.parseDouble(value.toString());
countValueWithZeroWeights = (weight == 0) ? countValueWithZeroWeights + 1 : countValueWithZeroWeights;
}
}
} catch (NumberFormatException e) {
validationException = addValidationError(("Weight is not a number"), validationException);
}
if (countValueWithZeroWeights > 1) {
validationException = addValidationError(("More than one value has weight set as 0 "), validationException);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets also add the count explicitly

Copy link
Contributor Author

@anshu1106 anshu1106 Sep 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

}
return validationException;
}

/**
* @param source weights definition from request body
* @return this request
*/
public ClusterPutWeightedRoutingRequest source(Map<String, String> source) {
setWeightedRouting(source);
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
weightedRouting.writeTo(out);
}

@Override
public String toString() {
return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "}";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.put;

import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.cluster.routing.WeightedRouting;

/**
* Request builder to update weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public class ClusterPutWeightedRoutingRequestBuilder extends ClusterManagerNodeOperationRequestBuilder<
ClusterPutWeightedRoutingRequest,
ClusterPutWeightedRoutingResponse,
ClusterPutWeightedRoutingRequestBuilder> {
public ClusterPutWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterPutWeightedRoutingAction action) {
super(client, action, new ClusterPutWeightedRoutingRequest());
}

public ClusterPutWeightedRoutingRequestBuilder setWeightedRouting(WeightedRouting weightedRouting) {
request.setWeightedRouting(weightedRouting);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.put;

import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.xcontent.ToXContentObject;

import java.io.IOException;

/**
* Response from updating weights for weighted round-robin search routing policy.
*
* @opensearch.internal
*/
public class ClusterPutWeightedRoutingResponse extends AcknowledgedResponse implements ToXContentObject {
public ClusterPutWeightedRoutingResponse(boolean acknowledged) {
super(acknowledged);
}

public ClusterPutWeightedRoutingResponse(StreamInput in) throws IOException {
super(in);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the point extending?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed to implement ToXContentObject since AcknowledgedResponse takes care of serialization/deserialization

Loading