Skip to content

Commit

Permalink
Merge pull request #386 from HubSpot/fix-executor-cleanup
Browse files Browse the repository at this point in the history
Fix executor cleanup
  • Loading branch information
tpetr committed Jan 13, 2015
2 parents f8a2382 + c96e624 commit 39454bb
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class SingularityClient {
private static final String TASKS_FORMAT = "http://%s/%s/tasks";
private static final String TASKS_KILL_TASK_FORMAT = TASKS_FORMAT + "/task/%s";
private static final String TASKS_GET_ACTIVE_FORMAT = TASKS_FORMAT + "/active";
private static final String TASKS_GET_ACTIVE_PER_HOST_FORMAT = TASKS_FORMAT + "/active/%s";
private static final String TASKS_GET_ACTIVE_ON_SLAVE_FORMAT = TASKS_FORMAT + "/active/slave/%s";
private static final String TASKS_GET_SCHEDULED_FORMAT = TASKS_FORMAT + "/scheduled";

private static final String HISTORY_FORMAT = "http://%s/%s/history";
Expand Down Expand Up @@ -506,10 +506,10 @@ public Collection<SingularityTask> getActiveTasks() {
return getCollection(requestUri, "active tasks", TASKS_COLLECTION);
}

public Collection<SingularityTask> getActiveTasks(final String host) {
final String requestUri = String.format(TASKS_GET_ACTIVE_PER_HOST_FORMAT, getHost(), contextPath, host);
public Collection<SingularityTask> getActiveTasksOnSlave(final String slaveId) {
final String requestUri = String.format(TASKS_GET_ACTIVE_ON_SLAVE_FORMAT, getHost(), contextPath, slaveId);

return getCollection(requestUri, String.format("active tasks on %s", host), TASKS_COLLECTION);
return getCollection(requestUri, String.format("active tasks on slave %s", slaveId), TASKS_COLLECTION);
}

public Optional<SingularityTaskCleanupResult> killTask(String taskId, Optional<String> user) {
Expand Down
10 changes: 10 additions & 0 deletions SingularityExecutorCleanup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@
<artifactId>SingularityClient</artifactId>
</dependency>

<dependency>
<groupId>com.hubspot</groupId>
<artifactId>SingularityMesosClient</artifactId>
</dependency>

<dependency>
<groupId>com.hubspot</groupId>
<artifactId>HorizonCore</artifactId>
</dependency>

<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.horizon.HttpClient;
import com.hubspot.horizon.HttpRequest;
import com.hubspot.horizon.HttpResponse;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.mesos.client.MesosClient;
import com.hubspot.mesos.json.MesosSlaveStateObject;
import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.SingularityTaskHistory;
import com.hubspot.singularity.SingularityTaskHistoryUpdate;
import com.hubspot.singularity.client.SingularityClient;
import com.hubspot.singularity.client.SingularityClientException;
import com.hubspot.singularity.client.SingularityClientModule;
import com.hubspot.singularity.executor.SingularityExecutorCleanupStatistics;
import com.hubspot.singularity.executor.SingularityExecutorCleanupStatistics.SingularityExecutorCleanupStatisticsBuilder;
import com.hubspot.singularity.executor.TemplateManager;
Expand All @@ -40,14 +47,16 @@ public class SingularityExecutorCleanup {
private final SingularityClient singularityClient;
private final TemplateManager templateManager;
private final SingularityExecutorCleanupConfiguration cleanupConfiguration;
private final MesosClient mesosClient;

@Inject
public SingularityExecutorCleanup(SingularityClient singularityClient, JsonObjectFileHelper jsonObjectFileHelper, SingularityExecutorConfiguration executorConfiguration, SingularityExecutorCleanupConfiguration cleanupConfiguration, TemplateManager templateManager) {
public SingularityExecutorCleanup(SingularityClient singularityClient, JsonObjectFileHelper jsonObjectFileHelper, SingularityExecutorConfiguration executorConfiguration, SingularityExecutorCleanupConfiguration cleanupConfiguration, TemplateManager templateManager, MesosClient mesosClient) {
this.jsonObjectFileHelper = jsonObjectFileHelper;
this.executorConfiguration = executorConfiguration;
this.cleanupConfiguration = cleanupConfiguration;
this.singularityClient = singularityClient;
this.templateManager = templateManager;
this.mesosClient = mesosClient;
}

public SingularityExecutorCleanupStatistics clean() {
Expand All @@ -58,9 +67,9 @@ public SingularityExecutorCleanupStatistics clean() {

try {
runningTaskIds = getRunningTaskIds();
} catch (SingularityClientException sce) {
LOG.error("While fetching running tasks from singularity", sce);
statisticsBldr.setErrorMessage(sce.getMessage());
} catch (Exception e) {
LOG.error("While fetching running tasks from singularity", e);
statisticsBldr.setErrorMessage(e.getMessage());
return statisticsBldr.build();
}

Expand Down Expand Up @@ -129,24 +138,22 @@ public SingularityExecutorCleanupStatistics clean() {
return statisticsBldr.build();
}

private Collection<SingularityTask> getActiveTasksOnSlave() {
private Set<String> getRunningTaskIds() {
try {
return singularityClient.getActiveTasks(JavaUtils.getHostAddress());
} catch (SocketException e) {
throw Throwables.propagate(e);
}
}
final String slaveId = mesosClient.getSlaveState(mesosClient.getSlaveUri(JavaUtils.getHostAddress())).getId();

private Set<String> getRunningTaskIds() {
final Collection<SingularityTask> activeTasks = getActiveTasksOnSlave();
final Collection<SingularityTask> activeTasks = singularityClient.getActiveTasksOnSlave(slaveId);

final Set<String> runningTaskIds = Sets.newHashSet();
final Set<String> runningTaskIds = Sets.newHashSet();

for (SingularityTask task : activeTasks) {
runningTaskIds.add(task.getTaskId().getId());
}
for (SingularityTask task : activeTasks) {
runningTaskIds.add(task.getTaskId().getId());
}

return runningTaskIds;
return runningTaskIds;
} catch (SocketException se) {
throw Throwables.propagate(se);
}
}

private boolean cleanTask(SingularityExecutorTaskDefinition taskDefinition, Optional<SingularityTaskHistory> taskHistory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.inject.Injector;
import com.google.inject.Stage;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.mesos.client.SingularityMesosClientModule;
import com.hubspot.singularity.client.SingularityClientModule;
import com.hubspot.singularity.executor.SingularityExecutorCleanupStatistics;
import com.hubspot.singularity.executor.cleanup.config.SingularityExecutorCleanupConfiguration;
Expand All @@ -27,7 +28,7 @@ public static void main(String... args) {
final long start = System.currentTimeMillis();

try {
final Injector injector = Guice.createInjector(Stage.PRODUCTION, new SingularityRunnerBaseModule(new SingularityS3ConfigurationLoader(), new SingularityExecutorConfigurationLoader(), new SingularityExecutorCleanupConfigurationLoader()), new SingularityExecutorModule(), new SingularityClientModule());
final Injector injector = Guice.createInjector(Stage.PRODUCTION, new SingularityRunnerBaseModule(new SingularityS3ConfigurationLoader(), new SingularityExecutorConfigurationLoader(), new SingularityExecutorCleanupConfigurationLoader()), new SingularityExecutorModule(), new SingularityClientModule(), new SingularityMesosClientModule());
final SingularityExecutorCleanupRunner runner = injector.getInstance(SingularityExecutorCleanupRunner.class);

LOG.info("Starting cleanup");
Expand Down

0 comments on commit 39454bb

Please sign in to comment.