Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add logic back to update tracking replication checkpoint on source #8560

Merged
merged 7 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public void testIndexing() throws IOException, ParseException {
*
* @throws Exception
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8322")
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
public void testIndexingWithSegRep() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
Expand Down Expand Up @@ -197,6 +198,11 @@ protected IndexShard getIndexShard(String node, String indexName) {
return indexService.getShard(shardId.get());
}

protected boolean segmentReplicationWithRemoteEnabled() {
return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue()
&& "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL));
}

protected Releasable blockReplication(List<String> nodes, CountDownLatch latch) {
CountDownLatch pauseReplicationLatch = new CountDownLatch(nodes.size());
for (String node : nodes) {
Expand All @@ -206,7 +212,11 @@ protected Releasable blockReplication(List<String> nodes, CountDownLatch latch)
node
));
mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT)) {
String actionToWaitFor = SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES;
if (segmentReplicationWithRemoteEnabled()) {
actionToWaitFor = SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT;
}
if (action.equals(actionToWaitFor)) {
try {
latch.countDown();
pauseReplicationLatch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
Expand Down Expand Up @@ -1324,9 +1323,4 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
ensureGreen(INDEX_NAME);
waitForSearchableDocs(2, nodes);
}

private boolean segmentReplicationWithRemoteEnabled() {
return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue()
&& "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
removeCopyState(sourceHandler.getCopyState());
}
});
if (request.getFilesToFetch().isEmpty()) {
wrappedListener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
} else {
handler.sendFiles(request, wrappedListener);
}
handler.sendFiles(request, wrappedListener);
} else {
listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.io.Closeable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -107,6 +108,14 @@ class SegmentReplicationSourceHandler {
* @param listener {@link ActionListener} that completes with the list of files sent.
*/
public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
// Short circuit when no files to transfer
if (request.getFilesToFetch().isEmpty()) {
// before completion, alert the primary of the replica's state.
shard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), copyState.getCheckpoint());
listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
return;
}

final ReplicationTimer timer = new ReplicationTimer();
if (isReplicating.compareAndSet(false, true) == false) {
throw new OpenSearchException("Replication to {} is already running.", shard.shardId());
Expand Down Expand Up @@ -159,10 +168,11 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene

sendFileStep.whenComplete(r -> {
try {
shard.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint());
future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata)));
timer.stop();
} finally {
IOUtils.close(resources);
timer.stop();
logger.trace(
"[replication id {}] Source node completed sending files to target node [{}], timing: {}",
request.getReplicationId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ public void onReplicationFailure(
}

protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) {
// Update replication checkpoint on source via transport call only supported for remote store integration. For node-
// node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call
if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) {
return;
}
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard();

final UpdateVisibleCheckpointRequest request = new UpdateVisibleCheckpointRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,13 @@ public void testReplicationAlreadyRunning() throws IOException {
1
);

final List<StoreFileMetadata> expectedFiles = List.of(new StoreFileMetadata("_0.si", 20, "test", Version.CURRENT.luceneVersion));

final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest(
1L,
replica.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
Collections.emptyList(),
expectedFiles,
latestReplicationCheckpoint
);

Expand All @@ -224,11 +226,12 @@ public void testCancelReplication() throws IOException, InterruptedException {
1
);

final List<StoreFileMetadata> expectedFiles = List.of(new StoreFileMetadata("_0.si", 20, "test", Version.CURRENT.luceneVersion));
final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest(
1L,
replica.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
Collections.emptyList(),
expectedFiles,
latestReplicationCheckpoint
);

Expand Down
Loading