Skip to content

Commit

Permalink
Cancelling a peer recovery on the source can leak a primary permit (#…
Browse files Browse the repository at this point in the history
…30318)

The code in `SourceRecoveryHandler` runs under a `CancellableThreads` instance in order to allow long running operations to be interrupted when the recovery is cancelled. Sadly if this happens at just the wrong moment while acquiring a permit from the primary, that primary can be leaked and never be freed.

Note that this is slightly better than it sounds - we only cancel recoveries on the source side if the primary shard itself is closed.

Relates to #30316
  • Loading branch information
bleskes committed May 2, 2018
1 parent 62f2918 commit af45b4d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
Expand All @@ -44,6 +43,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
Expand All @@ -67,6 +68,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -142,7 +144,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ");
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);

try (Closeable ignored = shard.acquireTranslogRetentionLock()) {
final long startingSeqNo;
Expand Down Expand Up @@ -196,7 +198,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
* all documents up to maxSeqNo in phase2.
*/
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId());
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
Expand Down Expand Up @@ -227,17 +229,41 @@ private boolean isTargetSameHistory() {
return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID());
}

private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason) {
static void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason,
IndexShard primary, CancellableThreads cancellableThreads, Logger logger) {
cancellableThreads.execute(() -> {
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
try (Releasable ignored = onAcquired.actionGet()) {
CompletableFuture<Releasable> permit = new CompletableFuture<>();
final ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
if (permit.complete(releasable) == false) {
releasable.close();
}
}

@Override
public void onFailure(Exception e) {
permit.completeExceptionally(e);
}
};
primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
try (Releasable ignored = FutureUtils.get(permit)) {
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
// races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
if (shard.isPrimaryMode() == false) {
throw new IndexShardRelocatedException(shard.shardId());
if (primary.isPrimaryMode() == false) {
throw new IndexShardRelocatedException(primary.shardId());
}
runnable.run();
} finally {
// just in case we got an exception (likely interrupted) while waiting for the get
permit.whenComplete((r, e) -> {
if (r != null) {
r.close();
}
if (e != null) {
logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
}
});
}
});
}
Expand Down Expand Up @@ -489,11 +515,11 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio
* the permit then the state of the shard will be relocated and this recovery will fail.
*/
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
shardId + " marking " + request.targetAllocationId() + " as in sync");
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getGlobalCheckpoint();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint");
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);

if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -439,6 +440,30 @@ long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
assertFalse(phase2Called.get());
}

public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception {
final CancellableThreads cancellableThreads = new CancellableThreads();
final IndexShard shard = mock(IndexShard.class);
final AtomicBoolean freed = new AtomicBoolean(true);
when(shard.isPrimaryMode()).thenReturn(true);
doAnswer(invocation -> {
freed.set(false);
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> freed.set(true));
return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());

Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test"));
cancelingThread.start();
try {
RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads, logger);
} catch (CancellableThreads.ExecutionCancelledException e) {
// expected.
}
cancelingThread.join();
// we have to use assert busy as we may be interrupted while acquiring the permit, if so we want to check
// that the permit is released.
assertBusy(() -> assertTrue(freed.get()));
}

private Store newStore(Path path) throws IOException {
return newStore(path, true);
}
Expand Down

0 comments on commit af45b4d

Please sign in to comment.