diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index d9120e1a6b37f..ebc2e59fa5a30 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -432,17 +432,42 @@ public void testReplicaAllocatorTimeout() { assertEquals(-1, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis()); } - public void testCollectTimedOutShardsAndScheduleReroute() throws InterruptedException { + public void testCollectTimedOutShardsAndScheduleReroute_Success() throws InterruptedException { createIndexAndUpdateClusterState(2, 5, 2); TestThreadPool threadPool = new TestThreadPool(getTestName()); ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); final RerouteService rerouteService = (reason, priority, listener) -> { - if (randomBoolean()) { - listener.onFailure(new OpenSearchException("simulated")); - } else { - listener.onResponse(clusterService.state()); - } + listener.onResponse(clusterService.state()); + assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); + assertEquals("reroute after existing shards allocator timed out", reason); + assertEquals(Priority.HIGH, priority); + rerouteLatch.countDown(); + }; + CountDownLatch timedOutShardsLatch = new CountDownLatch(20); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); + testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + executor.run(); + assertEquals(timedOutShardsLatch.getCount(), 10); + assertEquals(1, rerouteLatch.getCount()); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + executor.run(); + assertEquals(timedOutShardsLatch.getCount(), 0); + assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners + final boolean terminated = terminate(threadPool); + assert terminated; + clusterService.close(); + } + + public void testCollectTimedOutShardsAndScheduleReroute_Failure() throws InterruptedException { + createIndexAndUpdateClusterState(2, 5, 2); + TestThreadPool threadPool = new TestThreadPool(getTestName()); + ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); + final CountDownLatch rerouteLatch = new CountDownLatch(2); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onFailure(new OpenSearchException("simulated")); assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); assertEquals("reroute after existing shards allocator timed out", reason); assertEquals(Priority.HIGH, priority);