Skip to content

Commit

Permalink
Do not pass ThreadPool to DesiredBalanceComputer (#116590)
Browse files Browse the repository at this point in the history
Relates
#115511 (comment).
`ThreadPool` is used here only to get time. (I've extracted this out of
#116333).
  • Loading branch information
pxsalehi authored and jozala committed Nov 13, 2024
1 parent 8e177b3 commit 8c8ad7a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -74,11 +73,7 @@ public class DesiredBalanceComputer {
private TimeValue progressLogInterval;
private long maxBalanceComputationTimeDuringIndexCreationMillis;

public DesiredBalanceComputer(ClusterSettings clusterSettings, ThreadPool threadPool, ShardsAllocator delegateAllocator) {
this(clusterSettings, delegateAllocator, threadPool::relativeTimeInMillis);
}

DesiredBalanceComputer(ClusterSettings clusterSettings, ShardsAllocator delegateAllocator, LongSupplier timeSupplierMillis) {
public DesiredBalanceComputer(ClusterSettings clusterSettings, LongSupplier timeSupplierMillis, ShardsAllocator delegateAllocator) {
this.delegateAllocator = delegateAllocator;
this.timeSupplierMillis = timeSupplierMillis;
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public DesiredBalanceShardsAllocator(
delegateAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator),
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegateAllocator),
reconciler,
telemetryProvider
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testDeleteDesiredBalance() throws Exception {
var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);

var delegate = new BalancedShardsAllocator();
var computer = new DesiredBalanceComputer(clusterSettings, threadPool, delegate) {
var computer = new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegate) {

final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -85,8 +84,6 @@
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DesiredBalanceComputerTests extends ESAllocationTestCase {

Expand Down Expand Up @@ -1205,43 +1202,43 @@ public void testShouldLogComputationIteration() {
}

private void checkIterationLogging(int iterations, long eachIterationDuration, MockLog.AbstractEventExpectation expectation) {

var mockThreadPool = mock(ThreadPool.class);
var currentTime = new AtomicLong(0L);
when(mockThreadPool.relativeTimeInMillis()).thenAnswer(invocation -> currentTime.addAndGet(eachIterationDuration));

// Some runs of this test try to simulate a long desired balance computation. Setting a high value on the following setting
// prevents interrupting a long computation.
var clusterSettings = createBuiltInClusterSettings(
Settings.builder().put(DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING.getKey(), "2m").build()
);
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, mockThreadPool, new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
while (unassignedIterator.hasNext()) {
final var shardRouting = unassignedIterator.next();
if (shardRouting.primary()) {
unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
} else {
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
var desiredBalanceComputer = new DesiredBalanceComputer(
clusterSettings,
() -> currentTime.addAndGet(eachIterationDuration),
new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
while (unassignedIterator.hasNext()) {
final var shardRouting = unassignedIterator.next();
if (shardRouting.primary()) {
unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
} else {
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
}
}
}

// move shard on each iteration
for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) {
allocation.routingNodes().relocateShard(shard, "node-1", 0L, "test", allocation.changes());
}
for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) {
allocation.routingNodes().relocateShard(shard, "node-0", 0L, "test", allocation.changes());
// move shard on each iteration
for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) {
allocation.routingNodes().relocateShard(shard, "node-1", 0L, "test", allocation.changes());
}
for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) {
allocation.routingNodes().relocateShard(shard, "node-0", 0L, "test", allocation.changes());
}
}
}

@Override
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
throw new AssertionError("only used for allocation explain");
@Override
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
throw new AssertionError("only used for allocation explain");
}
}
});
);

assertThatLogger(() -> {
var iteration = new AtomicInteger(0);
Expand Down Expand Up @@ -1349,7 +1346,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
}

private static DesiredBalanceComputer createDesiredBalanceComputer(ShardsAllocator allocator) {
return new DesiredBalanceComputer(createBuiltInClusterSettings(), mock(ThreadPool.class), allocator);
return new DesiredBalanceComputer(createBuiltInClusterSettings(), () -> 0L, allocator);
}

private static void assertDesiredAssignments(DesiredBalance desiredBalance, Map<ShardId, ShardAssignment> expected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
shardsAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, shardsAllocator, time::get) {
new DesiredBalanceComputer(clusterSettings, time::get, shardsAllocator) {
@Override
public DesiredBalance compute(
DesiredBalance previousDesiredBalance,
Expand Down Expand Up @@ -522,7 +522,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo
shardsAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, shardsAllocator) {
@Override
public DesiredBalance compute(
DesiredBalance previousDesiredBalance,
Expand Down Expand Up @@ -625,7 +625,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo
shardsAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, shardsAllocator) {
@Override
public DesiredBalance compute(
DesiredBalance previousDesiredBalance,
Expand Down Expand Up @@ -712,7 +712,7 @@ public void testResetDesiredBalance() {
var delegateAllocator = createShardsAllocator();
var clusterSettings = createBuiltInClusterSettings();

var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator) {
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegateAllocator) {

final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();

Expand Down Expand Up @@ -780,7 +780,11 @@ public void testResetDesiredBalanceOnNoLongerMaster() {
var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);

var delegateAllocator = createShardsAllocator();
var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator);
var desiredBalanceComputer = new DesiredBalanceComputer(
createBuiltInClusterSettings(),
threadPool::relativeTimeInMillis,
delegateAllocator
);
var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
delegateAllocator,
threadPool,
Expand Down Expand Up @@ -829,7 +833,11 @@ public void testResetDesiredBalanceOnNodeShutdown() {

final var resetCalled = new AtomicBoolean();
var delegateAllocator = createShardsAllocator();
var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator);
var desiredBalanceComputer = new DesiredBalanceComputer(
createBuiltInClusterSettings(),
threadPool::relativeTimeInMillis,
delegateAllocator
);
var desiredBalanceAllocator = new DesiredBalanceShardsAllocator(
delegateAllocator,
threadPool,
Expand Down

0 comments on commit 8c8ad7a

Please sign in to comment.