Skip to content

Commit

Permalink
Account for node versions during allocation in ILM Shrink (#43300)
Browse files Browse the repository at this point in the history
This commit ensures that ILM's Shrink action will take node versions into
account when choosing which node to allocate to when shrinking an
index. Prior to this change, ILM could pick a node with a lower version
than some shards are already allocated to, which causes the new
allocation to fail as shards can't be relocated onto a node with a lower
version than they are already on.

As part of this, when making the decision about which node to allocate
to prior to Shrink, all shards in the index are considered, rather than
choosing a random shard to consider.

Further, the unit tests for the logic that chooses a node to allocate
shards to pre-shrink has been improved to validate the behavior in more
realistic and varied initial conditions.
  • Loading branch information
gwbrown authored Jun 24, 2019
1 parent 1034d23 commit b7e4410
Show file tree
Hide file tree
Showing 2 changed files with 300 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,76 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Allocates all shards in a single index to one node.
* For example, as preparation for shrinking that index.
*/
public class SetSingleNodeAllocateStep extends AsyncActionStep {
private static final Logger logger = LogManager.getLogger(SetSingleNodeAllocateStep.class);
public static final String NAME = "set-single-node-allocation";

private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(Collections.singletonList(
new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))));
// These allocation deciders were chosen because these are the conditions that can prevent
// allocation long-term, and that we can inspect in advance. Most other allocation deciders
// will either only delay relocation (e.g. ThrottlingAllocationDecider), or don't work very
// well when reallocating potentially many shards at once (e.g. DiskThresholdDecider)
private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(Arrays.asList(
new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
new NodeVersionAllocationDecider()
));

public SetSingleNodeAllocateStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey, client);
}

@Override
public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, ClusterStateObserver observer, Listener listener) {
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, clusterState.getRoutingNodes(), clusterState, null,
final RoutingNodes routingNodes = clusterState.getRoutingNodes();
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, routingNodes, clusterState, null,
System.nanoTime());
List<String> validNodeIds = new ArrayList<>();
Optional<ShardRouting> anyShard = clusterState.getRoutingTable().allShards(indexMetaData.getIndex().getName()).stream().findAny();
if (anyShard.isPresent()) {
// Iterate through the nodes finding ones that are acceptable for the current allocation rules of the shard
for (RoutingNode node : clusterState.getRoutingNodes()) {
boolean canRemainOnCurrentNode = ALLOCATION_DECIDERS.canRemain(anyShard.get(), node, allocation)
.type() == Decision.Type.YES;
if (canRemainOnCurrentNode) {
DiscoveryNode discoveryNode = node.node();
validNodeIds.add(discoveryNode.getId());
final Map<ShardId, List<ShardRouting>> routingsByShardId = clusterState.getRoutingTable()
.allShards(indexMetaData.getIndex().getName())
.stream()
.collect(Collectors.groupingBy(ShardRouting::shardId));


if (routingsByShardId.isEmpty() == false) {
for (RoutingNode node : routingNodes) {
boolean canAllocateOneCopyOfEachShard = routingsByShardId.values().stream() // For each shard
.allMatch(shardRoutings -> shardRoutings.stream() // Can we allocate at least one shard copy to this node?
.map(shardRouting -> ALLOCATION_DECIDERS.canAllocate(shardRouting, node, allocation).type())
.anyMatch(Decision.Type.YES::equals));
if (canAllocateOneCopyOfEachShard) {
validNodeIds.add(node.node().getId());
}
}
// Shuffle the list of nodes so the one we pick is random
Expand All @@ -70,6 +89,7 @@ public void performAction(IndexMetaData indexMetaData, ClusterState clusterState
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
} else {
// No nodes currently match the allocation rules so just wait until there is one that does
logger.debug("could not find any nodes to allocate index [{}] onto prior to shrink");
listener.onResponse(false);
}
} else {
Expand Down
Loading

0 comments on commit b7e4410

Please sign in to comment.