Skip to content

Commit

Permalink
Cancel recoveries even if all shards assigned (#46520)
Browse files Browse the repository at this point in the history
We cancel ongoing peer recoveries if a node joins the cluster with a completely
up-to-date copy of a shard, because we can use such a copy to recover a replica
instantly. However, today we only look for recoveries to cancel while there are
unassigned shards in the cluster. This means that we do not contemplate the
cancellation of the last few recoveries since recovering shards are not
unassigned.  It might take much longer for these recoveries to complete than
would be necessary if they were cancelled.

This commit fixes this by checking for cancellable recoveries even if all
shards are assigned.
  • Loading branch information
howardhuanghua authored and DaveCTurner committed Oct 1, 2019
1 parent 6c6b4bf commit af930a7
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,10 @@ private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
"auto-expand replicas out of sync with number of nodes in the cluster";

// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
removeDelayMarkers(allocation);
gatewayAllocator.allocateUnassigned(allocation);
}

removeDelayMarkers(allocation);
// try to allocate existing shard copies first
gatewayAllocator.allocateUnassigned(allocation);

shardsAllocator.allocate(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ protected static void innerAllocatedUnassigned(RoutingAllocation allocation,
unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering

primaryShardAllocator.allocateUnassigned(allocation);
replicaShardAllocator.processExistingRecoveries(allocation);
if (allocation.routingNodes().hasInactiveShards()) {
// cancel existing recoveries if we have a better match
replicaShardAllocator.processExistingRecoveries(allocation);
}
replicaShardAllocator.allocateUnassigned(allocation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
Expand Down Expand Up @@ -324,6 +325,86 @@ public void testReplicaRecovery() throws Exception {
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), numOfDocs);
}

public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exception {
logger.info("--> start node A");
final String nodeA = internalCluster().startNode();

logger.info("--> create index on node: {}", nodeA);
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT)
.getShards()[0].getStats().getStore().size();

logger.info("--> start node B");
// force a shard recovery from nodeA to nodeB
final String nodeB = internalCluster().startNode();

logger.info("--> add replica for {} on node: {}", INDEX_NAME, nodeB);
assertAcked(client().admin().indices().prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0)));
ensureGreen(INDEX_NAME);

logger.info("--> start node C");
final String nodeC = internalCluster().startNode();

// do sync flush to gen sync id
assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0));

// hold peer recovery on phase 2 after nodeB down
CountDownLatch phase1ReadyBlocked = new CountDownLatch(1);
CountDownLatch allowToCompletePhase1Latch = new CountDownLatch(1);
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeA);
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (PeerRecoveryTargetService.Actions.CLEAN_FILES.equals(action)) {
phase1ReadyBlocked.countDown();
try {
allowToCompletePhase1Latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
connection.sendRequest(requestId, action, request, options);
});

logger.info("--> restart node B");
internalCluster().restartNode(nodeB,
new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
phase1ReadyBlocked.await();
// nodeB stopped, peer recovery from nodeA to nodeC, it will be cancelled after nodeB get started.
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();

List<RecoveryState> recoveryStates = response.shardRecoveryStates().get(INDEX_NAME);
List<RecoveryState> nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates);
assertThat(nodeCRecoveryStates.size(), equalTo(1));

assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE,
false, nodeA, nodeC);
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());

return super.onNodeStopped(nodeName);
}
});

// wait for peer recovery from nodeA to nodeB which is a no-op recovery so it skips the CLEAN_FILES stage and hence is not blocked
ensureGreen();
allowToCompletePhase1Latch.countDown();
transportService.clearAllRules();

// make sure nodeA has primary and nodeB has replica
ClusterState state = client().admin().cluster().prepareState().get().getState();
List<ShardRouting> startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED);
assertThat(startedShards.size(), equalTo(2));
for (ShardRouting shardRouting : startedShards) {
if (shardRouting.primary()) {
assertThat(state.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(nodeA));
} else {
assertThat(state.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(nodeB));
}
}
}

public void testRerouteRecovery() throws Exception {
logger.info("--> start node A");
final String nodeA = internalCluster().startNode();
Expand Down

0 comments on commit af930a7

Please sign in to comment.