Skip to content

Commit

Permalink
Do not cancel recovery for copy on broken node (#48265)
Browse files Browse the repository at this point in the history
This change fixes a poisonous situation where an ongoing recovery was
canceled because a better copy was found on a node that the cluster had
previously tried allocating the shard to but failed. The solution is to
keep track of the set of nodes that an allocation was failed on so that
we can avoid canceling the current recovery for a copy on failed nodes.

Closes #47974
  • Loading branch information
dnhatn committed Nov 9, 2019
1 parent 5e4501e commit 9a42e71
Show file tree
Hide file tree
Showing 12 changed files with 281 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
"primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT);
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet());
failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver);
}
}
Expand Down Expand Up @@ -873,7 +873,7 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R
UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(),
currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(),
currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(),
allocationStatus);
allocationStatus, currInfo.getFailedNodeIds());
ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource());
changes.unassignedInfoUpdated(shard, newInfo);
shard = updatedShard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -39,8 +40,11 @@
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;

/**
* Holds additional information as to why the shard is in unassigned state.
Expand Down Expand Up @@ -214,6 +218,7 @@ public String value() {
private final String message;
private final Exception failure;
private final int failedAllocations;
private final Set<String> failedNodeIds;
private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard

/**
Expand All @@ -224,7 +229,7 @@ public String value() {
**/
public UnassignedInfo(Reason reason, String message) {
this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
AllocationStatus.NO_ATTEMPT, Collections.emptySet());
}

/**
Expand All @@ -235,9 +240,11 @@ public UnassignedInfo(Reason reason, String message) {
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
* @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.
* @param lastAllocationStatus the result of the last allocation attempt for this shard
* @param failedNodeIds a set of nodeIds that failed to complete allocations for this shard
*/
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations,
long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus) {
long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus,
Set<String> failedNodeIds) {
this.reason = Objects.requireNonNull(reason);
this.unassignedTimeMillis = unassignedTimeMillis;
this.unassignedTimeNanos = unassignedTimeNanos;
Expand All @@ -246,6 +253,7 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti
this.failure = failure;
this.failedAllocations = failedAllocations;
this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus);
this.failedNodeIds = Collections.unmodifiableSet(failedNodeIds);
assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) :
"failedAllocations: " + failedAllocations + " for reason " + reason;
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
Expand All @@ -263,6 +271,11 @@ public UnassignedInfo(StreamInput in) throws IOException {
this.failure = in.readException();
this.failedAllocations = in.readVInt();
this.lastAllocationStatus = AllocationStatus.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
this.failedNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString));
} else {
this.failedNodeIds = Collections.emptySet();
}
}

public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -280,6 +293,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeException(failure);
out.writeVInt(failedAllocations);
lastAllocationStatus.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeCollection(failedNodeIds, StreamOutput::writeString);
}
}

/**
Expand Down Expand Up @@ -354,6 +370,19 @@ public AllocationStatus getLastAllocationStatus() {
return lastAllocationStatus;
}

/**
* A set of nodeIds that failed to complete allocations for this shard. {@link org.elasticsearch.gateway.ReplicaShardAllocator}
* uses this set to avoid repeatedly canceling ongoing recoveries for copies on those nodes although they can perform noop recoveries.
* This set will be discarded when a shard moves to started. And if a shard is failed while started (i.e., from started to unassigned),
* the currently assigned node won't be added to this set.
*
* @see org.elasticsearch.gateway.ReplicaShardAllocator#processExistingRecoveries(RoutingAllocation)
* @see org.elasticsearch.cluster.routing.allocation.AllocationService#applyFailedShards(ClusterState, List, List)
*/
public Set<String> getFailedNodeIds() {
return failedNodeIds;
}

/**
* Calculates the delay left based on current time (in nanoseconds) and the delay defined by the index settings.
* Only relevant if shard is effectively delayed (see {@link #isDelayed()})
Expand Down Expand Up @@ -410,6 +439,9 @@ public String shortSummary() {
if (failedAllocations > 0) {
sb.append(", failed_attempts[").append(failedAllocations).append("]");
}
if (failedNodeIds.isEmpty() == false) {
sb.append(", failed_nodes[").append(failedNodeIds).append("]");
}
sb.append(", delayed=").append(delayed);
String details = getDetails();

Expand All @@ -433,6 +465,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (failedAllocations > 0) {
builder.field("failed_attempts", failedAllocations);
}
if (failedNodeIds.isEmpty() == false) {
builder.field("failed_nodes", failedNodeIds);
}
builder.field("delayed", delayed);
String details = getDetails();
if (details != null) {
Expand Down Expand Up @@ -466,13 +501,16 @@ public boolean equals(Object o) {
if (reason != that.reason) {
return false;
}
if (message != null ? !message.equals(that.message) : that.message != null) {
if (Objects.equals(message, that.message) == false) {
return false;
}
if (lastAllocationStatus != that.lastAllocationStatus) {
return false;
}
return !(failure != null ? !failure.equals(that.failure) : that.failure != null);
if (Objects.equals(failure, that.failure) == false) {
return false;
}
return failedNodeIds.equals(that.failedNodeIds);
}

@Override
Expand All @@ -484,6 +522,7 @@ public int hashCode() {
result = 31 * result + (message != null ? message.hashCode() : 0);
result = 31 * result + (failure != null ? failure.hashCode() : 0);
result = 31 * result + lastAllocationStatus.hashCode();
result = 31 * result + failedNodeIds.hashCode();
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -193,10 +195,18 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis
shardToFail.shardId(), shardToFail, failedShard);
}
int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0;
final Set<String> failedNodeIds;
if (failedShard.unassignedInfo() != null) {
failedNodeIds = new HashSet<>(failedShard.unassignedInfo().getFailedNodeIds().size() + 1);
failedNodeIds.addAll(failedShard.unassignedInfo().getFailedNodeIds());
failedNodeIds.add(failedShard.currentNodeId());
} else {
failedNodeIds = Collections.emptySet();
}
String message = "failed shard on node [" + shardToFail.currentNodeId() + "]: " + failedShardEntry.getMessage();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message,
failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
AllocationStatus.NO_ATTEMPT, failedNodeIds);
if (failedShardEntry.markAsStale()) {
allocation.removeAllocationId(failedShard);
}
Expand Down Expand Up @@ -288,8 +298,8 @@ private void removeDelayMarkers(RoutingAllocation allocation) {
if (newComputedLeftDelayNanos == 0) {
unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus()),
shardRouting.recoverySource(), allocation.changes());
unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(),
unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes());
}
}
}
Expand All @@ -307,7 +317,7 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) {
UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(),
unassignedInfo.getLastAllocationStatus()), shardRouting.recoverySource(), allocation.changes());
unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes());
}
}

Expand Down Expand Up @@ -416,7 +426,8 @@ private void disassociateDeadNodes(RoutingAllocation allocation) {
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT,
Collections.emptySet());
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == AllocationStatus.NO_ATTEMPT) {
unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO),
unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO,
unassignedInfo.getFailedNodeIds()),
shardRouting.recoverySource(), allocation.changes());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.index.shard.ShardNotFoundException;

import java.io.IOException;
import java.util.Collections;
import java.util.Optional;

/**
Expand Down Expand Up @@ -139,7 +140,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
", " + shardRouting.unassignedInfo().getMessage();
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage,
shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false,
shardRouting.unassignedInfo().getLastAllocationStatus());
shardRouting.unassignedInfo().getLastAllocationStatus(), Collections.emptySet());
}

initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

Expand Down Expand Up @@ -95,7 +97,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
continue;
}

MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryNode, primaryStore, shardStores, false);
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, shardStores, false);
if (matchingNodes.getNodeWithHighestMatch() != null) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
Expand All @@ -106,11 +108,13 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f
// we found a better match that can perform noop recovery, cancel the existing allocation.
logger.debug("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]",
currentNode, nodeWithHighestMatch);
final Set<String> failedNodeIds =
shard.unassignedInfo() == null ? Collections.emptySet() : shard.unassignedInfo().getFailedNodeIds();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+
nodeWithHighestMatch + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT);
UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds);
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo,
metaData.getIndexSafe(shard.index()), allocation.changes()));
Expand Down Expand Up @@ -186,7 +190,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
return AllocateUnassignedDecision.NOT_TAKEN;
}

MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryNode, primaryStore, shardStores, explain);
MatchingNodes matchingNodes = findMatchingNodes(
unassignedShard, allocation, false, primaryNode, primaryStore, shardStores, explain);
assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions";

List<NodeAllocationResult> nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions);
Expand Down Expand Up @@ -297,14 +302,18 @@ private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore
return nodeFilesStore.storeFilesMetaData();
}

private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation,
private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, boolean noMatchFailedNodes,
DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data,
boolean explain) {
Map<DiscoveryNode, MatchingNode> matchingNodes = new HashMap<>();
Map<String, NodeAllocationResult> nodeDecisions = explain ? new HashMap<>() : null;
for (Map.Entry<DiscoveryNode, NodeStoreFilesMetaData> nodeStoreEntry : data.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
if (noMatchFailedNodes && shard.unassignedInfo() != null &&
shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) {
continue;
}
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
// we don't have any files at all, it is an empty index
if (storeFilesMetaData.isEmpty()) {
Expand Down
Loading

0 comments on commit 9a42e71

Please sign in to comment.