Skip to content

Commit

Permalink
improve allocation decision to allow to explain why a decision has be…
Browse files Browse the repository at this point in the history
…en made

building the infra to support explaining why an allocation decision has been made, for example, why a shard is not allocated on a specific node
  • Loading branch information
kimchy committed Oct 16, 2012
1 parent c350dcd commit c2073c3
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -77,7 +78,8 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
lastNode = 0;
}

if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) {
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.YES) {
int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size();
if (numberOfShardsToAllocate <= 0) {
continue;
Expand All @@ -96,7 +98,8 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
MutableShardRouting shard = it.next();
// go over the nodes and try and allocate the remaining ones
for (RoutingNode routingNode : sortedNodesLeastToHigh(allocation)) {
if (allocation.deciders().canAllocate(shard, routingNode, allocation).allocate()) {
Decision decision = allocation.deciders().canAllocate(shard, routingNode, allocation);
if (decision.type() == Decision.Type.YES) {
changed = true;
routingNode.add(shard);
it.remove();
Expand Down Expand Up @@ -142,7 +145,8 @@ public boolean rebalance(RoutingAllocation allocation) {
continue;
}

if (allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation).allocate()) {
Decision decision = allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation);
if (decision.type() == Decision.Type.YES) {
changed = true;
lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(),
lowRoutingNode.nodeId(), startedShard.currentNodeId(),
Expand Down Expand Up @@ -179,7 +183,8 @@ public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingA
if (nodeToCheck.nodeId().equals(node.nodeId())) {
continue;
}
if (allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation).allocate()) {
Decision decision = allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation);
if (decision.type() == Decision.Type.YES) {
nodeToCheck.add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
nodeToCheck.nodeId(), shardRouting.currentNodeId(),
shardRouting.primary(), INITIALIZING, shardRouting.version() + 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
Expand Down Expand Up @@ -160,8 +161,9 @@ public void execute(RoutingAllocation allocation) throws ElasticSearchException
}

RoutingNode routingNode = allocation.routingNodes().node(discoNode.id());
if (!allocation.deciders().canAllocate(shardRouting, routingNode, allocation).allowed()) {
throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed");
Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
}
// go over and remove it from the unassigned
for (Iterator<MutableShardRouting> it = allocation.routingNodes().unassigned().iterator(); it.hasNext(); ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
Expand Down Expand Up @@ -158,11 +158,11 @@ public void execute(RoutingAllocation allocation) throws ElasticSearchException
}

RoutingNode toRoutingNode = allocation.routingNodes().node(toDiscoNode.id());
AllocationDecider.Decision decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation);
if (!decision.allowed()) {
throw new ElasticSearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", from " + fromDiscoNode + ", to " + toDiscoNode + ", since its not allowed");
Decision decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation);
if (decision.type() == Decision.Type.NO) {
throw new ElasticSearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", from " + fromDiscoNode + ", to " + toDiscoNode + ", since its not allowed, reason: " + decision);
}
if (!decision.allocate()) {
if (decision.type() == Decision.Type.THROTTLE) {
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,54 +30,6 @@
*/
public abstract class AllocationDecider extends AbstractComponent {

public static enum Decision {
YES {
@Override
public boolean allocate() {
return true;
}

@Override
public boolean allowed() {
return true;
}
},
NO {
@Override
public boolean allocate() {
return false;
}

@Override
public boolean allowed() {
return false;
}
},
THROTTLE {
@Override
public boolean allocate() {
return false;
}

@Override
public boolean allowed() {
return true;
}
};

/**
* It can be allocated *now* on a node. Note, it might be {@link #allowed()} to be allocated
* on a node, yet, allocate will be <tt>false</tt> since its being throttled for example.
*/
public abstract boolean allocate();

/**
* Is allocation allowed on a node. Note, this does not mean that we should allocate *now*,
* though, in extreme cases, we might "force" allocation.
*/
public abstract boolean allowed();
}

protected AllocationDecider(Settings settings) {
super(settings);
}
Expand All @@ -87,7 +39,7 @@ public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocat
}

public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.YES;
return Decision.ALWAYS;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,16 @@ public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocat

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Decision ret = Decision.YES;
// first, check if its in the ignored, if so, return NO
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
return Decision.NO;
}
// now, go over the registered allocations
for (AllocationDecider allocation1 : allocations) {
Decision decision = allocation1.canAllocate(shardRouting, node, allocation);
if (decision == Decision.NO) {
return Decision.NO;
} else if (decision == Decision.THROTTLE) {
ret = Decision.THROTTLE;
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canAllocate(shardRouting, node, allocation);
// the assumption is that a decider that returns the static instance Decision#ALWAYS
// does not really implements canAllocate
if (decision != Decision.ALWAYS) {
ret.add(decision);
}
}
return ret;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import com.google.common.collect.Lists;

import java.util.List;

/**
*/
public abstract class Decision {

public static final Decision ALWAYS = new Single(Type.YES);
public static final Decision YES = new Single(Type.YES);
public static final Decision NO = new Single(Type.NO);
public static final Decision THROTTLE = new Single(Type.THROTTLE);

public static Decision single(Type type, String explanation, Object... explanationParams) {
return new Single(type, explanation, explanationParams);
}

public static enum Type {
YES,
NO,
THROTTLE
}

public abstract Type type();

public static class Single extends Decision {
private final Type type;
private final String explanation;
private final Object[] explanationParams;

public Single(Type type) {
this(type, null, (Object[]) null);
}

public Single(Type type, String explanation, Object... explanationParams) {
this.type = type;
this.explanation = explanation;
this.explanationParams = explanationParams;
}

public Type type() {
return this.type;
}

@Override
public String toString() {
if (explanation == null) {
return type + "()";
}
return type + "(" + String.format(explanation, explanationParams) + ")";
}
}

public static class Multi extends Decision {

private final List<Decision> decisions = Lists.newArrayList();

public Multi add(Decision decision) {
decisions.add(decision);
return this;
}

@Override
public Type type() {
Type ret = Type.YES;
for (int i = 0; i < decisions.size(); i++) {
Type type = decisions.get(i).type();
if (type == Type.NO) {
return type;
} else if (type == Type.THROTTLE) {
ret = type;
}
}
return ret;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Decision decision : decisions) {
sb.append("[").append(decision.toString()).append("]");
}
return sb.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -119,7 +119,8 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
continue;
}
// if its THROTTLING, we are not going to allocate it to this node, so ignore it as well
if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) {
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.YES) {
canBeAllocatedToAtLeastOneNode = true;
break;
}
Expand Down Expand Up @@ -153,7 +154,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
if (allocation.deciders().canAllocate(shard, node, allocation) == AllocationDecider.Decision.NO) {
if (allocation.deciders().canAllocate(shard, node, allocation).type() == Decision.Type.NO) {
continue;
}

Expand Down Expand Up @@ -236,7 +237,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
}

if (lastNodeMatched != null) {
if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation) == AllocationDecider.Decision.THROTTLE) {
if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation).type() == Decision.Type.THROTTLE) {
if (logger.isTraceEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -195,10 +195,10 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
Set<DiscoveryNode> noNodes = Sets.newHashSet();
for (DiscoveryNode discoNode : nodesWithHighestVersion) {
RoutingNode node = routingNodes.node(discoNode.id());
AllocationDecider.Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision == AllocationDecider.Decision.THROTTLE) {
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
throttledNodes.add(discoNode);
} else if (decision == AllocationDecider.Decision.NO) {
} else if (decision.type() == Decision.Type.NO) {
noNodes.add(discoNode);
} else {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -258,7 +258,8 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
}
// if we can't allocate it on a node, ignore it, for example, this handles
// cases for only allocating a replica after a primary
if (allocation.deciders().canAllocate(shard, node, allocation).allocate()) {
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.YES) {
canBeAllocatedToAtLeastOneNode = true;
break;
}
Expand Down Expand Up @@ -292,7 +293,8 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
if (allocation.deciders().canAllocate(shard, node, allocation) == AllocationDecider.Decision.NO) {
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.NO) {
continue;
}

Expand Down Expand Up @@ -328,7 +330,8 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {

if (lastNodeMatched != null) {
// we only check on THROTTLE since we checked before before on NO
if (allocation.deciders().canAllocate(shard, lastNodeMatched, allocation) == AllocationDecider.Decision.THROTTLE) {
Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
if (logger.isTraceEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
Expand Down

0 comments on commit c2073c3

Please sign in to comment.