Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
kaushalmahi12 committed Sep 9, 2024
1 parent a6b1afd commit 981b15f
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE;
import static org.opensearch.wlm.cancellation.TaskCancellationService.MIN_VALUE;

/**
* Represents the highest resource consuming task first selection strategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.wlm.QueryGroupTask;
import org.opensearch.wlm.ResourceType;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -44,28 +45,29 @@
* @see QueryGroup
* @see ResourceType
*/
public class TaskCanceller {
public class TaskCancellationService {
public static final double MIN_VALUE = 1e-9;

private final WorkloadManagementSettings workloadManagementSettings;
private final TaskSelectionStrategy taskSelectionStrategy;
private final QueryGroupResourceUsageTrackerService resourceUsageTrackerService;
// a map of QueryGroupId to its corresponding QueryGroupLevelResourceUsageView object
private final Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews;
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews;
private final Collection<QueryGroup> activeQueryGroups;
private final Collection<QueryGroup> deletedQueryGroups;
private BooleanSupplier isNodeInDuress;

public TaskCanceller(
public TaskCancellationService(
WorkloadManagementSettings workloadManagementSettings,
MaximumResourceTaskSelectionStrategy taskSelectionStrategy,
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews,
TaskSelectionStrategy taskSelectionStrategy,
QueryGroupResourceUsageTrackerService resourceUsageTrackerService,
Collection<QueryGroup> activeQueryGroups,
Collection<QueryGroup> deletedQueryGroups,
BooleanSupplier isNodeInDuress
) {
this.workloadManagementSettings = workloadManagementSettings;
this.taskSelectionStrategy = taskSelectionStrategy;
this.queryGroupLevelResourceUsageViews = queryGroupLevelResourceUsageViews;
this.resourceUsageTrackerService = resourceUsageTrackerService;
this.activeQueryGroups = activeQueryGroups;
this.deletedQueryGroups = deletedQueryGroups;
this.isNodeInDuress = isNodeInDuress;
Expand All @@ -75,6 +77,7 @@ public TaskCanceller(
* Cancel tasks based on the implemented strategy.
*/
public final void cancelTasks() {
queryGroupLevelResourceUsageViews = resourceUsageTrackerService.constructQueryGroupLevelUsageViews();
// cancel tasks from QueryGroups that are in Enforced mode that are breaching their resource limits
cancelTasks(ResiliencyMode.ENFORCED);
// if the node is in duress, cancel tasks accordingly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public void setNanoTimeSupplier(LongSupplier nanoTimeSupplier) {

@Override
public double calculateResourceUsage(List<QueryGroupTask> tasks) {
assert nanoTimeSupplier != null : "nanoTimeSupplier has to be set in order to calculate the resource usage";
double usage = tasks.stream().mapToDouble(this::calculateTaskResourceUsage).sum();

usage /= PROCESSOR_COUNT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE;
import static org.opensearch.wlm.cancellation.TaskCancellationService.MIN_VALUE;
import static org.opensearch.wlm.tracker.CpuUsageCalculator.PROCESSOR_COUNT;
import static org.opensearch.wlm.tracker.MemoryUsageCalculator.HEAP_SIZE_BYTES;
import static org.opensearch.wlm.tracker.ResourceUsageCalculatorTests.createMockTaskWithResourceStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.List;
import java.util.stream.IntStream;

import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE;
import static org.opensearch.wlm.cancellation.TaskCancellationService.MIN_VALUE;
import static org.opensearch.wlm.tracker.MemoryUsageCalculator.HEAP_SIZE_BYTES;

public class MaximumResourceTaskSelectionStrategyTests extends OpenSearchTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.wlm.QueryGroupTask;
import org.opensearch.wlm.ResourceType;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;
import org.opensearch.wlm.tracker.ResourceUsageCalculatorTrackerServiceTests.TestClock;
import org.junit.Before;

Expand All @@ -30,44 +31,23 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TaskCancellerTests extends OpenSearchTestCase {
public class TaskCancellationServiceTests extends OpenSearchTestCase {
private static final String queryGroupId1 = "queryGroup1";
private static final String queryGroupId2 = "queryGroup2";

private TestClock clock;

private static class TestTaskCancellerImpl extends TaskCanceller {

public TestTaskCancellerImpl(
WorkloadManagementSettings workloadManagementSettings,
MaximumResourceTaskSelectionStrategy highestResourceConsumingTaskFirstSelectionStrategy,
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelViews,
Set<QueryGroup> activeQueryGroups,
Set<QueryGroup> deletedQueryGroups,
BooleanSupplier isNodeInDuress
) {
super(
workloadManagementSettings,
highestResourceConsumingTaskFirstSelectionStrategy,
queryGroupLevelViews,
activeQueryGroups,
deletedQueryGroups,
isNodeInDuress
);
}
}

private Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelViews;
private Set<QueryGroup> activeQueryGroups;
private Set<QueryGroup> deletedQueryGroups;
private TaskCanceller taskCancellation;
private TaskCancellationService taskCancellation;
private WorkloadManagementSettings workloadManagementSettings;
private QueryGroupResourceUsageTrackerService resourceUsageTrackerService;

@Before
public void setup() {
Expand All @@ -77,12 +57,14 @@ public void setup() {
deletedQueryGroups = new HashSet<>();

clock = new TestClock();
ResourceType.CPU.getResourceUsageCalculator().setNanoTimeSupplier(clock::getTime);
when(workloadManagementSettings.getNodeLevelCpuCancellationThreshold()).thenReturn(0.9);
when(workloadManagementSettings.getNodeLevelMemoryCancellationThreshold()).thenReturn(0.9);
taskCancellation = new TestTaskCancellerImpl(
resourceUsageTrackerService = mock(QueryGroupResourceUsageTrackerService.class);
taskCancellation = new TaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupLevelViews,
resourceUsageTrackerService,
activeQueryGroups,
deletedQueryGroups,
() -> false
Expand All @@ -106,6 +88,7 @@ public void testGetCancellableTasksFrom_setupAppropriateCancellationReasonAndSco
QueryGroupLevelResourceUsageView mockView = createResourceUsageViewMock();
when(mockView.getResourceUsageData()).thenReturn(Map.of(resourceType, cpuUsage, ResourceType.MEMORY, memoryUsage));
queryGroupLevelViews.put(queryGroupId1, mockView);
taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> cancellableTasksFrom = taskCancellation.getCancellableTasksFrom(queryGroup1);
assertEquals(2, cancellableTasksFrom.size());
Expand Down Expand Up @@ -137,6 +120,7 @@ public void testGetCancellableTasksFrom_returnsTasksWhenBreachingThreshold() {
QueryGroupLevelResourceUsageView mockView = createResourceUsageViewMock();
when(mockView.getResourceUsageData()).thenReturn(Map.of(resourceType, cpuUsage, ResourceType.MEMORY, memoryUsage));
queryGroupLevelViews.put(queryGroupId1, mockView);
taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> cancellableTasksFrom = taskCancellation.getCancellableTasksFrom(queryGroup1);
assertEquals(2, cancellableTasksFrom.size());
Expand All @@ -162,6 +146,7 @@ public void testGetCancellableTasksFrom_returnsTasksWhenBreachingThresholdForMem

queryGroupLevelViews.put(queryGroupId1, mockView);
activeQueryGroups.add(queryGroup1);
taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> cancellableTasksFrom = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED);
assertEquals(2, cancellableTasksFrom.size());
Expand All @@ -185,6 +170,7 @@ public void testGetCancellableTasksFrom_returnsNoTasksWhenNotBreachingThreshold(
when(mockView.getResourceUsageData()).thenReturn(Map.of(ResourceType.CPU, cpuUsage, ResourceType.MEMORY, memoryUsage));
queryGroupLevelViews.put(queryGroupId1, mockView);
activeQueryGroups.add(queryGroup1);
taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> cancellableTasksFrom = taskCancellation.getCancellableTasksFrom(queryGroup1);
assertTrue(cancellableTasksFrom.isEmpty());
Expand All @@ -205,11 +191,12 @@ public void testGetCancellableTasksFrom_filtersQueryGroupCorrectly() {
QueryGroupLevelResourceUsageView mockView = createResourceUsageViewMock();
queryGroupLevelViews.put(queryGroupId1, mockView);
activeQueryGroups.add(queryGroup1);
taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

TestTaskCancellerImpl taskCancellation = new TestTaskCancellerImpl(
TaskCancellationService taskCancellation = new TaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupLevelViews,
resourceUsageTrackerService,
activeQueryGroups,
deletedQueryGroups,
() -> false
Expand Down Expand Up @@ -239,20 +226,23 @@ public void testCancelTasks_cancelsGivenTasks() {
queryGroupLevelViews.put(queryGroupId1, mockView);
activeQueryGroups.add(queryGroup1);

TestTaskCancellerImpl taskCancellation = new TestTaskCancellerImpl(
TaskCancellationService taskCancellation = new TaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupLevelViews,
resourceUsageTrackerService,
activeQueryGroups,
deletedQueryGroups,
() -> false
);

taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> cancellableTasksFrom = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED);
assertEquals(2, cancellableTasksFrom.size());
assertEquals(1234, cancellableTasksFrom.get(0).getTask().getId());
assertEquals(4321, cancellableTasksFrom.get(1).getTask().getId());

when(resourceUsageTrackerService.constructQueryGroupLevelUsageViews()).thenReturn(queryGroupLevelViews);
taskCancellation.cancelTasks();
assertTrue(cancellableTasksFrom.get(0).getTask().isCancelled());
assertTrue(cancellableTasksFrom.get(1).getTask().isCancelled());
Expand Down Expand Up @@ -299,15 +289,17 @@ public void testCancelTasks_cancelsTasksFromDeletedQueryGroups() {
activeQueryGroups.add(activeQueryGroup);
deletedQueryGroups.add(deletedQueryGroup);

TestTaskCancellerImpl taskCancellation = new TestTaskCancellerImpl(
TaskCancellationService taskCancellation = new TaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupLevelViews,
resourceUsageTrackerService,
activeQueryGroups,
deletedQueryGroups,
() -> true
);

taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> cancellableTasksFrom = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED);
assertEquals(2, cancellableTasksFrom.size());
assertEquals(1234, cancellableTasksFrom.get(0).getTask().getId());
Expand All @@ -320,6 +312,7 @@ public void testCancelTasks_cancelsTasksFromDeletedQueryGroups() {
assertEquals(1000, cancellableTasksFromDeletedQueryGroups.get(0).getTask().getId());
assertEquals(1001, cancellableTasksFromDeletedQueryGroups.get(1).getTask().getId());

when(resourceUsageTrackerService.constructQueryGroupLevelUsageViews()).thenReturn(queryGroupLevelViews);
taskCancellation.cancelTasks();

assertTrue(cancellableTasksFrom.get(0).getTask().isCancelled());
Expand Down Expand Up @@ -370,14 +363,15 @@ public void testCancelTasks_does_not_cancelTasksFromDeletedQueryGroups_whenNodeN
activeQueryGroups.add(activeQueryGroup);
deletedQueryGroups.add(deletedQueryGroup);

TestTaskCancellerImpl taskCancellation = new TestTaskCancellerImpl(
TaskCancellationService taskCancellation = new TaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupLevelViews,
resourceUsageTrackerService,
activeQueryGroups,
deletedQueryGroups,
() -> false
);
taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> cancellableTasksFrom = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED);
assertEquals(2, cancellableTasksFrom.size());
Expand All @@ -391,6 +385,7 @@ public void testCancelTasks_does_not_cancelTasksFromDeletedQueryGroups_whenNodeN
assertEquals(1000, cancellableTasksFromDeletedQueryGroups.get(0).getTask().getId());
assertEquals(1001, cancellableTasksFromDeletedQueryGroups.get(1).getTask().getId());

when(resourceUsageTrackerService.constructQueryGroupLevelUsageViews()).thenReturn(queryGroupLevelViews);
taskCancellation.cancelTasks();

assertTrue(cancellableTasksFrom.get(0).getTask().isCancelled());
Expand Down Expand Up @@ -430,15 +425,17 @@ public void testCancelTasks_cancelsGivenTasks_WhenNodeInDuress() {
queryGroupLevelViews.put(queryGroupId2, mockView);
Collections.addAll(activeQueryGroups, queryGroup1, queryGroup2);

TestTaskCancellerImpl taskCancellation = new TestTaskCancellerImpl(
TaskCancellationService taskCancellation = new TaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupLevelViews,
resourceUsageTrackerService,
activeQueryGroups,
deletedQueryGroups,
() -> true
);

taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> cancellableTasksFrom = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED);
assertEquals(2, cancellableTasksFrom.size());
assertEquals(1234, cancellableTasksFrom.get(0).getTask().getId());
Expand All @@ -449,6 +446,7 @@ public void testCancelTasks_cancelsGivenTasks_WhenNodeInDuress() {
assertEquals(5678, cancellableTasksFrom1.get(0).getTask().getId());
assertEquals(8765, cancellableTasksFrom1.get(1).getTask().getId());

when(resourceUsageTrackerService.constructQueryGroupLevelUsageViews()).thenReturn(queryGroupLevelViews);
taskCancellation.cancelTasks();
assertTrue(cancellableTasksFrom.get(0).getTask().isCancelled());
assertTrue(cancellableTasksFrom.get(1).getTask().isCancelled());
Expand All @@ -475,6 +473,7 @@ public void testGetAllCancellableTasks_ReturnsNoTasksWhenNotBreachingThresholds(
);
queryGroupLevelViews.put(queryGroupId1, mockView);
activeQueryGroups.add(queryGroup1);
taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> allCancellableTasks = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED);
assertTrue(allCancellableTasks.isEmpty());
Expand All @@ -497,6 +496,7 @@ public void testGetAllCancellableTasks_ReturnsTasksWhenBreachingThresholds() {
when(mockView.getResourceUsageData()).thenReturn(Map.of(ResourceType.CPU, cpuUsage, ResourceType.MEMORY, memoryUsage));
queryGroupLevelViews.put(queryGroupId1, mockView);
activeQueryGroups.add(queryGroup1);
taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> allCancellableTasks = taskCancellation.getAllCancellableTasks(ResiliencyMode.ENFORCED);
assertEquals(2, allCancellableTasks.size());
Expand Down Expand Up @@ -526,6 +526,7 @@ public void testGetCancellableTasksFrom_doesNotReturnTasksWhenQueryGroupIdNotFou
queryGroupLevelViews.put(queryGroupId1, mockView);
activeQueryGroups.add(queryGroup1);
activeQueryGroups.add(queryGroup2);
taskCancellation.queryGroupLevelResourceUsageViews = queryGroupLevelViews;

List<TaskCancellation> cancellableTasksFrom = taskCancellation.getCancellableTasksFrom(queryGroup2);
assertEquals(0, cancellableTasksFrom.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE;
import static org.opensearch.wlm.cancellation.TaskCancellationService.MIN_VALUE;
import static org.opensearch.wlm.tracker.CpuUsageCalculator.PROCESSOR_COUNT;
import static org.opensearch.wlm.tracker.MemoryUsageCalculator.HEAP_SIZE_BYTES;
import static org.mockito.Mockito.mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.wlm.QueryGroupTask.QUERY_GROUP_ID_HEADER;
import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE;
import static org.opensearch.wlm.cancellation.TaskCancellationService.MIN_VALUE;
import static org.opensearch.wlm.tracker.CpuUsageCalculator.PROCESSOR_COUNT;
import static org.opensearch.wlm.tracker.MemoryUsageCalculator.HEAP_SIZE_BYTES;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down

0 comments on commit 981b15f

Please sign in to comment.