Skip to content

Commit

Permalink
Merge pull request #1412 from HubSpot/executor_slow_launch
Browse files Browse the repository at this point in the history
fix duplicate exit checker, add longer initial task wait
  • Loading branch information
ssalinas authored Feb 16, 2017
2 parents db9b556 + 44eb96a commit c9900f2
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.hubspot.mesos.MesosUtils;
import com.hubspot.singularity.executor.SingularityExecutorMonitor.KillState;
Expand Down Expand Up @@ -94,6 +93,7 @@ public void launchTask(final ExecutorDriver executorDriver, final Protos.TaskInf
task.getLog().info("Launched task {} with data {}", taskId, task.getExecutorData());
break;
}

} catch (Throwable t) {
LOG.error("Unexpected exception starting task {}", taskId, t);

Expand Down Expand Up @@ -149,7 +149,7 @@ public void frameworkMessage(ExecutorDriver executorDriver, byte[] bytes) {
public void shutdown(ExecutorDriver executorDriver) {
LOG.info("Asked to shutdown executor...");

monitor.shutdown(Optional.of(executorDriver));
monitor.shutdown(executorDriver);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Status;
import org.apache.mesos.Protos.TaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -66,7 +68,8 @@ public class SingularityExecutorMonitor {
private final Map<String, ListeningExecutorService> taskToShellCommandPool;

@Inject
public SingularityExecutorMonitor(@Named(SingularityExecutorModule.ALREADY_SHUT_DOWN) AtomicBoolean alreadyShutDown, SingularityExecutorLogging logging, ExecutorUtils executorUtils, SingularityExecutorProcessKiller processKiller, SingularityExecutorThreadChecker threadChecker, SingularityExecutorConfiguration configuration) {
public SingularityExecutorMonitor(@Named(SingularityExecutorModule.ALREADY_SHUT_DOWN) AtomicBoolean alreadyShutDown, SingularityExecutorLogging logging, ExecutorUtils executorUtils,
SingularityExecutorProcessKiller processKiller, SingularityExecutorThreadChecker threadChecker, SingularityExecutorConfiguration configuration) {
this.logging = logging;
this.configuration = configuration;
this.executorUtils = executorUtils;
Expand All @@ -83,23 +86,27 @@ public SingularityExecutorMonitor(@Named(SingularityExecutorModule.ALREADY_SHUT_
this.processBuilderPool = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("SingularityExecutorProcessBuilder-%d").build()));
this.runningProcessPool = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("SingularityExecutorProcessRunner-%d").build()));

this.runState = RunState.RUNNING;
this.runState = RunState.STARTING;
this.exitLock = new ReentrantLock();
this.alreadyShutDown = alreadyShutDown;
this.latch = new CountDownLatch(4);
}

this.exitCheckerFuture = Optional.of(startExitChecker(Optional.<ExecutorDriver> absent()));
public void start(ExecutorDriver driver) {
Preconditions.checkState(runState == RunState.STARTING);
this.runState = RunState.RUNNING;
this.exitCheckerFuture = Optional.of(startExitChecker(driver, configuration.getInitialIdleExecutorShutdownWaitMillis()));
}

public enum RunState {
RUNNING, SHUTDOWN;
STARTING, RUNNING, SHUTDOWN;
}

public enum SubmitState {
SUBMITTED, REJECTED, TASK_ALREADY_EXISTED;
}

public void shutdown(Optional<ExecutorDriver> driver) {
public void shutdown(ExecutorDriver driver) {
if (!alreadyShutDown.compareAndSet(false, true)) {
LOG.info("Already ran shut down process");
return;
Expand Down Expand Up @@ -151,15 +158,12 @@ public void shutdown(Optional<ExecutorDriver> driver) {
LOG.warn("While waiting to exit", t);
}

if (driver.isPresent()) {
LOG.info("Stopping driver {}", driver.get());
driver.get().stop();
} else {
logAndExit(1, "No driver present on shutdown, exiting");
}
LOG.info("Stopping driver {}", driver);
Status status = driver.stop();
LOG.info("Driver stopped with status {}", status);
}

private void checkForExit(final Optional<ExecutorDriver> driver) {
private void checkForExit(final ExecutorDriver driver, final long waitMillis) {
try {
exitLock.lockInterruptibly();
} catch (InterruptedException e) {
Expand All @@ -171,7 +175,7 @@ private void checkForExit(final Optional<ExecutorDriver> driver) {

try {
if (tasks.isEmpty()) {
LOG.info("Shutting down executor due to no tasks being submitted within {}", JavaUtils.durationFromMillis(configuration.getIdleExecutorShutdownWaitMillis()));
LOG.info("Shutting down executor due to no tasks being submitted within {}", JavaUtils.durationFromMillis(waitMillis));
runState = RunState.SHUTDOWN;
shuttingDown = true;
}
Expand All @@ -189,8 +193,8 @@ private void checkForExit(final Optional<ExecutorDriver> driver) {
}

@SuppressWarnings("rawtypes")
private Future startExitChecker(final Optional<ExecutorDriver> driver) {
LOG.info("Starting an exit checker that will run in {}", JavaUtils.durationFromMillis(configuration.getIdleExecutorShutdownWaitMillis()));
private Future startExitChecker(final ExecutorDriver driver, final long waitTimeMillis) {
LOG.info("Starting an exit checker that will run in {}", JavaUtils.durationFromMillis(waitTimeMillis));

return exitChecker.schedule(new Runnable() {

Expand All @@ -199,12 +203,12 @@ public void run() {
LOG.info("Exit checker running...");

try {
checkForExit(driver);
checkForExit(driver, waitTimeMillis);
} catch (Throwable t) {
logAndExit(2, "While shutting down", t);
}
}
}, configuration.getIdleExecutorShutdownWaitMillis(), TimeUnit.MILLISECONDS);
}, waitTimeMillis, TimeUnit.MILLISECONDS);
}

private void clearExitCheckerUnsafe() {
Expand Down Expand Up @@ -388,8 +392,8 @@ private void checkIdleExecutorShutdown(ExecutorDriver driver) {
try {
clearExitCheckerUnsafe();

if (tasks.isEmpty()) {
exitCheckerFuture = Optional.of(startExitChecker(Optional.of(driver)));
if (tasks.isEmpty() && runState == RunState.RUNNING) {
exitCheckerFuture = Optional.of(startExitChecker(driver, configuration.getIdleExecutorShutdownWaitMillis()));
}
} finally {
exitLock.unlock();
Expand Down Expand Up @@ -469,9 +473,9 @@ public KillState requestKill(String taskId, Optional<String> user, boolean destr
} else {
task.getLog().info("Killing process for task {}", taskId);
}

processKiller.submitKillRequest(runningProcess);
return KillState.KILLING_PROCESS;

}

return KillState.INCONSISTENT_STATE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.executor.config.SingularityExecutorConfiguration;
import com.hubspot.singularity.executor.task.SingularityExecutorTaskProcessCallable;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.hubspot.singularity.executor;

import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.MesosExecutorDriver;
import org.apache.mesos.Protos;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Guice;
import com.google.inject.Inject;
Expand All @@ -20,6 +19,8 @@
import com.hubspot.singularity.runner.base.configuration.BaseRunnerConfiguration;
import com.hubspot.singularity.s3.base.config.SingularityS3Configuration;

import ch.qos.logback.classic.LoggerContext;

public class SingularityExecutorRunner {

private static final Logger LOG = LoggerFactory.getLogger(SingularityExecutorRunner.class);
Expand All @@ -35,13 +36,26 @@ public static void main(String... args) {

LOG.info("Executor finished after {} with status: {}", JavaUtils.duration(start), driverStatus);

stopLog();

System.exit(driverStatus == Protos.Status.DRIVER_STOPPED ? 0 : 1);
} catch (Throwable t) {
LOG.error("Finished after {} with error", JavaUtils.duration(start), t);

stopLog();

System.exit(1);
}
}

private static void stopLog() {
ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
if (loggerFactory instanceof LoggerContext) {
LoggerContext context = (LoggerContext) loggerFactory;
context.stop();
}
}

private final String name;
private final SingularityExecutor singularityExecutor;
private final SingularityExecutorMonitor monitor;
Expand All @@ -57,13 +71,14 @@ public Protos.Status run() {
LOG.info("{} starting MesosExecutorDriver...", name);

final MesosExecutorDriver driver = new MesosExecutorDriver(singularityExecutor);
monitor.start(driver);

Runtime.getRuntime().addShutdownHook(new Thread("SingularityExecutorRunnerGracefulShutdown") {

@Override
public void run() {
LOG.info("Executor is shutting down, ensuring shutdown via shutdown hook");
monitor.shutdown(Optional.of((ExecutorDriver) driver));
monitor.shutdown(driver);
}

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public class SingularityExecutorConfiguration extends BaseRunnerConfiguration {
@JsonProperty
private long shutdownTimeoutWaitMillis = TimeUnit.MINUTES.toMillis(5);

@Min(0)
@JsonProperty
private long initialIdleExecutorShutdownWaitMillis = TimeUnit.MINUTES.toMillis(1);

@Min(0)
@JsonProperty
private long idleExecutorShutdownWaitMillis = TimeUnit.SECONDS.toMillis(10);
Expand Down Expand Up @@ -636,6 +640,14 @@ public void setLogrotateCompressionSettings(LogrotateCompressionSettings logrota
this.logrotateCompressionSettings = logrotateCompressionSettings;
}

public long getInitialIdleExecutorShutdownWaitMillis() {
return initialIdleExecutorShutdownWaitMillis;
}

public void setInitialIdleExecutorShutdownWaitMillis(long initialIdleExecutorShutdownWaitMillis) {
this.initialIdleExecutorShutdownWaitMillis = initialIdleExecutorShutdownWaitMillis;
}

@Override
public String toString() {
return "SingularityExecutorConfiguration{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ExecutionException;

import org.slf4j.Logger;

Expand Down

0 comments on commit c9900f2

Please sign in to comment.