Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Long balance computation should not delay new index primary assignment #116316

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public void onNewInput(T input) {
}
}

/**
* enqueues {@code input} if {@code expectedLatestKnownInput} is the latest known input.
* Neither of the parameters can be null.
*/
protected boolean compareAndEnqueue(T expectedLatestKnownInput, T input) {
assert expectedLatestKnownInput != null;
assert input != null;
return enqueuedInput.compareAndSet(Objects.requireNonNull(expectedLatestKnownInput), Objects.requireNonNull(input));
}

/**
* @return {@code false} iff there are no active/enqueued computations
*/
Expand All @@ -67,7 +77,7 @@ protected boolean isFresh(T input) {
/**
* Process the given input.
*
* @param input the value that was last received by {@link #onNewInput} before invocation.
* @param input the value that was last received by {@link #onNewInput} or {@link #compareAndEnqueue} before invocation.
*/
protected abstract void processInput(T input);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,17 @@
*
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
*/
public record DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> assignments) {
public record DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> assignments, ComputationFinishReason finishReason) {

enum ComputationFinishReason {
CONVERGED,
YIELD_TO_NEW_INPUT,
STOP_EARLY
}

public DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> assignments) {
this(lastConvergedIndex, assignments, ComputationFinishReason.CONVERGED);
}

public static final DesiredBalance INITIAL = new DesiredBalance(-1, Map.of());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.LongSupplier;
import java.util.function.Predicate;

import static java.util.stream.Collectors.toUnmodifiableSet;
Expand All @@ -49,8 +50,8 @@ public class DesiredBalanceComputer {

private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);

private final ThreadPool threadPool;
private final ShardsAllocator delegateAllocator;
private final LongSupplier timeSupplierMillis;

// stats
protected final MeanMetric iterations = new MeanMetric();
Expand All @@ -63,12 +64,28 @@ public class DesiredBalanceComputer {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING = Setting.timeSetting(
"cluster.routing.allocation.desired_balance.max_balance_computation_time_during_index_creation",
TimeValue.timeValueSeconds(1),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private TimeValue progressLogInterval;
private long maxBalanceComputationTimeDuringIndexCreationMillis;

public DesiredBalanceComputer(ClusterSettings clusterSettings, ThreadPool threadPool, ShardsAllocator delegateAllocator) {
this.threadPool = threadPool;
this(clusterSettings, delegateAllocator, threadPool::relativeTimeInMillis);
}

DesiredBalanceComputer(ClusterSettings clusterSettings, ShardsAllocator delegateAllocator, LongSupplier timeSupplierMillis) {
this.delegateAllocator = delegateAllocator;
this.timeSupplierMillis = timeSupplierMillis;
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
clusterSettings.initializeAndWatch(
MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
value -> this.maxBalanceComputationTimeDuringIndexCreationMillis = value.millis()
);
}

public DesiredBalance compute(
Expand All @@ -77,7 +94,6 @@ public DesiredBalance compute(
Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
Predicate<DesiredBalanceInput> isFresh
) {

if (logger.isTraceEnabled()) {
logger.trace(
"Recomputing desired balance for [{}]: {}, {}, {}, {}",
Expand All @@ -97,9 +113,10 @@ public DesiredBalance compute(
final var changes = routingAllocation.changes();
final var ignoredShards = getIgnoredShardsWithDiscardedAllocationStatus(desiredBalanceInput.ignoredShards());
final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation);
DesiredBalance.ComputationFinishReason finishReason = DesiredBalance.ComputationFinishReason.CONVERGED;

if (routingNodes.size() == 0) {
return new DesiredBalance(desiredBalanceInput.index(), Map.of());
return new DesiredBalance(desiredBalanceInput.index(), Map.of(), finishReason);
}

// we assume that all ongoing recoveries will complete
Expand Down Expand Up @@ -263,11 +280,12 @@ public DesiredBalance compute(

final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
final long timeWarningInterval = progressLogInterval.millis();
final long computationStartedTime = threadPool.relativeTimeInMillis();
final long computationStartedTime = timeSupplierMillis.getAsLong();
long nextReportTime = computationStartedTime + timeWarningInterval;

int i = 0;
boolean hasChanges = false;
boolean assignedNewlyCreatedPrimaryShards = false;
while (true) {
if (hasChanges) {
// Not the first iteration, so every remaining unassigned shard has been ignored, perhaps due to throttling. We must bring
Expand All @@ -293,6 +311,15 @@ public DesiredBalance compute(
for (final var shardRouting : routingNode) {
if (shardRouting.initializing()) {
hasChanges = true;
if (shardRouting.primary()
&& shardRouting.unassignedInfo() != null
&& shardRouting.unassignedInfo().reason() == UnassignedInfo.Reason.INDEX_CREATED) {
// TODO: we could include more cases that would cause early publishing of desired balance in case of a long
// computation. e.g.:
// - unassigned search replicas in case the shard has no assigned shard replicas
// - other reasons for an unassigned shard such as NEW_INDEX_RESTORED
assignedNewlyCreatedPrimaryShards = true;
}
clusterInfoSimulator.simulateShardStarted(shardRouting);
routingNodes.startShard(shardRouting, changes, 0L);
}
Expand All @@ -301,14 +328,14 @@ public DesiredBalance compute(

i++;
final int iterations = i;
final long currentTime = threadPool.relativeTimeInMillis();
final long currentTime = timeSupplierMillis.getAsLong();
final boolean reportByTime = nextReportTime <= currentTime;
final boolean reportByIterationCount = i % iterationCountReportInterval == 0;
if (reportByTime || reportByIterationCount) {
nextReportTime = currentTime + timeWarningInterval;
}

if (hasChanges == false) {
if (hasComputationConverged(hasChanges, i)) {
logger.debug(
"Desired balance computation for [{}] converged after [{}] and [{}] iterations",
desiredBalanceInput.index(),
Expand All @@ -324,9 +351,25 @@ public DesiredBalance compute(
"Desired balance computation for [{}] interrupted after [{}] and [{}] iterations as newer cluster state received. "
+ "Publishing intermediate desired balance and restarting computation",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
i
);
finishReason = DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT;
break;
}

if (assignedNewlyCreatedPrimaryShards
&& currentTime - computationStartedTime >= maxBalanceComputationTimeDuringIndexCreationMillis) {
logger.info(
"Desired balance computation for [{}] interrupted after [{}] and [{}] iterations "
+ "in order to not delay assignment of newly created index shards for more than [{}]. "
+ "Publishing intermediate desired balance and restarting computation",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
i,
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
TimeValue.timeValueMillis(maxBalanceComputationTimeDuringIndexCreationMillis).toString()
);
finishReason = DesiredBalance.ComputationFinishReason.STOP_EARLY;
break;
}

Expand Down Expand Up @@ -368,7 +411,12 @@ public DesiredBalance compute(
}

long lastConvergedIndex = hasChanges ? previousDesiredBalance.lastConvergedIndex() : desiredBalanceInput.index();
return new DesiredBalance(lastConvergedIndex, assignments);
return new DesiredBalance(lastConvergedIndex, assignments, finishReason);
}

// visible for testing
boolean hasComputationConverged(boolean hasRoutingChanges, int currentIteration) {
return hasRoutingChanges == false;
}

private static Map<ShardId, ShardAssignment> collectShardAssignments(RoutingNodes routingNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,16 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
)
);
computationsExecuted.inc();
if (isFresh(desiredBalanceInput)) {

if (currentDesiredBalance.finishReason() == DesiredBalance.ComputationFinishReason.STOP_EARLY) {
logger.debug(
"Desired balance computation for [{}] terminated early with partial result, scheduling reconciliation",
index
);
submitReconcileTask(currentDesiredBalance);
var newInput = DesiredBalanceInput.create(indexGenerator.incrementAndGet(), desiredBalanceInput.routingAllocation());
desiredBalanceComputation.compareAndEnqueue(desiredBalanceInput, newInput);
} else if (isFresh(desiredBalanceInput)) {
logger.debug("Desired balance computation for [{}] is completed, scheduling reconciliation", index);
computationsConverged.inc();
submitReconcileTask(currentDesiredBalance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS,
DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS,
DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING,
DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -73,6 +74,68 @@ protected void processInput(Integer input) {
assertTrue(Arrays.toString(valuePerThread) + " vs " + result.get(), Arrays.stream(valuePerThread).anyMatch(i -> i == result.get()));
}

public void testCompareAndEnqueue() throws Exception {
final var initialInput = new Object();
final var compareAndEnqueueCount = between(1, 10);
final var remaining = new AtomicInteger(compareAndEnqueueCount);
final var computationsExecuted = new AtomicInteger();
final var result = new AtomicReference<>();
final var computation = new ContinuousComputation<>(threadPool.generic()) {
@Override
protected void processInput(Object input) {
result.set(input);
if (remaining.decrementAndGet() >= 0) {
compareAndEnqueue(input, new Object());
}
computationsExecuted.incrementAndGet();
}
};
computation.onNewInput(initialInput);
assertBusy(() -> assertFalse(computation.isActive()));
assertNotEquals(result.get(), initialInput);
assertEquals(computationsExecuted.get(), 1 + compareAndEnqueueCount);
}

public void testCompareAndEnqueueSkipped() throws Exception {
final var barrier = new CyclicBarrier(2);
final var computationsExecuted = new AtomicInteger();
final var initialInput = new Object();
final var conditionalInput = new Object();
final var newInput = new Object();
final var submitConditional = new AtomicBoolean(true);
final var result = new AtomicReference<>();

final var computation = new ContinuousComputation<>(threadPool.generic()) {
@Override
protected void processInput(Object input) {
assertNotEquals(input, conditionalInput);
safeAwait(barrier); // start
safeAwait(barrier); // continue
if (submitConditional.getAndSet(false)) {
compareAndEnqueue(input, conditionalInput);
}
result.set(input);
safeAwait(barrier); // finished
computationsExecuted.incrementAndGet();
}
};
computation.onNewInput(initialInput);

safeAwait(barrier); // start
computation.onNewInput(newInput);
safeAwait(barrier); // continue
safeAwait(barrier); // finished
assertEquals(result.get(), initialInput);

safeAwait(barrier); // start
safeAwait(barrier); // continue
safeAwait(barrier); // finished

assertBusy(() -> assertFalse(computation.isActive()));
assertEquals(result.get(), newInput);
assertEquals(computationsExecuted.get(), 2);
}

public void testSkipsObsoleteValues() throws Exception {
final var barrier = new CyclicBarrier(2);
final Runnable await = () -> safeAwait(barrier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,12 @@ private void checkIterationLogging(int iterations, long eachIterationDuration, M
var currentTime = new AtomicLong(0L);
when(mockThreadPool.relativeTimeInMillis()).thenAnswer(invocation -> currentTime.addAndGet(eachIterationDuration));

var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), mockThreadPool, new ShardsAllocator() {
// Some runs of this test try to simulate a long desired balance computation. Setting a high value on the following setting
// prevents interrupting a long computation.
var clusterSettings = createBuiltInClusterSettings(
Settings.builder().put(DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING.getKey(), "2m").build()
);
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, mockThreadPool, new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
Expand Down
Loading