diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a717b34eede37..e62f1ed040124 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -48,7 +48,6 @@ import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -412,21 +411,10 @@ public void updateShardState(final ShardRouting newRouting, if (state == IndexShardState.POST_RECOVERY && newRouting.active()) { assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting; - if (newRouting.primary()) { - final DiscoveryNode recoverySourceNode = recoveryState.getSourceNode(); - final Engine engine = getEngine(); - if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) { - // there was no primary context hand-off in < 6.0.0, need to manually activate the shard - replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpoint()); - } - if (currentRouting.isRelocationTarget() == true && recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) { - // Flush the translog as it may contain operations with no sequence numbers. We want to make sure those - // operations will never be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq# - // (due to active indexing) and operations without a seq# coming from the translog. We therefore flush - // to create a lucene commit point to an empty translog file. - engine.flush(false, true); - } - } + assert currentRouting.isRelocationTarget() == false || currentRouting.primary() == false || + recoveryState.getSourceNode().getVersion().before(Version.V_6_0_0_alpha1) || + replicationTracker.isPrimaryMode() : + "a primary relocation is completed by the master, but primary mode is not active " + currentRouting; changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); } else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isPrimaryMode() == false && @@ -442,7 +430,22 @@ public void updateShardState(final ShardRouting newRouting, final CountDownLatch shardStateUpdated = new CountDownLatch(1); if (newRouting.primary()) { - if (newPrimaryTerm != primaryTerm) { + if (newPrimaryTerm == primaryTerm) { + if (currentRouting.initializing() && newRouting.active()) { + if (currentRouting.isRelocationTarget() == false) { + // the master started a recovering primary, activate primary mode. + replicationTracker.activatePrimaryMode(getLocalCheckpoint()); + } else if (recoveryState.getSourceNode().getVersion().before(Version.V_6_0_0_alpha1)) { + // there was no primary context hand-off in < 6.0.0, need to manually activate the shard + replicationTracker.activatePrimaryMode(getLocalCheckpoint()); + // Flush the translog as it may contain operations with no sequence numbers. We want to make sure those + // operations will never be replayed as part of peer recovery to avoid an arbitrary mixture of operations with + // seq# (due to active indexing) and operations without a seq# coming from the translog. We therefore flush + // to create a lucene commit point to an empty translog file. + getEngine().flush(false, true); + } + } + } else { assert currentRouting.primary() == false : "term is only increased as part of primary promotion"; /* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned * in one state causing it's term to be incremented. Note that if both current shard state and new @@ -539,6 +542,11 @@ public void onFailure(Exception e) { } // set this last, once we finished updating all internal state. this.shardRouting = newRouting; + + assert this.shardRouting.primary() == false || + this.shardRouting.started() == false || // note that we use started and not active to avoid relocating shards + this.replicationTracker.isPrimaryMode() + : "an started primary must be in primary mode " + this.shardRouting; shardStateUpdated.countDown(); } if (currentRouting != null && currentRouting.active() == false && newRouting.active()) { diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 018548be9629f..b05b1e5cc5ca0 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -363,7 +363,7 @@ public void testSeqNoCollision() throws Exception { logger.info("--> Promote replica2 as the primary"); shards.promoteReplicaToPrimary(replica2); logger.info("--> Recover replica3 from replica2"); - recoverReplica(replica3, replica2); + recoverReplica(replica3, replica2, true); try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); assertThat(snapshot.next(), equalTo(op2)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index bb6a007455433..6cf90ab2c3580 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -225,6 +225,7 @@ public void testPersistenceStateMetadataPersistence() throws Exception { } public void testFailShard() throws Exception { + allowShardFailures(); IndexShard shard = newStartedShard(); final ShardPath shardPath = shard.shardPath(); assertNotNull(shardPath); @@ -308,7 +309,8 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc } public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException { - final IndexShard indexShard = newStartedShard(false); + final IndexShard indexShard = newShard(false); + recoveryEmptyReplica(indexShard, randomBoolean()); final int operations = scaledRandomIntBetween(1, 64); final CyclicBarrier barrier = new CyclicBarrier(1 + operations); @@ -352,20 +354,10 @@ public void onFailure(Exception e) { barrier.await(); latch.await(); - // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); - final ShardRouting primaryRouting = - newShardRouting( - replicaRouting.shardId(), - replicaRouting.currentNodeId(), - null, - true, - ShardRoutingState.STARTED, - replicaRouting.allocationId()); - indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, - 0L, Collections.singleton(primaryRouting.allocationId().getId()), - new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), - Collections.emptySet()); + promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()), + new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build()); + final int delayedOperations = scaledRandomIntBetween(1, 64); final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations); @@ -427,8 +419,9 @@ public void onFailure(Exception e) { * 1) Internal state (ala ReplicationTracker) have been updated * 2) Primary term is set to the new term */ - public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException { - final IndexShard indexShard = newStartedShard(false); + public void testPublishingOrderOnPromotion() throws IOException, InterruptedException, BrokenBarrierException { + final IndexShard indexShard = newShard(false); + recoveryEmptyReplica(indexShard, randomBoolean()); final long promotedTerm = indexShard.getPrimaryTerm() + 1; final CyclicBarrier barrier = new CyclicBarrier(2); final AtomicBoolean stop = new AtomicBoolean(); @@ -447,18 +440,10 @@ public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierEx }); thread.start(); - final ShardRouting replicaRouting = indexShard.routingEntry(); - final ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, - ShardRoutingState.STARTED, replicaRouting.allocationId()); - - - final Set inSyncAllocationIds = Collections.singleton(primaryRouting.allocationId().getId()); - final IndexShardRoutingTable routingTable = - new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(); barrier.await(); - // promote the replica - indexShard.updateShardState(primaryRouting, promotedTerm, (shard, listener) -> {}, 0L, inSyncAllocationIds, routingTable, - Collections.emptySet()); + final ShardRouting replicaRouting = indexShard.routingEntry(); + promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()), + new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build()); stop.set(true); thread.join(); @@ -467,7 +452,8 @@ public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierEx public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { - final IndexShard indexShard = newStartedShard(false); + final IndexShard indexShard = newShard(false); + recoveryEmptyReplica(indexShard, randomBoolean()); // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); @@ -478,17 +464,8 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); - final ShardRouting primaryRouting = - newShardRouting( - replicaRouting.shardId(), - replicaRouting.currentNodeId(), - null, - true, - ShardRoutingState.STARTED, - replicaRouting.allocationId()); - indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, - 0L, Collections.singleton(primaryRouting.allocationId().getId()), - new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet()); + promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()), + new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build()); /* * This operation completing means that the delay operation executed as part of increasing the primary term has completed and the @@ -505,7 +482,7 @@ public void onResponse(Releasable releasable) { @Override public void onFailure(Exception e) { - throw new RuntimeException(e); + throw new AssertionError(e); } }, ThreadPool.Names.GENERIC, ""); @@ -845,7 +822,7 @@ public void testGlobalCheckpointSync() throws IOException { // add a replica recoverShardFromStore(primaryShard); final IndexShard replicaShard = newShard(shardId, false); - recoverReplica(replicaShard, primaryShard); + recoverReplica(replicaShard, primaryShard, true); final int maxSeqNo = randomIntBetween(0, 128); for (int i = 0; i <= maxSeqNo; i++) { EngineTestCase.generateNewSeqNo(primaryShard.getEngine()); @@ -1624,7 +1601,7 @@ public void testPrimaryHandOffUpdatesLocalCheckpoint() throws IOException { IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard()); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetaData()); - recoverReplica(primaryTarget, primarySource); + recoverReplica(primaryTarget, primarySource, true); // check that local checkpoint of new primary is properly tracked after primary relocation assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L)); @@ -2080,7 +2057,7 @@ public long indexTranslogOperations(List operations, int tot assertFalse(replica.isSyncNeeded()); return localCheckpoint; } - }, true); + }, true, true); closeShards(primary, replica); } @@ -2187,7 +2164,7 @@ public long indexTranslogOperations(List operations, int tot assertTrue(replica.isActive()); return localCheckpoint; } - }, false); + }, false, true); closeShards(primary, replica); } @@ -2239,7 +2216,7 @@ public void finalizeRecovery(long globalCheckpoint) throws IOException { super.finalizeRecovery(globalCheckpoint); assertListenerCalled.accept(replica); } - }, false); + }, false, true); closeShards(primary, replica); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 35bbc497838f2..5c6b000f7e519 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -357,6 +357,14 @@ public void updateShardState(ShardRouting shardRouting, assertTrue("and active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting, shardRouting.active()); } + if (this.shardRouting.primary()) { + assertTrue("a primary shard can't be demoted", shardRouting.primary()); + } else if (shardRouting.primary()) { + // note: it's ok for a replica in post recovery to be started and promoted at once + // this can happen when the primary failed after we sent the start shard message + assertTrue("a replica can only be promoted when active. current: " + this.shardRouting + " new: " + shardRouting, + shardRouting.active()); + } this.shardRouting = shardRouting; if (shardRouting.primary()) { term = newPrimaryTerm; diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 3b50fa649150c..4b1419375e6e5 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -43,7 +43,7 @@ public void testGetStartingSeqNo() throws Exception { try { // Empty store { - recoveryEmptyReplica(replica); + recoveryEmptyReplica(replica, true); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L)); recoveryTarget.decRef(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 537409f35d175..f9896d3116dd7 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -261,7 +261,7 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception { } IndexShard replicaShard = newShard(primaryShard.shardId(), false); updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData()); - recoverReplica(replicaShard, primaryShard); + recoverReplica(replicaShard, primaryShard, true); List commits = DirectoryReader.listCommits(replicaShard.store().directory()); long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index a3171c7f14161..cfa2e49929949 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -265,7 +265,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP RecoverySource.PeerRecoverySource.INSTANCE); final IndexShard newReplica = - newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER); + newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER); replicas.add(newReplica); updateAllocationIDsOnPrimary(); return newReplica; @@ -341,8 +341,11 @@ public void recoverReplica( IndexShard replica, BiFunction targetSupplier, boolean markAsRecovering) throws IOException { - ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering, activeIds(), - routingTable(Function.identity())); + final IndexShardRoutingTable routingTable = routingTable(Function.identity()); + final Set inSyncIds = activeIds(); + ESIndexLevelReplicationTestCase.this.recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds, + routingTable); + ESIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); } public synchronized DiscoveryNode getPrimaryNode() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 00b73d94d67e2..889de5d274f34 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -92,8 +92,10 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.function.Consumer; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; @@ -108,6 +110,14 @@ public abstract class IndexShardTestCase extends ESTestCase { public static final IndexEventListener EMPTY_EVENT_LISTENER = new IndexEventListener() {}; + private static final AtomicBoolean failOnShardFailures = new AtomicBoolean(true); + + private static final Consumer DEFAULT_SHARD_FAILURE_HANDLER = failure -> { + if (failOnShardFailures.get()) { + throw new AssertionError(failure.reason, failure.cause); + } + }; + protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() { @Override public void onRecoveryDone(RecoveryState state) { @@ -128,6 +138,7 @@ public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName(), threadPoolSettings()); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards + failOnShardFailures.set(true); } @Override @@ -139,6 +150,15 @@ public void tearDown() throws Exception { } } + /** + * by default, tests will fail if any shard created by this class fails. Tests that cause failures by design + * can call this method to ignore those failures + * + */ + protected void allowShardFailures() { + failOnShardFailures.set(false); + } + public Settings threadPoolSettings() { return Settings.EMPTY; } @@ -270,7 +290,7 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, /** * creates a new initializing shard. - * @param routing shard routing to use + * @param routing shard routing to use * @param shardPath path to use for shard data * @param indexMetaData indexMetaData for the shard, including any mapping * @param indexSearcherWrapper an optional wrapper to be used during searchers @@ -302,6 +322,7 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe engineFactory, indexEventListener, indexSearcherWrapper, threadPool, BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer, breakerService); + indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; } finally { if (success == false) { @@ -358,7 +379,7 @@ protected IndexShard newStartedShard(boolean primary) throws IOException { if (primary) { recoverShardFromStore(shard); } else { - recoveryEmptyReplica(shard); + recoveryEmptyReplica(shard, true); } return shard; } @@ -399,11 +420,11 @@ public static void updateRoutingEntry(IndexShard shard, ShardRouting shardRoutin inSyncIds, newRoutingTable, Collections.emptySet()); } - protected void recoveryEmptyReplica(IndexShard replica) throws IOException { + protected void recoveryEmptyReplica(IndexShard replica, boolean startReplica) throws IOException { IndexShard primary = null; try { primary = newStartedShard(true); - recoverReplica(replica, primary); + recoverReplica(replica, primary, startReplica); } finally { closeShards(primary); } @@ -415,42 +436,48 @@ protected DiscoveryNode getFakeDiscoNode(String id) { } /** recovers a replica from the given primary **/ - protected void recoverReplica(IndexShard replica, IndexShard primary) throws IOException { + protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { recoverReplica(replica, primary, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> { }), - true); + true, true); } /** recovers a replica from the given primary **/ protected void recoverReplica(final IndexShard replica, final IndexShard primary, final BiFunction targetSupplier, - final boolean markAsRecovering) throws IOException { + final boolean markAsRecovering, final boolean markAsStarted) throws IOException { IndexShardRoutingTable.Builder newRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId()); newRoutingTable.addShard(primary.routingEntry()); if (replica.routingEntry().isRelocationTarget() == false) { newRoutingTable.addShard(replica.routingEntry()); } - recoverReplica(replica, primary, targetSupplier, markAsRecovering, - Collections.singleton(primary.routingEntry().allocationId().getId()), - newRoutingTable.build()); + final Set inSyncIds = Collections.singleton(primary.routingEntry().allocationId().getId()); + final IndexShardRoutingTable routingTable = newRoutingTable.build(); + recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds, routingTable); + if (markAsStarted) { + startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); + } } /** * Recovers a replica from the give primary, allow the user to supply a custom recovery target. A typical usage of a custom recovery * target is to assert things in the various stages of recovery. + * + * Note: this method keeps the shard in {@link IndexShardState#POST_RECOVERY} and doesn't start it. + * * @param replica the recovery target shard * @param primary the recovery source shard * @param targetSupplier supplies an instance of {@link RecoveryTarget} * @param markAsRecovering set to {@code false} if the replica is marked as recovering */ - protected final void recoverReplica(final IndexShard replica, - final IndexShard primary, - final BiFunction targetSupplier, - final boolean markAsRecovering, - final Set inSyncIds, - final IndexShardRoutingTable routingTable) throws IOException { + protected final void recoverUnstartedReplica(final IndexShard replica, + final IndexShard primary, + final BiFunction targetSupplier, + final boolean markAsRecovering, + final Set inSyncIds, + final IndexShardRoutingTable routingTable) throws IOException { final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); if (markAsRecovering) { @@ -478,11 +505,15 @@ protected final void recoverReplica(final IndexShard replica, request, (int) ByteSizeUnit.MB.toBytes(1), Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), pNode.getName()).build()); - final ShardRouting initializingReplicaRouting = replica.routingEntry(); primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable, Collections.emptySet()); recovery.recoverToTarget(); recoveryTarget.markAsDone(); + } + + protected void startReplicaAfterRecovery(IndexShard replica, IndexShard primary, Set inSyncIds, + IndexShardRoutingTable routingTable) throws IOException { + ShardRouting initializingReplicaRouting = replica.routingEntry(); IndexShardRoutingTable newRoutingTable = initializingReplicaRouting.isRelocationTarget() ? new IndexShardRoutingTable.Builder(routingTable) @@ -502,6 +533,31 @@ protected final void recoverReplica(final IndexShard replica, currentClusterStateVersion.get(), inSyncIdsWithReplica, newRoutingTable, Collections.emptySet()); } + + /** + * promotes a replica to primary, incrementing it's term and starting it if needed + */ + protected void promoteReplica(IndexShard replica, Set inSyncIds, IndexShardRoutingTable routingTable) throws IOException { + assertThat(inSyncIds, contains(replica.routingEntry().allocationId().getId())); + final ShardRouting routingEntry = newShardRouting( + replica.routingEntry().shardId(), + replica.routingEntry().currentNodeId(), + null, + true, + ShardRoutingState.STARTED, + replica.routingEntry().allocationId()); + + final IndexShardRoutingTable newRoutingTable = new IndexShardRoutingTable.Builder(routingTable) + .removeShard(replica.routingEntry()) + .addShard(routingEntry) + .build(); + replica.updateShardState(routingEntry, replica.getPrimaryTerm() + 1, + (is, listener) -> + listener.onResponse(new PrimaryReplicaSyncer.ResyncTask(1, "type", "action", "desc", null, Collections.emptyMap())), + currentClusterStateVersion.incrementAndGet(), + inSyncIds, newRoutingTable, Collections.emptySet()); + } + private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException { Store.MetadataSnapshot result; try {