Skip to content

Commit

Permalink
Add basic recovery prioritization to GatewayAllocator
Browse files Browse the repository at this point in the history
This commit adds logic to prefer shards with higher priority
or from newer indicse to be allocated first if they are unallocated post API.

This commit allows users to set `index.priority` to a non-negative integer to
prioritize index recovery for certain indices. This setting is dynamically updateable
and defaults to `0`. If two indices have the same priority this change takes the creation
date into account to prioritize shards from newer indices which is important in the time-based
indices usecase.

Closes #11787
  • Loading branch information
s1monw committed Jul 7, 2015
1 parent a917819 commit 4723805
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public static State fromString(String state) {
public static final String SETTING_VERSION_UPGRADED = "index.version.upgraded";
public static final String SETTING_VERSION_MINIMUM_COMPATIBLE = "index.version.minimum_compatible";
public static final String SETTING_CREATION_DATE = "index.creation_date";
public static final String SETTING_PRIORITY = "index.priority";
public static final String SETTING_UUID = "index.uuid";
public static final String SETTING_DATA_PATH = "index.data_path";
public static final String SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE = "index.shared_filesystem.recover_on_any_node";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -566,6 +567,10 @@ public void addAll(Collection<MutableShardRouting> mutableShardRoutings) {
}
}

public void sort(Comparator<ShardRouting> comparator) {
CollectionUtil.timSort(unassigned, comparator);
}

public int size() {
return unassigned.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,16 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
DiscoveryNodes nodes = allocation.nodes();
RoutingNodes routingNodes = allocation.routingNodes();

MetaData metaData = routingNodes.metaData();
final MetaData metaData = routingNodes.metaData();
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
unassigned.sort(new PriorityComparator() {

@Override
protected Settings getIndexSettings(String index) {
IndexMetaData indexMetaData = metaData.index(index);
return indexMetaData.getSettings();
}
}); // sort for priority ordering
// First, handle primaries, they must find a place to be allocated on here
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
Expand Down Expand Up @@ -370,7 +379,7 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
}

// Now, handle replicas, try to assign them to nodes that are similar to the one the primary was allocated on
unassignedIterator = routingNodes.unassigned().iterator();
unassignedIterator = unassigned.iterator();
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();
if (shard.primary()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.gateway.local;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;

import java.util.Comparator;

/**
* A comparator that compares ShardRouting based on it's indexes priority (index.priority),
* it's creation date (index.creation_date), or eventually by it's index name in reverse order.
* We try to recover first shards from an index with the highest priority, if that's the same
* we try to compare the timestamp the index is created and pick the newer first (time-based indices,
* here the newer indices matter more). If even that is the same, we compare the index name which is useful
* if the date is baked into the index name. ie logstash-2015.05.03.
*/
abstract class PriorityComparator implements Comparator<ShardRouting> {

@Override
public final int compare(ShardRouting o1, ShardRouting o2) {
final String o1Index = o1.index();
final String o2Index = o2.index();
int cmp = 0;
if (o1Index.equals(o2Index) == false) {
final Settings settingsO1 = getIndexSettings(o1Index);
final Settings settingsO2 = getIndexSettings(o2Index);
cmp = Long.compare(priority(settingsO2), priority(settingsO1));
if (cmp == 0) {
cmp = Long.compare(timeCreated(settingsO2), timeCreated(settingsO1));
if (cmp == 0) {
cmp = o2Index.compareTo(o1Index);
}
}
}
return cmp;
}

private int priority(Settings settings) {
return settings.getAsInt(IndexMetaData.SETTING_PRIORITY, 1);
}

private long timeCreated(Settings settings) {
return settings.getAsLong(IndexMetaData.SETTING_CREATION_DATE, -1l);
}

protected abstract Settings getIndexSettings(String index);
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public IndexDynamicSettingsModule() {
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_BLOCKS_WRITE);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_BLOCKS_METADATA);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_PRIORITY, Validator.NON_NEGATIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE);
indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME);
indexDynamicSettings.addDynamicSetting(LocalGatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchTestCase;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

public class PriorityComparatorTests extends ElasticsearchTestCase {

public void testPriorityComparatorSort() {
RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards();
int numIndices = randomIntBetween(3, 99);
IndexMeta[] indices = new IndexMeta[numIndices];
final Map<String, IndexMeta> map = new HashMap<>();

for (int i = 0; i < indices.length; i++) {
if (frequently()) {
indices[i] = new IndexMeta("idx_2015_04_" + String.format(Locale.ROOT, "%02d", i), randomIntBetween(1, 1000), randomIntBetween(1, 10000));
} else { // sometimes just use defaults
indices[i] = new IndexMeta("idx_2015_04_" + String.format(Locale.ROOT, "%02d", i));
}
map.put(indices[i].name, indices[i]);
}
int numShards = randomIntBetween(10, 100);
for (int i = 0; i < numShards; i++) {
IndexMeta indexMeta = randomFrom(indices);
shards.add(new MutableShardRouting(new ImmutableShardRouting(indexMeta.name, randomIntBetween(1, 5), null, null, null,
randomBoolean(), ShardRoutingState.UNASSIGNED, randomIntBetween(0, 100), new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar"))));
}
shards.sort(new PriorityComparator() {
@Override
protected Settings getIndexSettings(String index) {
IndexMeta indexMeta = map.get(index);
return indexMeta.settings;
}
});
ShardRouting previous = null;
for (ShardRouting routing : shards) {
if (previous != null) {
IndexMeta prevMeta = map.get(previous.getIndex());
IndexMeta currentMeta = map.get(routing.getIndex());
if (prevMeta.priority == currentMeta.priority) {
if (prevMeta.creationDate == currentMeta.creationDate) {
if (prevMeta.name.equals(currentMeta.name) == false) {
assertTrue("indexName mismatch, expected:" + currentMeta.name + " after " + prevMeta.name + " " + prevMeta.name.compareTo(currentMeta.name), prevMeta.name.compareTo(currentMeta.name) > 0);
}
} else {
assertTrue("creationDate mismatch, expected:" + currentMeta.creationDate + " after " + prevMeta.creationDate, prevMeta.creationDate > currentMeta.creationDate);
}
} else {
assertTrue("priority mismatch, expected:" + currentMeta.priority + " after " + prevMeta.priority, prevMeta.priority > currentMeta.priority);
}
}
previous = routing;
}
}

private static class IndexMeta {
final String name;
final int priority;
final long creationDate;
final Settings settings;

private IndexMeta(String name) { // default
this.name = name;
this.priority = 1;
this.creationDate = -1;
this.settings = ImmutableSettings.EMPTY;
}

private IndexMeta(String name, int priority, long creationDate) {
this.name = name;
this.priority = priority;
this.creationDate = creationDate;
this.settings = ImmutableSettings.builder().put(IndexMetaData.SETTING_CREATION_DATE, creationDate)
.put(IndexMetaData.SETTING_PRIORITY, priority).build();
}
}
}
10 changes: 10 additions & 0 deletions src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.QueryParsingException;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -199,4 +200,13 @@ public void testFailShard() throws Exception {
assertTrue(exists);
assertThat("store index should be corrupted", Store.canOpenIndex(logger, shardIndexLocations), equalTo(false));
}

public void testUpdatePriority() {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(IndexMetaData.SETTING_PRIORITY, 200));
IndexSettingsService indexSettingsService = getInstanceFromNode(IndicesService.class).indexService("test").settingsService();
assertEquals(200, indexSettingsService.getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue());
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_PRIORITY, 400).build()).get();
assertEquals(400, indexSettingsService.getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue());
}
}

0 comments on commit 4723805

Please sign in to comment.