Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework cooldown logic #1961

Merged
merged 6 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ public String getDeployId() {
return deployId;
}

@Schema(description = "Number of tasks that have finished successfully for this deploy")
@Schema(description = "Number of sequential successful tasks (used in cooldown calculations)")
public int getNumSuccess() {
return numSuccess;
}

@Schema(description = "Number of tasks that have finished with a failure for this deploy")
@Schema(description = "Number of sequential failed tasks (used in cooldown calculations)")
public int getNumFailures() {
return numFailures;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,17 @@ public class SingularityConfiguration extends Configuration {

private long considerTaskHealthyAfterRunningForSeconds = 5;

private int cooldownAfterFailures = 3;
private int fastFailureCooldownCount = 3;

private double cooldownAfterPctOfInstancesFail = 1.0;
private long fastFailureCooldownMs = 60000;

private long cooldownExpiresAfterMinutes = 15;
private long fastCooldownExpiresMinutesWithoutFailure = 5;

private int slowFailureCooldownCount = 5;

private long slowFailureCooldownMs = 600000;

private long slowCooldownExpiresMinutesWithoutFailure = 8;

private long cooldownMinScheduleSeconds = 120;

Expand Down Expand Up @@ -468,10 +474,6 @@ public long getConsiderTaskHealthyAfterRunningForSeconds() {
return considerTaskHealthyAfterRunningForSeconds;
}

public int getCooldownAfterFailures() {
return cooldownAfterFailures;
}

public long getDebugCuratorCallOverBytes() {
return debugCuratorCallOverBytes;
}
Expand Down Expand Up @@ -504,16 +506,56 @@ public void setDebugCuratorCallOverMillis(long debugCuratorCallOverMillis) {
this.debugCuratorCallOverMillis = debugCuratorCallOverMillis;
}

public double getCooldownAfterPctOfInstancesFail() {
return cooldownAfterPctOfInstancesFail;
public long getCooldownMinScheduleSeconds() {
return cooldownMinScheduleSeconds;
}

public long getCooldownExpiresAfterMinutes() {
return cooldownExpiresAfterMinutes;
public int getFastFailureCooldownCount() {
return fastFailureCooldownCount;
}

public long getCooldownMinScheduleSeconds() {
return cooldownMinScheduleSeconds;
public void setFastFailureCooldownCount(int fastFailureCooldownCount) {
this.fastFailureCooldownCount = fastFailureCooldownCount;
}

public long getFastFailureCooldownMs() {
return fastFailureCooldownMs;
}

public void setFastFailureCooldownMs(long fastFailureCooldownMs) {
this.fastFailureCooldownMs = fastFailureCooldownMs;
}

public long getFastCooldownExpiresMinutesWithoutFailure() {
return fastCooldownExpiresMinutesWithoutFailure;
}

public void setFastCooldownExpiresMinutesWithoutFailure(long fastCooldownExpiresMinutesWithoutFailure) {
this.fastCooldownExpiresMinutesWithoutFailure = fastCooldownExpiresMinutesWithoutFailure;
}

public int getSlowFailureCooldownCount() {
return slowFailureCooldownCount;
}

public void setSlowFailureCooldownCount(int slowFailureCooldownCount) {
this.slowFailureCooldownCount = slowFailureCooldownCount;
}

public long getSlowFailureCooldownMs() {
return slowFailureCooldownMs;
}

public void setSlowFailureCooldownMs(long slowFailureCooldownMs) {
this.slowFailureCooldownMs = slowFailureCooldownMs;
}

public long getSlowCooldownExpiresMinutesWithoutFailure() {
return slowCooldownExpiresMinutesWithoutFailure;
}

public void setSlowCooldownExpiresMinutesWithoutFailure(long slowCooldownExpiresMinutesWithoutFailure) {
this.slowCooldownExpiresMinutesWithoutFailure = slowCooldownExpiresMinutesWithoutFailure;
}

public int getCacheTasksMaxSize() {
Expand Down Expand Up @@ -980,18 +1022,6 @@ public void setConsiderTaskHealthyAfterRunningForSeconds(long considerTaskHealth
this.considerTaskHealthyAfterRunningForSeconds = considerTaskHealthyAfterRunningForSeconds;
}

public void setCooldownAfterFailures(int cooldownAfterFailures) {
this.cooldownAfterFailures = cooldownAfterFailures;
}

public void setCooldownAfterPctOfInstancesFail(double cooldownAfterPctOfInstancesFail) {
this.cooldownAfterPctOfInstancesFail = cooldownAfterPctOfInstancesFail;
}

public void setCooldownExpiresAfterMinutes(long cooldownExpiresAfterMinutes) {
this.cooldownExpiresAfterMinutes = cooldownExpiresAfterMinutes;
}

public void setCooldownMinScheduleSeconds(long cooldownMinScheduleSeconds) {
this.cooldownMinScheduleSeconds = cooldownMinScheduleSeconds;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.hubspot.singularity.scheduler;

import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.inject.Singleton;

Expand All @@ -9,11 +13,9 @@

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.RequestState;
import com.hubspot.singularity.SingularityDeployStatistics;
import com.hubspot.singularity.SingularityRequest;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.config.SingularityConfiguration;

@Singleton
Expand All @@ -22,79 +24,54 @@ public class SingularityCooldown {
private static final Logger LOG = LoggerFactory.getLogger(SingularityCooldown.class);

private final SingularityConfiguration configuration;
private final long cooldownExpiresAfterMillis;

@Inject
public SingularityCooldown(SingularityConfiguration configuration) {
this.configuration = configuration;
this.cooldownExpiresAfterMillis = TimeUnit.MINUTES.toMillis(configuration.getCooldownExpiresAfterMinutes());
}

public boolean shouldEnterCooldown(SingularityRequest request, SingularityTaskId taskId, RequestState requestState, SingularityDeployStatistics deployStatistics, long failureTimestamp) {
boolean shouldEnterCooldown(SingularityRequest request, RequestState requestState, SingularityDeployStatistics deployStatistics, long failureTimestamp) {
if (requestState != RequestState.ACTIVE || !request.isAlwaysRunning()) {
return false;
}

if (configuration.getCooldownAfterFailures() < 1 || configuration.getCooldownExpiresAfterMinutes() < 1) {
return false;
}

final boolean failedTooManyTimes = hasFailedTooManyTimes(request, deployStatistics, Optional.of(taskId.getInstanceNo()), Optional.of(failureTimestamp));

if (failedTooManyTimes) {
LOG.trace("Request {} has failed at least {} times in {}", request.getId(), configuration.getCooldownAfterFailures(), configuration.getCooldownAfterFailures());
}

return failedTooManyTimes;
return hasFailedTooManyTimes(deployStatistics, Optional.of(failureTimestamp));
}

private boolean hasFailedTooManyTimes(SingularityRequest request, SingularityDeployStatistics deployStatistics, Optional<Integer> instanceNo, Optional<Long> recentFailureTimestamp) {
final long now = System.currentTimeMillis();

int numInstancesThatMustFail = (int) Math.ceil(request.getInstancesSafe() * configuration.getCooldownAfterPctOfInstancesFail());
int numInstancesThatFailed = 0;

for (int i = 1; i < request.getInstancesSafe() + 1; i++) {
int numFailuresInsideCooldown = 0;

for (long failureTimestamp : deployStatistics.getInstanceSequentialFailureTimestamps().get(i)) {
if (hasFailedInsideCooldown(now, failureTimestamp)) {
numFailuresInsideCooldown++;
}
}

if (instanceNo.isPresent() && instanceNo.get() == i && recentFailureTimestamp.isPresent()) {
if (hasFailedInsideCooldown(now, recentFailureTimestamp.get())) {
numFailuresInsideCooldown++;
}
}

if (numFailuresInsideCooldown >= configuration.getCooldownAfterFailures()) {
numInstancesThatFailed++;
}
}
private boolean hasFailedTooManyTimes(SingularityDeployStatistics deployStatistics, Optional<Long> recentFailureTimestamp) {
return hasFastFailureLoop(deployStatistics, recentFailureTimestamp) || hasSlowFailureLoop(deployStatistics, recentFailureTimestamp);
}

return numInstancesThatFailed >= numInstancesThatMustFail;
private boolean hasSlowFailureLoop(SingularityDeployStatistics deployStatistics, Optional<Long> recentFailureTimestamp) {
return hasFailureLoop(deployStatistics, recentFailureTimestamp, configuration.getSlowFailureCooldownMs(), configuration.getSlowFailureCooldownCount(), configuration.getSlowCooldownExpiresMinutesWithoutFailure());
}

public boolean hasCooldownExpired(SingularityRequest request, SingularityDeployStatistics deployStatistics, Optional<Integer> instanceNo, Optional<Long> recentFailureTimestamp) {
if (configuration.getCooldownExpiresAfterMinutes() < 1) {
return true;
}
private boolean hasFastFailureLoop(SingularityDeployStatistics deployStatistics, Optional<Long> recentFailureTimestamp) {
return hasFailureLoop(deployStatistics, recentFailureTimestamp, configuration.getFastFailureCooldownMs(), configuration.getFastFailureCooldownCount(), configuration.getFastCooldownExpiresMinutesWithoutFailure());

final boolean hasCooldownExpired = !hasFailedTooManyTimes(request, deployStatistics, instanceNo, recentFailureTimestamp);
}

if (hasCooldownExpired) {
LOG.trace("Request {} cooldown has expired or is not valid because {} tasks have not failed in the last {}", deployStatistics.getRequestId(), configuration.getCooldownAfterFailures(), JavaUtils.durationFromMillis(cooldownExpiresAfterMillis));
private boolean hasFailureLoop(SingularityDeployStatistics deployStatistics, Optional<Long> recentFailureTimestamp, long cooldownPeriod, int cooldownCount, long expiresAfterMins) {
final long now = System.currentTimeMillis();
long thresholdTime = now - cooldownPeriod;
List<Long> failureTimestamps = deployStatistics.getInstanceSequentialFailureTimestamps().asMap()
.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
if (recentFailureTimestamp.isPresent()) {
failureTimestamps.add(recentFailureTimestamp.get());
}
long failureCount = failureTimestamps.stream()
.filter((t) -> t > thresholdTime)
.count();
java.util.Optional<Long> mostRecentFailure = failureTimestamps.stream().max(Comparator.comparingLong(Long::valueOf));

return hasCooldownExpired;
return failureCount >= cooldownCount
&& (!mostRecentFailure.isPresent() || mostRecentFailure.get() > System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(expiresAfterMins));
}

private boolean hasFailedInsideCooldown(long now, long failureTimestamp) {
final long timeSinceFailure = now - failureTimestamp;

return timeSinceFailure < cooldownExpiresAfterMillis;
boolean hasCooldownExpired(SingularityDeployStatistics deployStatistics, Optional<Long> recentFailureTimestamp) {
return !hasFailedTooManyTimes(deployStatistics, recentFailureTimestamp);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ private boolean shouldExitCooldown(SingularityRequestWithState cooldownRequest)
return true;
}

if (cooldown.hasCooldownExpired(cooldownRequest.getRequest(), maybeDeployStatistics.get(), Optional.<Integer> absent(), Optional.<Long> absent())) {
return true;
}

return false;
return cooldown.hasCooldownExpired(maybeDeployStatistics.get(), Optional.absent());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@
import javax.inject.Singleton;

import com.google.inject.Inject;
import com.hubspot.singularity.config.SingularityConfiguration;

@Singleton
public class SingularityCooldownPoller extends SingularityLeaderOnlyPoller {

private final SingularityCooldownChecker checker;

@Inject
SingularityCooldownPoller(SingularityConfiguration configuration, SingularityCooldownChecker checker) {
super(TimeUnit.MINUTES.toMillis(configuration.getCooldownExpiresAfterMinutes()) / 2, TimeUnit.MILLISECONDS, true);

SingularityCooldownPoller(SingularityCooldownChecker checker) {
super(1, TimeUnit.MINUTES, true);
this.checker = checker;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ boolean checkTask(SingularityTask task, Optional<SingularityRequestWithState> re
private void checkForRepeatedFailures(Optional<SingularityRequestWithState> requestWithState, SingularityTaskId taskId) {
taskManager.markUnhealthyKill(taskId);

if (requestWithState.isPresent() && taskManager.getNumUnhealthyKills(taskId.getRequestId()) > configuration.getCooldownAfterFailures()) {
if (requestWithState.isPresent() && taskManager.getNumUnhealthyKills(taskId.getRequestId()) > configuration.getSlowFailureCooldownCount()) {
mailer.sendReplacementTasksFailingMail(requestWithState.get().getRequest());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ private RequestState checkCooldown(RequestState requestState, SingularityRequest
return requestState;
}

if (cooldown.hasCooldownExpired(request, deployStatistics, Optional.<Integer>absent(), Optional.<Long>absent())) {
requestManager.exitCooldown(request, System.currentTimeMillis(), Optional.<String>absent(), Optional.<String>absent());
if (cooldown.hasCooldownExpired(deployStatistics, Optional.absent())) {
requestManager.exitCooldown(request, System.currentTimeMillis(), Optional.absent(), Optional.absent());
return RequestState.ACTIVE;
}

Expand Down Expand Up @@ -591,7 +591,7 @@ private Optional<PendingType> handleCompletedTaskWithStatistics(Optional<Singula
}

if (!status.hasReason() || !status.getReason().equals(Reason.REASON_INVALID_OFFERS)) {
if (!state.isSuccess() && taskHistoryUpdateCreateResult == SingularityCreateResult.CREATED && cooldown.shouldEnterCooldown(request, taskId, requestState, deployStatistics, timestamp)) {
if (!state.isSuccess() && taskHistoryUpdateCreateResult == SingularityCreateResult.CREATED && cooldown.shouldEnterCooldown(request, requestState, deployStatistics, timestamp)) {
LOG.info("Request {} is entering cooldown due to task {}", request.getId(), taskId);
requestState = RequestState.SYSTEM_COOLDOWN;
requestManager.cooldown(request, System.currentTimeMillis());
Expand Down Expand Up @@ -735,18 +735,14 @@ private void updateDeployStatistics(SingularityDeployStatistics deployStatistics
if (SingularityTaskHistoryUpdate.getUpdate(taskManager.getTaskHistoryUpdates(taskId), ExtendedTaskState.TASK_CLEANING).isPresent()) {
LOG.debug("{} failed with {} after cleaning - ignoring it for cooldown", taskId, state);
} else {

if (sequentialFailureTimestamps.size() < configuration.getCooldownAfterFailures()) {
sequentialFailureTimestamps.add(timestamp);
} else if (timestamp > sequentialFailureTimestamps.get(0)) {
sequentialFailureTimestamps.set(0, timestamp);
}

sequentialFailureTimestamps.add(timestamp);
bldr.setNumSuccess(0);
bldr.setNumFailures(bldr.getNumFailures() + 1);
Collections.sort(sequentialFailureTimestamps);
}
} else {
bldr.setNumSuccess(bldr.getNumSuccess() + 1);
bldr.setNumFailures(0);
sequentialFailureTimestamps.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ private void prepareReplacementTaskFailedMail(SingularityRequest request) {

final String subject = String.format("Replacement tasks for request %s are unhealthy — Singularity", request.getId());

templateProperties.put("numFailures", configuration.getCooldownAfterFailures());
templateProperties.put("numFailures", configuration.getSlowFailureCooldownCount());

final String body = Jade4J.render(replacementTasksFailingTemplate, templateProperties);

Expand Down Expand Up @@ -562,9 +562,9 @@ private void prepareRequestInCooldownMail(SingularityRequest request) {

final String subject = String.format("Request %s has entered system cooldown — Singularity", request.getId());

templateProperties.put("numFailures", configuration.getCooldownAfterFailures());
templateProperties.put("numFailures", configuration.getSlowFailureCooldownCount());
templateProperties.put("cooldownDelayFormat", DurationFormatUtils.formatDurationHMS(TimeUnit.SECONDS.toMillis(configuration.getCooldownMinScheduleSeconds())));
templateProperties.put("cooldownExpiresFormat", DurationFormatUtils.formatDurationHMS(TimeUnit.MINUTES.toMillis(configuration.getCooldownExpiresAfterMinutes())));
templateProperties.put("cooldownExpiresFormat", DurationFormatUtils.formatDurationHMS(TimeUnit.MINUTES.toMillis(configuration.getSlowCooldownExpiresMinutesWithoutFailure())));

final String body = Jade4J.render(requestInCooldownTemplate, templateProperties);

Expand Down
Loading