Skip to content

Commit

Permalink
Randomly fail reroute
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Sep 4, 2024
1 parent 0278778 commit 00b809e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.cluster.routing.allocation.allocator;

import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterInfo;
import org.opensearch.cluster.ClusterName;
Expand Down Expand Up @@ -101,7 +102,11 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOutAndRerouteNotScheduled(
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
rerouteScheduled.compareAndSet(false, true);
Expand Down Expand Up @@ -139,7 +144,11 @@ public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduled() {
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
rerouteScheduled.compareAndSet(false, true);
Expand Down Expand Up @@ -178,7 +187,11 @@ public void testAllocatePartialPrimaryShardsUntilTimedOutAndRerouteScheduled() {
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
rerouteScheduled.compareAndSet(false, true);
Expand Down Expand Up @@ -218,7 +231,11 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOutAndR
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
rerouteScheduled.compareAndSet(false, true);
Expand Down Expand Up @@ -261,7 +278,11 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotSched
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
rerouteScheduled.compareAndSet(false, true);
Expand Down Expand Up @@ -299,7 +320,11 @@ public void testNoShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled()
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
rerouteScheduled.compareAndSet(false, true);
Expand Down Expand Up @@ -340,7 +365,11 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteSchedul
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
rerouteScheduled.compareAndSet(false, true);
Expand Down Expand Up @@ -381,7 +410,11 @@ public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() {
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
rerouteScheduled.compareAndSet(false, true);
Expand Down Expand Up @@ -423,7 +456,11 @@ public void testClusterNotRebalancedWhenTimedOutAndRerouteScheduled() {
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
rerouteScheduled.compareAndSet(false, true);
Expand Down Expand Up @@ -479,7 +516,11 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
rerouteScheduled.compareAndSet(false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.ClusterInfo;
Expand Down Expand Up @@ -437,7 +438,11 @@ public void testCollectTimedOutShardsAndScheduleReroute() throws InterruptedExce
ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);
final CountDownLatch rerouteLatch = new CountDownLatch(2);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L));
assertEquals("reroute after existing shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
Expand All @@ -454,7 +459,7 @@ public void testCollectTimedOutShardsAndScheduleReroute() throws InterruptedExce
executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false);
executor.run();
assertEquals(timedOutShardsLatch.getCount(), 0);
assertEquals(0, rerouteLatch.getCount());
assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners
final boolean terminated = terminate(threadPool);
assert terminated;
clusterService.close();
Expand Down

0 comments on commit 00b809e

Please sign in to comment.