Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid Request Locks With Task Lag #2244

Merged
merged 9 commits into from
Dec 1, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public void runActionOnPoll() {
}
},
requestId,
getClass().getSimpleName()
getClass().getSimpleName(),
SingularitySchedulerLock.Priority.LOW
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Singleton;
import org.slf4j.Logger;
Expand Down Expand Up @@ -167,7 +168,8 @@ public void runActionOnPoll() {
}
},
requestHistoryParent.requestId,
"request history purger"
"request history purger",
SingularitySchedulerLock.Priority.LOW
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.hubspot.singularity.helpers;

import com.google.inject.Inject;
import com.hubspot.singularity.SingularityManagedScheduledExecutorServiceFactory;
import com.hubspot.singularity.SingularityPendingTaskId;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.TaskManager;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class TaskLagGuardrail {
private final TaskManager taskManager;
private final SingularityConfiguration configuration;
private final ScheduledExecutorService executor;
private ConcurrentMap<String, Integer> lateTasksByRequestId;

@Inject
public TaskLagGuardrail(
SingularityConfiguration configuration,
SingularityManagedScheduledExecutorServiceFactory factory,
TaskManager taskManager
) {
this.configuration = configuration;
this.taskManager = taskManager;
this.lateTasksByRequestId = new ConcurrentHashMap<>();
this.executor = factory.getSingleThreaded(getClass().getSimpleName());
executor.scheduleWithFixedDelay(
this::updateLateTasksByRequestId,
0,
configuration.getRequestCacheTtl(),
TimeUnit.SECONDS
);
}

public boolean isLagged(String requestId) {
return lateTasksByRequestId.containsKey(requestId);
}

public void updateLateTasksByRequestId() {
long now = System.currentTimeMillis();
List<SingularityPendingTaskId> allPendingTaskIds = taskManager.getPendingTaskIds();

// not a thread safe assignment, but should be fine for periodic updates
this.lateTasksByRequestId =
allPendingTaskIds
.stream()
.filter(
p ->
now - p.getNextRunAt() > configuration.getDeltaAfterWhichTasksAreLateMillis()
)
.collect(
Collectors.toConcurrentMap(
SingularityPendingTaskId::getRequestId,
p -> 1,
Integer::sum
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.inject.multibindings.Multibinder;
import com.google.inject.name.Named;
import com.hubspot.singularity.helpers.MesosProtosUtils;
import com.hubspot.singularity.helpers.TaskLagGuardrail;
import com.hubspot.singularity.hooks.DeployAcceptanceHook;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -34,6 +35,7 @@ public void configure() {
bind(SingularityStartup.class).in(Scopes.SINGLETON);
bind(SingularitySchedulerLock.class).in(Scopes.SINGLETON);
bind(SingularityMesosSchedulerClient.class).in(Scopes.SINGLETON);
bind(TaskLagGuardrail.class).in(Scopes.SINGLETON);

Multibinder.newSetBinder(binder(), DeployAcceptanceHook.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.inject.Inject;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.helpers.TaskLagGuardrail;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -18,9 +19,11 @@ public class SingularitySchedulerLock {
private final ReentrantLock stateLock;
private final ReentrantLock offersLock;
private final ConcurrentHashMap<String, ReentrantLock> requestLocks;
private final TaskLagGuardrail taskLag;

@Inject
public SingularitySchedulerLock() {
public SingularitySchedulerLock(TaskLagGuardrail taskLag) {
this.taskLag = taskLag;
this.stateLock = new ReentrantLock();
this.offersLock = new ReentrantLock();
this.requestLocks = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -96,7 +99,38 @@ private void unlock(String requestId, String name, long start) {
lock.unlock();
}

/**
* Run the given function with the specified request lock.
*
* @param function The function to run.
* @param requestId Request to lock.
* @param name Description of this request lock.
*/
public void runWithRequestLock(Runnable function, String requestId, String name) {
runWithRequestLock(function, requestId, name, Priority.HIGH);
}

/**
* Run the given function with the specified request lock, unless run with low priority.
* If run with low priority, the function will not run if the request is lagged
* to allow higher priority components to acquire the lock.
*
* @param function The function to run.
* @param requestId Request to lock.
* @param name Description of this request lock.
* @param priority Priority of this request lock.
*/
public void runWithRequestLock(
Runnable function,
String requestId,
String name,
Priority priority
) {
if (priority == Priority.LOW && isLocked(requestId) && taskLag.isLagged(requestId)) {
LOG.info("{} - Skipping low priority lock on {}", name, requestId);
return;
}

long start = lock(requestId, name);
try {
function.run();
Expand Down Expand Up @@ -208,4 +242,14 @@ private void unlockOffers(String name, long start) {
LOG.debug("{} - Unlocking offers lock ({})", name, JavaUtils.duration(start));
offersLock.unlock();
}

private boolean isLocked(String requestId) {
ReentrantLock lock = requestLocks.get(requestId);
return lock != null && lock.isLocked();
}

public enum Priority {
LOW,
HIGH
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void runActionOnPoll() {
}
},
requestWithState.getRequest().getId(),
getClass().getSimpleName()
getClass().getSimpleName(),
SingularitySchedulerLock.Priority.LOW
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public void checkCooldowns() {
}
},
cooldownRequest.getRequest().getId(),
getClass().getSimpleName()
getClass().getSimpleName(),
SingularitySchedulerLock.Priority.LOW
),
cooldownExecutor
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ protected void checkExpiringObjects() {
);
},
requestWithState.get().getRequest().getId(),
getClazz().getSimpleName()
getClazz().getSimpleName(),
SingularitySchedulerLock.Priority.LOW
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.inject.Scopes;
import com.hubspot.singularity.data.history.SingularityHistoryPurger;
import com.hubspot.singularity.helpers.RebalancingHelper;
import com.hubspot.singularity.helpers.TaskLagGuardrail;
import com.hubspot.singularity.mesos.SingularityMesosOfferScheduler;
import com.hubspot.singularity.mesos.SingularityMesosTaskPrioritizer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ public void syncUpstreams() {
lock.runWithRequestLock(
() -> syncUpstreamsForService(singularityRequest),
singularityRequest.getId(),
getClass().getSimpleName()
getClass().getSimpleName(),
SingularitySchedulerLock.Priority.LOW
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.hubspot.singularity.scheduler;

import com.google.inject.Inject;
import com.hubspot.singularity.SingularityPendingRequest;
import com.hubspot.singularity.SingularityPendingTaskBuilder;
import com.hubspot.singularity.SingularityPendingTaskId;
import com.hubspot.singularity.helpers.TaskLagGuardrail;
import com.hubspot.singularity.mesos.SingularitySchedulerLock;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SingularitySchedulerLockTest extends SingularitySchedulerTestBase {
private final ExecutorService executor;

@Inject
private SingularitySchedulerLock lock;

@Inject
private TaskLagGuardrail lagGuardrail;

private String name = SingularitySchedulerLockTest.class.getSimpleName();

public SingularitySchedulerLockTest() {
super(false);
this.executor = Executors.newSingleThreadExecutor();
}

@Test
public void testLowPriorityLockNormal() {
TestRunnable runnable = new TestRunnable();
lock.runWithRequestLock(
runnable,
requestId,
name,
SingularitySchedulerLock.Priority.LOW
);
Assertions.assertTrue(runnable.ran);
}

@Test
public void testLowPriorityLockLaggedWithoutContention() {
laggedRequest();
TestRunnable runnable = new TestRunnable();
lock.runWithRequestLock(
runnable,
requestId,
name,
SingularitySchedulerLock.Priority.LOW
);
Assertions.assertTrue(runnable.ran);
}

@Test
public void testLowPriorityLockLaggedWithContention() {
laggedRequest();

CountDownLatch started = new CountDownLatch(1);
CountDownLatch finished = new CountDownLatch(1);
hold(started, finished);

TestRunnable runnable = new TestRunnable();
lock.runWithRequestLock(
runnable,
requestId,
name,
SingularitySchedulerLock.Priority.LOW
);
Assertions.assertFalse(runnable.ran);
finished.countDown();
}

private void hold(CountDownLatch started, CountDownLatch finished) {
executor.submit(
() -> {
lock.runWithRequestLock(
() -> {
try {
started.countDown();
finished.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},
requestId,
name
);
}
);

try {
started.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private void laggedRequest() {
taskManager.savePendingTask(
new SingularityPendingTaskBuilder()
.setPendingTaskId(
new SingularityPendingTaskId(
requestId,
firstDeployId,
0,
0,
SingularityPendingRequest.PendingType.NEW_DEPLOY,
0
)
)
.build()
);
lagGuardrail.updateLateTasksByRequestId();
}

private static class TestRunnable implements Runnable {
boolean ran = false;

@Override
public void run() {
ran = true;
}
}
}