Skip to content

Commit

Permalink
Use a time supplier interface instead of passing ThreadPool (elastic#…
Browse files Browse the repository at this point in the history
…116333)

An attempt to use a basic interface for time supplier based on
elastic#115511 (comment).
(TLDR: sometimes we pass around a ThreadPool instance just to be able to
get time. It might be more reasonable to separate those use cases)
  • Loading branch information
pxsalehi authored and afoucret committed Nov 14, 2024
1 parent 30c2370 commit d569b1f
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
Expand All @@ -37,7 +38,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.LongSupplier;
import java.util.function.Predicate;

import static java.util.stream.Collectors.toUnmodifiableSet;
Expand All @@ -50,7 +50,7 @@ public class DesiredBalanceComputer {
private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);

private final ShardsAllocator delegateAllocator;
private final LongSupplier timeSupplierMillis;
private final TimeProvider timeProvider;

// stats
protected final MeanMetric iterations = new MeanMetric();
Expand All @@ -73,9 +73,9 @@ public class DesiredBalanceComputer {
private TimeValue progressLogInterval;
private long maxBalanceComputationTimeDuringIndexCreationMillis;

public DesiredBalanceComputer(ClusterSettings clusterSettings, LongSupplier timeSupplierMillis, ShardsAllocator delegateAllocator) {
public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator) {
this.delegateAllocator = delegateAllocator;
this.timeSupplierMillis = timeSupplierMillis;
this.timeProvider = timeProvider;
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
clusterSettings.initializeAndWatch(
MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
Expand Down Expand Up @@ -275,7 +275,7 @@ public DesiredBalance compute(

final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
final long timeWarningInterval = progressLogInterval.millis();
final long computationStartedTime = timeSupplierMillis.getAsLong();
final long computationStartedTime = timeProvider.relativeTimeInMillis();
long nextReportTime = computationStartedTime + timeWarningInterval;

int i = 0;
Expand Down Expand Up @@ -323,7 +323,7 @@ public DesiredBalance compute(

i++;
final int iterations = i;
final long currentTime = timeSupplierMillis.getAsLong();
final long currentTime = timeProvider.relativeTimeInMillis();
final boolean reportByTime = nextReportTime <= currentTime;
final boolean reportByIterationCount = i % iterationCountReportInterval == 0;
if (reportByTime || reportByIterationCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public DesiredBalanceShardsAllocator(
delegateAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegateAllocator),
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator),
reconciler,
telemetryProvider,
nodeAllocationStatsProvider
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.time;

/**
* An interface encapsulating the different methods for getting relative and absolute time. The main
* implementation of this is {@link org.elasticsearch.threadpool.ThreadPool}. To make it clear that a
* {@code ThreadPool} is being passed around only to get time, it is preferred to use this interface.
*/
public interface TimeProvider {

/**
* Returns a value of milliseconds that may be used for relative time calculations.
*
* This method should only be used for calculating time deltas. For an epoch based
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
long relativeTimeInMillis();

/**
* Returns a value of nanoseconds that may be used for relative time calculations.
*
* This method should only be used for calculating time deltas. For an epoch based
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
long relativeTimeInNanos();

/**
* Returns a value of milliseconds that may be used for relative time calculations. Similar to {@link #relativeTimeInMillis()} except
* that this method is more expensive: the return value is computed directly from {@link System#nanoTime} and is not cached. You should
* use {@link #relativeTimeInMillis()} unless the extra accuracy offered by this method is worth the costs.
*
* When computing a time interval by comparing relative times in milliseconds, you should make sure that both endpoints use cached
* values returned from {@link #relativeTimeInMillis()} or that they both use raw values returned from this method. It doesn't really
* make sense to compare a raw value to a cached value, even if in practice the result of such a comparison will be approximately
* sensible.
*/
long rawRelativeTimeInMillis();

/**
* Returns the value of milliseconds since UNIX epoch.
*
* This method should only be used for exact date/time formatting. For calculating
* time deltas that should not suffer from negative deltas, which are possible with
* this method, see {@link #relativeTimeInMillis()}.
*/
long absoluteTimeInMillis();
}
36 changes: 6 additions & 30 deletions server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.SizeValue;
Expand Down Expand Up @@ -65,7 +66,7 @@
* Manages all the Java thread pools we create. {@link Names} contains a list of the thread pools, but plugins can dynamically add more
* thread pools to instantiate.
*/
public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler, TimeProvider {

private static final Logger logger = LogManager.getLogger(ThreadPool.class);

Expand Down Expand Up @@ -362,12 +363,7 @@ protected ThreadPool() {
this.scheduler = null;
}

/**
* Returns a value of milliseconds that may be used for relative time calculations.
*
* This method should only be used for calculating time deltas. For an epoch based
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
@Override
public long relativeTimeInMillis() {
return cachedTimeThread.relativeTimeInMillis();
}
Expand All @@ -379,37 +375,17 @@ public LongSupplier relativeTimeInMillisSupplier() {
return relativeTimeInMillisSupplier;
}

/**
* Returns a value of nanoseconds that may be used for relative time calculations.
*
* This method should only be used for calculating time deltas. For an epoch based
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
@Override
public long relativeTimeInNanos() {
return cachedTimeThread.relativeTimeInNanos();
}

/**
* Returns a value of milliseconds that may be used for relative time calculations. Similar to {@link #relativeTimeInMillis()} except
* that this method is more expensive: the return value is computed directly from {@link System#nanoTime} and is not cached. You should
* use {@link #relativeTimeInMillis()} unless the extra accuracy offered by this method is worth the costs.
*
* When computing a time interval by comparing relative times in milliseconds, you should make sure that both endpoints use cached
* values returned from {@link #relativeTimeInMillis()} or that they both use raw values returned from this method. It doesn't really
* make sense to compare a raw value to a cached value, even if in practice the result of such a comparison will be approximately
* sensible.
*/
@Override
public long rawRelativeTimeInMillis() {
return TimeValue.nsecToMSec(System.nanoTime());
}

/**
* Returns the value of milliseconds since UNIX epoch.
*
* This method should only be used for exact date/time formatting. For calculating
* time deltas that should not suffer from negative deltas, which are possible with
* this method, see {@link #relativeTimeInMillis()}.
*/
@Override
public long absoluteTimeInMillis() {
return cachedTimeThread.absoluteTimeInMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testDeleteDesiredBalance() throws Exception {
var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);

var delegate = new BalancedShardsAllocator();
var computer = new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegate) {
var computer = new DesiredBalanceComputer(clusterSettings, threadPool, delegate) {

final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.time.TimeProviderUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.IndexVersion;
Expand Down Expand Up @@ -1203,42 +1205,40 @@ public void testShouldLogComputationIteration() {

private void checkIterationLogging(int iterations, long eachIterationDuration, MockLog.AbstractEventExpectation expectation) {
var currentTime = new AtomicLong(0L);
TimeProvider timeProvider = TimeProviderUtils.create(() -> currentTime.addAndGet(eachIterationDuration));

// Some runs of this test try to simulate a long desired balance computation. Setting a high value on the following setting
// prevents interrupting a long computation.
var clusterSettings = createBuiltInClusterSettings(
Settings.builder().put(DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING.getKey(), "2m").build()
);
var desiredBalanceComputer = new DesiredBalanceComputer(
clusterSettings,
() -> currentTime.addAndGet(eachIterationDuration),
new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
while (unassignedIterator.hasNext()) {
final var shardRouting = unassignedIterator.next();
if (shardRouting.primary()) {
unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
} else {
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
}
}

// move shard on each iteration
for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) {
allocation.routingNodes().relocateShard(shard, "node-1", 0L, "test", allocation.changes());
}
for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) {
allocation.routingNodes().relocateShard(shard, "node-0", 0L, "test", allocation.changes());
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, timeProvider, new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
while (unassignedIterator.hasNext()) {
final var shardRouting = unassignedIterator.next();
if (shardRouting.primary()) {
unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
} else {
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
}
}

@Override
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
throw new AssertionError("only used for allocation explain");
// move shard on each iteration
for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) {
allocation.routingNodes().relocateShard(shard, "node-1", 0L, "test", allocation.changes());
}
for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) {
allocation.routingNodes().relocateShard(shard, "node-0", 0L, "test", allocation.changes());
}
}
);

@Override
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
throw new AssertionError("only used for allocation explain");
}
});

assertThatLogger(() -> {
var iteration = new AtomicInteger(0);
Expand Down Expand Up @@ -1346,7 +1346,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
}

private static DesiredBalanceComputer createDesiredBalanceComputer(ShardsAllocator allocator) {
return new DesiredBalanceComputer(createBuiltInClusterSettings(), () -> 0L, allocator);
return new DesiredBalanceComputer(createBuiltInClusterSettings(), TimeProviderUtils.create(() -> 0L), allocator);
}

private static void assertDesiredAssignments(DesiredBalance desiredBalance, Map<ShardId, ShardAssignment> expected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.TimeProviderUtils;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -398,7 +399,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
shardsAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, time::get, shardsAllocator) {
new DesiredBalanceComputer(clusterSettings, TimeProviderUtils.create(time::get), shardsAllocator) {
@Override
public DesiredBalance compute(
DesiredBalance previousDesiredBalance,
Expand Down Expand Up @@ -525,7 +526,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo
shardsAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, shardsAllocator) {
new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
@Override
public DesiredBalance compute(
DesiredBalance previousDesiredBalance,
Expand Down Expand Up @@ -629,7 +630,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo
shardsAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, shardsAllocator) {
new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
@Override
public DesiredBalance compute(
DesiredBalance previousDesiredBalance,
Expand Down Expand Up @@ -717,7 +718,7 @@ public void testResetDesiredBalance() {
var delegateAllocator = createShardsAllocator();
var clusterSettings = createBuiltInClusterSettings();

var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegateAllocator) {
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator) {

final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();

Expand Down Expand Up @@ -786,11 +787,7 @@ public void testResetDesiredBalanceOnNoLongerMaster() {
var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);

var delegateAllocator = createShardsAllocator();
var desiredBalanceComputer = new DesiredBalanceComputer(
createBuiltInClusterSettings(),
threadPool::relativeTimeInMillis,
delegateAllocator
);
var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator);
var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
delegateAllocator,
threadPool,
Expand Down Expand Up @@ -840,11 +837,7 @@ public void testResetDesiredBalanceOnNodeShutdown() {

final var resetCalled = new AtomicBoolean();
var delegateAllocator = createShardsAllocator();
var desiredBalanceComputer = new DesiredBalanceComputer(
createBuiltInClusterSettings(),
threadPool::relativeTimeInMillis,
delegateAllocator
);
var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator);
var desiredBalanceAllocator = new DesiredBalanceShardsAllocator(
delegateAllocator,
threadPool,
Expand Down
Loading

0 comments on commit d569b1f

Please sign in to comment.