Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better preferential scheduling #2050

Merged
merged 12 commits into from
Jan 28, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public enum SlaveMatchState {
OK(true),
PREFERRED_SLAVE(true),
NOT_RACK_OR_SLAVE_PARTICULAR(true),
RESOURCES_DO_NOT_MATCH(false),
RACK_SATURATED(false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,14 @@ public class SingularityConfiguration extends Configuration {

private boolean proxyRunNowToLeader = true;

private double preferredSlaveScaleFactor = 1.5;

// high cpu slave, based on cpu to memory ratio
private double highCpuSlaveCutOff = 1.5; //TODO

// high memory slave, based on cpu to memory ratio
private double highMemorySlaveCutOff = 0.5; //TODO

@JsonProperty("crashLoop")
private CrashLoopConfiguration crashLoopConfiguration = new CrashLoopConfiguration();

Expand Down Expand Up @@ -1693,6 +1701,30 @@ public void setSqlFallBackToBytesFields(boolean sqlFallBackToBytesFields) {
this.sqlFallBackToBytesFields = sqlFallBackToBytesFields;
}

public double getPreferredSlaveScaleFactor() {
return preferredSlaveScaleFactor;
}

public void setPreferredSlaveScaleFactor(double preferredSlaveScaleFactor) {
this.preferredSlaveScaleFactor = preferredSlaveScaleFactor;
}

public double getHighCpuSlaveCutOff() {
return highCpuSlaveCutOff;
}

public void setHighCpuSlaveCutOff(double highCpuSlaveCutOff) {
this.highCpuSlaveCutOff = highCpuSlaveCutOff;
}

public double getHighMemorySlaveCutOff() {
return highMemorySlaveCutOff;
}

public void setHighMemorySlaveCutOff(double highMemorySlaveCutOff) {
this.highMemorySlaveCutOff = highMemorySlaveCutOff;
}

public CrashLoopConfiguration getCrashLoopConfiguration() {
return crashLoopConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -473,7 +472,7 @@ private MaxProbableUsage getMaxProbableUsageForSlave(List<SingularityTaskId> act
if (taskId.getSanitizedHost().equals(sanitizedHostname)) {
if (requestUtilizations.containsKey(taskId.getRequestId())) {
RequestUtilization utilization = requestUtilizations.get(taskId.getRequestId());
cpu += getEstimatedCpuUsageForRequest(utilization);
cpu += slaveAndRackHelper.getEstimatedCpuUsageForRequest(utilization);
memBytes += utilization.getMaxMemBytesUsed();
diskBytes += utilization.getMaxDiskBytesUsed();
} else {
Expand Down Expand Up @@ -539,11 +538,6 @@ private List<SingularityTaskRequestHolder> getSortedDueTaskRequests() {
.collect(Collectors.toList());
}

private double getEstimatedCpuUsageForRequest(RequestUtilization requestUtilization) {
// To account for cpu bursts, tend towards max usage if the app is consistently over-utilizing cpu, tend towards avg if it is over-utilized in short bursts
return (requestUtilization.getMaxCpuUsed() - requestUtilization.getAvgCpuUsed()) * requestUtilization.getCpuBurstRating() + requestUtilization.getAvgCpuUsed();
}

private double score(SingularityOfferHolder offerHolder, SingularityTaskRequestHolder taskRequestHolder,
Optional<SingularitySlaveUsageWithCalculatedScores> maybeSlaveUsage, List<SingularityTaskId> activeTaskIdsForRequest,
RequestUtilization requestUtilization, Map<SingularityDeployKey, Optional<SingularityDeployStatistics>> deployStatsCache) {
Expand All @@ -553,7 +547,7 @@ private double score(SingularityOfferHolder offerHolder, SingularityTaskRequestH

double estimatedCpusToAdd = taskRequestHolder.getTotalResources().getCpus();
if (requestUtilization != null) {
estimatedCpusToAdd = getEstimatedCpuUsageForRequest(requestUtilization);
estimatedCpusToAdd = slaveAndRackHelper.getEstimatedCpuUsageForRequest(requestUtilization);
}
if (mesosConfiguration.isOmitOverloadedHosts() && maybeSlaveUsage.isPresent() && maybeSlaveUsage.get().isCpuOverloaded(estimatedCpusToAdd)) {
LOG.debug("Slave {} is overloaded (load5 {}/{}, load1 {}/{}, estimated cpus to add: {}, already committed cpus: {}), ignoring offer",
Expand All @@ -576,10 +570,10 @@ private double score(SingularityOfferHolder offerHolder, SingularityTaskRequestH
if (!matchesResources) {
return 0;
}
final SlaveMatchState slaveMatchState = slaveAndRackManager.doesOfferMatch(offerHolder, taskRequest, activeTaskIdsForRequest, isPreemptibleTask(taskRequest, deployStatsCache));
final SlaveMatchState slaveMatchState = slaveAndRackManager.doesOfferMatch(offerHolder, taskRequest, activeTaskIdsForRequest, isPreemptibleTask(taskRequest, deployStatsCache), requestUtilization);

if (slaveMatchState.isMatchAllowed()) {
return score(offerHolder.getHostname(), maybeSlaveUsage);
return score(offerHolder.getHostname(), maybeSlaveUsage, slaveMatchState);
} else if (LOG.isTraceEnabled()) {
LOG.trace("Ignoring offer on host {} with roles {} on {} for task {}; matched resources: {}, slave match state: {}", offerHolder.getHostname(),
offerHolder.getRoles(), offerHolder.getHostname(), pendingTaskId, matchesResources, slaveMatchState);
Expand All @@ -605,7 +599,7 @@ private boolean isPreemptibleTask(SingularityTaskRequest taskRequest, Map<Singul
}

@VisibleForTesting
double score(String hostname, Optional<SingularitySlaveUsageWithCalculatedScores> maybeSlaveUsage) {
double score(String hostname, Optional<SingularitySlaveUsageWithCalculatedScores> maybeSlaveUsage, SlaveMatchState slaveMatchState) {
if (!maybeSlaveUsage.isPresent() || maybeSlaveUsage.get().isMissingUsageData()) {
if (mesosConfiguration.isOmitForMissingUsageData()) {
LOG.info("Skipping slave {} with missing usage data ({})", hostname, maybeSlaveUsage);
Expand All @@ -618,11 +612,18 @@ private boolean isPreemptibleTask(SingularityTaskRequest taskRequest, Map<Singul

SingularitySlaveUsageWithCalculatedScores slaveUsageWithScores = maybeSlaveUsage.get();

return calculateScore(
double calculatedScore = calculateScore(
1 - slaveUsageWithScores.getMemAllocatedScore(), slaveUsageWithScores.getMemInUseScore(),
1 - slaveUsageWithScores.getCpusAllocatedScore(), slaveUsageWithScores.getCpusInUseScore(),
1 - slaveUsageWithScores.getDiskAllocatedScore(), slaveUsageWithScores.getDiskInUseScore(),
mesosConfiguration.getInUseResourceWeight(), mesosConfiguration.getAllocatedResourceWeight());

if (slaveMatchState == SlaveMatchState.PREFERRED_SLAVE) {
LOG.info("Slave {} is preferred, will scale score by {}", hostname, configuration.getPreferredSlaveScaleFactor());
calculatedScore *= configuration.getPreferredSlaveScaleFactor();
}

return calculatedScore;
}

private double calculateScore(double memAllocatedScore, double memInUseScore, double cpusAllocatedScore, double cpusInUseScore, double diskAllocatedScore, double diskInUseScore, double inUseResourceWeight, double allocatedResourceWeight) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.apache.mesos.v1.Protos.Offer;

import com.google.inject.Inject;
import com.hubspot.singularity.RequestUtilization;
import com.hubspot.singularity.config.MesosConfiguration;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.helpers.MesosUtils;

@Singleton
public class SingularitySlaveAndRackHelper {
Expand Down Expand Up @@ -116,4 +118,57 @@ public boolean containsAllAttributes(Map<String, String> attributes, Map<String,
return true;
}

public boolean containsAtLeastOneMatchingAttribute(Map<String, String> attributes, Map<String, String> otherAttributes) {
for (Map.Entry<String, String> entry : otherAttributes.entrySet()) {
if ((attributes.containsKey(entry.getKey()) && attributes.get(entry.getKey()).equals(entry.getValue()))) {
return true;
}
}
return false;
}

public enum CpuMemoryPreference {
AVERAGE,
HIGH_MEMORY,
HIGH_CPU
}

public CpuMemoryPreference getCpuMemoryPreferenceForRequest(RequestUtilization requestUtilization) {
double cpuMemoryRatioForRequest = getCpuMemoryRatioForRequest(requestUtilization);

if (cpuMemoryRatioForRequest > configuration.getHighCpuSlaveCutOff()) {
return CpuMemoryPreference.HIGH_CPU;
} else if (cpuMemoryRatioForRequest < configuration.getHighMemorySlaveCutOff()) {
return CpuMemoryPreference.HIGH_MEMORY;
}
return CpuMemoryPreference.AVERAGE;
}

public CpuMemoryPreference getCpuMemoryPreferenceForSlave(SingularityOfferHolder offerHolder) {
double cpuMemoryRatioForSlave = getCpuMemoryRatioForSlave(offerHolder);
if (cpuMemoryRatioForSlave > configuration.getHighCpuSlaveCutOff()) {
return CpuMemoryPreference.HIGH_CPU;
} else if (cpuMemoryRatioForSlave < configuration.getHighMemorySlaveCutOff()) {
return CpuMemoryPreference.HIGH_MEMORY;
}
return CpuMemoryPreference.AVERAGE;
}

private double getCpuMemoryRatioForSlave(SingularityOfferHolder offerHolder) {
double memory = MesosUtils.getMemory(offerHolder.getCurrentResources(), Optional.empty());
double cpus = MesosUtils.getNumCpus(offerHolder.getCurrentResources(), Optional.empty());
return memory/cpus;
}

private double getCpuMemoryRatioForRequest(RequestUtilization requestUtilization) {
double cpuUsageForRequest = getEstimatedCpuUsageForRequest(requestUtilization);
double memUsageForRequest = requestUtilization.getAvgMemBytesUsed();
return cpuUsageForRequest/memUsageForRequest;
}

public double getEstimatedCpuUsageForRequest(RequestUtilization requestUtilization) {
// To account for cpu bursts, tend towards max usage if the app is consistently over-utilizing cpu, tend towards avg if it is over-utilized in short bursts
sjeropkipruto marked this conversation as resolved.
Show resolved Hide resolved
return (requestUtilization.getMaxCpuUsed() - requestUtilization.getAvgCpuUsed()) * requestUtilization.getCpuBurstRating() + requestUtilization.getAvgCpuUsed();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.hubspot.mesos.json.MesosMasterSlaveObject;
import com.hubspot.mesos.json.MesosMasterStateObject;
import com.hubspot.singularity.MachineState;
import com.hubspot.singularity.RequestUtilization;
import com.hubspot.singularity.SingularityMachineAbstraction;
import com.hubspot.singularity.SingularityPendingRequest.PendingType;
import com.hubspot.singularity.SingularityPendingTask;
Expand All @@ -46,6 +47,7 @@
import com.hubspot.singularity.data.RackManager;
import com.hubspot.singularity.data.SlaveManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.mesos.SingularitySlaveAndRackHelper.CpuMemoryPreference;
import com.hubspot.singularity.scheduler.SingularityLeaderCache;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;

Expand Down Expand Up @@ -83,7 +85,7 @@ public class SingularitySlaveAndRackManager {
this.leaderCache = leaderCache;
}

SlaveMatchState doesOfferMatch(SingularityOfferHolder offerHolder, SingularityTaskRequest taskRequest, List<SingularityTaskId> activeTaskIdsForRequest, boolean isPreemptibleTask) {
SlaveMatchState doesOfferMatch(SingularityOfferHolder offerHolder, SingularityTaskRequest taskRequest, List<SingularityTaskId> activeTaskIdsForRequest, boolean isPreemptibleTask, RequestUtilization requestUtilization) {
final String host = offerHolder.getHostname();
final String rackId = offerHolder.getRackId();
final String slaveId = offerHolder.getSlaveId();
Expand Down Expand Up @@ -247,9 +249,36 @@ SlaveMatchState doesOfferMatch(SingularityOfferHolder offerHolder, SingularityTa
case GREEDY:
}

if (isSlavePreferred(offerHolder, taskRequest, requestUtilization)) {
LOG.info("Slave {} is preferred", offerHolder.getHostname());
return SlaveMatchState.PREFERRED_SLAVE;
}

return SlaveMatchState.OK;
}

private boolean isSlavePreferred(SingularityOfferHolder offerHolder, SingularityTaskRequest taskRequest, RequestUtilization requestUtilization) {
return isSlavePreferredByAllowedAttributes(offerHolder, taskRequest) || isSlavePreferredByCpuMemory(offerHolder, requestUtilization);
}

private boolean isSlavePreferredByAllowedAttributes(SingularityOfferHolder offerHolder, SingularityTaskRequest taskRequest) {
Map<String, String> allowedAttributes = getAllowedAttributes(taskRequest);
Map<String, String> hostAttributes = offerHolder.getTextAttributes();
boolean containsAtLeastOneMatchingAttribute = slaveAndRackHelper.containsAtLeastOneMatchingAttribute(hostAttributes, allowedAttributes);
LOG.info("is slave {} by allowed attributes? {}", offerHolder.getHostname(), containsAtLeastOneMatchingAttribute);
return containsAtLeastOneMatchingAttribute;
}

public boolean isSlavePreferredByCpuMemory(SingularityOfferHolder offerHolder, RequestUtilization requestUtilization) {
if (requestUtilization != null) {
CpuMemoryPreference cpuMemoryPreferenceForSlave = slaveAndRackHelper.getCpuMemoryPreferenceForSlave(offerHolder);
CpuMemoryPreference cpuMemoryPreferenceForRequest = slaveAndRackHelper.getCpuMemoryPreferenceForRequest(requestUtilization);
LOG.info("CpuMemoryPreference for slave {} is {}, CpuMemoryPreference for request {} is {}", offerHolder.getHostname(), cpuMemoryPreferenceForSlave.toString(), requestUtilization.getRequestId(), cpuMemoryPreferenceForRequest.toString());
return cpuMemoryPreferenceForSlave == cpuMemoryPreferenceForRequest;
}
return false;
}

private boolean isSlaveAttributesMatch(SingularityOfferHolder offer, SingularityTaskRequest taskRequest, boolean isPreemptibleTask) {
final Map<String, String> requiredAttributes = getRequiredAttributes(taskRequest);
final Map<String, String> allowedAttributes = getAllowedAttributes(taskRequest);
Expand Down Expand Up @@ -288,7 +317,6 @@ private boolean isSlaveAttributesMatch(SingularityOfferHolder offer, Singularity
}

return true;

}

private Map<String, String> getRequiredAttributes(SingularityTaskRequest taskRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.SingularityTaskRequest;
import com.hubspot.singularity.SingularityUser;
import com.hubspot.singularity.SlaveMatchState;
import com.hubspot.singularity.api.SingularityScaleRequest;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
Expand Down Expand Up @@ -100,47 +101,56 @@ public void itCorrectlyUsesDefaults() {
setRequestType(RequestType.SERVICE);

// LR - no usage tracked -> default score
assertValueIs(0.50, scheduler.score(SLAVE_ID, Optional.empty()));
assertValueIs(0.50, scheduler.score(SLAVE_ID, Optional.empty(), SlaveMatchState.OK));

// NLR - no deployStatistics -> default weights
setRequestType(RequestType.ON_DEMAND);
assertValueIs(0.5, scheduler.score(SLAVE_ID, Optional.of(getUsage(5, 10, 5, 5, 10, 5, 5, 10, 5))));
assertValueIs(0.5, scheduler.score(SLAVE_ID, Optional.of(getUsage(5, 10, 5, 5, 10, 5, 5, 10, 5)), SlaveMatchState.OK));
}

@Test
public void itCorrectlyScoresLongRunningTasks() {
setRequestType(RequestType.SERVICE);

// new slave (no resources used) -> perfect score
assertValueIs(1, scheduler.score(SLAVE_ID, Optional.of(getUsage(0,10, 0, 0,10, 0, 0, 10, 0))));
assertValueIs(1, scheduler.score(SLAVE_ID, Optional.of(getUsage(0,10, 0, 0,10, 0, 0, 10, 0)), SlaveMatchState.OK));

// cpu used, no mem used, no disk used
assertValueIs(0.85, scheduler.score(SLAVE_ID, Optional.of(getUsage(0, 10, 0, 5, 10, 5, 0, 10, 0))));
assertValueIs(0.76, scheduler.score(SLAVE_ID, Optional.of(getUsage(0, 10, 0, 8, 10, 8, 0, 10, 0))));
assertValueIs(0.85, scheduler.score(SLAVE_ID, Optional.of(getUsage(0, 10, 0, 5, 10, 5, 0, 10, 0)), SlaveMatchState.OK));
assertValueIs(0.76, scheduler.score(SLAVE_ID, Optional.of(getUsage(0, 10, 0, 8, 10, 8, 0, 10, 0)), SlaveMatchState.OK));

// no cpu used, mem used, no disk used
assertValueIs(0.75, scheduler.score(SLAVE_ID, Optional.of(getUsage(5, 10, 5, 0, 10, 0, 0, 10, 0))));
assertValueIs(0.60, scheduler.score(SLAVE_ID, Optional.of(getUsage(8, 10, 8, 0, 10, 0, 0, 10, 0))));
assertValueIs(0.75, scheduler.score(SLAVE_ID, Optional.of(getUsage(5, 10, 5, 0, 10, 0, 0, 10, 0)), SlaveMatchState.OK));
assertValueIs(0.60, scheduler.score(SLAVE_ID, Optional.of(getUsage(8, 10, 8, 0, 10, 0, 0, 10, 0)), SlaveMatchState.OK));

// no cpu used, no mem used, disk used
assertValueIs(0.90, scheduler.score(SLAVE_ID, Optional.of(getUsage(0, 10, 0, 0, 10, 0, 5, 10, 5))));
assertValueIs(0.84, scheduler.score(SLAVE_ID, Optional.of(getUsage(0, 10, 0, 0, 10, 0, 8, 10, 8))));
assertValueIs(0.90, scheduler.score(SLAVE_ID, Optional.of(getUsage(0, 10, 0, 0, 10, 0, 5, 10, 5)), SlaveMatchState.OK));
assertValueIs(0.84, scheduler.score(SLAVE_ID, Optional.of(getUsage(0, 10, 0, 0, 10, 0, 8, 10, 8)), SlaveMatchState.OK));

// cpu used, mem used, no disk used
assertValueIs(0.60, scheduler.score(SLAVE_ID, Optional.of(getUsage(5, 10, 5, 5, 10, 5, 0, 10, 0))));
assertValueIs(0.36, scheduler.score(SLAVE_ID, Optional.of(getUsage(8, 10, 8, 8, 10, 8, 0, 10, 0))));
assertValueIs(0.60, scheduler.score(SLAVE_ID, Optional.of(getUsage(5, 10, 5, 5, 10, 5, 0, 10, 0)), SlaveMatchState.OK));
assertValueIs(0.36, scheduler.score(SLAVE_ID, Optional.of(getUsage(8, 10, 8, 8, 10, 8, 0, 10, 0)), SlaveMatchState.OK));

// no cpu used, mem used, disk used
assertValueIs(0.65, scheduler.score(SLAVE_ID, Optional.of(getUsage(5,10, 5, 0, 10,0, 5, 10, 5))));
assertValueIs(0.44, scheduler.score(SLAVE_ID, Optional.of(getUsage(8,10, 8, 0,10, 0, 8, 10, 8))));
assertValueIs(0.65, scheduler.score(SLAVE_ID, Optional.of(getUsage(5,10, 5, 0, 10,0, 5, 10, 5)), SlaveMatchState.OK));
assertValueIs(0.44, scheduler.score(SLAVE_ID, Optional.of(getUsage(8,10, 8, 0,10, 0, 8, 10, 8)), SlaveMatchState.OK));

// cpu used, no mem used, disk used
assertValueIs(0.75, scheduler.score(SLAVE_ID, Optional.of(getUsage(0,10, 0, 5,10, 5, 5, 10, 5))));
assertValueIs(0.60, scheduler.score(SLAVE_ID, Optional.of(getUsage(0,10, 0, 8,10, 8, 8, 10, 8))));
assertValueIs(0.75, scheduler.score(SLAVE_ID, Optional.of(getUsage(0,10, 0, 5,10, 5, 5, 10, 5)), SlaveMatchState.OK));
assertValueIs(0.60, scheduler.score(SLAVE_ID, Optional.of(getUsage(0,10, 0, 8,10, 8, 8, 10, 8)), SlaveMatchState.OK));

// cpu used, mem used, disk used
assertValueIs(0.5, scheduler.score(SLAVE_ID, Optional.of(getUsage(5,10, 5, 5,10, 5, 5, 10, 5))));
assertValueIs(0.2, scheduler.score(SLAVE_ID, Optional.of(getUsage(8,10, 8, 8,10, 8, 8, 10, 8))));
assertValueIs(0.5, scheduler.score(SLAVE_ID, Optional.of(getUsage(5,10, 5, 5,10, 5, 5, 10, 5)), SlaveMatchState.OK));
assertValueIs(0.2, scheduler.score(SLAVE_ID, Optional.of(getUsage(8,10, 8, 8,10, 8, 8, 10, 8)), SlaveMatchState.OK));
}

@Test
public void itCorrectlyScalesScoresForPreferredHosts() {
assertValueIs(0.50, scheduler.score(SLAVE_ID, Optional.of(getUsage(5,10, 5, 5,10, 5, 5, 10, 5)), SlaveMatchState.OK));
assertValueIs(0.75, scheduler.score(SLAVE_ID, Optional.of(getUsage(5,10, 5, 5,10, 5, 5, 10, 5)), SlaveMatchState.PREFERRED_SLAVE));

assertValueIs(0.20, scheduler.score(SLAVE_ID, Optional.of(getUsage(8,10, 8, 8,10, 8, 8, 10, 8)), SlaveMatchState.OK));
assertValueIs(0.30, scheduler.score(SLAVE_ID, Optional.of(getUsage(8,10, 8, 8,10, 8, 8, 10, 8)), SlaveMatchState.PREFERRED_SLAVE));
}

@Test
Expand Down