Skip to content

Commit

Permalink
Fix bug where ReplicationListeners would not complete on target cance…
Browse files Browse the repository at this point in the history
…llation.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jul 5, 2023
1 parent 9e4e54d commit fff6bac
Show file tree
Hide file tree
Showing 16 changed files with 478 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public void testBasicUsage() throws Exception {
assertOrderedSearchHits(response, "2", "1");
}


@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testMultipleValues() throws Exception {
String index = "foo";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
Expand Down Expand Up @@ -186,15 +188,25 @@ protected void verifyStoreContent() throws Exception {
}

private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) {
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName);
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
}

protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> id = indexService.shardIds().stream()
.filter(sid -> sid == shardId.id())
.findFirst();
return indexService.getShard(id.get());
}

protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());
final Optional<Integer> id = indexService.shardIds().stream().findFirst();
return indexService.getShard(id.get());
}

protected Releasable blockReplication(List<String> nodes, CountDownLatch latch) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.AbstractDisruptionTestCase;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT {

protected static final String REPOSITORY_NAME = "test-remote-store-repo";

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
// set up remote store repo, will randomly enable it in this suite.
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
createIndex(INDEX_NAME);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

@Override
public Settings indexSettings() {
final Settings.Builder builder = Settings.builder()
.put(super.indexSettings())
// reset shard & replica count to random values set by OpenSearchIntegTestCase.
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);

//TODO: Randomly enable remote store on these tests.
// if (randomBoolean()) {
// builder
// .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
// .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME);
// }
return builder.build();
}

public void testBasicReplication() throws Exception {
final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh();
ensureGreen(INDEX_NAME);
verifyStoreContent();
}

public void testDropReplicaDuringCopy() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
internalCluster().startClusterManagerOnlyNodes(1);

final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}

internalCluster().restartRandomDataNode();

ensureYellow(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get();
internalCluster().startDataOnlyNode();
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet();
}

public void testDeleteIndexWhileReplicating() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet();
}

public void testCancelWhileOngoingRecovery() throws Exception {
internalCluster().startNode();
final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
// start a node to trigger a recovery
internalCluster().startNode();
internalCluster().fullRestart();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ private NRTReplicationReaderManager buildReaderManager() throws IOException {
store.cleanupAndPreserveLatestCommitPoint(
"On reader closed",
getLatestSegmentInfos(),
getLastCommittedSegmentInfos(),
false
getLastCommittedSegmentInfos()
);
} catch (IOException e) {
// Log but do not rethrow - we can try cleaning up again after next replication cycle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,13 @@ public void relocated(
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";

performSegRep.run();
try {
logger.info("Starting force sync from relocated");
performSegRep.run();
} catch (Exception e) {
logger.error("Hmm", e);
}
logger.info("Finished force sync from relocated -- continue handoff");
/*
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
* network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
Expand Down
13 changes: 5 additions & 8 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException {
this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true);
this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo());
}

/**
Expand All @@ -816,22 +816,20 @@ public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos info
* @param reason the reason for this cleanup operation logged for each deleted file
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos
* @param deleteTempFiles Does this clean up delete temporary replication files
*
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(
String reason,
SegmentInfos infos,
SegmentInfos lastCommittedSegmentInfos,
boolean deleteTempFiles
SegmentInfos lastCommittedSegmentInfos
) throws IOException {
assert indexSettings.isSegRepEnabled();
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true), deleteTempFiles);
cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true));
} finally {
metadataLock.writeLock().unlock();
}
Expand All @@ -840,8 +838,7 @@ public void cleanupAndPreserveLatestCommitPoint(
private void cleanupFiles(
String reason,
Collection<String> localSnapshot,
@Nullable Collection<String> additionalFiles,
boolean deleteTempFiles
@Nullable Collection<String> additionalFiles
) throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
Expand All @@ -851,7 +848,7 @@ private void cleanupFiles(
// also ensure we are not deleting a file referenced by an active reader.
|| replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false
// prevent temporary file deletion during reader cleanup
|| deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) {
|| existingFile.startsWith(REPLICATION_PREFIX)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
Expand Down
Loading

0 comments on commit fff6bac

Please sign in to comment.