From 8446946462e56a653d30d8bf4655327adc48871b Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Sun, 15 Jan 2023 10:47:51 -0800 Subject: [PATCH 1/8] Update force segment replication sync to be synchronous Signed-off-by: Suraj Singh --- .../SegmentReplicationRelocationIT.java | 113 +++++++++ .../opensearch/index/shard/IndexShard.java | 49 ++-- .../recovery/RecoverySourceHandler.java | 22 +- .../indices/recovery/RecoveryTarget.java | 2 +- .../recovery/RecoveryTargetHandler.java | 3 +- .../recovery/RemoteRecoveryTargetHandler.java | 15 +- .../SegmentReplicationSourceHandler.java | 24 +- .../index/shard/IndexShardTests.java | 220 +++--------------- .../SegmentReplicationIndexShardTests.java | 139 +++++++++-- ...alStorePeerRecoverySourceHandlerTests.java | 2 +- ...enSearchIndexLevelReplicationTestCase.java | 2 +- .../index/shard/IndexShardTestCase.java | 32 +-- .../indices/recovery/AsyncRecoveryTarget.java | 12 +- 13 files changed, 339 insertions(+), 296 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index fb8b6a7150b9a..6f151e1465528 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -22,13 +22,17 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexModule; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -276,4 +280,113 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E }, 1, TimeUnit.MINUTES); assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 120L); } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") + public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { + final String primary = internalCluster().startNode(featureFlagSettings()); + prepareCreate( + INDEX_NAME, + Settings.builder() + .put("index.number_of_shards", 1) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", -1) + ).get(); + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush to have segments on disk"); + client().admin().indices().prepareFlush().execute().actionGet(); + + logger.info("--> index more docs so there are ops in the transaction log"); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 10; i < 20; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + final String replica = internalCluster().startNode(featureFlagSettings()); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + // Mock transport service to add behaviour of throwing corruption exception during segment replication process. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, replica)); + // Block segment replication to allow adding operations during handoff + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch relocationLatch = new CountDownLatch(1); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, primary), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) { + latch.countDown(); + try { + relocationLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + int totalDocCount = 2000; + Runnable relocationThread = () -> { + // Wait for relocation to halt at SegRep. Ingest docs at that point. + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + for (int i = 20; i < totalDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute() + ); + } + IndexShard indexShard = getIndexShard(replica); + logger.info("--> Active operations {}", indexShard.getActiveOperationsCount()); + assertTrue(indexShard.getActiveOperationsCount() > 0); + // Post ingestion, let SegRep to complete. + relocationLatch.countDown(); + }; + + logger.info("--> relocate the shard from primary to replica"); + ActionFuture relocationListener = client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) + .execute(); + + // This thread first waits for recovery to halt during segment replication. After which it ingests data to ensure + // have some documents queued. + relocationThread.run(); + + relocationListener.actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> verifying count"); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + assertEquals(totalDocCount, client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 37f92471d0cde..431519658edd1 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -61,7 +61,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; -import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; @@ -754,7 +753,6 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta * * @param performSegRep a {@link Runnable} that is executed after operations are blocked * @param consumer a {@link Runnable} that is executed after performSegRep - * @param listener ActionListener * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation * @throws IllegalStateException if the relocation target is no longer part of the replication group * @throws InterruptedException if blocking operations is interrupted @@ -762,8 +760,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta public void relocated( final String targetAllocationId, final Consumer consumer, - final Consumer performSegRep, - final ActionListener listener + final Runnable performSegRep ) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { @@ -781,32 +778,28 @@ public void relocated( assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED : "in-flight operations in progress while moving shard state to relocated"; - final StepListener segRepSyncListener = new StepListener<>(); - performSegRep.accept(segRepSyncListener); - segRepSyncListener.whenComplete(r -> { - /* - * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a - * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. - */ - verifyRelocatingState(); - final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId); + performSegRep.run(); + /* + * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a + * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. + */ + verifyRelocatingState(); + final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId); + try { + consumer.accept(primaryContext); + synchronized (mutex) { + verifyRelocatingState(); + replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under + // mutex + } + } catch (final Exception e) { try { - consumer.accept(primaryContext); - synchronized (mutex) { - verifyRelocatingState(); - replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under - // mutex - } - } catch (final Exception e) { - try { - replicationTracker.abortRelocationHandoff(); - } catch (final Exception inner) { - e.addSuppressed(inner); - } - throw e; + replicationTracker.abortRelocationHandoff(); + } catch (final Exception inner) { + e.addSuppressed(inner); } - listener.onResponse(null); - }, listener::onFailure); + throw e; + } }); } catch (TimeoutException e) { logger.warn("timed out waiting for relocation hand-off to complete"); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 276821dfb09b4..832e17eded8fb 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -818,34 +818,24 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis logger ); - final StepListener handoffListener = new StepListener<>(); if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); - final Consumer forceSegRepConsumer = shard.indexSettings().isSegRepEnabled() + final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabled() ? recoveryTarget::forceSegmentFileSync - : res -> res.onResponse(null); + : () -> {}; // TODO: make relocated async // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done cancellableThreads.execute( - () -> shard.relocated( - request.targetAllocationId(), - recoveryTarget::handoffPrimaryContext, - forceSegRepConsumer, - handoffListener - ) + () -> shard.relocated(request.targetAllocationId(), recoveryTarget::handoffPrimaryContext, forceSegRepRunnable) ); /* * if the recovery process fails after disabling primary mode on the source shard, both relocation source and * target are failed (see {@link IndexShard#updateRoutingEntry}). */ - } else { - handoffListener.onResponse(null); } - handoffListener.whenComplete(res -> { - stopWatch.stop(); - logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); - listener.onResponse(null); - }, listener::onFailure); + stopWatch.stop(); + logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); + listener.onResponse(null); }, listener::onFailure); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index c8cc5c4409e6b..7d63b4a67a694 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -217,7 +217,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { + public void forceSegmentFileSync() { throw new UnsupportedOperationException("Method not supported on target!"); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java index ef0d4abc44c7d..b43253c32d844 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java @@ -58,9 +58,8 @@ public interface RecoveryTargetHandler extends FileChunkWriter { * * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files * conflict with replicas when target is promoted as primary. - * @param listener segment replication event listener */ - void forceSegmentFileSync(ActionListener listener); + void forceSegmentFileSync(); /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index 5f638103a021c..a8dd083ba838e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -192,16 +192,17 @@ public void indexTranslogOperations( * * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files * conflict with replicas when target is promoted as primary. - * @param listener segment replication event listener */ @Override - public void forceSegmentFileSync(ActionListener listener) { - final String action = SegmentReplicationTargetService.Actions.FORCE_SYNC; + public void forceSegmentFileSync() { final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); - final ForceSyncRequest request = new ForceSyncRequest(requestSeqNo, recoveryId, shardId); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - final ActionListener responseListener = ActionListener.map(listener, r -> null); - retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); + transportService.submitRequest( + targetNode, + SegmentReplicationTargetService.Actions.FORCE_SYNC, + new ForceSyncRequest(requestSeqNo, recoveryId, shardId), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), + EmptyTransportResponseHandler.INSTANCE_SAME + ).txGet(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index b63b84a5c1eab..750e7629783e7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -22,7 +22,6 @@ import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.RunUnderPrimaryPermit; import org.opensearch.indices.recovery.DelayRecoveryException; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.MultiChunkTransfer; @@ -138,23 +137,12 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene ); }; - RunUnderPrimaryPermit.run(() -> { - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); - if (targetShardRouting == null) { - logger.debug( - "delaying replication of {} as it is not listed as assigned to target node {}", - shard.shardId(), - targetNode - ); - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); - } - }, - shard.shardId() + " validating recovery target [" + request.getTargetAllocationId() + "] registered ", - shard, - cancellableThreads, - logger - ); + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); + if (targetShardRouting == null) { + logger.debug("delaying replication of {} as it is not listed as assigned to target node {}", shard.shardId(), targetNode); + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } final StepListener sendFileStep = new StepListener<>(); Set storeFiles = new HashSet<>(Arrays.asList(shard.store().directory().listAll())); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index e7df266ca7133..575e90129418b 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -50,7 +50,6 @@ import org.apache.lucene.util.Constants; import org.junit.Assert; import org.opensearch.Assertions; -import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -939,12 +938,6 @@ public void onResponse(final Releasable releasable) { closeShards(indexShard); } - private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { - PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); - return fut.get(); - } - private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); @@ -997,22 +990,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { AllocationId.newRelocation(routing.allocationId()) ); IndexShardTestCase.updateRoutingEntry(indexShard, routing); - indexShard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(indexShard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); engineClosed = false; break; } @@ -2025,22 +2003,7 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { Thread recoveryThread = new Thread(() -> { latch.countDown(); try { - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -2075,18 +2038,7 @@ public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { shard.relocated( routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> relocationStarted.countDown(), - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } + () -> {} ); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -2175,26 +2127,11 @@ public void run() { AtomicBoolean relocated = new AtomicBoolean(); final Thread recoveryThread = new Thread(() -> { try { - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - relocated.set(true); - } - - @Override - public void onFailure(Exception e) { - relocated.set(false); - fail(e.toString()); - } - } - ); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } + relocated.set(true); }); // ensure we wait for all primary operation locks to be acquired allPrimaryOperationLocksAcquired.await(); @@ -2225,48 +2162,25 @@ public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedE final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, routing); - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting)); closeShards(shard); } - public void testRelocatedSegRepConsumerError() throws IOException, InterruptedException { + public void testRelocatedSegRepError() throws IOException, InterruptedException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, routing); - shard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onFailure(new ReplicationFailedException("Segment replication failed")), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - fail("Expected failure"); - } - - @Override - public void onFailure(Exception e) { - assertTrue(ExceptionsHelper.unwrapCause(e) instanceof ReplicationFailedException); - assertEquals(e.getMessage(), "Segment replication failed"); - } - } + ReplicationFailedException segRepException = expectThrows( + ReplicationFailedException.class, + () -> shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + () -> { throw new ReplicationFailedException("Segment replication failed"); } + ) ); + assertTrue(segRepException.getMessage().equals("Segment replication failed")); closeShards(shard); } @@ -2276,21 +2190,9 @@ public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOE final ShardRouting relocationRouting = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, relocationRouting); IndexShardTestCase.updateRoutingEntry(shard, originalRouting); - shard.relocated( - relocationRouting.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - fail("IllegalIndexShardStateException expected!"); - } - - @Override - public void onFailure(Exception e) { - assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalIndexShardStateException); - } - } + expectThrows( + IllegalIndexShardStateException.class, + () -> shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}) ); closeShards(shard); } @@ -2305,28 +2207,13 @@ public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, Thread relocationThread = new Thread(new AbstractRunnable() { @Override public void onFailure(Exception e) { - fail(e.toString()); + relocationException.set(e); } @Override protected void doRun() throws Exception { cyclicBarrier.await(); - shard.relocated( - relocationRouting.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - relocationException.set(e); - } - } - ); + shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } }); relocationThread.start(); @@ -2374,48 +2261,22 @@ public void testRelocateMissingTarget() throws Exception { final ShardRouting toNode2 = ShardRoutingHelper.relocate(original, "node_2"); IndexShardTestCase.updateRoutingEntry(shard, toNode2); final AtomicBoolean relocated = new AtomicBoolean(); - shard.relocated( - toNode1.getTargetRelocatingShard().allocationId().getId(), - ctx -> relocated.set(true), - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - fail("Expected IllegalStateException!"); - } - @Override - public void onFailure(Exception e) { - assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalStateException); - assertThat( - e.getMessage(), - equalTo( - "relocation target [" - + toNode1.getTargetRelocatingShard().allocationId().getId() - + "] is no longer part of the replication group" - ) - ); - } - } + final IllegalStateException error = expectThrows( + IllegalStateException.class, + () -> shard.relocated(toNode1.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true), () -> {}) + ); + assertThat( + error.getMessage(), + equalTo( + "relocation target [" + + toNode1.getTargetRelocatingShard().allocationId().getId() + + "] is no longer part of the replication group" + ) ); assertFalse(relocated.get()); - shard.relocated( - toNode2.getTargetRelocatingShard().allocationId().getId(), - ctx -> relocated.set(true), - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(relocated.get()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(toNode2.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true), () -> {}); assertTrue(relocated.get()); closeShards(shard); } @@ -2808,22 +2669,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting); - shard.relocated( - inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener() { - @Override - public void onResponse(Void unused) { - assertTrue(shard.isRelocatedPrimary()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + shard.relocated(inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); assertTrue(shard.isRelocatedPrimary()); try { IndexShardTestCase.updateRoutingEntry(shard, origRouting); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 44771faf36871..54e8682f90d22 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -15,8 +15,10 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.ClusterSettings; @@ -54,17 +56,21 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import static java.util.Arrays.asList; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -307,15 +313,12 @@ public void testPrimaryRelocation() throws Exception { ); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); - BiFunction, ActionListener, List> replicatePrimaryFunction = ( - shardList, - listener) -> { + Function, List> replicatePrimaryFunction = (shardList) -> { try { assert shardList.size() >= 2; final IndexShard primary = shardList.get(0); - return replicateSegments(primary, shardList.subList(1, shardList.size()), listener); + return replicateSegments(primary, shardList.subList(1, shardList.size())); } catch (IOException | InterruptedException e) { - listener.onFailure(e); throw new RuntimeException(e); } }; @@ -347,11 +350,12 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { ); updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); - BiFunction, ActionListener, List> replicatePrimaryFunction = ( - shardList, - listener) -> { - listener.onFailure(new IOException("Expected failure")); - return null; + Function, List> replicatePrimaryFunction = (shardList) -> { + try { + throw new IOException("Expected failure"); + } catch (IOException e) { + throw new RuntimeException(e); + } }; Exception e = expectThrows( Exception.class, @@ -366,7 +370,7 @@ public void onDone(ReplicationState state) { @Override public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("Expected failure")); + assertEquals(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), "Expected failure"); } }), true, @@ -374,10 +378,119 @@ public void onFailure(ReplicationState state, ReplicationFailedException e, bool replicatePrimaryFunction ) ); - assertThat(e, hasToString(containsString("Expected failure"))); closeShards(primarySource, primaryTarget); } + // Todo: Remove this test when there is a better mechanism to test a functionality passing in different replication + // strategy. + public void testLockingBeforeAndAfterRelocated() throws Exception { + final IndexShard shard = newStartedShard(true, settings); + final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); + CountDownLatch latch = new CountDownLatch(1); + Thread recoveryThread = new Thread(() -> { + latch.countDown(); + try { + shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) { + // start finalization of recovery + recoveryThread.start(); + latch.await(); + // recovery can only be finalized after we release the current primaryOperationLock + assertFalse(shard.isRelocatedPrimary()); + } + // recovery can be now finalized + recoveryThread.join(); + assertTrue(shard.isRelocatedPrimary()); + final ExecutionException e = expectThrows(ExecutionException.class, () -> acquirePrimaryOperationPermitBlockingly(shard)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + + closeShards(shard); + } + + // Todo: Remove this test when there is a better mechanism to test a functionality passing in different replication + // strategy. + public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { + final IndexShard shard = newStartedShard(true, settings); + final ShardRouting routing = ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); + final CountDownLatch startRecovery = new CountDownLatch(1); + final CountDownLatch relocationStarted = new CountDownLatch(1); + Thread recoveryThread = new Thread(() -> { + try { + startRecovery.await(); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> relocationStarted.countDown(), + () -> {} + ); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + recoveryThread.start(); + + final int numberOfAcquisitions = randomIntBetween(1, 10); + final List assertions = new ArrayList<>(numberOfAcquisitions); + final int recoveryIndex = randomIntBetween(0, numberOfAcquisitions - 1); + + for (int i = 0; i < numberOfAcquisitions; i++) { + final PlainActionFuture onLockAcquired; + if (i < recoveryIndex) { + final AtomicBoolean invoked = new AtomicBoolean(); + onLockAcquired = new PlainActionFuture() { + + @Override + public void onResponse(Releasable releasable) { + invoked.set(true); + releasable.close(); + super.onResponse(releasable); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(); + } + + }; + assertions.add(() -> assertTrue(invoked.get())); + } else if (recoveryIndex == i) { + startRecovery.countDown(); + relocationStarted.await(); + onLockAcquired = new PlainActionFuture<>(); + assertions.add(() -> { + final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + }); + } else { + onLockAcquired = new PlainActionFuture<>(); + assertions.add(() -> { + final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + }); + } + + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE, "i_" + i); + } + + for (final Runnable assertion : assertions) { + assertion.run(); + } + + recoveryThread.join(); + + closeShards(shard); + } + public void testReplicaReceivesLowerGeneration() throws Exception { // when a replica gets incoming segments that are lower than what it currently has on disk. diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index ba54d3eb3dba8..307cb854f000a 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -1117,7 +1117,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) {} @Override - public void forceSegmentFileSync(ActionListener listener) {} + public void forceSegmentFileSync() {} @Override public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {} diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index ad19473380063..6b0f50d80fba2 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -545,7 +545,7 @@ public void recoverReplica( markAsRecovering, inSyncIds, routingTable, - (a, b) -> null + (a) -> null ); OpenSearchIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); computeReplicationTargets(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 3ae79a8a17879..954e36bb38116 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -66,6 +66,7 @@ import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -150,11 +151,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import static org.hamcrest.Matchers.contains; @@ -250,6 +253,12 @@ protected Store createStore(ShardId shardId, IndexSettings indexSettings, Direct return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); } + protected Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { + PlainActionFuture fut = new PlainActionFuture<>(); + indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); + return fut.get(); + } + /** * Creates a new initializing shard. The shard will have its own unique data path. * @@ -838,7 +847,7 @@ protected DiscoveryNode getFakeDiscoNode(String id) { } protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { - recoverReplica(replica, primary, startReplica, (a, b) -> null); + recoverReplica(replica, primary, startReplica, (a) -> null); } /** recovers a replica from the given primary **/ @@ -846,7 +855,7 @@ protected void recoverReplica( IndexShard replica, IndexShard primary, boolean startReplica, - BiFunction, ActionListener, List> replicatePrimaryFunction + Function, List> replicatePrimaryFunction ) throws IOException { recoverReplica( replica, @@ -865,7 +874,7 @@ protected void recoverReplica( final boolean markAsRecovering, final boolean markAsStarted ) throws IOException { - recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a, b) -> null); + recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a) -> null); } /** recovers a replica from the given primary **/ @@ -875,7 +884,7 @@ protected void recoverReplica( final BiFunction targetSupplier, final boolean markAsRecovering, final boolean markAsStarted, - final BiFunction, ActionListener, List> replicatePrimaryFunction + final Function, List> replicatePrimaryFunction ) throws IOException { IndexShardRoutingTable.Builder newRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId()); newRoutingTable.addShard(primary.routingEntry()); @@ -908,7 +917,7 @@ protected final void recoverUnstartedReplica( final boolean markAsRecovering, final Set inSyncIds, final IndexShardRoutingTable routingTable, - final BiFunction, ActionListener, List> replicatePrimaryFunction + final Function, List> replicatePrimaryFunction ) throws IOException { final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); @@ -1319,11 +1328,8 @@ public void getSegmentFiles( * @param replicaShards - Replicas that will be updated. * @return {@link List} List of target components orchestrating replication. */ - public final List replicateSegments( - IndexShard primaryShard, - List replicaShards, - ActionListener... listeners - ) throws IOException, InterruptedException { + public final List replicateSegments(IndexShard primaryShard, List replicaShards) + throws IOException, InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); Map primaryMetadata; try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { @@ -1346,13 +1352,7 @@ public void onReplicationDone(SegmentReplicationState state) { assertTrue(recoveryDiff.missing.isEmpty()); assertTrue(recoveryDiff.different.isEmpty()); assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); - for (ActionListener listener : listeners) { - listener.onResponse(null); - } } catch (Exception e) { - for (ActionListener listener : listeners) { - listener.onFailure(e); - } throw ExceptionsHelper.convertToRuntime(e); } finally { countDownLatch.countDown(); diff --git a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java index 45b11c95b4102..fa8b3c9e3a2c3 100644 --- a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java @@ -46,7 +46,7 @@ import java.util.List; import java.util.concurrent.Executor; -import java.util.function.BiFunction; +import java.util.function.Function; /** * Wraps a {@link RecoveryTarget} to make all remote calls to be executed asynchronously using the provided {@code executor}. @@ -59,14 +59,14 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler { private final IndexShard replica; - private final BiFunction, ActionListener, List> replicatePrimaryFunction; + private final Function, List> replicatePrimaryFunction; public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) { this.executor = executor; this.target = target; this.primary = null; this.replica = null; - this.replicatePrimaryFunction = (a, b) -> null; + this.replicatePrimaryFunction = (a) -> null; } public AsyncRecoveryTarget( @@ -74,7 +74,7 @@ public AsyncRecoveryTarget( Executor executor, IndexShard primary, IndexShard replica, - BiFunction, ActionListener, List> replicatePrimaryFunction + Function, List> replicatePrimaryFunction ) { this.executor = executor; this.target = target; @@ -89,8 +89,8 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { - executor.execute(() -> this.replicatePrimaryFunction.apply(List.of(primary, replica), listener)); + public void forceSegmentFileSync() { + this.replicatePrimaryFunction.apply(List.of(primary, replica)); } @Override From d5e44deda4248566ad951dba81e03cb5f66f70c0 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 18 Jan 2023 10:16:49 -0800 Subject: [PATCH 2/8] Add logs and fix spotlessApply Signed-off-by: Suraj Singh pick da8cb72ab4f Update unit test post rebase --- .../SegmentReplicationRelocationIT.java | 30 ++++++++----------- .../recovery/RecoverySourceHandler.java | 4 +-- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 6f151e1465528..9c89d8aad2a62 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -22,15 +22,12 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexModule; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -281,6 +278,10 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 120L); } + /** + * This test verifies delayed operations are replayed and searchable on target + * + */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { final String primary = internalCluster().startNode(featureFlagSettings()); @@ -321,16 +322,15 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { .actionGet(); assertEquals(clusterHealthResponse.isTimedOut(), false); - // Mock transport service to add behaviour of throwing corruption exception during segment replication process. + // Mock transport service to halt round of segment replication to allow indexing in parallel. MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, replica)); - // Block segment replication to allow adding operations during handoff - CountDownLatch latch = new CountDownLatch(1); + CountDownLatch blockSegRepLatch = new CountDownLatch(1); CountDownLatch relocationLatch = new CountDownLatch(1); mockTransportService.addSendBehavior( internalCluster().getInstance(TransportService.class, primary), (connection, requestId, action, request, options) -> { if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) { - latch.countDown(); + blockSegRepLatch.countDown(); try { relocationLatch.await(); } catch (InterruptedException e) { @@ -340,11 +340,11 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { connection.sendRequest(requestId, action, request, options); } ); - int totalDocCount = 2000; - Runnable relocationThread = () -> { + int totalDocCount = 200; + Thread relocationThread = new Thread(() -> { // Wait for relocation to halt at SegRep. Ingest docs at that point. try { - latch.await(); + blockSegRepLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -353,12 +353,8 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute() ); } - IndexShard indexShard = getIndexShard(replica); - logger.info("--> Active operations {}", indexShard.getActiveOperationsCount()); - assertTrue(indexShard.getActiveOperationsCount() > 0); - // Post ingestion, let SegRep to complete. relocationLatch.countDown(); - }; + }); logger.info("--> relocate the shard from primary to replica"); ActionFuture relocationListener = client().admin() @@ -369,8 +365,8 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { // This thread first waits for recovery to halt during segment replication. After which it ingests data to ensure // have some documents queued. - relocationThread.run(); - + relocationThread.start(); + relocationThread.join(); relocationListener.actionGet(); clusterHealthResponse = client().admin() .cluster() diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 832e17eded8fb..3f0cf544994a1 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -819,7 +819,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis ); if (request.isPrimaryRelocation()) { - logger.trace("performing relocation hand-off"); + logger.info("performing relocation hand-off"); final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabled() ? recoveryTarget::forceSegmentFileSync : () -> {}; @@ -834,7 +834,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis */ } stopWatch.stop(); - logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); + logger.info("finalizing recovery took [{}]", stopWatch.totalTime()); listener.onResponse(null); }, listener::onFailure); } From 790627de80487c8c33416b3e95eea8a3751a3b74 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 18 Jan 2023 23:04:53 -0800 Subject: [PATCH 3/8] Update unit test post rebase Signed-off-by: Suraj Singh --- .../index/shard/IndexShardTests.java | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 575e90129418b..0abbcd1595bb1 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2703,27 +2703,13 @@ public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throw IndexShardTestCase.updateRoutingEntry(indexShard, routing); assertTrue(indexShard.isSyncNeeded()); try { - indexShard.relocated( - routing.getTargetRelocatingShard().allocationId().getId(), - primaryContext -> {}, - r -> r.onResponse(null), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - assertTrue(indexShard.isRelocatedPrimary()); - assertFalse(indexShard.isSyncNeeded()); - assertFalse(indexShard.getReplicationTracker().isPrimaryMode()); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - } - ); + indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); } catch (InterruptedException e) { throw new RuntimeException(e); } + assertTrue(indexShard.isRelocatedPrimary()); + assertFalse(indexShard.isSyncNeeded()); + assertFalse(indexShard.getReplicationTracker().isPrimaryMode()); closeShards(indexShard); } From e2df180fef15e49255f76eb672962227cf0c352e Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 19 Jan 2023 10:17:51 -0800 Subject: [PATCH 4/8] Update integration tests Signed-off-by: Suraj Singh --- .../SegmentReplicationRelocationIT.java | 144 +++++++++--------- .../recovery/RecoverySourceHandler.java | 2 +- .../test/OpenSearchIntegTestCase.java | 118 ++++++++++++++ 3 files changed, 191 insertions(+), 73 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 9c89d8aad2a62..11636872be512 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -41,14 +41,15 @@ public class SegmentReplicationRelocationIT extends SegmentReplicationIT { private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); - private void createIndex() { + private void createIndex(int replicaCount) { prepareCreate( INDEX_NAME, Settings.builder() .put("index.number_of_shards", 1) .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put("index.number_of_replicas", 1) + .put("index.number_of_replicas", replicaCount) + .put("index.refresh_interval", -1) ).get(); } @@ -58,16 +59,21 @@ private void createIndex() { */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryRelocation() throws Exception { - final String oldPrimary = internalCluster().startNode(); - createIndex(); - final String replica = internalCluster().startNode(); + final String oldPrimary = internalCluster().startNode(featureFlagSettings()); + createIndex(1); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(0, 200); - ingestDocs(initialDocCount); - - logger.info("--> verifying count {}", initialDocCount); - assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + final int initialDocCount = scaledRandomIntBetween(100, 1000); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 0; i < initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } logger.info("--> start another node"); final String newPrimary = internalCluster().startNode(); @@ -99,28 +105,29 @@ public void testPrimaryRelocation() throws Exception { logger.info("--> get the state, verify shard 1 primary moved from node1 to node2"); ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); - - logger.info("--> state {}", state); - assertEquals( state.getRoutingNodes().node(state.nodes().resolveNode(newPrimary).getId()).iterator().next().state(), ShardRoutingState.STARTED ); - final int finalDocCount = initialDocCount; - ingestDocs(finalDocCount); + for (int i = initialDocCount; i < 2 * initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); refresh(INDEX_NAME); - - logger.info("--> verifying count again {}", initialDocCount + finalDocCount); - client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount( - client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); - assertHitCount( - client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); + logger.info("--> verifying count again {}", 2 * initialDocCount); + assertHitCount(client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2 * initialDocCount); + waitForReplicaUpdate(); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2 * initialDocCount); } /** @@ -128,18 +135,22 @@ public void testPrimaryRelocation() throws Exception { * failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the * replicas. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryRelocationWithSegRepFailure() throws Exception { - final String oldPrimary = internalCluster().startNode(); - createIndex(); - final String replica = internalCluster().startNode(); + final String oldPrimary = internalCluster().startNode(featureFlagSettings()); + createIndex(1); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(1, 100); - ingestDocs(initialDocCount); - - logger.info("--> verifying count {}", initialDocCount); - assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + final int initialDocCount = scaledRandomIntBetween(100, 1000); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 0; i < initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } logger.info("--> start another node"); final String newPrimary = internalCluster().startNode(); @@ -184,20 +195,25 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { .actionGet(); assertEquals(clusterHealthResponse.isTimedOut(), false); - final int finalDocCount = initialDocCount; - ingestDocs(finalDocCount); - refresh(INDEX_NAME); + for (int i = initialDocCount; i < 2 * initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } logger.info("Verify older primary is still refreshing replica nodes"); - client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount( - client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); - assertHitCount( - client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); + refresh(INDEX_NAME); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2 * initialDocCount); + waitForReplicaUpdate(); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2 * initialDocCount); } /** @@ -205,16 +221,8 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { * */ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { - final String primary = internalCluster().startNode(); - prepareCreate( - INDEX_NAME, - Settings.builder() - .put("index.number_of_shards", 1) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put("index.number_of_replicas", 0) - .put("index.refresh_interval", -1) - ).get(); + final String primary = internalCluster().startNode(featureFlagSettings()); + createIndex(0); for (int i = 0; i < 10; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); @@ -250,7 +258,7 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E .prepareReroute() .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) .execute(); - for (int i = 20; i < 120; i++) { + for (int i = 20; i < 1000; i++) { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) @@ -275,25 +283,16 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); - assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 120L); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 1000L); } /** * This test verifies delayed operations are replayed and searchable on target * */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { final String primary = internalCluster().startNode(featureFlagSettings()); - prepareCreate( - INDEX_NAME, - Settings.builder() - .put("index.number_of_shards", 1) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put("index.number_of_replicas", 0) - .put("index.refresh_interval", -1) - ).get(); + createIndex(0); for (int i = 0; i < 10; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); @@ -340,7 +339,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { connection.sendRequest(requestId, action, request, options); } ); - int totalDocCount = 200; + int totalDocCount = 2000; Thread relocationThread = new Thread(() -> { // Wait for relocation to halt at SegRep. Ingest docs at that point. try { @@ -383,6 +382,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); + refresh(INDEX_NAME); assertEquals(totalDocCount, client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 3f0cf544994a1..4f3e68aba16f3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -819,7 +819,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis ); if (request.isPrimaryRelocation()) { - logger.info("performing relocation hand-off"); + logger.trace("performing relocation hand-off"); final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabled() ? recoveryTarget::forceSegmentFileSync : () -> {}; diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 86123012fee5d..9d561a1d60030 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -62,6 +62,7 @@ import org.opensearch.action.admin.indices.segments.IndexSegments; import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.opensearch.action.bulk.BulkRequestBuilder; @@ -84,6 +85,7 @@ import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; @@ -125,6 +127,7 @@ import org.opensearch.http.HttpInfo; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.MergePolicyConfig; import org.opensearch.index.MergeSchedulerConfig; @@ -132,10 +135,12 @@ import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.Segment; import org.opensearch.index.mapper.MockFieldFilterPlugin; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.IndicesRequestCache; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.store.IndicesStore; import org.opensearch.monitor.os.OsInfo; import org.opensearch.node.NodeMocksPlugin; @@ -185,6 +190,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; @@ -1016,6 +1022,118 @@ public ClusterHealthStatus waitForRelocation() { return waitForRelocation(null); } + protected void assertSegmentStats(int numberOfReplicas) throws IOException { + final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); + + List segmentsByIndex = getShardSegments(indicesSegmentResponse); + + // There will be an entry in the list for each index. + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegments.size() + ); + + for (ShardSegments shardSegment : replicaShardSegments) { + final Map latestReplicaSegments = getLatestSegments(shardSegment); + for (Segment replicaSegment : latestReplicaSegments.values()) { + final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); + assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); + assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); + assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); + assertEquals(replicaSegment.getSize(), primarySegment.getSize()); + } + + // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. + // This ensures the previous commit point is not wiped. + final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); + ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); + final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); + IndexShard indexShard = getIndexShard(replicaNode.getName(), replicaShardRouting.getIndexName()); + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + indexShard.store().readLastCommittedSegmentsInfo(); + } + } + } + + protected IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); + } + + private Map getLatestSegments(ShardSegments segments) { + final Optional generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare); + final Long latestPrimaryGen = generation.get(); + return segments.getSegments() + .stream() + .filter(s -> s.getGeneration() == latestPrimaryGen) + .collect(Collectors.toMap(Segment::getName, Function.identity())); + } + + private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { + return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + } + + private List getShardSegments(IndicesSegmentResponse indicesSegmentResponse) { + return indicesSegmentResponse.getIndices() + .values() + .stream() // get list of IndexSegments + .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group + .map(IndexShardSegments::getShards) // get list of segments across replication group + .collect(Collectors.toList()); + } + + /** + * Used for segment replication only + * + * Waits until the replica is caught up to the latest primary segments gen. + * @throws Exception if assertion fails + */ + protected void waitForReplicaUpdate() throws Exception { + // wait until the replica has the latest segment generation. + assertBusy(() -> { + final IndicesSegmentResponse indicesSegmentResponse = client().admin() + .indices() + .segments(new IndicesSegmentsRequest()) + .actionGet(); + List segmentsByIndex = getShardSegments(indicesSegmentResponse); + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + // if we don't have any segments yet, proceed. + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); + if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); + for (ShardSegments shardSegments : replicaShardSegments) { + logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments()); + final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() + .stream() + .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); + assertTrue(isReplicaCaughtUpToPrimary); + } + } + } + }); + } + + /** * Waits for all relocating shards to become active and the cluster has reached the given health status * using the cluster health API. From f336cf3f7a2268af2c61544d488ddf08dd1153b1 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 19 Jan 2023 13:31:46 -0800 Subject: [PATCH 5/8] Mute testPrimaryRelocationWithSegRepFailure Signed-off-by: Suraj Singh --- .../indices/replication/SegmentReplicationRelocationIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 11636872be512..42cc07ea765fb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -135,6 +135,7 @@ public void testPrimaryRelocation() throws Exception { * failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the * replicas. */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryRelocationWithSegRepFailure() throws Exception { final String oldPrimary = internalCluster().startNode(featureFlagSettings()); createIndex(1); From 419c3aa8dd5e302d5f28185613236b2cde533c05 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 19 Jan 2023 14:45:31 -0800 Subject: [PATCH 6/8] Remove extra closing bracket after main merge Signed-off-by: Suraj Singh --- .../indices/replication/SegmentReplicationRelocationIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 42cc07ea765fb..52975f0a003a0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -59,7 +59,7 @@ private void createIndex(int replicaCount) { */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryRelocation() throws Exception { - final String oldPrimary = internalCluster().startNode(featureFlagSettings()); + final String oldPrimary = internalCluster().startNode(); createIndex(1); final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); From fb0b5f2fb9be32984ab23af1f4effae69608ffe9 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 27 Jan 2023 09:15:38 -0800 Subject: [PATCH 7/8] PR feedback Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationBaseIT.java | 180 ++++++++++++++++++ .../replication/SegmentReplicationIT.java | 172 +---------------- .../SegmentReplicationRelocationIT.java | 93 ++++----- .../test/OpenSearchIntegTestCase.java | 118 ------------ 4 files changed, 232 insertions(+), 331 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java new file mode 100644 index 0000000000000..5f7433126db57 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.Nullable; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static org.opensearch.test.OpenSearchIntegTestCase.client; +import static org.opensearch.test.OpenSearchTestCase.assertBusy; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase { + + protected static final String INDEX_NAME = "test-idx-1"; + protected static final int SHARD_COUNT = 1; + protected static final int REPLICA_COUNT = 1; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); + } + + @Override + protected Collection> nodePlugins() { + return asList(MockTransportService.TestPlugin.class); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + @Nullable + protected ShardRouting getShardRoutingForNodeName(String nodeName) { + final ClusterState state = getClusterState(); + for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) { + for (ShardRouting shardRouting : shardRoutingTable.activeShards()) { + final String nodeId = shardRouting.currentNodeId(); + final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId); + if (discoveryNode.getName().equals(nodeName)) { + return shardRouting; + } + } + } + return null; + } + + protected void assertDocCounts(int expectedDocCount, String... nodeNames) { + for (String node : nodeNames) { + assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount); + } + } + + protected ClusterState getClusterState() { + return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + } + + protected DiscoveryNode getNodeContainingPrimaryShard() { + final ClusterState state = getClusterState(); + final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); + return state.nodes().resolveNode(primaryShard.currentNodeId()); + } + + /** + * Waits until all given nodes have at least the expected docCount. + * + * @param docCount - Expected Doc count. + * @param nodes - List of node names. + */ + protected void waitForSearchableDocs(long docCount, List nodes) throws Exception { + // wait until the replica has the latest segment generation. + assertBusy(() -> { + for (String node : nodes) { + final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + final long hits = response.getHits().getTotalHits().value; + if (hits < docCount) { + fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); + } + } + }, 1, TimeUnit.MINUTES); + } + + protected void waitForSearchableDocs(long docCount, String... nodes) throws Exception { + waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); + } + + protected void verifyStoreContent() throws Exception { + assertBusy(() -> { + final ClusterState clusterState = getClusterState(); + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); + final String indexName = primaryRouting.getIndexName(); + final List replicaRouting = shardRoutingTable.replicaShards(); + final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName); + final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); + for (ShardRouting replica : replicaRouting) { + IndexShard replicaShard = getIndexShard(clusterState, replica, indexName); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primarySegmentMetadata, + replicaShard.getSegmentMetadataMap() + ); + if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { + fail( + "Expected no missing or different segments between primary and replica but diff was missing: " + + recoveryDiff.missing + + " Different: " + + recoveryDiff.different + + " Primary Replication Checkpoint : " + + primaryShard.getLatestReplicationCheckpoint() + + " Replica Replication Checkpoint: " + + replicaShard.getLatestReplicationCheckpoint() + ); + } + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + replicaShard.store().readLastCommittedSegmentsInfo(); + } + } + } + }, 1, TimeUnit.MINUTES); + } + + private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) { + return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName); + } + + protected IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 043a5850aef05..0101379321932 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -11,47 +11,29 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.IndexRoutingTable; -import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; -import org.opensearch.common.Nullable; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.index.Index; import org.opensearch.index.IndexModule; -import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.store.Store; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.plugins.Plugin; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; -import java.util.Collection; -import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.Optional; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.hamcrest.Matchers.equalTo; @@ -61,56 +43,8 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SegmentReplicationIT extends OpenSearchIntegTestCase { - - protected static final String INDEX_NAME = "test-idx-1"; - protected static final int SHARD_COUNT = 1; - protected static final int REPLICA_COUNT = 1; - - @Override - protected Collection> nodePlugins() { - return asList(MockTransportService.TestPlugin.class); - } - - @Override - public Settings indexSettings() { - return Settings.builder() - .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build(); - } - - @Override - protected boolean addMockInternalEngine() { - return false; - } - - @Override - protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); - } - - public void ingestDocs(int docCount) throws Exception { - try ( - BackgroundIndexer indexer = new BackgroundIndexer( - INDEX_NAME, - "_doc", - client(), - -1, - RandomizedTest.scaledRandomIntBetween(2, 5), - false, - random() - ) - ) { - indexer.start(docCount); - waitForDocs(docCount, indexer); - refresh(INDEX_NAME); - } - } - +public class SegmentReplicationIT extends SegmentReplicationBaseIT { + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -660,106 +594,4 @@ public void testDropPrimaryDuringReplication() throws Exception { verifyStoreContent(); } } - - /** - * Waits until all given nodes have at least the expected docCount. - * - * @param docCount - Expected Doc count. - * @param nodes - List of node names. - */ - private void waitForSearchableDocs(long docCount, List nodes) throws Exception { - // wait until the replica has the latest segment generation. - assertBusy(() -> { - for (String node : nodes) { - final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); - final long hits = response.getHits().getTotalHits().value; - if (hits < docCount) { - fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); - } - } - }, 1, TimeUnit.MINUTES); - } - - private void waitForSearchableDocs(long docCount, String... nodes) throws Exception { - waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); - } - - private void verifyStoreContent() throws Exception { - assertBusy(() -> { - final ClusterState clusterState = getClusterState(); - for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { - for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { - final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); - final String indexName = primaryRouting.getIndexName(); - final List replicaRouting = shardRoutingTable.replicaShards(); - final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName); - final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); - for (ShardRouting replica : replicaRouting) { - IndexShard replicaShard = getIndexShard(clusterState, replica, indexName); - final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( - primarySegmentMetadata, - replicaShard.getSegmentMetadataMap() - ); - if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { - fail( - "Expected no missing or different segments between primary and replica but diff was missing: " - + recoveryDiff.missing - + " Different: " - + recoveryDiff.different - + " Primary Replication Checkpoint : " - + primaryShard.getLatestReplicationCheckpoint() - + " Replica Replication Checkpoint: " - + replicaShard.getLatestReplicationCheckpoint() - ); - } - // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - replicaShard.store().readLastCommittedSegmentsInfo(); - } - } - } - }, 1, TimeUnit.MINUTES); - } - - private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) { - return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName); - } - - private IndexShard getIndexShard(String node, String indexName) { - final Index index = resolveIndex(indexName); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexService indexService = indicesService.indexServiceSafe(index); - final Optional shardId = indexService.shardIds().stream().findFirst(); - return indexService.getShard(shardId.get()); - } - - @Nullable - private ShardRouting getShardRoutingForNodeName(String nodeName) { - final ClusterState state = getClusterState(); - for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) { - for (ShardRouting shardRouting : shardRoutingTable.activeShards()) { - final String nodeId = shardRouting.currentNodeId(); - final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId); - if (discoveryNode.getName().equals(nodeName)) { - return shardRouting; - } - } - } - return null; - } - - private void assertDocCounts(int expectedDocCount, String... nodeNames) { - for (String node : nodeNames) { - assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount); - } - } - - private ClusterState getClusterState() { - return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); - } - - private DiscoveryNode getNodeContainingPrimaryShard() { - final ClusterState state = getClusterState(); - final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); - return state.nodes().resolveNode(primaryShard.currentNodeId()); - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 52975f0a003a0..1bb822a2bf40f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -38,7 +39,7 @@ * This test class verifies primary shard relocation with segment replication as replication strategy. */ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SegmentReplicationRelocationIT extends SegmentReplicationIT { +public class SegmentReplicationRelocationIT extends SegmentReplicationBaseIT { private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); private void createIndex(int replicaCount) { @@ -57,11 +58,10 @@ private void createIndex(int replicaCount) { * This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before * relocation and after relocation documents are indexed and documents are verified */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryRelocation() throws Exception { final String oldPrimary = internalCluster().startNode(); createIndex(1); - final String replica = internalCluster().startNode(featureFlagSettings()); + final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(100, 1000); final List> pendingIndexResponses = new ArrayList<>(); @@ -123,11 +123,10 @@ public void testPrimaryRelocation() throws Exception { client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); - refresh(INDEX_NAME); - logger.info("--> verifying count again {}", 2 * initialDocCount); - assertHitCount(client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2 * initialDocCount); - waitForReplicaUpdate(); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2 * initialDocCount); + flushAndRefresh(INDEX_NAME); + logger.info("--> verify count again {}", 2 * initialDocCount); + waitForSearchableDocs(2 * initialDocCount, newPrimary, replica); + verifyStoreContent(); } /** @@ -135,11 +134,10 @@ public void testPrimaryRelocation() throws Exception { * failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the * replicas. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryRelocationWithSegRepFailure() throws Exception { - final String oldPrimary = internalCluster().startNode(featureFlagSettings()); + final String oldPrimary = internalCluster().startNode(); createIndex(1); - final String replica = internalCluster().startNode(featureFlagSettings()); + final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(100, 1000); final List> pendingIndexResponses = new ArrayList<>(); @@ -207,14 +205,13 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { } logger.info("Verify older primary is still refreshing replica nodes"); - refresh(INDEX_NAME); assertBusy(() -> { client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); - assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2 * initialDocCount); - waitForReplicaUpdate(); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2 * initialDocCount); + flushAndRefresh(INDEX_NAME); + waitForSearchableDocs(2 * initialDocCount, oldPrimary, replica); + verifyStoreContent(); } /** @@ -222,9 +219,11 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { * */ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { - final String primary = internalCluster().startNode(featureFlagSettings()); - createIndex(0); - + final String primary = internalCluster().startNode(); + createIndex(1); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int totalDocCount = 1000; for (int i = 0; i < 10; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } @@ -243,12 +242,12 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E ); } - final String replica = internalCluster().startNode(); + final String newPrimary = internalCluster().startNode(); ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("2") + .setWaitForNodes("3") .execute() .actionGet(); assertEquals(clusterHealthResponse.isTimedOut(), false); @@ -257,9 +256,9 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E ActionFuture relocationListener = client().admin() .cluster() .prepareReroute() - .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, newPrimary)) .execute(); - for (int i = 20; i < 1000; i++) { + for (int i = 20; i < totalDocCount; i++) { pendingIndexResponses.add( client().prepareIndex(INDEX_NAME) .setId(Integer.toString(i)) @@ -284,16 +283,23 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); - assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 1000L); + flushAndRefresh(INDEX_NAME); + waitForSearchableDocs(totalDocCount, newPrimary, replica); + verifyStoreContent(); } /** - * This test verifies delayed operations are replayed and searchable on target + * This test verifies delayed operations during primary handoff are replayed and searchable. It does so by halting + * segment replication which is performed while holding primary indexing permits which results in queuing of + * operations during handoff. The test verifies all docs ingested are searchable on new primary. * */ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { - final String primary = internalCluster().startNode(featureFlagSettings()); - createIndex(0); + final String primary = internalCluster().startNode(); + createIndex(1); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int totalDocCount = 2000; for (int i = 0; i < 10; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); @@ -312,27 +318,28 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { .execute() ); } - final String replica = internalCluster().startNode(featureFlagSettings()); + final String newPrimary = internalCluster().startNode(); ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("2") + .setWaitForNodes("3") .execute() .actionGet(); assertEquals(clusterHealthResponse.isTimedOut(), false); + ensureGreen(INDEX_NAME); - // Mock transport service to halt round of segment replication to allow indexing in parallel. - MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, replica)); + // Get mock transport service from newPrimary, halt recovery during segment replication (during handoff) to allow indexing in parallel. + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, newPrimary)); CountDownLatch blockSegRepLatch = new CountDownLatch(1); - CountDownLatch relocationLatch = new CountDownLatch(1); - mockTransportService.addSendBehavior( + CountDownLatch waitForIndexingLatch = new CountDownLatch(1); + mockTargetTransportService.addSendBehavior( internalCluster().getInstance(TransportService.class, primary), (connection, requestId, action, request, options) -> { if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) { blockSegRepLatch.countDown(); try { - relocationLatch.await(); + waitForIndexingLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -340,8 +347,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { connection.sendRequest(requestId, action, request, options); } ); - int totalDocCount = 2000; - Thread relocationThread = new Thread(() -> { + Thread indexingThread = new Thread(() -> { // Wait for relocation to halt at SegRep. Ingest docs at that point. try { blockSegRepLatch.await(); @@ -353,20 +359,20 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute() ); } - relocationLatch.countDown(); + waitForIndexingLatch.countDown(); }); - logger.info("--> relocate the shard from primary to replica"); + logger.info("--> relocate the shard from primary to newPrimary"); ActionFuture relocationListener = client().admin() .cluster() .prepareReroute() - .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, newPrimary)) .execute(); // This thread first waits for recovery to halt during segment replication. After which it ingests data to ensure - // have some documents queued. - relocationThread.start(); - relocationThread.join(); + // documents are queued. + indexingThread.start(); + indexingThread.join(); relocationListener.actionGet(); clusterHealthResponse = client().admin() .cluster() @@ -383,7 +389,8 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { client().admin().indices().prepareRefresh().execute().actionGet(); assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); - refresh(INDEX_NAME); - assertEquals(totalDocCount, client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value); + flushAndRefresh(INDEX_NAME); + waitForSearchableDocs(totalDocCount, replica, newPrimary); + verifyStoreContent(); } } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 9d561a1d60030..86123012fee5d 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -62,7 +62,6 @@ import org.opensearch.action.admin.indices.segments.IndexSegments; import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; -import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.opensearch.action.bulk.BulkRequestBuilder; @@ -85,7 +84,6 @@ import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; @@ -127,7 +125,6 @@ import org.opensearch.http.HttpInfo; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; -import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.MergePolicyConfig; import org.opensearch.index.MergeSchedulerConfig; @@ -135,12 +132,10 @@ import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.Segment; import org.opensearch.index.mapper.MockFieldFilterPlugin; -import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.IndicesRequestCache; -import org.opensearch.indices.IndicesService; import org.opensearch.indices.store.IndicesStore; import org.opensearch.monitor.os.OsInfo; import org.opensearch.node.NodeMocksPlugin; @@ -190,7 +185,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; @@ -1022,118 +1016,6 @@ public ClusterHealthStatus waitForRelocation() { return waitForRelocation(null); } - protected void assertSegmentStats(int numberOfReplicas) throws IOException { - final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); - - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - - // There will be an entry in the list for each index. - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - - // Separate Primary & replica shards ShardSegments. - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - - assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - - assertEquals( - "There should be a ShardSegment entry for each replica in the replicationGroup", - numberOfReplicas, - replicaShardSegments.size() - ); - - for (ShardSegments shardSegment : replicaShardSegments) { - final Map latestReplicaSegments = getLatestSegments(shardSegment); - for (Segment replicaSegment : latestReplicaSegments.values()) { - final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); - assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); - assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); - assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); - assertEquals(replicaSegment.getSize(), primarySegment.getSize()); - } - - // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. - // This ensures the previous commit point is not wiped. - final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); - ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); - final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); - IndexShard indexShard = getIndexShard(replicaNode.getName(), replicaShardRouting.getIndexName()); - // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - indexShard.store().readLastCommittedSegmentsInfo(); - } - } - } - - protected IndexShard getIndexShard(String node, String indexName) { - final Index index = resolveIndex(indexName); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexService indexService = indicesService.indexServiceSafe(index); - final Optional shardId = indexService.shardIds().stream().findFirst(); - return indexService.getShard(shardId.get()); - } - - private Map getLatestSegments(ShardSegments segments) { - final Optional generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare); - final Long latestPrimaryGen = generation.get(); - return segments.getSegments() - .stream() - .filter(s -> s.getGeneration() == latestPrimaryGen) - .collect(Collectors.toMap(Segment::getName, Function.identity())); - } - - private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { - return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); - } - - private List getShardSegments(IndicesSegmentResponse indicesSegmentResponse) { - return indicesSegmentResponse.getIndices() - .values() - .stream() // get list of IndexSegments - .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group - .map(IndexShardSegments::getShards) // get list of segments across replication group - .collect(Collectors.toList()); - } - - /** - * Used for segment replication only - * - * Waits until the replica is caught up to the latest primary segments gen. - * @throws Exception if assertion fails - */ - protected void waitForReplicaUpdate() throws Exception { - // wait until the replica has the latest segment generation. - assertBusy(() -> { - final IndicesSegmentResponse indicesSegmentResponse = client().admin() - .indices() - .segments(new IndicesSegmentsRequest()) - .actionGet(); - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - // if we don't have any segments yet, proceed. - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); - for (ShardSegments shardSegments : replicaShardSegments) { - logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments()); - final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() - .stream() - .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); - assertTrue(isReplicaCaughtUpToPrimary); - } - } - } - }); - } - - /** * Waits for all relocating shards to become active and the cluster has reached the given health status * using the cluster health API. From 2084a842be396a00f8a87750666f5b5a5521f16c Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 27 Jan 2023 13:54:52 -0800 Subject: [PATCH 8/8] Spotless fix Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationRelocationIT.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 1bb822a2bf40f..5b0948dace75d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -31,9 +31,6 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; /** * This test class verifies primary shard relocation with segment replication as replication strategy. @@ -329,8 +326,12 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception { assertEquals(clusterHealthResponse.isTimedOut(), false); ensureGreen(INDEX_NAME); - // Get mock transport service from newPrimary, halt recovery during segment replication (during handoff) to allow indexing in parallel. - MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, newPrimary)); + // Get mock transport service from newPrimary, halt recovery during segment replication (during handoff) to allow indexing in + // parallel. + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + newPrimary + )); CountDownLatch blockSegRepLatch = new CountDownLatch(1); CountDownLatch waitForIndexingLatch = new CountDownLatch(1); mockTargetTransportService.addSendBehavior(