Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust scheduler assignment queue for node #15168

Merged
merged 2 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ public long getTotalSplitsWeight(InternalNode node)

Dith3r marked this conversation as resolved.
Show resolved Hide resolved
Dith3r marked this conversation as resolved.
Show resolved Hide resolved
Dith3r marked this conversation as resolved.
Show resolved Hide resolved
public long getQueuedSplitsWeightForStage(InternalNode node)
{
PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier());
return getQueuedSplitsWeightForStage(node.getNodeIdentifier());
}

public long getQueuedSplitsWeightForStage(String nodeId)
{
PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(nodeId);
return stageInfo == null ? 0 : stageInfo.getQueuedSplitsWeight();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public static SplitPlacementResult selectDistributionNodes(
NodeMap nodeMap,
NodeTaskMap nodeTaskMap,
long maxSplitsWeightPerNode,
long maxPendingSplitsWeightPerTask,
Dith3r marked this conversation as resolved.
Show resolved Hide resolved
long minPendingSplitsWeightPerTask,
int maxUnacknowledgedSplitsPerTask,
Set<Split> splits,
List<RemoteTask> existingTasks,
Expand All @@ -173,7 +173,7 @@ public static SplitPlacementResult selectDistributionNodes(
SplitWeight splitWeight = split.getSplitWeight();

// if node is full, don't schedule now, which will push back on the scheduling of splits
if (canAssignSplitToDistributionNode(assignmentStats, node, maxSplitsWeightPerNode, maxPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, splitWeight)) {
if (canAssignSplitToDistributionNode(assignmentStats, node, maxSplitsWeightPerNode, minPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, splitWeight)) {
assignments.put(node, split);
assignmentStats.addAssignedSplit(node, splitWeight);
}
Expand All @@ -182,15 +182,15 @@ public static SplitPlacementResult selectDistributionNodes(
}
}

ListenableFuture<Void> blocked = toWhenHasSplitQueueSpaceFuture(blockedNodes, existingTasks, calculateLowWatermark(maxPendingSplitsWeightPerTask));
ListenableFuture<Void> blocked = toWhenHasSplitQueueSpaceFuture(blockedNodes, existingTasks, calculateLowWatermark(minPendingSplitsWeightPerTask));
return new SplitPlacementResult(blocked, ImmutableMultimap.copyOf(assignments));
}

private static boolean canAssignSplitToDistributionNode(NodeAssignmentStats assignmentStats, InternalNode node, long maxSplitsWeightPerNode, long maxPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, SplitWeight splitWeight)
private static boolean canAssignSplitToDistributionNode(NodeAssignmentStats assignmentStats, InternalNode node, long maxSplitsWeightPerNode, long minPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, SplitWeight splitWeight)
{
return assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask &&
(canAssignSplitBasedOnWeight(assignmentStats.getTotalSplitsWeight(node), maxSplitsWeightPerNode, splitWeight) ||
canAssignSplitBasedOnWeight(assignmentStats.getQueuedSplitsWeightForStage(node), maxPendingSplitsWeightPerTask, splitWeight));
canAssignSplitBasedOnWeight(assignmentStats.getQueuedSplitsWeightForStage(node), minPendingSplitsWeightPerTask, splitWeight));
}

public static boolean canAssignSplitBasedOnWeight(long currentWeight, long weightLimit, SplitWeight splitWeight)
Expand All @@ -200,9 +200,9 @@ public static boolean canAssignSplitBasedOnWeight(long currentWeight, long weigh
return addExact(currentWeight, splitWeight.getRawValue()) <= weightLimit || (currentWeight == 0 && weightLimit > 0);
}

public static long calculateLowWatermark(long maxPendingSplitsWeightPerTask)
public static long calculateLowWatermark(long minPendingSplitsWeightPerTask)
{
return (long) Math.ceil(maxPendingSplitsWeightPerTask * 0.5);
return (long) Math.ceil(minPendingSplitsWeightPerTask * 0.5);
}

public static ListenableFuture<Void> toWhenHasSplitQueueSpaceFuture(Set<InternalNode> blockedNodes, List<RemoteTask> existingTasks, long weightSpaceThreshold)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public enum SplitsBalancingPolicy
private int minCandidates = 10;
private boolean includeCoordinator = true;
private int maxSplitsPerNode = 100;
private int maxPendingSplitsPerTask = 10;
private int minPendingSplitsPerTask = 10;
private int maxAdjustedPendingSplitsWeightPerTask = 500;
private NodeSchedulerPolicy nodeSchedulerPolicy = NodeSchedulerPolicy.UNIFORM;
private boolean optimizedLocalScheduling = true;
private SplitsBalancingPolicy splitsBalancingPolicy = SplitsBalancingPolicy.STAGE;
Expand Down Expand Up @@ -108,17 +109,30 @@ public NodeSchedulerConfig setIncludeCoordinator(boolean includeCoordinator)
return this;
}

@Config("node-scheduler.max-pending-splits-per-task")
@LegacyConfig({"node-scheduler.max-pending-splits-per-node-per-task", "node-scheduler.max-pending-splits-per-node-per-stage"})
public NodeSchedulerConfig setMaxPendingSplitsPerTask(int maxPendingSplitsPerTask)
@Config("node-scheduler.min-pending-splits-per-task")
Dith3r marked this conversation as resolved.
Show resolved Hide resolved
@LegacyConfig({"node-scheduler.max-pending-splits-per-task", "node-scheduler.max-pending-splits-per-node-per-task", "node-scheduler.max-pending-splits-per-node-per-stage"})
public NodeSchedulerConfig setMinPendingSplitsPerTask(int minPendingSplitsPerTask)
{
this.maxPendingSplitsPerTask = maxPendingSplitsPerTask;
this.minPendingSplitsPerTask = minPendingSplitsPerTask;
return this;
}

public int getMaxPendingSplitsPerTask()
public int getMinPendingSplitsPerTask()
{
return maxPendingSplitsPerTask;
return minPendingSplitsPerTask;
}

@Config("node-scheduler.max-adjusted-pending-splits-per-task")
public NodeSchedulerConfig setMaxAdjustedPendingSplitsWeightPerTask(int maxAdjustedPendingSplitsWeightPerTask)
Dith3r marked this conversation as resolved.
Show resolved Hide resolved
{
this.maxAdjustedPendingSplitsWeightPerTask = maxAdjustedPendingSplitsWeightPerTask;
return this;
}

@Min(0)
public int getMaxAdjustedPendingSplitsWeightPerTask()
{
return maxAdjustedPendingSplitsWeightPerTask;
}

public int getMaxSplitsPerNode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ else if (!splitWaitingForAnyNode) {
continue;
}
Set<InternalNode> nodes = nodeMap.getWorkersByNetworkPath().get(location);
chosenNode = bestNodeSplitCount(splitWeight, new ResettableRandomizedIterator<>(nodes), minCandidates, calculateMaxPendingSplitsWeightPerTask(i, depth), assignmentStats);
chosenNode = bestNodeSplitCount(splitWeight, new ResettableRandomizedIterator<>(nodes), minCandidates, calculateMinPendingSplitsWeightPerTask(i, depth), assignmentStats);
if (chosenNode != null) {
chosenDepth = i;
break;
Expand All @@ -196,12 +196,12 @@ else if (!splitWaitingForAnyNode) {
}

ListenableFuture<Void> blocked;
long maxPendingForWildcardNetworkAffinity = calculateMaxPendingSplitsWeightPerTask(0, topologicalSplitCounters.size() - 1);
long minPendingForWildcardNetworkAffinity = calculateMinPendingSplitsWeightPerTask(0, topologicalSplitCounters.size() - 1);
if (splitWaitingForAnyNode) {
blocked = toWhenHasSplitQueueSpaceFuture(existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
blocked = toWhenHasSplitQueueSpaceFuture(existingTasks, calculateLowWatermark(minPendingForWildcardNetworkAffinity));
}
else {
blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(minPendingForWildcardNetworkAffinity));
}
return new SplitPlacementResult(blocked, assignment);
}
Expand All @@ -211,7 +211,7 @@ else if (!splitWaitingForAnyNode) {
* splitAffinity. A split with zero affinity can only fill half the queue, whereas one that matches
* exactly can fill the entire queue.
*/
private long calculateMaxPendingSplitsWeightPerTask(int splitAffinity, int totalDepth)
private long calculateMinPendingSplitsWeightPerTask(int splitAffinity, int totalDepth)
{
if (totalDepth == 0) {
return maxPendingSplitsWeightPerTask;
Expand All @@ -229,7 +229,7 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
}

@Nullable
private InternalNode bestNodeSplitCount(SplitWeight splitWeight, Iterator<InternalNode> candidates, int minCandidatesWhenFull, long maxPendingSplitsWeightPerTask, NodeAssignmentStats assignmentStats)
private InternalNode bestNodeSplitCount(SplitWeight splitWeight, Iterator<InternalNode> candidates, int minCandidatesWhenFull, long minPendingSplitsWeightPerTask, NodeAssignmentStats assignmentStats)
{
InternalNode bestQueueNotFull = null;
long minWeight = Long.MAX_VALUE;
Expand All @@ -246,7 +246,7 @@ private InternalNode bestNodeSplitCount(SplitWeight splitWeight, Iterator<Intern
}
fullCandidatesConsidered++;
long taskQueuedWeight = assignmentStats.getQueuedSplitsWeightForStage(node);
if (taskQueuedWeight < minWeight && canAssignSplitBasedOnWeight(taskQueuedWeight, maxPendingSplitsWeightPerTask, splitWeight)) {
if (taskQueuedWeight < minWeight && canAssignSplitBasedOnWeight(taskQueuedWeight, minPendingSplitsWeightPerTask, splitWeight)) {
minWeight = taskQueuedWeight;
bestQueueNotFull = node;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class TopologyAwareNodeSelectorFactory
private final int minCandidates;
private final boolean includeCoordinator;
private final long maxSplitsWeightPerNode;
private final long maxPendingSplitsWeightPerTask;
private final long minPendingSplitsWeightPerTask;
private final NodeTaskMap nodeTaskMap;

private final List<CounterStat> placementCounters;
Expand All @@ -87,10 +87,10 @@ public TopologyAwareNodeSelectorFactory(
this.includeCoordinator = schedulerConfig.isIncludeCoordinator();
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
int maxSplitsPerNode = schedulerConfig.getMaxSplitsPerNode();
int maxPendingSplitsPerTask = schedulerConfig.getMaxPendingSplitsPerTask();
checkArgument(maxSplitsPerNode >= maxPendingSplitsPerTask, "maxSplitsPerNode must be > maxPendingSplitsPerTask");
int minPendingSplitsPerTask = schedulerConfig.getMinPendingSplitsPerTask();
checkArgument(maxSplitsPerNode >= minPendingSplitsPerTask, "maxSplitsPerNode must be > minPendingSplitsPerTask");
this.maxSplitsWeightPerNode = SplitWeight.rawValueForStandardSplitCount(maxSplitsPerNode);
this.maxPendingSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount(maxPendingSplitsPerTask);
this.minPendingSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount(minPendingSplitsPerTask);

Builder<CounterStat> placementCounters = ImmutableList.builder();
ImmutableMap.Builder<String, CounterStat> placementCountersByName = ImmutableMap.builder();
Expand Down Expand Up @@ -133,7 +133,7 @@ public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle>
nodeMap,
minCandidates,
maxSplitsWeightPerNode,
maxPendingSplitsWeightPerTask,
minPendingSplitsWeightPerTask,
getMaxUnacknowledgedSplitsPerTask(session),
placementCounters,
networkTopology);
Expand Down
Loading