From feca82b6e6bb5cad0e66b965a5737183113d8754 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Mon, 11 Nov 2024 15:58:50 +0100 Subject: [PATCH] Do not pass ThreadPool to DesiredBalanceComputer (#116590) Relates https://github.com/elastic/elasticsearch/pull/115511#discussion_r1814819721. `ThreadPool` is used here only to get time. (I've extracted this out of https://github.com/elastic/elasticsearch/pull/116333). --- .../allocator/DesiredBalanceComputer.java | 7 +-- .../DesiredBalanceShardsAllocator.java | 2 +- ...nsportDeleteDesiredBalanceActionTests.java | 2 +- .../DesiredBalanceComputerTests.java | 57 +++++++++---------- .../DesiredBalanceShardsAllocatorTests.java | 20 +++++-- 5 files changed, 44 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java index 42240a996c531..682dc85ccd00f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java @@ -26,7 +26,6 @@ import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.HashMap; @@ -74,11 +73,7 @@ public class DesiredBalanceComputer { private TimeValue progressLogInterval; private long maxBalanceComputationTimeDuringIndexCreationMillis; - public DesiredBalanceComputer(ClusterSettings clusterSettings, ThreadPool threadPool, ShardsAllocator delegateAllocator) { - this(clusterSettings, delegateAllocator, threadPool::relativeTimeInMillis); - } - - DesiredBalanceComputer(ClusterSettings clusterSettings, ShardsAllocator delegateAllocator, LongSupplier timeSupplierMillis) { + public DesiredBalanceComputer(ClusterSettings clusterSettings, LongSupplier timeSupplierMillis, ShardsAllocator delegateAllocator) { this.delegateAllocator = delegateAllocator; this.timeSupplierMillis = timeSupplierMillis; clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 0cfb3af87f012..5ccb59e29d7dc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -91,7 +91,7 @@ public DesiredBalanceShardsAllocator( delegateAllocator, threadPool, clusterService, - new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator), + new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegateAllocator), reconciler, telemetryProvider ); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java index 17fab91d97cad..bb4aa9beeb42e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java @@ -101,7 +101,7 @@ public void testDeleteDesiredBalance() throws Exception { var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings); var delegate = new BalancedShardsAllocator(); - var computer = new DesiredBalanceComputer(clusterSettings, threadPool, delegate) { + var computer = new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegate) { final AtomicReference lastComputationInput = new AtomicReference<>(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java index 56a687646b364..51401acabb0ac 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java @@ -53,7 +53,6 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.test.MockLog; -import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.HashMap; @@ -85,8 +84,6 @@ import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class DesiredBalanceComputerTests extends ESAllocationTestCase { @@ -1205,43 +1202,43 @@ public void testShouldLogComputationIteration() { } private void checkIterationLogging(int iterations, long eachIterationDuration, MockLog.AbstractEventExpectation expectation) { - - var mockThreadPool = mock(ThreadPool.class); var currentTime = new AtomicLong(0L); - when(mockThreadPool.relativeTimeInMillis()).thenAnswer(invocation -> currentTime.addAndGet(eachIterationDuration)); - // Some runs of this test try to simulate a long desired balance computation. Setting a high value on the following setting // prevents interrupting a long computation. var clusterSettings = createBuiltInClusterSettings( Settings.builder().put(DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING.getKey(), "2m").build() ); - var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, mockThreadPool, new ShardsAllocator() { - @Override - public void allocate(RoutingAllocation allocation) { - final var unassignedIterator = allocation.routingNodes().unassigned().iterator(); - while (unassignedIterator.hasNext()) { - final var shardRouting = unassignedIterator.next(); - if (shardRouting.primary()) { - unassignedIterator.initialize("node-0", null, 0L, allocation.changes()); - } else { - unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes()); + var desiredBalanceComputer = new DesiredBalanceComputer( + clusterSettings, + () -> currentTime.addAndGet(eachIterationDuration), + new ShardsAllocator() { + @Override + public void allocate(RoutingAllocation allocation) { + final var unassignedIterator = allocation.routingNodes().unassigned().iterator(); + while (unassignedIterator.hasNext()) { + final var shardRouting = unassignedIterator.next(); + if (shardRouting.primary()) { + unassignedIterator.initialize("node-0", null, 0L, allocation.changes()); + } else { + unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes()); + } } - } - // move shard on each iteration - for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) { - allocation.routingNodes().relocateShard(shard, "node-1", 0L, "test", allocation.changes()); - } - for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) { - allocation.routingNodes().relocateShard(shard, "node-0", 0L, "test", allocation.changes()); + // move shard on each iteration + for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) { + allocation.routingNodes().relocateShard(shard, "node-1", 0L, "test", allocation.changes()); + } + for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) { + allocation.routingNodes().relocateShard(shard, "node-0", 0L, "test", allocation.changes()); + } } - } - @Override - public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) { - throw new AssertionError("only used for allocation explain"); + @Override + public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) { + throw new AssertionError("only used for allocation explain"); + } } - }); + ); assertThatLogger(() -> { var iteration = new AtomicInteger(0); @@ -1349,7 +1346,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } private static DesiredBalanceComputer createDesiredBalanceComputer(ShardsAllocator allocator) { - return new DesiredBalanceComputer(createBuiltInClusterSettings(), mock(ThreadPool.class), allocator); + return new DesiredBalanceComputer(createBuiltInClusterSettings(), () -> 0L, allocator); } private static void assertDesiredAssignments(DesiredBalance desiredBalance, Map expected) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java index 27c430131ff07..2cb3204787ce1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java @@ -396,7 +396,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing shardsAllocator, threadPool, clusterService, - new DesiredBalanceComputer(clusterSettings, shardsAllocator, time::get) { + new DesiredBalanceComputer(clusterSettings, time::get, shardsAllocator) { @Override public DesiredBalance compute( DesiredBalance previousDesiredBalance, @@ -522,7 +522,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo shardsAllocator, threadPool, clusterService, - new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) { + new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, shardsAllocator) { @Override public DesiredBalance compute( DesiredBalance previousDesiredBalance, @@ -625,7 +625,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo shardsAllocator, threadPool, clusterService, - new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) { + new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, shardsAllocator) { @Override public DesiredBalance compute( DesiredBalance previousDesiredBalance, @@ -712,7 +712,7 @@ public void testResetDesiredBalance() { var delegateAllocator = createShardsAllocator(); var clusterSettings = createBuiltInClusterSettings(); - var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator) { + var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegateAllocator) { final AtomicReference lastComputationInput = new AtomicReference<>(); @@ -780,7 +780,11 @@ public void testResetDesiredBalanceOnNoLongerMaster() { var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); var delegateAllocator = createShardsAllocator(); - var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator); + var desiredBalanceComputer = new DesiredBalanceComputer( + createBuiltInClusterSettings(), + threadPool::relativeTimeInMillis, + delegateAllocator + ); var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator( delegateAllocator, threadPool, @@ -829,7 +833,11 @@ public void testResetDesiredBalanceOnNodeShutdown() { final var resetCalled = new AtomicBoolean(); var delegateAllocator = createShardsAllocator(); - var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator); + var desiredBalanceComputer = new DesiredBalanceComputer( + createBuiltInClusterSettings(), + threadPool::relativeTimeInMillis, + delegateAllocator + ); var desiredBalanceAllocator = new DesiredBalanceShardsAllocator( delegateAllocator, threadPool,