diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index ee50074bd9f5c..45a0bd7b18afd 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.routing.allocation.allocator; +import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterName; @@ -101,7 +102,11 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOutAndRerouteNotScheduled( ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -139,7 +144,11 @@ public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduled() { ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -178,7 +187,11 @@ public void testAllocatePartialPrimaryShardsUntilTimedOutAndRerouteScheduled() { ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -218,7 +231,11 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOutAndR ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -261,7 +278,11 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotSched ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -299,7 +320,11 @@ public void testNoShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled() ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -340,7 +365,11 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteSchedul ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -381,7 +410,11 @@ public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() { ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -423,7 +456,11 @@ public void testClusterNotRebalancedWhenTimedOutAndRerouteScheduled() { ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); @@ -479,7 +516,11 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertEquals("reroute after balanced shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); rerouteScheduled.compareAndSet(false, true); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index 61b53d688d330..d9120e1a6b37f 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.cluster.ClusterInfo; @@ -437,7 +438,11 @@ public void testCollectTimedOutShardsAndScheduleReroute() throws InterruptedExce ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onResponse(clusterService.state()); + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); assertEquals("reroute after existing shards allocator timed out", reason); assertEquals(Priority.HIGH, priority); @@ -454,7 +459,7 @@ public void testCollectTimedOutShardsAndScheduleReroute() throws InterruptedExce executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 0); - assertEquals(0, rerouteLatch.getCount()); + assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners final boolean terminated = terminate(threadPool); assert terminated; clusterService.close();