From becbc4cb1e74c547e6dd8cb37d7f15abfb9239c8 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Mon, 3 Jul 2023 10:24:25 -0700 Subject: [PATCH] Fix bug where ReplicationListeners would not complete on target cancellation. Signed-off-by: Marc Handalian --- .../ICUCollationKeywordFieldMapperIT.java | 6 + .../replication/SegmentReplicationBaseIT.java | 18 +- .../SegmentReplicationSuiteIT.java | 130 ++++++++++++++ .../index/engine/NRTReplicationEngine.java | 3 +- .../opensearch/index/shard/IndexShard.java | 8 +- .../org/opensearch/index/store/Store.java | 13 +- .../recovery/RecoverySourceHandler.java | 35 ++-- .../SegmentReplicationSourceHandler.java | 1 + .../SegmentReplicationSourceService.java | 4 +- .../replication/SegmentReplicationState.java | 11 +- .../replication/SegmentReplicationTarget.java | 169 ++++++++---------- .../SegmentReplicationTargetService.java | 80 +++++---- .../UnrecoverableReplicationException.java | 20 +++ .../common/ReplicationCollection.java | 50 +++++- .../SegmentReplicationIndexShardTests.java | 105 ++++++++++- .../SegmentReplicationTargetServiceTests.java | 2 - 16 files changed, 478 insertions(+), 177 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/UnrecoverableReplicationException.java diff --git a/plugins/analysis-icu/src/internalClusterTest/java/org/opensearch/index/mapper/ICUCollationKeywordFieldMapperIT.java b/plugins/analysis-icu/src/internalClusterTest/java/org/opensearch/index/mapper/ICUCollationKeywordFieldMapperIT.java index 4f596034bfece..95682e6f0c251 100644 --- a/plugins/analysis-icu/src/internalClusterTest/java/org/opensearch/index/mapper/ICUCollationKeywordFieldMapperIT.java +++ b/plugins/analysis-icu/src/internalClusterTest/java/org/opensearch/index/mapper/ICUCollationKeywordFieldMapperIT.java @@ -110,6 +110,12 @@ public void testBasicUsage() throws Exception { assertOrderedSearchHits(response, "2", "1"); } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + public void testMultipleValues() throws Exception { String index = "foo"; diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 52fe85b51cebd..7e16669fb57c7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -22,9 +22,11 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; @@ -186,15 +188,25 @@ protected void verifyStoreContent() throws Exception { } private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) { - return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName); + return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName); + } + + protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional id = indexService.shardIds().stream() + .filter(sid -> sid == shardId.id()) + .findFirst(); + return indexService.getShard(id.get()); } protected IndexShard getIndexShard(String node, String indexName) { final Index index = resolveIndex(indexName); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); IndexService indexService = indicesService.indexServiceSafe(index); - final Optional shardId = indexService.shardIds().stream().findFirst(); - return indexService.getShard(shardId.get()); + final Optional id = indexService.shardIds().stream().findFirst(); + return indexService.getShard(id.get()); } protected Releasable blockReplication(List nodes, CountDownLatch latch) { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java new file mode 100644 index 0000000000000..46aeafbd5b451 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.discovery.AbstractDisruptionTestCase; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) +public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT { + + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + // set up remote store repo, will randomly enable it in this suite. + Path absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + createIndex(INDEX_NAME); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + @Override + public Settings indexSettings() { + final Settings.Builder builder = Settings.builder() + .put(super.indexSettings()) + // reset shard & replica count to random values set by OpenSearchIntegTestCase. + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards()) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + + //TODO: Randomly enable remote store on these tests. +// if (randomBoolean()) { +// builder +// .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) +// .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME); +// } + return builder.build(); + } + + public void testBasicReplication() throws Exception { + final int docCount = scaledRandomIntBetween(10, 200); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(); + ensureGreen(INDEX_NAME); + verifyStoreContent(); + } + + public void testDropReplicaDuringCopy() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + internalCluster().startClusterManagerOnlyNodes(1); + + final int docCount = scaledRandomIntBetween(10, 200); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + + internalCluster().restartRandomDataNode(); + + ensureYellow(INDEX_NAME); + client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get(); + internalCluster().startDataOnlyNode(); + client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); + } + + public void testDeleteIndexWhileReplicating() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int docCount = scaledRandomIntBetween(10, 200); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); + } + + public void testCancelWhileOngoingRecovery() throws Exception { + internalCluster().startNode(); + final int docCount = scaledRandomIntBetween(10, 200); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + // start a node to trigger a recovery + internalCluster().startNode(); + internalCluster().fullRestart(); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 50b5fbb8596a6..cb576286583c2 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -129,8 +129,7 @@ private NRTReplicationReaderManager buildReaderManager() throws IOException { store.cleanupAndPreserveLatestCommitPoint( "On reader closed", getLatestSegmentInfos(), - getLastCommittedSegmentInfos(), - false + getLastCommittedSegmentInfos() ); } catch (IOException e) { // Log but do not rethrow - we can try cleaning up again after next replication cycle. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 01c0a12d463ea..528f4f2985aad 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -816,7 +816,13 @@ public void relocated( assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED : "in-flight operations in progress while moving shard state to relocated"; - performSegRep.run(); + try { + logger.info("Starting force sync from relocated"); + performSegRep.run(); + } catch (Exception e) { + logger.error("Hmm", e); + } + logger.info("Finished force sync from relocated -- continue handoff"); /* * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 90832b4c77756..ff9cee7115af5 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -799,7 +799,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException { - this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true); + this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo()); } /** @@ -816,22 +816,20 @@ public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos info * @param reason the reason for this cleanup operation logged for each deleted file * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. * @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos - * @param deleteTempFiles Does this clean up delete temporary replication files * * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ public void cleanupAndPreserveLatestCommitPoint( String reason, SegmentInfos infos, - SegmentInfos lastCommittedSegmentInfos, - boolean deleteTempFiles + SegmentInfos lastCommittedSegmentInfos ) throws IOException { assert indexSettings.isSegRepEnabled(); // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true), deleteTempFiles); + cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true)); } finally { metadataLock.writeLock().unlock(); } @@ -840,8 +838,7 @@ public void cleanupAndPreserveLatestCommitPoint( private void cleanupFiles( String reason, Collection localSnapshot, - @Nullable Collection additionalFiles, - boolean deleteTempFiles + @Nullable Collection additionalFiles ) throws IOException { assert metadataLock.isWriteLockedByCurrentThread(); for (String existingFile : directory.listAll()) { @@ -851,7 +848,7 @@ private void cleanupFiles( // also ensure we are not deleting a file referenced by an active reader. || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false // prevent temporary file deletion during reader cleanup - || deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) { + || existingFile.startsWith(REPLICATION_PREFIX)) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) continue; diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 5e278f06cfb8f..e11b8cd982a13 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -396,7 +396,7 @@ void phase1( phase1ExistingFileSizes.add(md.length()); existingTotalSizeInBytes += md.length(); if (logger.isTraceEnabled()) { - logger.trace( + logger.info( "recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," + " size [{}]", md.name(), md.checksum(), @@ -410,21 +410,21 @@ void phase1( phase1Files.addAll(diff.missing); for (StoreFileMetadata md : phase1Files) { if (request.metadataSnapshot().asMap().containsKey(md.name())) { - logger.trace( + logger.info( "recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", md.name(), request.metadataSnapshot().asMap().get(md.name()), md ); } else { - logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name()); + logger.info("recovery [phase1]: recovering [{}], does not exist in remote", md.name()); } phase1FileNames.add(md.name()); phase1FileSizes.add(md.length()); totalSizeInBytes += md.length(); } - logger.trace( + logger.info( "recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), @@ -472,7 +472,7 @@ void phase1( final long existingTotalSize = existingTotalSizeInBytes; cleanFilesStep.whenComplete(r -> { final TimeValue took = stopWatch.totalTime(); - logger.trace("recovery [phase1]: took [{}]", took); + logger.info("recovery [phase1]: took [{}]", took); listener.onResponse( new SendFileResult( phase1FileNames, @@ -486,14 +486,14 @@ void phase1( ); }, listener::onFailure); } else { - logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId()); + logger.info("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId()); // but we must still create a retention lease final StepListener createRetentionLeaseStep = new StepListener<>(); createRetentionLease(startingSeqNo, createRetentionLeaseStep); createRetentionLeaseStep.whenComplete(retentionLease -> { final TimeValue took = stopWatch.totalTime(); - logger.trace("recovery [phase1]: took [{}]", took); + logger.info("recovery [phase1]: took [{}]", took); listener.onResponse( new SendFileResult( Collections.emptyList(), @@ -533,14 +533,14 @@ void createRetentionLease(final long startingSeqNo, ActionListener cloneRetentionLeaseStep = new StepListener<>(); final RetentionLease clonedLease = shard.cloneLocalPeerRecoveryRetentionLease( request.targetNode().getId(), new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false) ); - logger.trace("cloned primary's retention lease as [{}]", clonedLease); + logger.info("cloned primary's retention lease as [{}]", clonedLease); cloneRetentionLeaseStep.whenComplete(rr -> listener.onResponse(clonedLease), listener::onFailure); } catch (RetentionLeaseNotFoundException e) { // it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before @@ -555,7 +555,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, addRetentionLeaseStep, false) ); addRetentionLeaseStep.whenComplete(rr -> listener.onResponse(newLease), listener::onFailure); - logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); + logger.info("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); } }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger); } @@ -602,12 +602,12 @@ void prepareTargetForTranslog(int totalTranslogOps, ActionListener li final ActionListener wrappedListener = ActionListener.wrap(nullVal -> { stopWatch.stop(); final TimeValue tookTime = stopWatch.totalTime(); - logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); + logger.info("recovery [phase1]: remote engine start took [{}]", tookTime); listener.onResponse(tookTime); }, e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e))); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. - logger.trace("recovery [phase1]: prepare remote engine for translog"); + logger.info("recovery [phase1]: prepare remote engine for translog"); cancellableThreads.checkForCancel(); recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener); } @@ -640,7 +640,7 @@ void phase2( if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } - logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]"); + logger.info("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]"); final StopWatch stopWatch = new StopWatch().start(); final StepListener sendListener = new StepListener<>(); final OperationBatchSender sender = new OperationBatchSender( @@ -667,7 +667,7 @@ void phase2( ); stopWatch.stop(); final TimeValue tookTime = stopWatch.totalTime(); - logger.trace("recovery [phase2]: took [{}]", tookTime); + logger.info("recovery [phase2]: took [{}]", tookTime); listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime)); }, listener::onFailure); sender.start(); @@ -791,7 +791,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis } cancellableThreads.checkForCancel(); StopWatch stopWatch = new StopWatch().start(); - logger.trace("finalizing recovery"); + logger.info("finalizing recovery"); /* * Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a * shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done @@ -819,7 +819,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis ); if (request.isPrimaryRelocation()) { - logger.trace("performing relocation hand-off"); + logger.info("performing relocation hand-off"); final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabled() ? recoveryTarget::forceSegmentFileSync : () -> {}; @@ -835,7 +835,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis } else { // Force round of segment replication to update its checkpoint to primary's if (shard.indexSettings().isSegRepEnabled()) { - recoveryTarget.forceSegmentFileSync(); + cancellableThreads.execute(recoveryTarget::forceSegmentFileSync); } } stopWatch.stop(); @@ -865,6 +865,7 @@ static final class SendSnapshotResult { * Cancels the recovery and interrupts all eligible threads. */ public void cancel(String reason) { + logger.info("Cancelling recovery {}", shard.shardId()); cancellableThreads.cancel(reason); recoveryTarget.cancel(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index f754f5f625fb1..873a9e8d4babc 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -180,6 +180,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene * Cancels the replication and interrupts all eligible threads. */ public void cancel(String reason) { + logger.info("Cancelling source handler"); writer.cancel(); cancellableThreads.cancel(reason); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 79186deeeaf0f..e4eb92f080247 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -131,7 +131,7 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); timer.stop(); - logger.trace( + logger.info( new ParameterizedMessage( "[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", request.getReplicationId(), @@ -206,6 +206,7 @@ protected void doStart() { protected void doStop() { final ClusterService clusterService = indicesService.clusterService(); if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + logger.info("Node close - Ongoing Replication events: {} {}", ongoingSegmentReplications.size(), ongoingSegmentReplications.getHandlers().size()); indicesService.clusterService().removeListener(this); } } @@ -222,6 +223,7 @@ protected void doClose() throws IOException { @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()) { + logger.info("Cancelling for shard {}", shardId); ongoingSegmentReplications.cancel(indexShard, "shard is closed"); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index 7a996ec7aedaa..226ccbaf01afa 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -45,8 +45,7 @@ public enum Stage { GET_CHECKPOINT_INFO((byte) 3), FILE_DIFF((byte) 4), GET_FILES((byte) 5), - FINALIZE_REPLICATION((byte) 6), - CANCELLED((byte) 7); + FINALIZE_REPLICATION((byte) 6); private static final Stage[] STAGES = new Stage[Stage.values().length]; @@ -245,14 +244,6 @@ public void setStage(Stage stage) { overallTimer.stop(); timingData.put("OVERALL", overallTimer.time()); break; - case CANCELLED: - if (this.stage == Stage.DONE) { - throw new IllegalStateException("can't move replication to Cancelled state from Done."); - } - this.stage = Stage.CANCELLED; - overallTimer.stop(); - timingData.put("OVERALL", overallTimer.time()); - break; default: throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]"); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 22c68ad46fea6..dcd2abc46e7f8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -17,7 +17,6 @@ import org.apache.lucene.store.ByteBuffersDataInput; import org.apache.lucene.store.ByteBuffersIndexInput; import org.apache.lucene.store.ChecksumIndexInput; -import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -38,6 +37,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.List; /** * Represents the target of a replication event. @@ -101,17 +101,11 @@ public SegmentReplicationTarget retryCopy() { @Override public String description() { - return "Segment replication from " + source.toString(); + return "Segment replication from " + source.getDescription(); } @Override public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) { - // Cancellations still are passed to our SegmentReplicationListener as failures, if we have failed because of cancellation - // update the stage. - final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class); - if (cancelledException != null) { - state.setStage(SegmentReplicationState.Stage.CANCELLED); - } listener.onFailure(state(), e, sendShardFailure); } @@ -140,42 +134,45 @@ public void writeFileChunk( /** * Start the Replication event. + * * @param listener {@link ActionListener} listener. */ public void startReplication(ActionListener listener) { cancellableThreads.setOnCancel((reason, beforeCancelEx) -> { - // This method only executes when cancellation is triggered by this node and caught by a call to checkForCancel, - // SegmentReplicationSource does not share CancellableThreads. - final CancellableThreads.ExecutionCancelledException executionCancelledException = - new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]"); - notifyListener(new ReplicationFailedException("Segment replication failed", executionCancelledException), false); - throw executionCancelledException; + throw new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]"); }); state.setStage(SegmentReplicationState.Stage.REPLICATING); final StepListener checkpointInfoListener = new StepListener<>(); final StepListener getFilesListener = new StepListener<>(); - final StepListener finalizeListener = new StepListener<>(); - cancellableThreads.checkForCancel(); - logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId()); + logger.info("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId()); // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); + cancellableThreads.checkForCancel(); source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); - checkpointInfoListener.whenComplete(checkpointInfo -> getFiles(checkpointInfo, getFilesListener), listener::onFailure); + checkpointInfoListener.whenComplete(checkpointInfo -> { + final List filesToFetch = getFiles(checkpointInfo); + state.setStage(SegmentReplicationState.Stage.GET_FILES); + cancellableThreads.checkForCancel(); + source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener); + }, listener::onFailure); + getFilesListener.whenComplete( - response -> finalizeReplication(checkpointInfoListener.result(), finalizeListener), + response -> { + finalizeReplication(checkpointInfoListener.result()); + listener.onResponse(null); + }, listener::onFailure ); - finalizeListener.whenComplete(r -> listener.onResponse(null), listener::onFailure); } - private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getFilesListener) + private List getFiles(CheckpointInfoResponse checkpointInfo) throws IOException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap()); - logger.trace("Replication diff for checkpoint {} {}", checkpointInfo.getCheckpoint(), diff); + logger.info("Replication diff for checkpoint {} {}", checkpointInfo.getCheckpoint(), diff); /* * Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming * snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an @@ -189,95 +186,80 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { + private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) { // TODO: Refactor the logic so that finalize doesn't have to be invoked for remote store as source if (source instanceof RemoteStoreReplicationSource) { - ActionListener.completeWith(listener, () -> { - state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); - return null; - }); + state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); return; } - ActionListener.completeWith(listener, () -> { + cancellableThreads.checkForCancel(); + state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); + Store store = null; + try { + multiFileWriter.renameAllTempFiles(); + store = store(); + store.incRef(); + // Deserialize the new SegmentInfos object sent from the primary. + final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); + SegmentInfos infos = SegmentInfos.readCommit( + store.directory(), + toIndexInput(checkpointInfoResponse.getInfosBytes()), + responseCheckpoint.getSegmentsGen() + ); cancellableThreads.checkForCancel(); - state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); - Store store = null; + indexShard.finalizeReplication(infos); + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. try { - multiFileWriter.renameAllTempFiles(); - store = store(); - store.incRef(); - // Deserialize the new SegmentInfos object sent from the primary. - final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); - SegmentInfos infos = SegmentInfos.readCommit( - store.directory(), - toIndexInput(checkpointInfoResponse.getInfosBytes()), - responseCheckpoint.getSegmentsGen() - ); - cancellableThreads.checkForCancel(); - indexShard.finalizeReplication(infos); - } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { - // this is a fatal exception at this stage. - // this means we transferred files from the remote that have not be checksummed and they are - // broken. We have to clean up this shard entirely, remove all files and bubble it up to the - // source shard since this index might be broken there as well? The Source can handle this and checks - // its content on disk if possible. try { - try { - store.removeCorruptionMarker(); - } finally { - Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files - } - } catch (Exception e) { - logger.debug("Failed to clean lucene index", e); - ex.addSuppressed(e); + store.removeCorruptionMarker(); + } finally { + Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files } - ReplicationFailedException rfe = new ReplicationFailedException( - indexShard.shardId(), - "failed to clean after replication", - ex - ); - fail(rfe, true); - throw rfe; - } catch (OpenSearchException ex) { + } catch (Exception e) { + logger.debug("Failed to clean lucene index", e); + ex.addSuppressed(e); + } + throw new UnrecoverableReplicationException( + shardId(), + "failed to clean after replication", + ex + ); + } catch (OpenSearchException ex) { /* Ignore closed replication target as it can happen due to index shard closed event in a separate thread. In such scenario, ignore the exception */ - assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled"; - logger.info("Replication target closed", ex); - } catch (Exception ex) { - ReplicationFailedException rfe = new ReplicationFailedException( - indexShard.shardId(), - "failed to clean after replication", - ex - ); - fail(rfe, true); - throw rfe; - } finally { - if (store != null) { - store.decRef(); - } + assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled"; + logger.info("Replication target already closed", ex); + } catch (Exception ex) { + throw new UnrecoverableReplicationException( + shardId(), + "failed to clean after replication", + ex + ); + } finally { + if (store != null) { + store.decRef(); } - return null; - }); + } } /** @@ -290,10 +272,15 @@ private ChecksumIndexInput toIndexInput(byte[] input) { ); } + /** + * Trigger a cancellation, this method will not close the target a subsequent call to #fail is required from target service. + */ @Override - protected void onCancel(String reason) { - cancellableThreads.cancel(reason); - source.cancel(); - multiFileWriter.close(); + public void cancel(String reason) { + if (finished.get() == false) { + logger.info("Cancelling replication for target {} {}", getId(), source.getDescription()); + cancellableThreads.cancel(reason); + source.cancel(); + } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index a7e0c0ec887ab..924ab142c0159 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -11,14 +11,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; +import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -145,7 +146,8 @@ public SegmentReplicationTargetService( @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()) { - onGoingReplications.cancelForShard(shardId, "shard closed"); + logger.info("[shardId {}] cancelling replication for shard", indexShard.shardId()); + onGoingReplications.requestCancel(indexShard.shardId(), "Shard closing"); latestReceivedCheckpoint.remove(shardId); } } @@ -167,7 +169,7 @@ public void afterIndexShardStarted(IndexShard indexShard) { @Override public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) { if (oldRouting != null && indexShard.indexSettings().isSegRepEnabled() && oldRouting.primary() == false && newRouting.primary()) { - onGoingReplications.cancelForShard(indexShard.shardId(), "shard has been promoted to primary"); + onGoingReplications.requestCancel(indexShard.shardId(), "Shard closing"); latestReceivedCheckpoint.remove(indexShard.shardId()); } } @@ -207,11 +209,11 @@ public SegmentReplicationState getSegmentReplicationState(ShardId shardId) { * @param replicaShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { - logger.trace(() -> new ParameterizedMessage("Replica received new replication checkpoint from primary [{}]", receivedCheckpoint)); + logger.info(() -> new ParameterizedMessage("Replica received new replication checkpoint from primary [{}]", receivedCheckpoint)); // if the shard is in any state if (replicaShard.state().equals(IndexShardState.CLOSED)) { // ignore if shard is closed - logger.trace(() -> "Ignoring checkpoint, Shard is closed"); + logger.info(() -> "Ignoring checkpoint, Shard is closed"); return; } updateLatestReceivedCheckpoint(receivedCheckpoint, replicaShard); @@ -223,14 +225,14 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId()); if (ongoingReplicationTarget != null) { if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) { - logger.trace( + logger.info( "Cancelling ongoing replication from old primary with primary term {}", ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() ); onGoingReplications.cancel(ongoingReplicationTarget.getId(), "Cancelling stuck target after new primary"); completedReplications.put(replicaShard.shardId(), ongoingReplicationTarget); } else { - logger.trace( + logger.info( () -> new ParameterizedMessage( "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", replicaShard.getLatestReplicationCheckpoint() @@ -244,7 +246,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe startReplication(replicaShard, new SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { - logger.trace( + logger.info( () -> new ParameterizedMessage( "[shardId {}] [replication id {}] Replication complete to {}, timing data: {}", replicaShard.shardId().getId(), @@ -268,7 +270,7 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { - logger.trace( + logger.info( () -> new ParameterizedMessage( "[shardId {}] [replication id {}] Replication failed, timing data: {}", replicaShard.shardId().getId(), @@ -285,7 +287,7 @@ public void onReplicationFailure( } } else { - logger.trace( + logger.info( () -> new ParameterizedMessage("Ignoring checkpoint, shard not started {} {}", receivedCheckpoint, replicaShard.state()) ); } @@ -381,7 +383,7 @@ public SegmentReplicationTarget startReplication(final IndexShard indexShard, fi // pkg-private for integration tests void startReplication(final SegmentReplicationTarget target) { - final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout()); + final long replicationId = onGoingReplications.startSafe(target, recoverySettings.activityTimeout()); threadPool.generic().execute(new ReplicationRunner(replicationId)); } @@ -410,7 +412,7 @@ default void onFailure(ReplicationState state, ReplicationFailedException e, boo /** * Runnable implementation to trigger a replication event. */ - private class ReplicationRunner implements Runnable { + private class ReplicationRunner extends AbstractRunnable { final long replicationId; @@ -419,46 +421,51 @@ public ReplicationRunner(long replicationId) { } @Override - public void run() { + public void onFailure(Exception e) { + logger.error("CAUGHT ON FAILURE?", e); + onGoingReplications.fail(replicationId, new ReplicationFailedException("Unexpected Error during replication"), false); + } + + @Override + public void doRun() { start(replicationId); } } private void start(final long replicationId) { + final SegmentReplicationTarget target; try (ReplicationRef replicationRef = onGoingReplications.get(replicationId)) { // This check is for handling edge cases where the reference is removed before the ReplicationRunner is started by the // threadpool. if (replicationRef == null) { return; } - SegmentReplicationTarget target = onGoingReplications.getTarget(replicationId); - replicationRef.get().startReplication(new ActionListener<>() { + target = onGoingReplications.getTarget(replicationId); + } + try { + target.startReplication(new ActionListener<>() { @Override public void onResponse(Void o) { + logger.info("Finished replicating marking as done {}", target.shardId()); onGoingReplications.markAsDone(replicationId); if (target.state().getIndex().recoveredFileCount() != 0 && target.state().getIndex().recoveredBytes() != 0) { completedReplications.put(target.shardId(), target); } - } @Override public void onFailure(Exception e) { - Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof CancellableThreads.ExecutionCancelledException) { - if (onGoingReplications.getTarget(replicationId) != null) { - IndexShard indexShard = onGoingReplications.getTarget(replicationId).indexShard(); - // if the target still exists in our collection, the primary initiated the cancellation, fail the replication - // but do not fail the shard. Cancellations initiated by this node from Index events will be removed with - // onGoingReplications.cancel and not appear in the collection when this listener resolves. - onGoingReplications.fail(replicationId, new ReplicationFailedException(indexShard, cause), false); - completedReplications.put(target.shardId(), target); - } - } else { - onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false); + logger.error("Failed?", e); + if (e instanceof UnrecoverableReplicationException) { + onGoingReplications.fail(replicationId, (UnrecoverableReplicationException) e, true); + return; } + onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false); } }); + } catch (OpenSearchException e) { + logger.error("Replication cancelled", e); + onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false); } } @@ -492,10 +499,11 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha channel.sendResponse(TransportResponse.Empty.INSTANCE); return; } + logger.info("Starting force sync {}", indexShard.shardId()); startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { - logger.trace( + logger.info( () -> new ParameterizedMessage( "[shardId {}] [replication id {}] Replication complete to {}, timing data: {}", indexShard.shardId().getId(), @@ -508,19 +516,27 @@ public void onReplicationDone(SegmentReplicationState state) { // Promote engine type for primary target if (indexShard.recoveryState().getPrimary() == true) { indexShard.resetToWriteableEngine(); + logger.info("Shard promoted as primary {}", indexShard.shardId()); } else { // Update the replica's checkpoint on primary's replication tracker. updateVisibleCheckpoint(state.getReplicationId(), indexShard); } - channel.sendResponse(TransportResponse.Empty.INSTANCE); +// channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (InterruptedException | TimeoutException | IOException e) { throw new RuntimeException(e); + } finally { + try { + logger.info("Ack response"); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (IOException e) { + logger.error("Error sending response to primary"); + } } } @Override public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - logger.trace( + logger.info( () -> new ParameterizedMessage( "[shardId {}] [replication id {}] Replication failed, timing data: {}", indexShard.shardId().getId(), diff --git a/server/src/main/java/org/opensearch/indices/replication/UnrecoverableReplicationException.java b/server/src/main/java/org/opensearch/indices/replication/UnrecoverableReplicationException.java new file mode 100644 index 0000000000000..84574ce39b114 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/UnrecoverableReplicationException.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationFailedException; + +public class UnrecoverableReplicationException extends ReplicationFailedException { + + public UnrecoverableReplicationException(ShardId shardId, String extraInfo, Throwable cause) { + super(shardId, extraInfo, cause); + } + +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index e918ac0a79691..9ec86a990d0fc 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -70,6 +70,22 @@ public ReplicationCollection(Logger logger, ThreadPool threadPool) { this.threadPool = threadPool; } + /** + * Starts a new target event for a given shard, throws if this shard is already replicating. + * @param target + * @param activityTimeout + * @return + */ + public long startSafe(T target, TimeValue activityTimeout) { + synchronized (onGoingTargetEvents) { + final T event = getOngoingReplicationTarget(target.shardId()); + if (event != null) { + throw new ReplicationFailedException("Shard already replicating"); + } + return start(target, activityTimeout); + } + } + /** * Starts a new target event for the given shard, source node and state * @@ -83,7 +99,7 @@ public long start(T target, TimeValue activityTimeout) { private void startInternal(T target, TimeValue activityTimeout) { T existingTarget = onGoingTargetEvents.putIfAbsent(target.getId(), target); assert existingTarget == null : "found two Target instances with the same id"; - logger.trace("started {}", target.description()); + logger.info("started {}", target.description()); threadPool.schedule( new ReplicationMonitor(target.getId(), target.lastAccessTime(), activityTimeout), activityTimeout, @@ -118,10 +134,10 @@ public T reset(final long id, final TimeValue activityTimeout) { // Closes the current target boolean successfulReset = oldTarget.reset(newTarget.cancellableThreads()); if (successfulReset) { - logger.trace("restarted {}, previous id [{}]", newTarget.description(), oldTarget.getId()); + logger.info("restarted {}, previous id [{}]", newTarget.description(), oldTarget.getId()); return newTarget; } else { - logger.trace( + logger.info( "{} could not be reset as it is already cancelled, previous id [{}]", newTarget.description(), oldTarget.getId() @@ -171,7 +187,7 @@ public boolean cancel(long id, String reason) { T removed = onGoingTargetEvents.remove(id); boolean cancelled = false; if (removed != null) { - logger.trace("canceled {} (reason [{}])", removed.description(), reason); + logger.info("canceled {} (reason [{}])", removed.description(), reason); removed.cancel(reason); cancelled = true; } @@ -188,7 +204,7 @@ public boolean cancel(long id, String reason) { public void fail(long id, ReplicationFailedException e, boolean sendShardFailure) { T removed = onGoingTargetEvents.remove(id); if (removed != null) { - logger.trace("failing {}. Send shard failure: [{}]", removed.description(), sendShardFailure); + logger.info("failing {}. Send shard failure: [{}]", removed.description(), sendShardFailure); removed.fail(e, sendShardFailure); } } @@ -197,7 +213,7 @@ public void fail(long id, ReplicationFailedException e, boolean sendShardFailure public void markAsDone(long id) { T removed = onGoingTargetEvents.remove(id); if (removed != null) { - logger.trace("Marking {} as done", removed.description()); + logger.info("Marking {} as done", removed.description()); removed.markAsDone(); } } @@ -227,13 +243,29 @@ public boolean cancelForShard(ShardId shardId, String reason) { } } for (T removed : matchedTargets) { - logger.trace("canceled {} (reason [{}])", removed.description(), reason); + logger.info("canceled {} (reason [{}])", removed.description(), reason); removed.cancel(reason); cancelled = true; } return cancelled; } + /** + * Trigger cancel on the target but do not remove it from the collection. + * This is intended to be called to ensure replication events are removed from the collection + * only when the target has closed. + * + * @param shardId {@link ShardId} shard events to cancel + * @param reason {@link String} reason for cancellation + */ + public void requestCancel(ShardId shardId, String reason) { + for (T value : onGoingTargetEvents.values()) { + if (value.shardId().equals(shardId)) { + value.cancel(reason); + } + } + } + /** * Get target for shard * @@ -289,7 +321,7 @@ public void onFailure(Exception e) { protected void doRun() throws Exception { T status = onGoingTargetEvents.get(id); if (status == null) { - logger.trace("[monitor] no status found for [{}], shutting down", id); + logger.info("[monitor] no status found for [{}], shutting down", id); return; } long accessTime = status.lastAccessTime(); @@ -303,7 +335,7 @@ protected void doRun() throws Exception { return; } lastSeenAccessTime = accessTime; - logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", id, lastSeenAccessTime); + logger.info("[monitor] rescheduling check for [{}]. last access time is [{}]", id, lastSeenAccessTime); threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC); } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 0c859c5f6a64a..4bbd24c462001 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -974,6 +974,110 @@ public void getSegmentFiles( } } + public void testCloseShardWhileGettingCheckpoint() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + + ActionListener listener; + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + // set the listener, we will only fail it once we cancel the source. + this.listener = listener; + // shard is closing while we are copying files. + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + Assert.fail("Unreachable"); + } + + @Override + public void cancel() { + // simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed . + final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); + listener.onFailure(exception); + } + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, targetService); + + shards.removeReplica(replica); + closeShards(replica); + } + } + + public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + + ActionListener listener; + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + resolveCheckpointInfoResponseListener(listener, primary); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + // set the listener, we will only fail it once we cancel the source. + this.listener = listener; + // shard is closing while we are copying files. + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + } + + @Override + public void cancel() { + // simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed . + final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); + listener.onFailure(exception); + } + }; + when(sourceFactory.get(any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, targetService); + + shards.removeReplica(replica); + closeShards(replica); + } + } + public void testPrimaryCancelsExecution() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { shards.startAll(); @@ -1063,7 +1167,6 @@ public void onReplicationDone(SegmentReplicationState state) { @Override public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { assertFalse(sendShardFailure); - assertEquals(SegmentReplicationState.Stage.CANCELLED, state.getStage()); latch.countDown(); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index c632f2843cba2..d4c1a00c70df9 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -62,7 +62,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.opensearch.indices.replication.SegmentReplicationState.Stage.CANCELLED; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -304,7 +303,6 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I serviceSpy.startReplication(targetSpy); latch.await(); // wait for the new checkpoint to arrive, before the listener completes. - assertEquals(CANCELLED, targetSpy.state().getStage()); verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary"); verify(serviceSpy, times(1)).startReplication(eq(replicaShard), any()); }