Skip to content

Commit

Permalink
add task cancellation skeleton
Browse files Browse the repository at this point in the history
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
kaushalmahi12 committed Apr 22, 2024
1 parent 5ba7535 commit d2a19ee
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null);

// Propagate the resource limit group from co-ordinator to data nodes
shardRequest.setResourceLimitGroupId(getTask().getResourceLimitGroupId());
shardRequest.setResourceLimitGroupId(getTask().getResourceLimitGroupName());
return shardRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ public boolean shouldCancelChildrenOnCancellation() {
return false;
}

public String getResourceLimitGroupId() {
public String getResourceLimitGroupName() {
return resourceLimitGroupId;
}

public void setResourceLimitGroupId(String resourceLimitGroupId) {
public void setResourceLimitGroupName(String resourceLimitGroupId) {
this.resourceLimitGroupId = resourceLimitGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ public boolean shouldCancelChildrenOnCancellation() {
return true;
}

public String getResourceLimitGroupId() {
public String getResourceLimitGroupName() {
return resourceLimitGroupId;
}

public void setResourceLimitGroupId(String resourceLimitGroupId) {
public void setResourceLimitGroupName(String resourceLimitGroupId) {
this.resourceLimitGroupId = resourceLimitGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ private void executeSearch(
localShardIterators.size() + remoteShardIterators.size()
);

task.setResourceLimitGroupId(searchRequest.resourceLimitGroupId());
task.setResourceLimitGroupName(searchRequest.resourceLimitGroupId());

searchAsyncActionProvider.asyncSearchAction(
task,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(resourceName, value);
}

public String getResourceName() {
return resourceName;
}

public Double getValue() {
return value;
}
}

/**
Expand Down Expand Up @@ -270,4 +278,11 @@ public ResourceLimitGroupMode getMode() {
public List<ResourceLimit> getResourceLimits() {
return resourceLimits;
}

public ResourceLimit getResourceLimitFor(String resourceName) {
return resourceLimits.stream()
.filter(resourceLimit -> resourceLimit.getResourceName().equals(resourceName))
.findFirst()
.orElseGet(() -> new ResourceLimit(resourceName, 100.0));
}
}
6 changes: 3 additions & 3 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ public void executeQueryPhase(
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
: "empty responses require more than one shard";
final IndexShard shard = getShard(request);
task.setResourceLimitGroupId(request.resourceLimitGroupId());
task.setResourceLimitGroupName(request.resourceLimitGroupId());
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest orig) {
Expand Down Expand Up @@ -677,7 +677,7 @@ public void executeQueryPhase(
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
task.setResourceLimitGroupId(shardSearchRequest.resourceLimitGroupId());
task.setResourceLimitGroupName(shardSearchRequest.resourceLimitGroupId());
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
Expand Down Expand Up @@ -780,7 +780,7 @@ public void executeFetchPhase(
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
task.setResourceLimitGroupId(shardSearchRequest.resourceLimitGroupId());
task.setResourceLimitGroupName(shardSearchRequest.resourceLimitGroupId());
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
runAsync(getExecutor(readerContext.indexShard()), () -> {
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.search.resource_limit_group.cancellation.ResourceLimitGroupRequestCanceller;
import org.opensearch.search.resource_limit_group.cancellation.ResourceLimitGroupTaskCanceller;
import org.opensearch.search.resource_limit_group.tracker.ResourceLimitGroupResourceUsageTracker;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -26,11 +26,12 @@ public class ResourceLimitGroupService extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(ResourceLimitGroupService.class);

private final ResourceLimitGroupResourceUsageTracker requestTracker;
private final ResourceLimitGroupRequestCanceller requestCanceller;
private final ResourceLimitGroupTaskCanceller requestCanceller;
private final ResourceLimitGroupPruner resourceLimitGroupPruner;
private volatile Scheduler.Cancellable scheduledFuture;
private final ResourceLimitGroupServiceSettings sandboxServiceSettings;
private final ThreadPool threadPool;
private final ResourceLimitGroupTaskCanceller taskCanceller;

/**
* Guice managed constructor
Expand All @@ -43,24 +44,26 @@ public class ResourceLimitGroupService extends AbstractLifecycleComponent {
@Inject
public ResourceLimitGroupService(
ResourceLimitGroupResourceUsageTracker requestTrackerService,
ResourceLimitGroupRequestCanceller requestCanceller,
ResourceLimitGroupTaskCanceller requestCanceller,
ResourceLimitGroupPruner resourceLimitGroupPruner,
ResourceLimitGroupServiceSettings sandboxServiceSettings,
ThreadPool threadPool
ThreadPool threadPool,
ResourceLimitGroupTaskCanceller taskCanceller
) {
this.requestTracker = requestTrackerService;
this.requestCanceller = requestCanceller;
this.resourceLimitGroupPruner = resourceLimitGroupPruner;
this.sandboxServiceSettings = sandboxServiceSettings;
this.threadPool = threadPool;
this.taskCanceller = taskCanceller;
}

/**
* run at regular interval
*/
private void doRun() {
requestTracker.updateResourceLimitGroupsResourceUsage();
requestCanceller.cancelViolatingTasks();
taskCanceller.cancelTasks();
resourceLimitGroupPruner.pruneResourceLimitGroup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* This interface can be implemented by tasks which will be tracked and monitored using {@link org.opensearch.cluster.metadata.ResourceLimitGroup}
*/
public interface ResourceLimitGroupTask {
public void setResourceLimitGroupId(String sandboxResourceLimitGroup);
public void setResourceLimitGroupName(String sandboxResourceLimitGroup);

public String getResourceLimitGroupId();
public String getResourceLimitGroupName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.resource_limit_group.cancellation;

import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;

import java.util.List;

public interface CancellableTaskSelector {
/**
* This method selects tasks which can be cancelled
* @param tasks is list of available tasks to select from
* @param reduceBy is meant to select enough number of tasks consuming {@param reduceBy} resource
* @param resource it is a system resource e,g; "jvm" or "cpu"
* @return
*/
public List<TaskCancellation> selectTasks(List<Task> tasks, Double reduceBy, String resource);

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.resource_limit_group.cancellation;

/**
* This class is used to identify and cancel the violating tasks in a resourceLimitGroup
*/
public abstract class ResourceLimitGroupTaskCanceller {
private CancellableTaskSelector taskSelector;

public ResourceLimitGroupTaskCanceller(CancellableTaskSelector taskSelector) {
this.taskSelector = taskSelector;
}

public abstract void cancelTasks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.common.inject.AbstractModule;
import org.opensearch.search.resource_limit_group.ResourceLimitGroupPruner;
import org.opensearch.search.resource_limit_group.cancellation.ResourceLimitGroupRequestCanceller;
import org.opensearch.search.resource_limit_group.cancellation.ResourceLimitGroupTaskCanceller;
import org.opensearch.search.resource_limit_group.tracker.ResourceLimitGroupResourceUsageTracker;
import org.opensearch.search.resource_limit_group.tracker.ResourceLimitsGroupResourceUsageTrackerService;

Expand All @@ -27,7 +27,7 @@ public ResourceLimitGroupModule() {}
@Override
protected void configure() {
bind(ResourceLimitGroupResourceUsageTracker.class).to(ResourceLimitsGroupResourceUsageTrackerService.class).asEagerSingleton();
bind(ResourceLimitGroupRequestCanceller.class).to(ResourceLimitsGroupResourceUsageTrackerService.class).asEagerSingleton();
bind(ResourceLimitGroupTaskCanceller.class).to(ResourceLimitsGroupResourceUsageTrackerService.class).asEagerSingleton();
bind(ResourceLimitGroupPruner.class).to(ResourceLimitsGroupResourceUsageTrackerService.class).asEagerSingleton();
}
}
Loading

0 comments on commit d2a19ee

Please sign in to comment.