Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel recoveries even if all shards assigned #46520

Merged
merged 13 commits into from
Oct 1, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,10 @@ private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
"auto-expand replicas out of sync with number of nodes in the cluster";

// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
removeDelayMarkers(allocation);
gatewayAllocator.allocateUnassigned(allocation);
}

removeDelayMarkers(allocation);
// try to allocate existing shard copies first
gatewayAllocator.allocateUnassigned(allocation);

shardsAllocator.allocate(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ protected static void innerAllocatedUnassigned(RoutingAllocation allocation,
unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering

primaryShardAllocator.allocateUnassigned(allocation);
replicaShardAllocator.processExistingRecoveries(allocation);
if (allocation.routingNodes().hasInactiveShards()) {
// cancel existing recoveries if we have a better match
replicaShardAllocator.processExistingRecoveries(allocation);
}
replicaShardAllocator.allocateUnassigned(allocation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,81 @@ public void testReplicaRecovery() throws Exception {
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), numOfDocs);
}

public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exception {
logger.info("--> start node A");
final String nodeA = internalCluster().startNode();

logger.info("--> create index on node: {}", nodeA);
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shardSize is unused:

Suggested change
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT)
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats().getStore().size();

.getShards()[0].getStats().getStore().size();

logger.info("--> start node B");
// force a shard recovery from nodeA to nodeB
final String nodeB = internalCluster().startNode();
Settings nodeBDataPathSettings = internalCluster().dataPathSettings(nodeB);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodeBDataPathSettings is unused:

Suggested change
Settings nodeBDataPathSettings = internalCluster().dataPathSettings(nodeB);


logger.info("--> add replica for {} on node: {}", INDEX_NAME, nodeB);
assertAcked(client().admin().indices().prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0)));
ensureGreen(INDEX_NAME);

logger.info("--> start node C");
final String nodeC = internalCluster().startNode();
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd normally recommend the shorthand

Suggested change
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut());
ensureStableCluster(3);

but I don't think this is necessary:

  • startNode() calls validateClusterFormed()
  • anyway it doesn't matter if node C takes a bit longer to join the cluster because we have to wait for its recovery to start which only happens after it's joined.

Therefore I think we can drop this:

Suggested change
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut());


// do sync flush to gen sync id
assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0));

// hold peer recovery on phase 2 after nodeB down
CountDownLatch allowToCompletePhase1Latch = new CountDownLatch(1);
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeA);
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (PeerRecoveryTargetService.Actions.CLEAN_FILES.equals(action)) {
try {
allowToCompletePhase1Latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
connection.sendRequest(requestId, action, request, options);
});

logger.info("--> restart node B");
internalCluster().restartNode(nodeB,
new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
assertBusy(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😁 I was just about to note the missing wait here.

I think it'd be neater to wait for node A to send its CLEAN_FILES action instead of using an assertBusy. You can use another CountDownLatch for this.

// nodeB stopped, peer recovery from nodeA to nodeC, it will be cancelled after nodeB get started.
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();

List<RecoveryState> recoveryStates = response.shardRecoveryStates().get(INDEX_NAME);
List<RecoveryState> nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not need to say anything about the recoveries on node A. These assertions are true, but not particularly important for this test.

assertThat(nodeARecoveryStates.size(), equalTo(1));
List<RecoveryState> nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates);
assertThat(nodeCRecoveryStates.size(), equalTo(1));

assertRecoveryState(nodeARecoveryStates.get(0), 0, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
true, Stage.DONE, null, nodeA);
validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex());

assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE,
false, nodeA, nodeC);
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
});

return super.onNodeStopped(nodeName);
}
});

// wait for peer recovering from nodeA to nodeB to be finished
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me some time to work out why this works - I suggest this comment explaining it:

Suggested change
// wait for peer recovering from nodeA to nodeB to be finished
// wait for peer recovery from nodeA to nodeB which is a no-op recovery so it skips the CLEAN_FILES stage and hence is not blocked

ensureGreen();
allowToCompletePhase1Latch.countDown();
transportService.clearAllRules();
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
}

public void testRerouteRecovery() throws Exception {
logger.info("--> start node A");
final String nodeA = internalCluster().startNode();
Expand Down