Skip to content

Commit

Permalink
ReplicationTracker.markAllocationIdAsInSync may hang if allocation is…
Browse files Browse the repository at this point in the history
… cancelled (#30316)

At the end of recovery, we mark the recovering shard as "in sync" on the primary. From this point on 
the primary will treat any replication failure on it as critical and will reach out to the master to fail the 
shard. To do so, we wait for the local checkpoint of the recovered shard to be above the global 
checkpoint (in order to maintain global checkpoint invariant).

If the master decides to cancel the allocation of the recovering shard while we wait, the method can 
currently hang and fail to return. It will also ignore the interrupts that are triggered by the cancelled 
recovery due to the primary closing. 

Note that this is crucial as this method is called while holding a primary permit. Since the method 
never comes back, the permit is never released. The unreleased permit will then block any primary 
relocation *and* while the primary is trying to relocate all indexing will be blocked for 30m as it 
waits to acquire the missing permit.
  • Loading branch information
bleskes committed May 2, 2018
1 parent d502578 commit a65fe9d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,11 @@ private boolean invariant() {
"shard copy " + entry.getKey() + " is in-sync but not tracked";
}

// all pending in sync shards are tracked
for (String aId : pendingInSync) {
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
}

return true;
}

Expand Down Expand Up @@ -521,6 +526,9 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync));
}
}
if (removedEntries) {
pendingInSync.removeIf(aId -> checkpoints.containsKey(aId) == false);
}
} else {
for (String initializingId : initializingAllocationIds) {
if (shardAllocationId.equals(initializingId) == false) {
Expand Down Expand Up @@ -549,6 +557,8 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
replicationGroup = calculateReplicationGroup();
if (primaryMode && removedEntries) {
updateGlobalCheckpointOnPrimary();
// notify any waiter for local checkpoint advancement to recheck that their shard is still being tracked.
notifyAllWaiters();
}
}
assert invariant();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ public void testWaitForAllocationIdToBeInSync() throws Exception {
final AllocationId inSyncAllocationId = AllocationId.newInitializing();
final AllocationId trackingAllocationId = AllocationId.newInitializing();
final ReplicationTracker tracker = newTracker(inSyncAllocationId);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()),
final long clusterStateVersion = randomNonNegativeLong();
tracker.updateFromMaster(clusterStateVersion, Collections.singleton(inSyncAllocationId.getId()),
routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId), emptySet());
tracker.activatePrimaryMode(globalCheckpoint);
final Thread thread = new Thread(() -> {
Expand Down Expand Up @@ -336,13 +337,22 @@ public void testWaitForAllocationIdToBeInSync() throws Exception {
assertBusy(() -> assertTrue(tracker.pendingInSync.contains(trackingAllocationId.getId())));
}

tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64));
// synchronize with the waiting thread to mark that it is complete
barrier.await();
assertTrue(complete.get());
assertTrue(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync);
if (randomBoolean()) {
// normal path, shard catches up
tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64));
// synchronize with the waiting thread to mark that it is complete
barrier.await();
assertTrue(complete.get());
assertTrue(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync);
} else {
// master changes its mind and cancels the allocation
tracker.updateFromMaster(clusterStateVersion + 1, Collections.singleton(inSyncAllocationId.getId()),
routingTable(emptySet(), inSyncAllocationId), emptySet());
barrier.await();
assertTrue(complete.get());
assertNull(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()));
}
assertFalse(tracker.pendingInSync.contains(trackingAllocationId.getId()));

thread.join();
}

Expand Down

0 comments on commit a65fe9d

Please sign in to comment.