From d2a19ee6898d14769534662ee1d1340ae703c8fc Mon Sep 17 00:00:00 2001 From: Kaushal Kumar Date: Mon, 22 Apr 2024 13:28:26 -0700 Subject: [PATCH] add task cancellation skeleton Signed-off-by: Kaushal Kumar --- .../search/AbstractSearchAsyncAction.java | 2 +- .../action/search/SearchShardTask.java | 4 +- .../opensearch/action/search/SearchTask.java | 4 +- .../action/search/TransportSearchAction.java | 2 +- .../cluster/metadata/ResourceLimitGroup.java | 15 ++ .../org/opensearch/search/SearchService.java | 6 +- .../ResourceLimitGroupService.java | 13 +- .../ResourceLimitGroupTask.java | 4 +- .../cancellation/CancellableTaskSelector.java | 26 +++ .../ResourceLimitGroupRequestCanceller.java | 19 -- .../ResourceLimitGroupTaskCanceller.java | 22 +++ .../module/ResourceLimitGroupModule.java | 4 +- ...imitsGroupResourceUsageTrackerService.java | 162 +++++++++++++++--- 13 files changed, 224 insertions(+), 59 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/CancellableTaskSelector.java delete mode 100644 server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/ResourceLimitGroupRequestCanceller.java create mode 100644 server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/ResourceLimitGroupTaskCanceller.java diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index c706189f993a2..338ddaa46d067 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -841,7 +841,7 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null); // Propagate the resource limit group from co-ordinator to data nodes - shardRequest.setResourceLimitGroupId(getTask().getResourceLimitGroupId()); + shardRequest.setResourceLimitGroupId(getTask().getResourceLimitGroupName()); return shardRequest; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java index 17c32d09d15d6..5ea916f46c05b 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java @@ -87,11 +87,11 @@ public boolean shouldCancelChildrenOnCancellation() { return false; } - public String getResourceLimitGroupId() { + public String getResourceLimitGroupName() { return resourceLimitGroupId; } - public void setResourceLimitGroupId(String resourceLimitGroupId) { + public void setResourceLimitGroupName(String resourceLimitGroupId) { this.resourceLimitGroupId = resourceLimitGroupId; } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTask.java b/server/src/main/java/org/opensearch/action/search/SearchTask.java index 775eadd38ef53..c580a82d728c2 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTask.java @@ -109,11 +109,11 @@ public boolean shouldCancelChildrenOnCancellation() { return true; } - public String getResourceLimitGroupId() { + public String getResourceLimitGroupName() { return resourceLimitGroupId; } - public void setResourceLimitGroupId(String resourceLimitGroupId) { + public void setResourceLimitGroupName(String resourceLimitGroupId) { this.resourceLimitGroupId = resourceLimitGroupId; } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 7ee7292176473..b8e11a339ee50 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -1105,7 +1105,7 @@ private void executeSearch( localShardIterators.size() + remoteShardIterators.size() ); - task.setResourceLimitGroupId(searchRequest.resourceLimitGroupId()); + task.setResourceLimitGroupName(searchRequest.resourceLimitGroupId()); searchAsyncActionProvider.asyncSearchAction( task, diff --git a/server/src/main/java/org/opensearch/cluster/metadata/ResourceLimitGroup.java b/server/src/main/java/org/opensearch/cluster/metadata/ResourceLimitGroup.java index fbe76175e7826..b6215ce49961d 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/ResourceLimitGroup.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/ResourceLimitGroup.java @@ -174,6 +174,14 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(resourceName, value); } + + public String getResourceName() { + return resourceName; + } + + public Double getValue() { + return value; + } } /** @@ -270,4 +278,11 @@ public ResourceLimitGroupMode getMode() { public List getResourceLimits() { return resourceLimits; } + + public ResourceLimit getResourceLimitFor(String resourceName) { + return resourceLimits.stream() + .filter(resourceLimit -> resourceLimit.getResourceName().equals(resourceName)) + .findFirst() + .orElseGet(() -> new ResourceLimit(resourceName, 100.0)); + } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 1167dd43784aa..214673bae36f6 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -568,7 +568,7 @@ public void executeQueryPhase( assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1 : "empty responses require more than one shard"; final IndexShard shard = getShard(request); - task.setResourceLimitGroupId(request.resourceLimitGroupId()); + task.setResourceLimitGroupName(request.resourceLimitGroupId()); rewriteAndFetchShardRequest(shard, request, new ActionListener() { @Override public void onResponse(ShardSearchRequest orig) { @@ -677,7 +677,7 @@ public void executeQueryPhase( } runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); - task.setResourceLimitGroupId(shardSearchRequest.resourceLimitGroupId()); + task.setResourceLimitGroupName(shardSearchRequest.resourceLimitGroupId()); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) @@ -780,7 +780,7 @@ public void executeFetchPhase( public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { final ReaderContext readerContext = findReaderContext(request.contextId(), request); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); - task.setResourceLimitGroupId(shardSearchRequest.resourceLimitGroupId()); + task.setResourceLimitGroupName(shardSearchRequest.resourceLimitGroupId()); final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); runAsync(getExecutor(readerContext.indexShard()), () -> { try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { diff --git a/server/src/main/java/org/opensearch/search/resource_limit_group/ResourceLimitGroupService.java b/server/src/main/java/org/opensearch/search/resource_limit_group/ResourceLimitGroupService.java index 1dc3dba1c1a0d..98103e9a599e2 100644 --- a/server/src/main/java/org/opensearch/search/resource_limit_group/ResourceLimitGroupService.java +++ b/server/src/main/java/org/opensearch/search/resource_limit_group/ResourceLimitGroupService.java @@ -12,7 +12,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.search.resource_limit_group.cancellation.ResourceLimitGroupRequestCanceller; +import org.opensearch.search.resource_limit_group.cancellation.ResourceLimitGroupTaskCanceller; import org.opensearch.search.resource_limit_group.tracker.ResourceLimitGroupResourceUsageTracker; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -26,11 +26,12 @@ public class ResourceLimitGroupService extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(ResourceLimitGroupService.class); private final ResourceLimitGroupResourceUsageTracker requestTracker; - private final ResourceLimitGroupRequestCanceller requestCanceller; + private final ResourceLimitGroupTaskCanceller requestCanceller; private final ResourceLimitGroupPruner resourceLimitGroupPruner; private volatile Scheduler.Cancellable scheduledFuture; private final ResourceLimitGroupServiceSettings sandboxServiceSettings; private final ThreadPool threadPool; + private final ResourceLimitGroupTaskCanceller taskCanceller; /** * Guice managed constructor @@ -43,16 +44,18 @@ public class ResourceLimitGroupService extends AbstractLifecycleComponent { @Inject public ResourceLimitGroupService( ResourceLimitGroupResourceUsageTracker requestTrackerService, - ResourceLimitGroupRequestCanceller requestCanceller, + ResourceLimitGroupTaskCanceller requestCanceller, ResourceLimitGroupPruner resourceLimitGroupPruner, ResourceLimitGroupServiceSettings sandboxServiceSettings, - ThreadPool threadPool + ThreadPool threadPool, + ResourceLimitGroupTaskCanceller taskCanceller ) { this.requestTracker = requestTrackerService; this.requestCanceller = requestCanceller; this.resourceLimitGroupPruner = resourceLimitGroupPruner; this.sandboxServiceSettings = sandboxServiceSettings; this.threadPool = threadPool; + this.taskCanceller = taskCanceller; } /** @@ -60,7 +63,7 @@ public ResourceLimitGroupService( */ private void doRun() { requestTracker.updateResourceLimitGroupsResourceUsage(); - requestCanceller.cancelViolatingTasks(); + taskCanceller.cancelTasks(); resourceLimitGroupPruner.pruneResourceLimitGroup(); } diff --git a/server/src/main/java/org/opensearch/search/resource_limit_group/ResourceLimitGroupTask.java b/server/src/main/java/org/opensearch/search/resource_limit_group/ResourceLimitGroupTask.java index 0925ad6a3950f..66b034ecd8e82 100644 --- a/server/src/main/java/org/opensearch/search/resource_limit_group/ResourceLimitGroupTask.java +++ b/server/src/main/java/org/opensearch/search/resource_limit_group/ResourceLimitGroupTask.java @@ -12,7 +12,7 @@ * This interface can be implemented by tasks which will be tracked and monitored using {@link org.opensearch.cluster.metadata.ResourceLimitGroup} */ public interface ResourceLimitGroupTask { - public void setResourceLimitGroupId(String sandboxResourceLimitGroup); + public void setResourceLimitGroupName(String sandboxResourceLimitGroup); - public String getResourceLimitGroupId(); + public String getResourceLimitGroupName(); } diff --git a/server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/CancellableTaskSelector.java b/server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/CancellableTaskSelector.java new file mode 100644 index 0000000000000..b40ad3dc91771 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/CancellableTaskSelector.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.resource_limit_group.cancellation; + +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; + +import java.util.List; + +public interface CancellableTaskSelector { + /** + * This method selects tasks which can be cancelled + * @param tasks is list of available tasks to select from + * @param reduceBy is meant to select enough number of tasks consuming {@param reduceBy} resource + * @param resource it is a system resource e,g; "jvm" or "cpu" + * @return + */ + public List selectTasks(List tasks, Double reduceBy, String resource); + +} diff --git a/server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/ResourceLimitGroupRequestCanceller.java b/server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/ResourceLimitGroupRequestCanceller.java deleted file mode 100644 index 77be6104c9600..0000000000000 --- a/server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/ResourceLimitGroupRequestCanceller.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.search.resource_limit_group.cancellation; - -/** - * This interface is used to identify and cancel the violating tasks in a resourceLimitGroup - */ -public interface ResourceLimitGroupRequestCanceller { - /** - * Cancels the tasks from conteded resourceLimitGroups - */ - void cancelViolatingTasks(); -} diff --git a/server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/ResourceLimitGroupTaskCanceller.java b/server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/ResourceLimitGroupTaskCanceller.java new file mode 100644 index 0000000000000..6fd3a4c822a95 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/resource_limit_group/cancellation/ResourceLimitGroupTaskCanceller.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.resource_limit_group.cancellation; + +/** + * This class is used to identify and cancel the violating tasks in a resourceLimitGroup + */ +public abstract class ResourceLimitGroupTaskCanceller { + private CancellableTaskSelector taskSelector; + + public ResourceLimitGroupTaskCanceller(CancellableTaskSelector taskSelector) { + this.taskSelector = taskSelector; + } + + public abstract void cancelTasks(); +} diff --git a/server/src/main/java/org/opensearch/search/resource_limit_group/module/ResourceLimitGroupModule.java b/server/src/main/java/org/opensearch/search/resource_limit_group/module/ResourceLimitGroupModule.java index fb2d607e4f774..0a59df4565ef2 100644 --- a/server/src/main/java/org/opensearch/search/resource_limit_group/module/ResourceLimitGroupModule.java +++ b/server/src/main/java/org/opensearch/search/resource_limit_group/module/ResourceLimitGroupModule.java @@ -10,7 +10,7 @@ import org.opensearch.common.inject.AbstractModule; import org.opensearch.search.resource_limit_group.ResourceLimitGroupPruner; -import org.opensearch.search.resource_limit_group.cancellation.ResourceLimitGroupRequestCanceller; +import org.opensearch.search.resource_limit_group.cancellation.ResourceLimitGroupTaskCanceller; import org.opensearch.search.resource_limit_group.tracker.ResourceLimitGroupResourceUsageTracker; import org.opensearch.search.resource_limit_group.tracker.ResourceLimitsGroupResourceUsageTrackerService; @@ -27,7 +27,7 @@ public ResourceLimitGroupModule() {} @Override protected void configure() { bind(ResourceLimitGroupResourceUsageTracker.class).to(ResourceLimitsGroupResourceUsageTrackerService.class).asEagerSingleton(); - bind(ResourceLimitGroupRequestCanceller.class).to(ResourceLimitsGroupResourceUsageTrackerService.class).asEagerSingleton(); + bind(ResourceLimitGroupTaskCanceller.class).to(ResourceLimitsGroupResourceUsageTrackerService.class).asEagerSingleton(); bind(ResourceLimitGroupPruner.class).to(ResourceLimitsGroupResourceUsageTrackerService.class).asEagerSingleton(); } } diff --git a/server/src/main/java/org/opensearch/search/resource_limit_group/tracker/ResourceLimitsGroupResourceUsageTrackerService.java b/server/src/main/java/org/opensearch/search/resource_limit_group/tracker/ResourceLimitsGroupResourceUsageTrackerService.java index 4f11b6e2a678f..d91d08b39c903 100644 --- a/server/src/main/java/org/opensearch/search/resource_limit_group/tracker/ResourceLimitsGroupResourceUsageTrackerService.java +++ b/server/src/main/java/org/opensearch/search/resource_limit_group/tracker/ResourceLimitsGroupResourceUsageTrackerService.java @@ -8,33 +8,38 @@ package org.opensearch.search.resource_limit_group.tracker; +import org.opensearch.cluster.metadata.ResourceLimitGroup; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.search.resource_limit_group.ResourceLimitGroupPruner; -import org.opensearch.search.resource_limit_group.cancellation.ResourceLimitGroupRequestCanceller; +import org.opensearch.search.resource_limit_group.ResourceLimitGroupTask; +import org.opensearch.search.resource_limit_group.cancellation.CancellableTaskSelector; +import org.opensearch.search.resource_limit_group.cancellation.ResourceLimitGroupTaskCanceller; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.LongSupplier; import java.util.stream.Collectors; /** * This class tracks requests per resourceLimitGroups */ -public class ResourceLimitsGroupResourceUsageTrackerService +public class ResourceLimitsGroupResourceUsageTrackerService extends ResourceLimitGroupTaskCanceller implements TaskManager.TaskEventListeners, ResourceLimitGroupResourceUsageTracker, - ResourceLimitGroupRequestCanceller, ResourceLimitGroupPruner { - private static final String CPU = "CPU"; - private static final String JVM_ALLOCATIONS = "JVM_Allocations"; + private static final String CPU = "cpu"; + private static final String JVM = "jvm"; + private static final List TRACKED_RESOURCES = List.of(JVM); private static final int numberOfAvailableProcessors = Runtime.getRuntime().availableProcessors(); private static final long totalAvailableJvmMemory = Runtime.getRuntime().totalMemory(); private final LongSupplier timeNanosSupplier; @@ -43,29 +48,94 @@ public class ResourceLimitsGroupResourceUsageTrackerService * {@link org.opensearch.search.resource_limit_group.ResourceLimitGroupService} runs */ private List toDeleteResourceLimitGroups; - private List activeResourceLimitGroups; + private List activeResourceLimitGroups; + /** + * This var will hold the sandbox level resource usage as per last run of + * {@link org.opensearch.search.resource_limit_group.ResourceLimitGroupService} + */ + private Map> resourceUsage; + /** + * We are keeping this as instance member as we will need this to identify the contended resource limit groups + * resourceLimitGroupName -> List<ResourceLimitGroupTask> + */ + private Map> resourceLimitGroupTasks; private final TaskManager taskManager; private final TaskResourceTrackingService taskResourceTrackingService; + private final ClusterService clusterService; + private final CancellableTaskSelector taskSelector; /** * SandboxResourceTrackerService constructor * @param taskManager * @param taskResourceTrackingService + * @param clusterService */ @Inject public ResourceLimitsGroupResourceUsageTrackerService( - TaskManager taskManager, - TaskResourceTrackingService taskResourceTrackingService + final TaskManager taskManager, + final TaskResourceTrackingService taskResourceTrackingService, + final ClusterService clusterService, + final LongSupplier timeNanosSupplier, + final CancellableTaskSelector taskSelector ) { + super(taskSelector); this.taskManager = taskManager; this.taskResourceTrackingService = taskResourceTrackingService; - toDeleteResourceLimitGroups = Collections.synchronizedList(new ArrayList<>()); - this.timeNanosSupplier = System::nanoTime; + toDeleteResourceLimitGroups = new ArrayList<>(); + this.clusterService = clusterService; + this.timeNanosSupplier = timeNanosSupplier; + this.taskSelector = taskSelector; } @Override public void updateResourceLimitGroupsResourceUsage() { + activeResourceLimitGroups = new ArrayList<>(clusterService.state().metadata().resourceLimitGroups().values()); + + updateResourceLimitGroupTasks(); + + refreshResourceLimitGroupsUsage(resourceLimitGroupTasks); + } + private void updateResourceLimitGroupTasks() { + List activeTasks = taskResourceTrackingService.getResourceAwareTasks() + .values() + .stream() + .filter(task -> task instanceof ResourceLimitGroupTask) + .map(task -> (ResourceLimitGroupTask) task) + .collect(Collectors.toList()); + + Map> newResourceLimitGroupTasks = new HashMap<>(); + for (Map.Entry> entry : activeTasks.stream() + .collect(Collectors.groupingBy(ResourceLimitGroupTask::getResourceLimitGroupName)) + .entrySet()) { + newResourceLimitGroupTasks.put(entry.getKey(), entry.getValue().stream().map(task -> (Task) task).collect(Collectors.toList())); + } + + resourceLimitGroupTasks = newResourceLimitGroupTasks; + } + + private void refreshResourceLimitGroupsUsage(Map> resourceLimitGroupTasks) { + /** + * remove the deleted resource limit groups + */ + final List nonExistingResourceLimitGroups = new ArrayList<>(resourceUsage.keySet()); + for (String activeResourceLimitGroup : resourceLimitGroupTasks.keySet()) { + nonExistingResourceLimitGroups.remove(activeResourceLimitGroup); + } + nonExistingResourceLimitGroups.forEach(resourceUsage::remove); + + for (Map.Entry> resourceLimitGroup : resourceLimitGroupTasks.entrySet()) { + final String resourceLimitGroupName = resourceLimitGroup.getKey(); + + Map resourceLimitGroupUsage = resourceLimitGroup.getValue() + .stream() + .map(this::calculateAbsoluteResourceUsageFor) + .reduce(new AbsoluteResourceUsage(0, 0), AbsoluteResourceUsage::merge) + .toMap(); + + resourceUsage.put(resourceLimitGroupName, resourceLimitGroupUsage); + + } } // @Override @@ -110,6 +180,18 @@ public double getAbsoluteCpuUsageInPercentage() { public double getAbsoluteJvmAllocationsUsageInPercent() { return absoluteJvmAllocationsUsage * 100; } + + /** + * + * @return {@code Map} + * which captures key as resource and value as the percentage value + */ + public Map toMap() { + Map map = new HashMap<>(); + // We can put the additional resources into this map in the future + map.put(JVM, getAbsoluteJvmAllocationsUsageInPercent()); + return map; + } } /** @@ -140,7 +222,7 @@ public void onTaskCompleted(Task task) {} * */ @Override - public void cancelViolatingTasks() { + public void cancelTasks() { List cancellableTasks = getCancellableTasks(); for (TaskCancellation taskCancellation : cancellableTasks) { taskCancellation.cancel(); @@ -153,10 +235,14 @@ public void cancelViolatingTasks() { */ private List getCancellableTasks() { // get cancellations from enforced type sandboxes - List inViolationSandboxes = getBreachingSandboxIds(); + final List inViolationResourceLimitGroups = getBreachingResourceLimitGroups(); + final List enforcedResourceLimitGroups = inViolationResourceLimitGroups.stream() + .filter(resourceLimitGroup -> resourceLimitGroup.getMode().equals(ResourceLimitGroup.ResourceLimitGroupMode.ENFORCED)) + .collect(Collectors.toList()); List cancellableTasks = new ArrayList<>(); - for (String sandboxId : inViolationSandboxes) { - cancellableTasks.addAll(getCancellableTasksFrom(sandboxId)); + + for (ResourceLimitGroup resourceLimitGroup : enforcedResourceLimitGroups) { + cancellableTasks.addAll(getCancellableTasksFrom(resourceLimitGroup)); } // get cancellations from soft type sandboxes if the node is in duress (hitting node level cancellation @@ -165,18 +251,50 @@ private List getCancellableTasks() { return cancellableTasks; } - public void deleteSandbox(String sandboxId) { - if (hasUnfinishedTasks(sandboxId)) { - toDeleteResourceLimitGroups.add(sandboxId); + public void deleteSandbox(String sandboxName) { + if (hasUnfinishedTasks(sandboxName)) { + toDeleteResourceLimitGroups.add(sandboxName); } // remove this sandbox from the active sandboxes + activeResourceLimitGroups = activeResourceLimitGroups.stream() + .filter(resourceLimitGroup -> !resourceLimitGroup.getName().equals(sandboxName)) + .collect(Collectors.toList()); } - private List getBreachingSandboxIds() { - return Collections.emptyList(); + private List getBreachingResourceLimitGroups() { + final List breachingResourceLimitGroupNames = new ArrayList<>(); + + for (ResourceLimitGroup resourceLimitGroup : activeResourceLimitGroups) { + Map currentResourceUsage = resourceUsage.get(resourceLimitGroup.getName()); + boolean isBreaching = false; + + for (ResourceLimitGroup.ResourceLimit resourceLimit : resourceLimitGroup.getResourceLimits()) { + if (currentResourceUsage.get(resourceLimit.getResourceName()) > resourceLimit.getValue()) { + isBreaching = true; + break; + } + } + + if (isBreaching) breachingResourceLimitGroupNames.add(resourceLimitGroup); + } + + return breachingResourceLimitGroupNames; } - private List getCancellableTasksFrom(String sandboxId) { - return Collections.emptyList(); + List getCancellableTasksFrom(ResourceLimitGroup resourceLimitGroup) { + List cancellations = new ArrayList<>(); + for (String resource : TRACKED_RESOURCES) { + final double reduceBy = resourceUsage.get(resourceLimitGroup.getName()).get(resource) - resourceLimitGroup.getResourceLimitFor( + resource + ).getValue(); + /** + * if the resource is not defined for this sandbox then ignore cancellations from it + */ + if (reduceBy < 0.0) { + continue; + } + cancellations.addAll(taskSelector.selectTasks(resourceLimitGroupTasks.get(resourceLimitGroup.getName()), reduceBy, resource)); + } + return cancellations; } }