Skip to content

Commit

Permalink
[Segment Replication] Fix bug where ReplicationListeners would not co…
Browse files Browse the repository at this point in the history
…mplete on target cancellation.

This change updates cancellation flow for Segment Replication to ensure all listeners are resolved during cancellation.
It does this by requesting cancellation before shard closure instead of using ReplicationCollection's cancelForShard which immediately removes it from the replicationCollection even though the target is in the process of closing.  This would cause the underlying ReplicationListener to never get invoked on close.

This change includes new tests using suite scope to catch for any open tasks.
This caught other locations where this was possible:
1. On a replica during force sync if the shard was closed while resolving its listeners, it would never call back to the primary. - Fixed by refactoring those paths to use a ChannelActionListener.
2. On the primary during forceSync, the primary was relying on the replica to send cancellation in order to proceed after forceSync, Fixed by wrapping the recoveryTarget::forceSync in cancellableThreads.
3. During cancellation when pterm is greater on an incoming checkpoint, we would remove from collection but the target could still be open - fixed by waiting for the cancelled target to be closed before proceeding.  Also added method to ReplicationTarget to guarantee only a single target can be added to the collection.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jul 6, 2023
1 parent 9e4e54d commit 17af199
Show file tree
Hide file tree
Showing 15 changed files with 567 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
Expand Down Expand Up @@ -186,9 +187,23 @@ protected void verifyStoreContent() throws Exception {
}

private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) {
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName);
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
}

/**
* Fetch IndexShard by shardId, multiple shards per node allowed.
*/
protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> id = indexService.shardIds().stream().filter(sid -> sid == shardId.id()).findFirst();
return indexService.getShard(id.get());
}

/**
* Fetch IndexShard, assumes only a single shard per node.
*/
protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.junit.Before;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT {

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
createIndex(INDEX_NAME);
}

@Override
public Settings indexSettings() {
final Settings.Builder builder = Settings.builder()
.put(super.indexSettings())
// reset shard & replica count to random values set by OpenSearchIntegTestCase.
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);

// TODO: Randomly enable remote store on these tests.
return builder.build();
}

public void testBasicReplication() throws Exception {
final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh();
ensureGreen(INDEX_NAME);
verifyStoreContent();
}

public void testDropRandomNodeDuringReplication() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
internalCluster().startClusterManagerOnlyNodes(1);

final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh();

internalCluster().restartRandomDataNode();

ensureYellow(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get();
internalCluster().startDataOnlyNode();
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet();
}

public void testDeleteIndexWhileReplicating() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet();
}

public void testFullRestartDuringReplication() throws Exception {
internalCluster().startNode();
final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
internalCluster().fullRestart();
ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,7 @@ private NRTReplicationReaderManager buildReaderManager() throws IOException {
(files) -> {
store.decRefFileDeleter(files);
try {
store.cleanupAndPreserveLatestCommitPoint(
"On reader closed",
getLatestSegmentInfos(),
getLastCommittedSegmentInfos(),
false
);
store.cleanupAndPreserveLatestCommitPoint("On reader closed", getLatestSegmentInfos(), getLastCommittedSegmentInfos());
} catch (IOException e) {
// Log but do not rethrow - we can try cleaning up again after next replication cycle.
// If that were to fail, the shard will as well.
Expand Down
23 changes: 7 additions & 16 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException {
this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true);
this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo());
}

/**
Expand All @@ -816,33 +816,24 @@ public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos info
* @param reason the reason for this cleanup operation logged for each deleted file
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos
* @param deleteTempFiles Does this clean up delete temporary replication files
*
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(
String reason,
SegmentInfos infos,
SegmentInfos lastCommittedSegmentInfos,
boolean deleteTempFiles
) throws IOException {
public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos, SegmentInfos lastCommittedSegmentInfos)
throws IOException {
assert indexSettings.isSegRepEnabled();
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true), deleteTempFiles);
cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true));
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(
String reason,
Collection<String> localSnapshot,
@Nullable Collection<String> additionalFiles,
boolean deleteTempFiles
) throws IOException {
private void cleanupFiles(String reason, Collection<String> localSnapshot, @Nullable Collection<String> additionalFiles)
throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
Expand All @@ -851,7 +842,7 @@ private void cleanupFiles(
// also ensure we are not deleting a file referenced by an active reader.
|| replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false
// prevent temporary file deletion during reader cleanup
|| deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) {
|| existingFile.startsWith(REPLICATION_PREFIX)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
} else {
// Force round of segment replication to update its checkpoint to primary's
if (shard.indexSettings().isSegRepEnabled()) {
recoveryTarget.forceSegmentFileSync();
cancellableThreads.execute(recoveryTarget::forceSegmentFileSync);
}
}
stopWatch.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public enum Stage {
GET_CHECKPOINT_INFO((byte) 3),
FILE_DIFF((byte) 4),
GET_FILES((byte) 5),
FINALIZE_REPLICATION((byte) 6),
CANCELLED((byte) 7);
FINALIZE_REPLICATION((byte) 6);

private static final Stage[] STAGES = new Stage[Stage.values().length];

Expand Down Expand Up @@ -245,14 +244,6 @@ public void setStage(Stage stage) {
overallTimer.stop();
timingData.put("OVERALL", overallTimer.time());
break;
case CANCELLED:
if (this.stage == Stage.DONE) {
throw new IllegalStateException("can't move replication to Cancelled state from Done.");
}
this.stage = Stage.CANCELLED;
overallTimer.stop();
timingData.put("OVERALL", overallTimer.time());
break;
default:
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
}
Expand Down
Loading

0 comments on commit 17af199

Please sign in to comment.