Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slightly Smarter Task Shuffling #2057

Merged
merged 20 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ protected void configure() {
bind(SingularityPriorityKillPoller.class).in(Scopes.SINGLETON);
bind(SingularityUsageCleanerPoller.class).in(Scopes.SINGLETON);
bind(SingularityUsagePoller.class).in(Scopes.SINGLETON);
bind(SingularityTaskShuffler.class).in(Scopes.SINGLETON);
bind(SingularityMesosTaskPrioritizer.class).in(Scopes.SINGLETON);
bind(SingularityMesosOfferScheduler.class).in(Scopes.SINGLETON);
bind(SingularityLeaderCache.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
package com.hubspot.singularity.scheduler;

import com.google.inject.Inject;
import com.hubspot.singularity.SingularityPendingRequest;
import com.hubspot.singularity.SingularityPendingRequest.PendingType;
import com.hubspot.singularity.SingularitySlaveUsage;
import com.hubspot.singularity.SingularityTaskCleanup;
import com.hubspot.singularity.TaskCleanupType;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.RequestManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.scheduler.SingularityTaskShuffler.OverusedResource.Type;
import io.dropwizard.util.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

public class SingularityTaskShuffler {
private static final Logger LOG = LoggerFactory.getLogger(SingularityTaskShuffler.class);

private final SingularityConfiguration configuration;
private final RequestManager requestManager;
private final TaskManager taskManager;

@Inject
SingularityTaskShuffler(SingularityConfiguration configuration,
RequestManager requestManager,
TaskManager taskManager) {
this.configuration = configuration;
this.requestManager = requestManager;
this.taskManager = taskManager;
}

static class OverusedResource {
enum Type {MEMORY, CPU}

double usage;
double total;
Type resourceType;

OverusedResource(double usage, double total, Type resourceType) {
this.usage = usage;
this.total = total;
this.resourceType = resourceType;
}

public double getOverusageRatio() {
return usage / total;
}

public static int prioritize(OverusedResource r1, OverusedResource r2) {
if (r1.resourceType == r2.resourceType) {
return Double.compare(r2.getOverusageRatio(), r1.getOverusageRatio());
} else if (r1.resourceType == Type.MEMORY) {
return -1;
} else {
return 1;
}
}

public double match(double cpuUsage, double memUsageBytes) {
if (resourceType == Type.CPU) {
return cpuUsage;
} else {
return memUsageBytes;
}
}

public boolean exceeds(double cpuUsage, double memUsageBytes) {
return usage > match(cpuUsage, memUsageBytes);
}

public void updateOverusage(double cpuUsageDelta, double memUsageDelta) {
usage -= match(cpuUsageDelta, memUsageDelta);
}

public TaskCleanupType toTaskCleanupType() {
if (resourceType == Type.CPU) {
return TaskCleanupType.REBALANCE_CPU_USAGE;
} else {
return TaskCleanupType.REBALANCE_MEMORY_USAGE;
}
}
}

static class OverusedSlave {
SingularitySlaveUsage usage;
List<TaskIdWithUsage> tasks;
OverusedResource resource;

OverusedSlave(SingularitySlaveUsage usage, List<TaskIdWithUsage> tasks, OverusedResource resource) {
this.usage = usage;
this.tasks = tasks;
this.resource = resource;
}
}

public void shuffle(Map<SingularitySlaveUsage, List<TaskIdWithUsage>> overloadedHosts) {
List<OverusedSlave> slavesToShuffle = overloadedHosts.entrySet().stream()
.map((entry) -> new OverusedSlave(entry.getKey(), entry.getValue(), getMostOverusedResource(entry.getKey())))
.sorted((s1, s2) -> OverusedResource.prioritize(s1.resource, s2.resource))
.collect(Collectors.toList());

List<SingularityTaskCleanup> shufflingTasks = getShufflingTasks();
Set<String> shufflingRequests = getAssociatedRequestIds(shufflingTasks);
Map<String, Long> shufflingTasksPerHost = getShufflingTaskCountPerHost(shufflingTasks);
long shufflingTasksOnCluster = shufflingTasks.size();

for (OverusedSlave slave : slavesToShuffle) {
if (shufflingTasksOnCluster >= configuration.getMaxTasksToShuffleTotal()) {
LOG.debug("Not shuffling any more tasks (totalShuffleCleanups: {})", shufflingTasksOnCluster);
break;
}

TaskCleanupType shuffleCleanupType = slave.resource.toTaskCleanupType();
List<TaskIdWithUsage> shuffleCandidates = getPrioritizedShuffleCandidates(slave);

long shufflingTasksOnSlave = shufflingTasksPerHost.getOrDefault(getHostId(slave).orElse(""), 0L);
long availableTasksOnSlave = shuffleCandidates.size();
double cpuUsage = getSystemLoadForShuffle(slave.usage);
double memUsageBytes = slave.usage.getMemoryBytesUsed();

for (TaskIdWithUsage task : shuffleCandidates) {
availableTasksOnSlave--;

if (shufflingRequests.contains(task.getTaskId().getRequestId())) {
LOG.debug("Request {} already has a shuffling task, skipping", task.getTaskId().getRequestId());
continue;
}

boolean resourceNotOverused = !isOverutilized(slave, cpuUsage, memUsageBytes);
boolean tooManyShufflingTasks = isShufflingTooManyTasks(shufflingTasksOnSlave, shufflingTasksOnCluster);
double taskCpuUsage = task.getUsage().getCpusUsed();
double taskMemUsage = task.getUsage().getMemoryTotalBytes();

if (resourceNotOverused || tooManyShufflingTasks) {
LOG.debug("Not shuffling any more tasks on slave {} ({} overage : {}%, shuffledOnHost: {}, totalShuffleCleanups: {})",
task.getTaskId().getSanitizedHost(),
slave.resource.resourceType,
slave.resource.getOverusageRatio() * 100,
shufflingTasksOnSlave,
shufflingTasksOnCluster
);
break;
}

long availableShufflesOnSlave = configuration.getMaxTasksToShufflePerHost() - shufflingTasksOnSlave;
if (availableShufflesOnSlave == 1 && availableTasksOnSlave > 0 && slave.resource.exceeds(taskCpuUsage, taskMemUsage)) {
LOG.debug("Skipping shuffling task {} on slave {} to reach threshold ({} overage : {}%, shuffledOnHost: {}, totalShuffleCleanups: {})",
task.getTaskId().getId(),
task.getTaskId().getSanitizedHost(),
slave.resource.resourceType,
slave.resource.getOverusageRatio() * 100,
shufflingTasksOnSlave,
shufflingTasksOnCluster
);
continue;
}

String message = getShuffleMessage(slave, task, cpuUsage, memUsageBytes);
bounce(task, shuffleCleanupType, Optional.of(message));

cpuUsage -= taskCpuUsage;
memUsageBytes -= taskMemUsage;
slave.resource.updateOverusage(taskCpuUsage, taskMemUsage);

shufflingTasksOnSlave++;
shufflingTasksOnCluster++;
shufflingRequests.add(task.getTaskId().getRequestId());
}
}
}

private List<TaskIdWithUsage> getPrioritizedShuffleCandidates(OverusedSlave slave) {
// SingularityUsageHelper ensures that requests flagged as always ineligible for shuffling have been filtered out.
List<TaskIdWithUsage> out = slave.tasks;

if (slave.resource.resourceType == Type.CPU) {
out.sort((u1, u2) -> Double.compare(
getUtilizationScoreForCPUShuffle(u2),
getUtilizationScoreForCPUShuffle(u1)
));
} else {
out.sort((u1, u2) -> Double.compare(
getUtilizationScoreForMemoryShuffle(u1),
getUtilizationScoreForMemoryShuffle(u2)
));
}

return out;
}

private double getUtilizationScoreForCPUShuffle(TaskIdWithUsage task) {
return task.getUsage().getCpusUsed() / task.getRequestedResources().getCpus();
}

private double getUtilizationScoreForMemoryShuffle(TaskIdWithUsage task) {
double memoryUtilization = task.getUsage().getMemoryTotalBytes() / task.getRequestedResources().getMemoryMb();
double cpuUtilization = task.getUsage().getCpusUsed() / task.getRequestedResources().getCpus();

return (memoryUtilization + cpuUtilization) / 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still thinking more on the metric for this. You were correct to point out the fact that, by shuffling more/smaller tasks we may very well hit the shuffling limits more often. I'm not certain if it will be often enough to be of consequence given the size of our cluster, but there are 2 things I could see us doing to compensate for it:

  • Possibly make the shuffle limits a percentage of the overall task count (i.e. so we don't have to worry about it being more restrictive as the cluster grows larger)
  • Possibly make a slightly more complex scoring mechanism. I could see a case where there is maybe a threshold for cpu (something simple like a 50-60% cutoff) in memory shuffle. If tasks are below that, they are given a higher priority for high memory usage (inverse of now), if they are above that they are scored as they are now. This could serve to put a few higher memory, but more likely less busy tasks at the top of the queue, resulting in fewer overall shuffles

}

private boolean isOverutilized(OverusedSlave slave, double cpuUsage, double memUsageBytes) {
if (slave.resource.resourceType == Type.CPU) {
return cpuUsage > slave.usage.getSystemCpusTotal();
} else {
return memUsageBytes > getTargetMemoryUtilizationForHost(slave.usage);
}
}

private boolean isShufflingTooManyTasks(long shuffledOnSlave, long shuffledOnCluster) {
return shuffledOnSlave >= configuration.getMaxTasksToShufflePerHost()
|| shuffledOnCluster >= configuration.getMaxTasksToShuffleTotal();
}

private void bounce(TaskIdWithUsage task, TaskCleanupType cleanupType, Optional<String> message) {
String actionId = UUID.randomUUID().toString();

taskManager.createTaskCleanup(new SingularityTaskCleanup(
Optional.empty(),
cleanupType,
System.currentTimeMillis(),
task.getTaskId(),
message,
Optional.of(actionId),
Optional.empty(), Optional.empty()
));

requestManager.addToPendingQueue(new SingularityPendingRequest(
task.getTaskId().getRequestId(),
task.getTaskId().getDeployId(),
System.currentTimeMillis(),
Optional.empty(),
PendingType.TASK_BOUNCE,
Optional.empty(),
Optional.empty(),
Optional.empty(),
message,
Optional.of(actionId)
));
}

private String getShuffleMessage(OverusedSlave slave, TaskIdWithUsage task, double cpuUsage, double memUsageBytes) {
if (slave.resource.resourceType == Type.CPU) {
return String.format(
"Load on slave is %s / %s, shuffling task using %s / %s to less busy host",
cpuUsage,
slave.usage.getSystemCpusTotal(),
task.getUsage().getCpusUsed(),
task.getRequestedResources().getCpus()
);
} else {
return String.format(
"Mem usage on slave is %sMiB / %sMiB, shuffling task using %sMiB / %sMiB to less busy host",
SizeUnit.BYTES.toMegabytes(((long) memUsageBytes)),
SizeUnit.BYTES.toMegabytes(((long) slave.usage.getSystemMemTotalBytes())),
SizeUnit.BYTES.toMegabytes(task.getUsage().getMemoryTotalBytes()),
((long) task.getRequestedResources().getMemoryMb())
);
}
}

private double getTargetMemoryUtilizationForHost(SingularitySlaveUsage usage) {
return configuration.getShuffleTasksWhenSlaveMemoryUtilizationPercentageExceeds() * usage.getSystemMemTotalBytes();
}

private OverusedResource getMostOverusedResource(SingularitySlaveUsage overloadedSlave) {
double currentCpuLoad = getSystemLoadForShuffle(overloadedSlave);
double currentMemUsageBytes = overloadedSlave.getMemoryBytesUsed();

double targetCpuUsage = overloadedSlave.getSystemCpusTotal();
double cpuOverage = currentCpuLoad - overloadedSlave.getSystemCpusTotal();
double cpuOverusage = cpuOverage / overloadedSlave.getSystemCpusTotal();

double targetMemUsageBytes = getTargetMemoryUtilizationForHost(overloadedSlave);
double memOverageBytes = currentMemUsageBytes - targetMemUsageBytes;
double memOverusage = memOverageBytes / targetMemUsageBytes;

if (cpuOverusage > memOverusage) {
return new OverusedResource(cpuOverage, targetCpuUsage, Type.CPU);
} else {
return new OverusedResource(memOverageBytes, targetMemUsageBytes, Type.MEMORY);
}
}

private List<SingularityTaskCleanup> getShufflingTasks() {
return taskManager.getCleanupTasks()
.stream()
.filter(SingularityTaskShuffler::isShuffleCleanup)
.collect(Collectors.toList());
}

private Map<String, Long> getShufflingTaskCountPerHost(List<SingularityTaskCleanup> shufflingTasks) {
Map<String, Long> out = new HashMap<>();

for (SingularityTaskCleanup c : shufflingTasks) {
String host = c.getTaskId().getSanitizedHost();
out.replace(c.getTaskId().getSanitizedHost(), out.getOrDefault(host, 0L) + 1);
}

return out;
}

private Optional<String> getHostId(OverusedSlave slave) {
if (slave.tasks.size() <= 0) {
return Optional.empty();
}

// probably should add slave metadata to SingularitySlaveUsage
return Optional.of(slave.tasks.get(0).getTaskId().getSanitizedHost());
}

private static boolean isShuffleCleanup(SingularityTaskCleanup cleanup) {
TaskCleanupType type = cleanup.getCleanupType();
return type == TaskCleanupType.REBALANCE_CPU_USAGE || type == TaskCleanupType.REBALANCE_MEMORY_USAGE;
}

private Set<String> getAssociatedRequestIds(List<SingularityTaskCleanup> cleanups) {
return cleanups.stream()
.map((taskCleanup) -> taskCleanup.getTaskId().getRequestId())
.collect(Collectors.toSet());
}

private double getSystemLoadForShuffle(SingularitySlaveUsage usage) {
switch (configuration.getMesosConfiguration().getScoreUsingSystemLoad()) {
case LOAD_1:
return usage.getSystemLoad1Min();
case LOAD_5:
return usage.getSystemLoad5Min();
case LOAD_15:
default:
return usage.getSystemLoad15Min();
}
}
}
Loading