Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More efficient active tasks call for executor cleanup #1972

Merged
merged 2 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class SingularityClient {
private static final String TASKS_KILL_TASK_FORMAT = TASKS_FORMAT + "/task/%s";
private static final String TASKS_GET_ACTIVE_FORMAT = TASKS_FORMAT + "/active";
private static final String TASKS_GET_ACTIVE_ON_SLAVE_FORMAT = TASKS_FORMAT + "/active/slave/%s";
private static final String TASKS_GET_ACTIVE_IDS_ON_SLAVE_FORMAT = TASKS_GET_ACTIVE_ON_SLAVE_FORMAT + "/ids";
private static final String TASKS_GET_SCHEDULED_FORMAT = TASKS_FORMAT + "/scheduled";
private static final String TASKS_GET_SCHEDULED_IDS_FORMAT = TASKS_GET_SCHEDULED_FORMAT + "/ids";
private static final String TASKS_BY_STATE_FORMAT =TASKS_FORMAT + "/ids/request/%s";
Expand Down Expand Up @@ -219,6 +220,7 @@ public class SingularityClient {
private static final TypeReference<Collection<SingularityPendingRequest>> PENDING_REQUESTS_COLLECTION = new TypeReference<Collection<SingularityPendingRequest>>() {};
private static final TypeReference<Collection<SingularityRequestCleanup>> CLEANUP_REQUESTS_COLLECTION = new TypeReference<Collection<SingularityRequestCleanup>>() {};
private static final TypeReference<Collection<SingularityTask>> TASKS_COLLECTION = new TypeReference<Collection<SingularityTask>>() {};
private static final TypeReference<Collection<SingularityTaskId>> TASK_IDS_COLLECTION = new TypeReference<Collection<SingularityTaskId>>() {};
private static final TypeReference<Collection<SingularityTaskIdHistory>> TASKID_HISTORY_COLLECTION = new TypeReference<Collection<SingularityTaskIdHistory>>() {};
private static final TypeReference<Collection<SingularityRack>> RACKS_COLLECTION = new TypeReference<Collection<SingularityRack>>() {};
private static final TypeReference<Collection<SingularitySlave>> SLAVES_COLLECTION = new TypeReference<Collection<SingularitySlave>>() {};
Expand Down Expand Up @@ -868,6 +870,12 @@ public Collection<SingularityTask> getActiveTasksOnSlave(final String slaveId) {
return getCollection(requestUri, String.format("active tasks on slave %s", slaveId), TASKS_COLLECTION);
}

public Collection<SingularityTaskId> getActiveTaskIdsOnSlave(final String slaveId) {
final Function<String, String> requestUri = (host) -> String.format(TASKS_GET_ACTIVE_IDS_ON_SLAVE_FORMAT, getApiBase(host), slaveId);

return getCollection(requestUri, String.format("active tasks on slave %s", slaveId), TASK_IDS_COLLECTION);
}

public Optional<SingularityTaskCleanupResult> killTask(String taskId, Optional<SingularityKillTaskRequest> killTaskRequest) {
final Function<String, String> requestUri = (host) -> String.format(TASKS_KILL_TASK_FORMAT, getApiBase(host), taskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,7 +31,6 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.mesos.JavaUtils;
Expand All @@ -39,10 +39,10 @@
import com.hubspot.singularity.MachineState;
import com.hubspot.singularity.SingularityClientCredentials;
import com.hubspot.singularity.SingularitySlave;
import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.SingularityTaskExecutorData;
import com.hubspot.singularity.SingularityTaskHistory;
import com.hubspot.singularity.SingularityTaskHistoryUpdate;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.client.SingularityClient;
import com.hubspot.singularity.client.SingularityClientException;
import com.hubspot.singularity.client.SingularityClientProvider;
Expand Down Expand Up @@ -317,15 +317,10 @@ private boolean isDecommissioned() {
private Set<String> getRunningTaskIds() {
final String slaveId = mesosClient.getSlaveState(mesosClient.getSlaveUri(hostname)).getId();

final Collection<SingularityTask> activeTasks = singularityClient.getActiveTasksOnSlave(slaveId);

final Set<String> runningTaskIds = Sets.newHashSet();

for (SingularityTask task : activeTasks) {
runningTaskIds.add(task.getTaskId().getId());
}

return runningTaskIds;
return singularityClient.getActiveTaskIdsOnSlave(slaveId)
.stream()
.map(SingularityTaskId::getId)
.collect(Collectors.toSet());
}

private boolean executorStillRunning(SingularityExecutorTaskDefinition taskDefinition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,14 @@ public List<SingularityTask> getTasksOnSlave(Collection<SingularityTaskId> activ
return tasks;
}

public List<SingularityTaskId> getTaskIdsOnSlave(Collection<SingularityTaskId> activeTaskIds, SingularitySlave slave) {
final String sanitizedHost = JavaUtils.getReplaceHyphensWithUnderscores(slave.getHost());

return activeTaskIds.stream()
.filter((t) -> t.getSanitizedHost().equals(sanitizedHost))
.collect(Collectors.toList());
}

public List<SingularityTaskHistoryUpdate> getTaskHistoryUpdates(SingularityTaskId taskId) {
if (leaderCache.active()) {
return leaderCache.getTaskHistoryUpdates(taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,25 @@ public List<SingularityTask> getTasksForSlave(
return authorizationHelper.filterByAuthorizedRequests(user, taskManager.getTasksOnSlave(taskManager.getActiveTaskIds(useWebCache(useWebCache)), maybeSlave.get()), SingularityTransformHelpers.TASK_TO_REQUEST_ID, SingularityAuthorizationScope.READ);
}

@GET
@Path("/active/slave/{slaveId}/ids")
@Operation(
summary = "Retrieve list of active tasks on a specific slave",
responses = {
@ApiResponse(responseCode = "404", description = "A slave with the specified id was not found")
}
)
public List<SingularityTaskId> getTaskIdsForSlave(
@Parameter(hidden = true) @Auth SingularityUser user,
@Parameter(description = "The mesos slave id to retrieve task ids for") @PathParam("slaveId") String slaveId,
@Parameter(description = "Use the cached version of this data to limit expensive api calls") @QueryParam("useWebCache") Boolean useWebCache) {
Optional<SingularitySlave> maybeSlave = slaveManager.getObject(slaveId);

checkNotFound(maybeSlave.isPresent(), "Couldn't find a slave in any state with id %s", slaveId);

return authorizationHelper.filterByAuthorizedRequests(user, taskManager.getTaskIdsOnSlave(taskManager.getActiveTaskIds(useWebCache(useWebCache)), maybeSlave.get()), SingularityTransformHelpers.TASK_ID_TO_REQUEST_ID, SingularityAuthorizationScope.READ);
}

@GET
@PropertyFiltering
@Path("/active")
Expand Down