Skip to content

Commit

Permalink
Weighted round-robin scheduling policy for shard coordination traffic… (
Browse files Browse the repository at this point in the history
opensearch-project#4241)

* Weighted round-robin scheduling policy for shard coordination traffic routing

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
Signed-off-by: Vishal Sarda <vsarda@amazon.com>
  • Loading branch information
anshu1106 authored and Vishalks committed Sep 28, 2022
1 parent d2db9ca commit fce3950
Show file tree
Hide file tree
Showing 13 changed files with 1,254 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
- Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156))
- Weighted round-robin scheduling policy for shard coordination traffic ([#4241](https://github.com/opensearch-project/OpenSearch/pull/4241))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.metadata.MetadataMappingService;
import org.opensearch.cluster.metadata.MetadataUpdateSettingsService;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
Expand Down Expand Up @@ -191,6 +192,7 @@ public static List<Entry> getNamedWriteables() {
ComposableIndexTemplateMetadata::readDiffFrom
);
registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
registerMetadataCustom(entries, WeightedRoutingMetadata.TYPE, WeightedRoutingMetadata::new, WeightedRoutingMetadata::readDiffFrom);
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
Expand Down Expand Up @@ -274,6 +276,13 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
DataStreamMetadata::fromXContent
)
);
entries.add(
new NamedXContentRegistry.Entry(
Metadata.Custom.class,
new ParseField(WeightedRoutingMetadata.TYPE),
WeightedRoutingMetadata::fromXContent
)
);
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,14 @@ public IndexGraveyard indexGraveyard() {
return custom(IndexGraveyard.TYPE);
}

/**
* *
* @return The weighted routing metadata for search requests
*/
public WeightedRoutingMetadata weightedRoutingMetadata() {
return custom(WeightedRoutingMetadata.TYPE);
}

public <T extends Custom> T custom(String type) {
return (T) customs.get(type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchParseException;
import org.opensearch.Version;
import org.opensearch.cluster.AbstractNamedDiffable;
import org.opensearch.cluster.NamedDiff;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;

/**
* Contains metadata for weighted routing
*
* @opensearch.internal
*/
public class WeightedRoutingMetadata extends AbstractNamedDiffable<Metadata.Custom> implements Metadata.Custom {
private static final Logger logger = LogManager.getLogger(WeightedRoutingMetadata.class);
public static final String TYPE = "weighted_shard_routing";
public static final String AWARENESS = "awareness";
private WeightedRouting weightedRouting;

public WeightedRouting getWeightedRouting() {
return weightedRouting;
}

public WeightedRoutingMetadata setWeightedRouting(WeightedRouting weightedRouting) {
this.weightedRouting = weightedRouting;
return this;
}

public WeightedRoutingMetadata(StreamInput in) throws IOException {
if (in.available() != 0) {
this.weightedRouting = new WeightedRouting(in);
}
}

public WeightedRoutingMetadata(WeightedRouting weightedRouting) {
this.weightedRouting = weightedRouting;
}

@Override
public EnumSet<Metadata.XContentContext> context() {
return Metadata.API_AND_GATEWAY;
}

@Override
public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_2_4_0;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (weightedRouting != null) {
weightedRouting.writeTo(out);
}
}

public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Metadata.Custom.class, TYPE, in);
}

public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws IOException {
String attrKey = null;
Double attrValue;
String attributeName = null;
Map<String, Double> weights = new HashMap<>();
WeightedRouting weightedRouting = null;
XContentParser.Token token;
// move to the first alias
parser.nextToken();
String awarenessField = null;

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
awarenessField = parser.currentName();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new OpenSearchParseException(
"failed to parse weighted routing metadata [{}], expected " + "object",
awarenessField
);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
attributeName = parser.currentName();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new OpenSearchParseException(
"failed to parse weighted routing metadata [{}], expected" + " object",
attributeName
);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
attrKey = parser.currentName();
} else if (token == XContentParser.Token.VALUE_NUMBER) {
attrValue = Double.parseDouble(parser.text());
weights.put(attrKey, attrValue);
} else {
throw new OpenSearchParseException(
"failed to parse weighted routing metadata attribute " + "[{}], unknown type",
attributeName
);
}
}
}
}
}
weightedRouting = new WeightedRouting(attributeName, weights);
return new WeightedRoutingMetadata(weightedRouting);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoutingMetadata that = (WeightedRoutingMetadata) o;
return weightedRouting.equals(that.weightedRouting);
}

@Override
public int hashCode() {
return weightedRouting.hashCode();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
toXContent(weightedRouting, builder);
return builder;
}

public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder) throws IOException {
builder.startObject(AWARENESS);
builder.startObject(weightedRouting.attributeName());
for (Map.Entry<String, Double> entry : weightedRouting.weights().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
}

@Override
public String toString() {
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
private volatile Map<AttributesKey, AttributesRoutings> activeShardsByAttributes = emptyMap();
private volatile Map<AttributesKey, AttributesRoutings> initializingShardsByAttributes = emptyMap();
private final Object shardsByAttributeMutex = new Object();
private final Object shardsByWeightMutex = new Object();
private volatile Map<WeightedRoutingKey, List<ShardRouting>> activeShardsByWeight = emptyMap();
private volatile Map<WeightedRoutingKey, List<ShardRouting>> initializingShardsByWeight = emptyMap();

/**
* The initializing list, including ones that are initializing on a target node because of relocation.
Expand Down Expand Up @@ -233,6 +236,10 @@ public List<ShardRouting> assignedShards() {
return this.assignedShards;
}

public Map<WeightedRoutingKey, List<ShardRouting>> getActiveShardsByWeight() {
return activeShardsByWeight;
}

public ShardIterator shardsRandomIt() {
return new PlainShardIterator(shardId, shuffler.shuffle(shards));
}
Expand Down Expand Up @@ -292,6 +299,73 @@ public ShardIterator activeInitializingShardsRankedIt(
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an iterator over active and initializing shards, shards are ordered by weighted
* round-robin scheduling policy.
*
* @param weightedRouting entity
* @param nodes discovered nodes in the cluster
* @return an iterator over active and initializing shards, ordered by weighted round-robin
* scheduling policy. Making sure that initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) {
final int seed = shuffler.nextSeed();
List<ShardRouting> ordered = new ArrayList<>();
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(shuffler.shuffle(orderedActiveShards, seed));
if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns a list containing shard routings ordered using weighted round-robin scheduling.
*/
private List<ShardRouting> shardsOrderedByWeight(
List<ShardRouting> shards,
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight
) {
WeightedRoundRobin<ShardRouting> weightedRoundRobin = new WeightedRoundRobin<>(
calculateShardWeight(shards, weightedRouting, nodes, defaultWeight)
);
List<WeightedRoundRobin.Entity<ShardRouting>> shardsOrderedbyWeight = weightedRoundRobin.orderEntities();
List<ShardRouting> orderedShardRouting = new ArrayList<>(activeShards.size());
if (shardsOrderedbyWeight != null) {
for (WeightedRoundRobin.Entity<ShardRouting> shardRouting : shardsOrderedbyWeight) {
orderedShardRouting.add(shardRouting.getTarget());
}
}
return orderedShardRouting;
}

/**
* Returns a list containing shard routing and associated weight. This function iterates through all the shards and
* uses weighted routing to find weight for the corresponding shard. This is fed to weighted round-robin scheduling
* to order shards by weight.
*/
private List<WeightedRoundRobin.Entity<ShardRouting>> calculateShardWeight(
List<ShardRouting> shards,
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight
) {
List<WeightedRoundRobin.Entity<ShardRouting>> shardsWithWeights = new ArrayList<>();
for (ShardRouting shard : shards) {
DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node != null) {
String attVal = node.getAttributes().get(weightedRouting.attributeName());
// If weight for a zone is not defined, considering it as 1 by default
Double weight = weightedRouting.weights().getOrDefault(attVal, defaultWeight);
shardsWithWeights.add(new WeightedRoundRobin.Entity<>(weight, shard));
}
}
return shardsWithWeights;
}

private static Set<String> getAllNodeIds(final List<ShardRouting> shards) {
final Set<String> nodeIds = new HashSet<>();
for (ShardRouting shard : shards) {
Expand Down Expand Up @@ -698,6 +772,66 @@ public int shardsMatchingPredicateCount(Predicate<ShardRouting> predicate) {
return count;
}

/**
* Key for WeightedRouting Shard Iterator
*
* @opensearch.internal
*/
public static class WeightedRoutingKey {
private final WeightedRouting weightedRouting;

public WeightedRoutingKey(WeightedRouting weightedRouting) {
this.weightedRouting = weightedRouting;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoutingKey key = (WeightedRoutingKey) o;
if (!weightedRouting.equals(key.weightedRouting)) return false;
return true;
}

@Override
public int hashCode() {
int result = weightedRouting.hashCode();
return result;
}
}

/**
* *
* Gets active shard routing from memory if available, else calculates and put it in memory.
*/
private List<ShardRouting> getActiveShardsByWeight(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) {
WeightedRoutingKey key = new WeightedRoutingKey(weightedRouting);
List<ShardRouting> shardRoutings = activeShardsByWeight.get(key);
if (shardRoutings == null) {
synchronized (shardsByWeightMutex) {
shardRoutings = shardsOrderedByWeight(activeShards, weightedRouting, nodes, defaultWeight);
activeShardsByWeight = new MapBuilder().put(key, shardRoutings).immutableMap();
}
}
return shardRoutings;
}

/**
* *
* Gets initializing shard routing from memory if available, else calculates and put it in memory.
*/
private List<ShardRouting> getInitializingShardsByWeight(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) {
WeightedRoutingKey key = new WeightedRoutingKey(weightedRouting);
List<ShardRouting> shardRoutings = initializingShardsByWeight.get(key);
if (shardRoutings == null) {
synchronized (shardsByWeightMutex) {
shardRoutings = shardsOrderedByWeight(activeShards, weightedRouting, nodes, defaultWeight);
initializingShardsByWeight = new MapBuilder().put(key, shardRoutings).immutableMap();
}
}
return shardRoutings;
}

/**
* Builder of an index shard routing table.
*
Expand Down
Loading

0 comments on commit fce3950

Please sign in to comment.