diff --git a/CHANGELOG.md b/CHANGELOG.md index 60f16de7c3623..60ff5fa026455 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514)) - Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956)) - Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967)) +- Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020)) - Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java index 98541310649db..9a8cdc384b107 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -123,9 +123,9 @@ public void testWritesRejected() throws Exception { assertEquals(perGroupStats.getRejectedRequestCount(), 2L); } refresh(INDEX_NAME); - // wait for the replicas to catch up after block is released. - waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {})); + // wait for the replicas to catch up after block is released. + assertReplicaCheckpointUpdated(primaryShard); // index another doc showing there is no pressure enforced. indexDoc(); refresh(INDEX_NAME); @@ -179,7 +179,7 @@ public void testAddReplicaWhileWritesBlocked() throws Exception { } refresh(INDEX_NAME); // wait for the replicas to catch up after block is released. - waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {})); + assertReplicaCheckpointUpdated(primaryShard); // index another doc showing there is no pressure enforced. indexDoc(); @@ -258,6 +258,10 @@ public void testFailStaleReplica() throws Exception { } public void testWithDocumentReplicationEnabledIndex() throws Exception { + assumeTrue( + "Can't create DocRep index with remote store enabled. Skipping.", + indexSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false) == false + ); Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build(); // Starts a primary and replica node. final String primaryNode = internalCluster().startNode(settings); @@ -313,7 +317,7 @@ public void testBulkWritesRejected() throws Exception { } refresh(INDEX_NAME); // wait for the replicas to catch up after block is released. - waitForSearchableDocs(totalDocs, replicaNodes.toArray(new String[] {})); + assertReplicaCheckpointUpdated(primaryShard); // index another doc showing there is no pressure enforced. executeBulkRequest(nodes, totalDocs); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 4c839a4c6d9dd..7675e454a0696 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -23,6 +23,7 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; @@ -38,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -204,7 +206,7 @@ protected Releasable blockReplication(List nodes, CountDownLatch latch) node )); mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) { + if (action.equals(SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT)) { try { latch.countDown(); pauseReplicationLatch.await(); @@ -222,4 +224,13 @@ protected Releasable blockReplication(List nodes, CountDownLatch latch) }; } + protected void assertReplicaCheckpointUpdated(IndexShard primaryShard) throws Exception { + assertBusy(() -> { + Set groupStats = primaryShard.getReplicationStats(); + assertEquals(primaryShard.indexSettings().getNumberOfReplicas(), groupStats.size()); + for (SegmentReplicationShardStats shardStat : groupStats) { + assertEquals(0, shardStat.getCheckpointsBehindCount()); + } + }, 30, TimeUnit.SECONDS); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 818da5ceea356..13c5e8c7bdd9e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -797,10 +797,6 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { } public void testPressureServiceStats() throws Exception { - assumeFalse( - "Skipping the test as pressure service is not compatible with SegRep and Remote store yet.", - segmentReplicationWithRemoteEnabled() - ); final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); final String replicaNode = internalCluster().startNode(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationWithRemoteStorePressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationWithRemoteStorePressureIT.java new file mode 100644 index 0000000000000..e7c9c811b7734 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationWithRemoteStorePressureIT.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.SegmentReplicationPressureIT; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +/** + * This class executes the SegmentReplicationPressureIT suite with remote store integration enabled. + * Setup is similar to SegmentReplicationPressureIT but this also enables the segment replication using remote store which + * is behind SEGMENT_REPLICATION_EXPERIMENTAL flag. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationWithRemoteStorePressureIT extends SegmentReplicationPressureIT { + + private static final String REPOSITORY_NAME = "test-remote-store-repo"; + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + Path absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } +} diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 6b34d6641fcf2..c1b2a66223043 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1176,13 +1176,18 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation assert handoffInProgress == false; assert invariant(); final CheckpointState cps = checkpoints.get(allocationId); - assert !this.shardAllocationId.equals(allocationId) && cps != null; + assert !this.shardAllocationId.equals(allocationId); + // Ignore if the cps is null (i.e. replica shard not in active state). + if (cps == null) { + logger.warn("Ignoring the checkpoint update for allocation ID {} as its not being tracked by primary", allocationId); + return; + } if (cps.checkpointTimers.isEmpty() == false) { // stop any timers for checkpoints up to the received cp and remove from cps.checkpointTimers. // Compute the max lag from the set of completed timers. final AtomicLong lastFinished = new AtomicLong(0L); cps.checkpointTimers.entrySet().removeIf((entry) -> { - boolean result = visibleCheckpoint.equals(entry.getKey()) || visibleCheckpoint.isAheadOf(entry.getKey()); + boolean result = entry.getKey().isAheadOf(visibleCheckpoint) == false; if (result) { final ReplicationTimer timer = entry.getValue(); timer.stop(); diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 6f04c6cf6f665..050a66bedcf5d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -121,10 +121,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener { try { - shard.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint()); future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); } finally { IOUtils.close(resources); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 0e62a4320e3f3..79186deeeaf0f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -34,6 +34,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; import java.io.IOException; @@ -63,6 +64,7 @@ public static class Actions { public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info"; public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files"; + public static final String UPDATE_VISIBLE_CHECKPOINT = "internal:index/shard/replication/update_visible_checkpoint"; } private final OngoingSegmentReplications ongoingSegmentReplications; @@ -89,6 +91,12 @@ protected SegmentReplicationSourceService( GetSegmentFilesRequest::new, new GetSegmentFilesRequestHandler() ); + transportService.registerRequestHandler( + Actions.UPDATE_VISIBLE_CHECKPOINT, + ThreadPool.Names.GENERIC, + UpdateVisibleCheckpointRequest::new, + new UpdateVisibleCheckpointRequestHandler() + ); } public SegmentReplicationSourceService( @@ -142,6 +150,20 @@ public void messageReceived(GetSegmentFilesRequest request, TransportChannel cha } } + private class UpdateVisibleCheckpointRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(UpdateVisibleCheckpointRequest request, TransportChannel channel, Task task) throws Exception { + try { + IndexService indexService = indicesService.indexServiceSafe(request.getPrimaryShardId().getIndex()); + IndexShard indexShard = indexService.getShard(request.getPrimaryShardId().id()); + indexShard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), request.getCheckpoint()); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (Exception e) { + channel.sendResponse(e); + } + } + } + @Override public void clusterChanged(ClusterChangedEvent event) { if (event.nodesRemoved()) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 7567991e4f9bc..a7e0c0ec887ab 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -13,7 +13,9 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; @@ -26,6 +28,7 @@ import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.recovery.ForceSyncRequest; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RetryableTransportClient; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef; @@ -36,6 +39,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; @@ -45,6 +49,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT; + /** * Service class that orchestrates replication events on replicas. * @@ -66,6 +72,8 @@ public class SegmentReplicationTargetService implements IndexEventListener { protected final Map latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap(); private final IndicesService indicesService; + private final ClusterService clusterService; + private final TransportService transportService; public ReplicationRef get(long replicationId) { return onGoingReplications.get(replicationId); @@ -86,7 +94,8 @@ public SegmentReplicationTargetService( final RecoverySettings recoverySettings, final TransportService transportService, final SegmentReplicationSourceFactory sourceFactory, - final IndicesService indicesService + final IndicesService indicesService, + final ClusterService clusterService ) { this( threadPool, @@ -94,6 +103,7 @@ public SegmentReplicationTargetService( transportService, sourceFactory, indicesService, + clusterService, new ReplicationCollection<>(logger, threadPool) ); } @@ -104,6 +114,7 @@ public SegmentReplicationTargetService( final TransportService transportService, final SegmentReplicationSourceFactory sourceFactory, final IndicesService indicesService, + final ClusterService clusterService, final ReplicationCollection ongoingSegmentReplications ) { this.threadPool = threadPool; @@ -111,6 +122,8 @@ public SegmentReplicationTargetService( this.onGoingReplications = ongoingSegmentReplications; this.sourceFactory = sourceFactory; this.indicesService = indicesService; + this.clusterService = clusterService; + this.transportService = transportService; transportService.registerRequestHandler( Actions.FILE_CHUNK, @@ -240,6 +253,10 @@ public void onReplicationDone(SegmentReplicationState state) { state.getTimingData() ) ); + + // update visible checkpoint to primary + updateVisibleCheckpoint(state.getReplicationId(), replicaShard); + // if we received a checkpoint during the copy event that is ahead of this // try and process it. processLatestReceivedCheckpoint(replicaShard, thread); @@ -274,6 +291,61 @@ public void onReplicationFailure( } } + protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) { + ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard(); + + final UpdateVisibleCheckpointRequest request = new UpdateVisibleCheckpointRequest( + replicationId, + replicaShard.routingEntry().allocationId().getId(), + primaryShard.shardId(), + getPrimaryNode(primaryShard), + replicaShard.getLatestReplicationCheckpoint() + ); + + final TransportRequestOptions options = TransportRequestOptions.builder() + .withTimeout(recoverySettings.internalActionTimeout()) + .build(); + logger.debug("Updating replication checkpoint to {}", request.getCheckpoint()); + RetryableTransportClient transportClient = new RetryableTransportClient( + transportService, + getPrimaryNode(primaryShard), + recoverySettings.internalActionRetryTimeout(), + logger + ); + final ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.debug( + "Successfully updated replication checkpoint {} for replica {}", + replicaShard.shardId(), + request.getCheckpoint() + ); + } + + @Override + public void onFailure(Exception e) { + logger.error( + "Failed to update visible checkpoint for replica {}, {}: {}", + replicaShard.shardId(), + request.getCheckpoint(), + e + ); + } + }; + + transportClient.executeRetryableAction( + UPDATE_VISIBLE_CHECKPOINT, + request, + options, + ActionListener.map(listener, r -> null), + in -> TransportResponse.Empty.INSTANCE + ); + } + + private DiscoveryNode getPrimaryNode(ShardRouting primaryShard) { + return clusterService.state().nodes().get(primaryShard.currentNodeId()); + } + // visible to tests protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread) { final ReplicationCheckpoint latestPublishedCheckpoint = latestReceivedCheckpoint.get(replicaShard.shardId()); @@ -436,6 +508,9 @@ public void onReplicationDone(SegmentReplicationState state) { // Promote engine type for primary target if (indexShard.recoveryState().getPrimary() == true) { indexShard.resetToWriteableEngine(); + } else { + // Update the replica's checkpoint on primary's replication tracker. + updateVisibleCheckpoint(state.getReplicationId(), indexShard); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (InterruptedException | TimeoutException | IOException e) { diff --git a/server/src/main/java/org/opensearch/indices/replication/UpdateVisibleCheckpointRequest.java b/server/src/main/java/org/opensearch/indices/replication/UpdateVisibleCheckpointRequest.java new file mode 100644 index 0000000000000..2674adf711406 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/UpdateVisibleCheckpointRequest.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest; + +import java.io.IOException; + +/** + * Request object for updating the replica's checkpoint on primary for tracking. + * + * @opensearch.internal + */ +public class UpdateVisibleCheckpointRequest extends SegmentReplicationTransportRequest { + + private final ReplicationCheckpoint checkpoint; + private final ShardId primaryShardId; + + public UpdateVisibleCheckpointRequest(StreamInput in) throws IOException { + super(in); + checkpoint = new ReplicationCheckpoint(in); + primaryShardId = new ShardId(in); + } + + public UpdateVisibleCheckpointRequest( + long replicationId, + String targetAllocationId, + ShardId primaryShardId, + DiscoveryNode targetNode, + ReplicationCheckpoint checkpoint + ) { + super(replicationId, targetAllocationId, targetNode); + this.checkpoint = checkpoint; + this.primaryShardId = primaryShardId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + checkpoint.writeTo(out); + primaryShardId.writeTo(out); + } + + public ReplicationCheckpoint getCheckpoint() { + return checkpoint; + } + + public ShardId getPrimaryShardId() { + return primaryShardId; + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 17582bff1fc97..eb1b97647896d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1105,7 +1105,8 @@ protected Node( recoverySettings, transportService, new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService), - indicesService + indicesService, + clusterService ) ); b.bind(SegmentReplicationSourceService.class) diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index 7cfc95d7f5cff..d26d652451f35 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -1868,6 +1868,64 @@ public void testSegmentReplicationCheckpointTracking() { } } + public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() { + Settings settings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + final long initialClusterStateVersion = randomNonNegativeLong(); + final int numberOfActiveAllocationsIds = randomIntBetween(2, 16); + final int numberOfInitializingIds = randomIntBetween(2, 16); + final Tuple, Set> activeAndInitializingAllocationIds = randomActiveAndInitializingAllocationIds( + numberOfActiveAllocationsIds, + numberOfInitializingIds + ); + final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final Set initializingIds = activeAndInitializingAllocationIds.v2(); + AllocationId primaryId = activeAllocationIds.iterator().next(); + IndexShardRoutingTable routingTable = routingTable(initializingIds, primaryId); + final ReplicationTracker tracker = newTracker(primaryId, settings); + + tracker.updateFromClusterManager(initialClusterStateVersion, ids(activeAllocationIds), routingTable); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); + assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds))); + assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable)); + assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync)); + + // get insync ids, filter out the primary. + final Set inSyncAllocationIds = tracker.getReplicationGroup() + .getInSyncAllocationIds() + .stream() + .filter(id -> tracker.shardAllocationId.equals(id) == false) + .collect(Collectors.toSet()); + + final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 1, + 1, + 1L, + Codec.getDefault().getName() + ); + tracker.setLatestReplicationCheckpoint(initialCheckpoint); + + Set groupStats = tracker.getSegmentReplicationStats(); + assertEquals(inSyncAllocationIds.size(), groupStats.size()); + for (SegmentReplicationShardStats shardStat : groupStats) { + assertEquals(1, shardStat.getCheckpointsBehindCount()); + } + + // simulate replicas moved up to date. + final Map checkpoints = tracker.checkpoints; + for (String id : inSyncAllocationIds) { + final ReplicationTracker.CheckpointState checkpointState = checkpoints.get(id); + assertEquals(1, checkpointState.checkpointTimers.size()); + tracker.updateVisibleCheckpointForShard(id, initialCheckpoint); + assertEquals(0, checkpointState.checkpointTimers.size()); + } + + // Unknown allocation ID will be ignored. + tracker.updateVisibleCheckpointForShard("randomAllocationID", initialCheckpoint); + assertThrows(AssertionError.class, () -> tracker.updateVisibleCheckpointForShard(tracker.shardAllocationId, initialCheckpoint)); + } + public void testPrimaryContextHandoffWithRemoteTranslogEnabled() throws IOException { Settings settings = Settings.builder().put("index.remote_store.translog.enabled", "true").build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", settings); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index cfd24a6858348..8d4a2046b72d8 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -19,6 +19,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingHelper; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; @@ -295,7 +296,13 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { IndexShard primaryShard = newStartedShard(true); SegmentReplicationTargetService sut; - sut = prepareForReplication(primaryShard, null, mock(TransportService.class), mock(IndicesService.class)); + sut = prepareForReplication( + primaryShard, + null, + mock(TransportService.class), + mock(IndicesService.class), + mock(ClusterService.class) + ); SegmentReplicationTargetService spy = spy(sut); // Starting a new shard in PrimaryMode and shard routing primary. @@ -1011,6 +1018,7 @@ private SegmentReplicationTargetService newTargetService(SegmentReplicationSourc new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), mock(TransportService.class), sourceFactory, + null, null ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 76481ebbecea3..1ceba05f16369 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -14,6 +14,7 @@ import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -33,6 +34,7 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; @@ -57,12 +59,13 @@ public class SegmentReplicationSourceServiceTests extends OpenSearchTestCase { private DiscoveryNode localNode; private SegmentReplicationSourceService segmentReplicationSourceService; private OngoingSegmentReplications ongoingSegmentReplications; + private IndexShard mockIndexShard; @Override public void setUp() throws Exception { super.setUp(); // setup mocks - IndexShard mockIndexShard = CopyStateTests.createMockIndexShard(); + mockIndexShard = CopyStateTests.createMockIndexShard(); ShardId testShardId = mockIndexShard.shardId(); IndicesService mockIndicesService = mock(IndicesService.class); IndexService mockIndexService = mock(IndexService.class); @@ -145,6 +148,27 @@ public void onFailure(Exception e) { }); } + public void testUpdateVisibleCheckpoint() { + UpdateVisibleCheckpointRequest request = new UpdateVisibleCheckpointRequest( + 0L, + "", + mockIndexShard.shardId(), + localNode, + testCheckpoint + ); + executeUpdateVisibleCheckpoint(request, new ActionListener<>() { + @Override + public void onResponse(TransportResponse transportResponse) { + assertTrue(TransportResponse.Empty.INSTANCE.equals(transportResponse)); + } + + @Override + public void onFailure(Exception e) { + fail("unexpected exception: " + e); + } + }); + } + public void testCheckpointInfo() { executeGetCheckpointInfo(new ActionListener<>() { @Override @@ -226,4 +250,44 @@ public GetSegmentFilesResponse read(StreamInput in) throws IOException { } ); } + + private void executeUpdateVisibleCheckpoint(UpdateVisibleCheckpointRequest request, ActionListener listener) { + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + UpdateVisibleCheckpointRequest newRequest = new UpdateVisibleCheckpointRequest(in); + assertTrue(newRequest.getCheckpoint().equals(request.getCheckpoint())); + assertTrue(newRequest.getTargetAllocationId().equals(request.getTargetAllocationId())); + } + } catch (IOException e) { + fail("Failed to parse UpdateVisibleCheckpointRequest " + e); + } + + transportService.sendRequest( + localNode, + SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT, + request, + new TransportResponseHandler<>() { + @Override + public void handleResponse(TransportResponse response) { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public void handleException(TransportException e) { + listener.onFailure(e); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public CheckpointInfoResponse read(StreamInput in) throws IOException { + return new CheckpointInfoResponse(in); + } + } + ); + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 9c796ec05c22a..725fcd8f35693 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -13,8 +13,12 @@ import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; @@ -29,6 +33,7 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationFailedException; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -44,12 +49,14 @@ import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -75,6 +82,9 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private DiscoveryNode localNode; private IndicesService indicesService; + private ClusterService clusterService; + + private SegmentReplicationState state; private static long TRANSPORT_TIMEOUT = 30000;// 30sec @@ -101,7 +111,11 @@ public void setUp() throws Exception { when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource); testThreadPool = new TestThreadPool("test", Settings.EMPTY); - localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + localNode = new DiscoveryNode( + primaryShard.getReplicationGroup().getRoutingTable().primaryShard().currentNodeId(), + buildNewFakeTransportAddress(), + Version.CURRENT + ); CapturingTransport transport = new CapturingTransport(); transportService = transport.createTransportService( Settings.EMPTY, @@ -115,8 +129,15 @@ public void setUp() throws Exception { transportService.acceptIncomingRequests(); indicesService = mock(IndicesService.class); - - sut = prepareForReplication(primaryShard, null, transportService, indicesService); + clusterService = mock(ClusterService.class); + ClusterState clusterState = mock(ClusterState.class); + RoutingTable mockRoutingTable = mock(RoutingTable.class); + when(clusterService.state()).thenReturn(clusterState); + when(clusterState.routingTable()).thenReturn(mockRoutingTable); + when(mockRoutingTable.shardRoutingTable(any())).thenReturn(primaryShard.getReplicationGroup().getRoutingTable()); + + when(clusterState.nodes()).thenReturn(DiscoveryNodes.builder().add(localNode).build()); + sut = prepareForReplication(primaryShard, replicaShard, transportService, indicesService, clusterService); initialCheckpoint = replicaShard.getLatestReplicationCheckpoint(); aheadCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), @@ -132,6 +153,14 @@ public void setUp() throws Exception { initialCheckpoint.getSegmentInfosVersion() + 1, primaryCodec ); + + state = new SegmentReplicationState( + replicaShard.routingEntry(), + new ReplicationLuceneIndex(), + 0L, + "", + new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT) + ); } @Override @@ -330,6 +359,42 @@ public void testAfterIndexShardStartedProcessesLatestReceivedCheckpoint() { verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any()); } + public void testStartReplicationListenerSuccess() throws InterruptedException { + sut.updateLatestReceivedCheckpoint(aheadCheckpoint, replicaShard); + SegmentReplicationTargetService spy = spy(sut); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(i -> { + ((SegmentReplicationTargetService.SegmentReplicationListener) i.getArgument(1)).onReplicationDone(state); + latch.countDown(); + return null; + }).when(spy).startReplication(any(), any()); + doNothing().when(spy).updateVisibleCheckpoint(eq(0L), any()); + spy.afterIndexShardStarted(replicaShard); + + latch.await(2, TimeUnit.SECONDS); + verify(spy, (atLeastOnce())).updateVisibleCheckpoint(eq(0L), eq(replicaShard)); + } + + public void testStartReplicationListenerFailure() throws InterruptedException { + sut.updateLatestReceivedCheckpoint(aheadCheckpoint, replicaShard); + SegmentReplicationTargetService spy = spy(sut); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(i -> { + ((SegmentReplicationTargetService.SegmentReplicationListener) i.getArgument(1)).onReplicationFailure( + state, + new ReplicationFailedException(replicaShard, null), + false + ); + latch.countDown(); + return null; + }).when(spy).startReplication(any(), any()); + doNothing().when(spy).updateVisibleCheckpoint(eq(0L), any()); + spy.afterIndexShardStarted(replicaShard); + + latch.await(2, TimeUnit.SECONDS); + verify(spy, (never())).updateVisibleCheckpoint(eq(0L), eq(replicaShard)); + } + public void testDoNotProcessLatestCheckpointIfItIsbehind() { sut.updateLatestReceivedCheckpoint(replicaShard.getLatestReplicationCheckpoint(), replicaShard); assertFalse(sut.processLatestReceivedCheckpoint(replicaShard, null)); @@ -345,6 +410,7 @@ public void testOnNewCheckpointInvokedOnClosedShardDoesNothing() throws IOExcept public void testBeforeIndexShardClosed_DoesNothingForDocRepIndex() throws IOException { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final IndicesService indicesService = mock(IndicesService.class); + final ClusterService clusterService = mock(ClusterService.class); final ReplicationCollection ongoingReplications = mock(ReplicationCollection.class); final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( threadPool, @@ -352,6 +418,7 @@ public void testBeforeIndexShardClosed_DoesNothingForDocRepIndex() throws IOExce mock(TransportService.class), sourceFactory, indicesService, + clusterService, ongoingReplications ); final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); @@ -364,6 +431,7 @@ public void testBeforeIndexShardClosed_DoesNothingForDocRepIndex() throws IOExce public void testShardRoutingChanged_DoesNothingForDocRepIndex() throws IOException { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final IndicesService indicesService = mock(IndicesService.class); + final ClusterService clusterService = mock(ClusterService.class); final ReplicationCollection ongoingReplications = mock(ReplicationCollection.class); final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( threadPool, @@ -371,6 +439,7 @@ public void testShardRoutingChanged_DoesNothingForDocRepIndex() throws IOExcepti mock(TransportService.class), sourceFactory, indicesService, + clusterService, ongoingReplications ); final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index efa1db17d0461..0bb2b604e8f1a 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1867,7 +1867,8 @@ public void onFailure(final Exception e) { recoverySettings, transportService, new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService), - indicesService + indicesService, + clusterService ), mock(SegmentReplicationSourceService.class), shardStateAction, diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 64f1947044c94..4f4ad6e90abfb 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1302,7 +1302,8 @@ public final SegmentReplicationTargetService prepareForReplication( IndexShard primaryShard, IndexShard target, TransportService transportService, - IndicesService indicesService + IndicesService indicesService, + ClusterService clusterService ) { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( @@ -1310,7 +1311,8 @@ public final SegmentReplicationTargetService prepareForReplication( new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), transportService, sourceFactory, - indicesService + indicesService, + clusterService ); final SegmentReplicationSource replicationSource = new TestReplicationSource() { @Override @@ -1379,7 +1381,8 @@ public final List replicateSegments(IndexShard primary primaryShard, replica, mock(TransportService.class), - mock(IndicesService.class) + mock(IndicesService.class), + mock(ClusterService.class) ); final SegmentReplicationTarget target = targetService.startReplication( replica,