Skip to content

Commit

Permalink
Resubmit Worker Allocations (#725)
Browse files Browse the repository at this point in the history
A recent change removed worker resubmits when workers are stuck in accepted: #719

Our underlying scheduler will not retry the allocations, so we need a way to conditionally enable the ability to resubmit.
  • Loading branch information
kmg-stripe authored Nov 13, 2024
1 parent a5874b2 commit 4da2e05
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1906,13 +1906,29 @@ public void checkHeartBeats(Instant currentTime) {
for (JobWorker worker : stage.getAllWorkers()) {
IMantisWorkerMetadata workerMeta = worker.getMetadata();
if (!workerMeta.getLastHeartbeatAt().isPresent()) {
// the worker is still waiting for resource allocation and the scheduler should take care of
// the retry logic.
Instant acceptedAt = Instant.ofEpochMilli(workerMeta.getAcceptedAt());
LOGGER.warn("Job {}, Worker {} stuck in accepted state since {}",
this.jobMgr.getJobId(),
workerMeta.getWorkerId(),
acceptedAt);
if(!scheduler.schedulerHandlesAllocationRetries()) {
// worker stuck in accepted and the scheduler will not retry allocation requests, so
// we must resubmit
if (Duration.between(acceptedAt, currentTime).getSeconds() > stuckInSubmitToleranceSecs) {
LOGGER.info("Resubmitting Job {}, Worker {} that has been stuck in accepted state for {}", this.jobMgr.getJobId(),
workerMeta.getWorkerId(), Duration.between(acceptedAt, currentTime).getSeconds());
workersToResubmit.add(worker);
eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(
WARN,
"worker stuck in Accepted state, resubmitting worker",
workerMeta.getStageNum(),
workerMeta.getWorkerId(),
workerMeta.getState()));
}
} else {
// the worker is still waiting for resource allocation and the scheduler should take care of
// the retry logic.
LOGGER.warn("Job {}, Worker {} stuck in accepted state since {}",
this.jobMgr.getJobId(),
workerMeta.getWorkerId(),
acceptedAt);
}
} else {
if (Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds()
> missedHeartBeatToleranceSecs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ default Duration getSchedulerIntervalBetweenRetries() {
@Default("3")
int getSlaMaxHeadroomForAccepted();

@Config("mantis.scheduler.handlesAllocationRetries")
@Default("true")
boolean getSchedulerHandlesAllocationRetries();

default Duration getHeartbeatInterval() {
return Duration.ofMillis(getHeartbeatIntervalInMs());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,11 @@ public interface MantisScheduler {
*/
void initializeRunningWorker(final ScheduleRequest scheduleRequest, final String hostname, final String hostID);

/**
* This should return true if the underlying scheduler handles retrying worker allocations.
*
* @return If there are not enough resources to schedule the worker and the scheduler automatically retries until
* the worker is assigned, then return true; otherwise, return false.
*/
boolean schedulerHandlesAllocationRetries();
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public MantisScheduler forClusterID(ClusterID clusterID) {
executeStageRequestFactory,
jobMessageRouter,
metricsRegistry),
"scheduler-for-" + cid.getResourceID()));
"scheduler-for-" + cid.getResourceID()),
masterConfiguration.getSchedulerHandlesAllocationRetries());
});
} else {
log.error("Scheduler gets unexpected null clusterID");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
public class ResourceClusterAwareScheduler implements MantisScheduler {

private final ActorRef schedulerActor;
private final boolean handlesAllocationRetries;

@Override
public void scheduleWorkers(BatchScheduleRequest scheduleRequest) {
Expand Down Expand Up @@ -67,4 +68,9 @@ public void initializeRunningWorker(ScheduleRequest scheduleRequest, String host
new InitializeRunningWorkerRequestEvent(scheduleRequest, TaskExecutorID.of(hostID)),
null);
}

@Override
public boolean schedulerHandlesAllocationRetries() {
return handlesAllocationRetries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ public void initializeRunningWorker(ScheduleRequest scheduleRequest, String host
// TODO Auto-generated method stub

}

@Override
public boolean schedulerHandlesAllocationRetries(){
return false;
}
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ public void updateWorkerSchedulingReadyTime(final WorkerId workerId, final long
public void initializeRunningWorker(final ScheduleRequest scheduleRequest, final String hostname, final String hostID) {
// no-op
}

@Override
public boolean schedulerHandlesAllocationRetries() {
return false;
}
}

0 comments on commit 4da2e05

Please sign in to comment.