Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the broker round robin selection in task execution configurable #2223

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,13 @@ public final class ExecutorConfig {
public static final String MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_DOC = "The minimum execution progress check interval that users "
+ "can dynamically set the execution progress check interval to.";

/**
* <code>prefer.broker.roundrobin.in.execution</code>
*/
public static final String PREFER_BROKER_ROUND_ROBIN_CONFIG = "prefer.broker.roundrobin.in.execution";
public static final boolean DEFAULT_PREFER_BROKER_ROUND_ROBIN = true;
public static final String PREFER_BROKER_ROUND_ROBIN_DOC = "whether to prefer round-robin of brokers in rebalance execution.";

/**
* <code>slow.task.alerting.backoff.ms</code>
*/
Expand Down Expand Up @@ -990,6 +997,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.BOOLEAN,
DEFAULT_AUTO_STOP_EXTERNAL_AGENT,
ConfigDef.Importance.MEDIUM,
AUTO_STOP_EXTERNAL_AGENT_DOC);
AUTO_STOP_EXTERNAL_AGENT_DOC)
.define(PREFER_BROKER_ROUND_ROBIN_CONFIG,
ConfigDef.Type.BOOLEAN,
DEFAULT_PREFER_BROKER_ROUND_ROBIN,
ConfigDef.Importance.MEDIUM,
PREFER_BROKER_ROUND_ROBIN_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class ExecutionProposal {
private final Set<ReplicaPlacementInfo> _replicasToRemove;
// Replicas to move between disks are the replicas which are to be hosted by a different disk of the same broker.
private final Map<Integer, ReplicaPlacementInfo> _replicasToMoveBetweenDisksByBroker;
private final Set<Integer> _oldReplicasSet;
private final Set<Integer> _newReplicasSet;

/**
* Construct an execution proposals.
Expand All @@ -69,10 +71,10 @@ public ExecutionProposal(TopicPartition tp,
validate();

// Populate replicas to add, to remove and to move across disk.
Set<Integer> newBrokerList = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
Set<Integer> oldBrokerList = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
_replicasToAdd = _newReplicas.stream().filter(r -> !oldBrokerList.contains(r.brokerId())).collect(Collectors.toSet());
_replicasToRemove = _oldReplicas.stream().filter(r -> !newBrokerList.contains(r.brokerId())).collect(Collectors.toSet());
_newReplicasSet = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
_oldReplicasSet = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
_replicasToAdd = _newReplicas.stream().filter(r -> !_oldReplicasSet.contains(r.brokerId())).collect(Collectors.toSet());
_replicasToRemove = _oldReplicas.stream().filter(r -> !_newReplicasSet.contains(r.brokerId())).collect(Collectors.toSet());
_replicasToMoveBetweenDisksByBroker = new HashMap<>();
newReplicas.stream().filter(r -> !_replicasToAdd.contains(r) && !_oldReplicas.contains(r))
.forEach(r -> _replicasToMoveBetweenDisksByBroker.put(r.brokerId(), r));
Expand Down Expand Up @@ -177,6 +179,20 @@ public List<ReplicaPlacementInfo> oldReplicas() {
return Collections.unmodifiableList(_oldReplicas);
}

/**
* @return The broker ID set of the partitions before executing the proposal.
*/
public Set<Integer> oldReplicasBrokerIdSet() {
return Collections.unmodifiableSet(_oldReplicasSet);
}

/**
* @return The broker ID set of the partitions after executing the proposal.
*/
public Set<Integer> newReplicasBrokerIdSet() {
return Collections.unmodifiableSet(_newReplicasSet);
}

/**
* @return The new replica list fo the partition after executing the proposal.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public synchronized void addExecutionProposals(Collection<ExecutionProposal> pro
}
}

Map<Integer, Integer> getSortedBrokerIdToInterBrokerMoveTaskCountMap() {
return _executionTaskPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap();
}

/**
* Set the execution mode of the tasks to keep track of the ongoing execution mode via sensors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
Expand All @@ -39,11 +40,14 @@
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTER_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTRA_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.*;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.LEADER_ACTION;
import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;

/**
Expand Down Expand Up @@ -80,6 +84,7 @@ public class ExecutionTaskPlanner {
private final long _taskExecutionAlertingThresholdMs;
private final double _interBrokerReplicaMovementRateAlertingThreshold;
private final double _intraBrokerReplicaMovementRateAlertingThreshold;
private final boolean _preferRoundRobin;
private static final int PRIORITIZE_BROKER_1 = -1;
private static final int PRIORITIZE_BROKER_2 = 1;
private static final int PRIORITIZE_NONE = 0;
Expand Down Expand Up @@ -120,6 +125,7 @@ public ExecutionTaskPlanner(AdminClient adminClient, KafkaCruiseControlConfig co

_defaultReplicaMovementTaskStrategy = _defaultReplicaMovementTaskStrategy.chainBaseReplicaMovementStrategyIfAbsent();
}
_preferRoundRobin = config.getBoolean(PREFER_BROKER_ROUND_ROBIN_CONFIG);
}

/**
Expand Down Expand Up @@ -379,11 +385,13 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
break;
}
// If this broker has already involved in this round, skip it.
if (brokerInvolved.contains(brokerId)) {
if (_preferRoundRobin && brokerInvolved.contains(brokerId)) {
continue;
}
// Check the available balancing proposals of this broker to see if we can find one ready to execute.
SortedSet<ExecutionTask> proposalsForBroker = _interPartMoveTasksByBrokerId.get(brokerId);
// Make a TreeSet copy of the proposals for this broker to avoid ConcurrentModificationException and
// keep the same order of proposals.
SortedSet<ExecutionTask> proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId));
allenxwang marked this conversation as resolved.
Show resolved Hide resolved
LOG.trace("Execution task for broker {} are {}", brokerId, proposalsForBroker);
for (ExecutionTask task : proposalsForBroker) {
// Break if max cap reached
Expand All @@ -398,8 +406,8 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
int sourceBroker = task.proposal().oldLeader().brokerId();
Set<Integer> destinationBrokers = task.proposal().replicasToAdd().stream().mapToInt(ReplicaPlacementInfo::brokerId)
.boxed().collect(Collectors.toSet());
if (brokerInvolved.contains(sourceBroker)
|| KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers)) {
if (_preferRoundRobin && (brokerInvolved.contains(sourceBroker)
allenxwang marked this conversation as resolved.
Show resolved Hide resolved
|| KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers))) {
continue;
}
TopicPartition tp = task.proposal().topicPartition();
Expand Down Expand Up @@ -429,8 +437,10 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
newTaskAdded = true;
numInProgressPartitions++;
LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}", task, brokerId, readyBrokers);
// We can stop the check for proposals for this broker because we have found a proposal.
break;
if (_preferRoundRobin) {
// We can stop the check for proposals for this broker because we have found a proposal.
break;
}
}
}
}
Expand Down Expand Up @@ -537,4 +547,34 @@ private Comparator<Integer> brokerComparator(StrategyOptions strategyOptions, Re
: broker1 - broker2;
};
}

Map<Integer, Integer> getSortedBrokerIdToInterBrokerMoveTaskCountMap() {
if (_interPartMoveTasksByBrokerId == null || _interPartMoveTasksByBrokerId.isEmpty()) {
return Collections.emptyMap();
}
Map<Integer, Integer> resultMap = _interPartMoveTasksByBrokerId.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().size()
))
.entrySet()
.stream()
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(e1, e2) -> e1,
// maintain the order of the sorted map.
LinkedHashMap::new
));
return resultMap;
}

/*
* Package private for testing.
*/
Map<Integer, SortedSet<ExecutionTask>> getInterPartMoveTasksByBrokerId() {
return _interPartMoveTasksByBrokerId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static com.linkedin.kafka.cruisecontrol.executor.ExecutorAdminUtils.*;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils.UNIT_INTERVAL_TO_PERCENTAGE;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler.SamplingMode.*;
import static com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager.TaskState.COMPLETED;
import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;

Expand Down Expand Up @@ -830,6 +831,10 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency,
requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy,
isTriggeredByUserRequest, loadMonitor);
if (removedBrokers != null && !removedBrokers.isEmpty()) {
int[] numTasks = getNumTasksUnrelatedToBrokerRemoval(removedBrokers, proposals);
LOG.info("User task {}: {} of {} partition move proposals are unrelated to removed brokers.", uuid, numTasks[0], numTasks[1]);
}
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest);
} catch (Exception e) {
if (e instanceof OngoingExecutionException) {
Expand All @@ -842,6 +847,32 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
}
}

/**
* Get the number of tasks unrelated to broker removal.
* @param removedBrokers removed brokers
* @param proposals proposals to execute
* @return an array of two integers, the first one is the number of tasks unrelated to broker removal,
* the second one is the total number of tasks.
*/
private int[] getNumTasksUnrelatedToBrokerRemoval(Set<Integer> removedBrokers, Collection<ExecutionProposal> proposals) {
int[] numTasks = new int[2];
int unrelatedCount = 0;
int totalCount = 0;
for (ExecutionProposal proposal: proposals) {
Set<Integer> oldBrokers = proposal.oldReplicasBrokerIdSet();
Set<Integer> newBrokers = proposal.newReplicasBrokerIdSet();
if (!oldBrokers.equals(newBrokers)) {
totalCount++;
if (oldBrokers.stream().noneMatch(removedBrokers::contains) && newBrokers.stream().noneMatch(removedBrokers::contains)) {
unrelatedCount++;
}
}
}
numTasks[0] = unrelatedCount;
numTasks[1] = totalCount;
return numTasks;
}

private void sanityCheckExecuteProposals(LoadMonitor loadMonitor, String uuid) throws OngoingExecutionException {
if (_hasOngoingExecution) {
throw new OngoingExecutionException("Cannot execute new proposals while there is an ongoing execution.");
Expand Down Expand Up @@ -1362,8 +1393,8 @@ private class ProposalExecutionRunnable implements Runnable {
public void run() {
LOG.info("User task {}: Starting executing balancing proposals.", _uuid);
final long start = System.currentTimeMillis();
UserTaskManager.UserTaskInfo userTaskInfo = initExecution();
try {
UserTaskManager.UserTaskInfo userTaskInfo = initExecution();
execute(userTaskInfo);
} catch (Exception e) {
LOG.error("User task {}: ProposalExecutionRunnable got exception during run", _uuid, e);
Expand All @@ -1381,7 +1412,12 @@ public void run() {
if (_executionException != null) {
LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage());
} else {
LOG.info("User task {}: Execution succeeded: {}. ", _uuid, executionStatusString);
String status = "succeeded";
if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) {
// The task may be in state of COMPLETED_WITH_ERROR if the user requested to stop the execution.
status = userTaskInfo.state().toString();
allenxwang marked this conversation as resolved.
Show resolved Hide resolved
}
LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString);
}
// Clear completed execution.
clearCompletedExecution();
Expand Down Expand Up @@ -1611,6 +1647,12 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements);
Map<Integer, Integer> map = _executionTaskManager.getSortedBrokerIdToInterBrokerMoveTaskCountMap();
LOG.info("User task {}: Broker Id to Execution Task Count Map: {}", _uuid, map);
if (!map.isEmpty()) {
LOG.info("User task {}: Degree of task count skew towards the largest single broker", _uuid,
map.entrySet().iterator().next().getValue() / (float) numTotalPartitionMovements);
}

int partitionsToMove = numTotalPartitionMovements;
// Exhaust all the pending partition movements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl,

@Override
protected OptimizationResult getResult() throws Exception {
return new OptimizationResult(computeResult(), _kafkaCruiseControl.config());
try {
return new OptimizationResult(computeResult(), _kafkaCruiseControl.config());
} catch (Exception e) {
LOG.error("User task {}: failed to remove brokers due to {}", _uuid, e);
throw e;
}
}

@Override
Expand Down
Loading
Loading