Skip to content

Commit

Permalink
[Segment Replication] Add logic back to update tracking replication c…
Browse files Browse the repository at this point in the history
…heckpoint on source (opensearch-project#8560)

* [Segment Replication] Add logic back to update tracking replication checkpoint on source

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Update comment

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments & mute breaking bwc-test

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless check

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Stop timer inside try to prevent double stop on timer

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Update PressureITs to wait for appropriate transport call for replica update

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless check

Signed-off-by: Suraj Singh <surajrider@gmail.com>

---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
dreamer-89 authored and shiv0408 committed Apr 25, 2024
1 parent 4a2942b commit c663661
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public void testIndexing() throws IOException, ParseException {
*
* @throws Exception
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8322")
public void testIndexingWithSegRep() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
Expand Down Expand Up @@ -222,6 +223,11 @@ protected IndexShard getIndexShard(String node, String indexName) {
return indexService.getShard(shardId.get());
}

protected boolean segmentReplicationWithRemoteEnabled() {
return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue()
&& "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL));
}

protected Releasable blockReplication(List<String> nodes, CountDownLatch latch) {
CountDownLatch pauseReplicationLatch = new CountDownLatch(nodes.size());
for (String node : nodes) {
Expand All @@ -231,7 +237,11 @@ protected Releasable blockReplication(List<String> nodes, CountDownLatch latch)
node
));
mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT)) {
String actionToWaitFor = SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES;
if (segmentReplicationWithRemoteEnabled()) {
actionToWaitFor = SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT;
}
if (action.equals(actionToWaitFor)) {
try {
latch.countDown();
pauseReplicationLatch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
Expand Down Expand Up @@ -1324,9 +1323,4 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
ensureGreen(INDEX_NAME);
waitForSearchableDocs(2, nodes);
}

private boolean segmentReplicationWithRemoteEnabled() {
return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue()
&& "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
removeCopyState(sourceHandler.getCopyState());
}
});
if (request.getFilesToFetch().isEmpty()) {
wrappedListener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
} else {
handler.sendFiles(request, wrappedListener);
}
handler.sendFiles(request, wrappedListener);
} else {
listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.io.Closeable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -107,6 +108,14 @@ class SegmentReplicationSourceHandler {
* @param listener {@link ActionListener} that completes with the list of files sent.
*/
public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
// Short circuit when no files to transfer
if (request.getFilesToFetch().isEmpty()) {
// before completion, alert the primary of the replica's state.
shard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), copyState.getCheckpoint());
listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
return;
}

final ReplicationTimer timer = new ReplicationTimer();
if (isReplicating.compareAndSet(false, true) == false) {
throw new OpenSearchException("Replication to {} is already running.", shard.shardId());
Expand Down Expand Up @@ -159,10 +168,11 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene

sendFileStep.whenComplete(r -> {
try {
shard.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint());
future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata)));
timer.stop();
} finally {
IOUtils.close(resources);
timer.stop();
logger.trace(
"[replication id {}] Source node completed sending files to target node [{}], timing: {}",
request.getReplicationId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ public void onReplicationFailure(
}

protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) {
// Update replication checkpoint on source via transport call only supported for remote store integration. For node-
// node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call
if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) {
return;
}
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard();

final UpdateVisibleCheckpointRequest request = new UpdateVisibleCheckpointRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,13 @@ public void testReplicationAlreadyRunning() throws IOException {
1
);

final List<StoreFileMetadata> expectedFiles = List.of(new StoreFileMetadata("_0.si", 20, "test", Version.CURRENT.luceneVersion));

final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest(
1L,
replica.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
Collections.emptyList(),
expectedFiles,
latestReplicationCheckpoint
);

Expand All @@ -224,11 +226,12 @@ public void testCancelReplication() throws IOException, InterruptedException {
1
);

final List<StoreFileMetadata> expectedFiles = List.of(new StoreFileMetadata("_0.si", 20, "test", Version.CURRENT.luceneVersion));
final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest(
1L,
replica.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
Collections.emptyList(),
expectedFiles,
latestReplicationCheckpoint
);

Expand Down

0 comments on commit c663661

Please sign in to comment.