Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/QuerySandbox(ResourceLimitGroup) Add sandbox schema and tracking framework #13311

Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
// than creating an empty response in the search thread pool.
// Note that, we have to disable this shortcut for queries that create a context (scroll and search context).
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null);

return shardRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,6 @@ public SearchRequest pipeline(String pipeline) {
public String pipeline() {
return pipeline;
}

@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.core.tasks.TaskId;
import org.opensearch.search.fetch.ShardFetchSearchRequest;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.resource_limit_group.ResourceLimitGroupTask;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchBackpressureTask;

Expand All @@ -50,9 +51,10 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask {
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask, ResourceLimitGroupTask {
// generating metadata in a lazy way since source can be quite big
private final MemoizedSupplier<String> metadataSupplier;
private String resourceLimitGroupId;

public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, () -> "");
Expand Down Expand Up @@ -84,4 +86,12 @@ public boolean supportsResourceTracking() {
public boolean shouldCancelChildrenOnCancellation() {
return false;
}

public String getResourceLimitGroupName() {
return resourceLimitGroupId;
}

public void setResourceLimitGroupName(String resourceLimitGroupId) {
this.resourceLimitGroupId = resourceLimitGroupId;
}
Comment on lines +89 to +96
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class does not have any other setters. Why not set the resourceLimitGroupId within the constructor itself?

Copy link
Contributor Author

@kaushalmahi12 kaushalmahi12 May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't do that because of following reasons

but we will change the information is coming in the searchRequest so this will likely not be present in final change.

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.search.resource_limit_group.ResourceLimitGroupTask;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchBackpressureTask;

Expand All @@ -49,10 +50,11 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class SearchTask extends CancellableTask implements SearchBackpressureTask {
public class SearchTask extends CancellableTask implements SearchBackpressureTask, ResourceLimitGroupTask {
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
private String resourceLimitGroupId;

public SearchTask(
long id,
Expand Down Expand Up @@ -106,4 +108,12 @@ public final SearchProgressListener getProgressListener() {
public boolean shouldCancelChildrenOnCancellation() {
return true;
}

public String getResourceLimitGroupName() {
return resourceLimitGroupId;
}

public void setResourceLimitGroupName(String resourceLimitGroupId) {
this.resourceLimitGroupId = resourceLimitGroupId;
}
Comment on lines +112 to +118
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as SearchShardTask

Copy link
Contributor Author

@kaushalmahi12 kaushalmahi12 May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above,
This is where shard level task are created : https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java#L91

so basically the tasks are created by TaskManager using TaskAwareRequest.

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.index.query.Rewriteable;
import org.opensearch.search.MultiTenantLabel;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchService;
import org.opensearch.search.SearchShardTarget;
Expand Down Expand Up @@ -122,8 +123,7 @@
import java.util.stream.StreamSupport;

import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.opensearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
import static org.opensearch.action.search.SearchType.QUERY_THEN_FETCH;
import static org.opensearch.action.search.SearchType.*;
import static org.opensearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort;

/**
Expand Down Expand Up @@ -166,6 +166,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final String NOT_PROVIDED = "NOT_PROVIDED";

private final NodeClient client;
private final ThreadPool threadPool;
Expand Down Expand Up @@ -1104,6 +1105,15 @@ private void executeSearch(
concreteLocalIndices,
localShardIterators.size() + remoteShardIterators.size()
);

// Set tenant for this request in the task for tracking the tasks across tenants
Map<String, Object> multiTenantLabels = searchRequest.source().multiTenantLabels();
String tenant = NOT_PROVIDED;
if (multiTenantLabels != null) {
tenant = (String) multiTenantLabels.get(MultiTenantLabel.TENANT_LABEL.name());
}
task.setResourceLimitGroupName(tenant);

searchAsyncActionProvider.asyncSearchAction(
task,
searchRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,13 @@ public Map<String, View> views() {
return Optional.ofNullable((ViewMetadata) this.custom(ViewMetadata.TYPE)).map(ViewMetadata::views).orElse(Collections.emptyMap());
}

public Set<ResourceLimitGroup> resourceLimitGroups() {
return Optional.ofNullable((ResourceLimitGroupMetadata) this.custom(ResourceLimitGroupMetadata.TYPE))
.map(ResourceLimitGroupMetadata::resourceLimitGroups)
.orElse(Collections.emptySet());

}

public DecommissionAttributeMetadata decommissionAttributeMetadata() {
return custom(DecommissionAttributeMetadata.TYPE);
}
Expand Down Expand Up @@ -1329,6 +1336,25 @@ public Builder removeDataStream(String name) {
return this;
}

public Builder resourceLimitGroups(final Set<ResourceLimitGroup> resourceLimitGroups) {
this.customs.put(ResourceLimitGroupMetadata.TYPE, new ResourceLimitGroupMetadata(resourceLimitGroups));
return this;
}

public Builder put(final ResourceLimitGroup resourceLimitGroup) {
Objects.requireNonNull(resourceLimitGroup, "resourceLimitGroup should not be null");
Set<ResourceLimitGroup> existing = getResourceLimitGroups();
existing.add(resourceLimitGroup);
return resourceLimitGroups(existing);
}

private Set<ResourceLimitGroup> getResourceLimitGroups() {
return Optional.ofNullable(this.customs.get(ResourceLimitGroupMetadata.TYPE))
.map(o -> (ResourceLimitGroupMetadata) o)
.map(ResourceLimitGroupMetadata::resourceLimitGroups)
.orElse(Collections.emptySet());
}

private Map<String, View> getViews() {
return Optional.ofNullable(customs.get(ViewMetadata.TYPE))
.map(o -> (ViewMetadata) o)
Expand Down
Loading
Loading