Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add multitenant support for search workloads #13713

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.search.fetch.ShardFetchSearchRequest;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.ResourceLimitGroupTask;
import org.opensearch.tasks.SearchBackpressureTask;

import java.util.Map;
Expand All @@ -50,9 +51,10 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask {
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask, ResourceLimitGroupTask {
// generating metadata in a lazy way since source can be quite big
private final MemoizedSupplier<String> metadataSupplier;
private String resourceLimitGroupId;

public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, () -> "");
Expand Down Expand Up @@ -84,4 +86,12 @@ public boolean supportsResourceTracking() {
public boolean shouldCancelChildrenOnCancellation() {
return false;
}

public String getResourceLimitGroupName() {
return resourceLimitGroupId;
}

public void setResourceLimitGroupName(String resourceLimitGroupId) {
this.resourceLimitGroupId = resourceLimitGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.ResourceLimitGroupTask;
import org.opensearch.tasks.SearchBackpressureTask;

import java.util.Map;
Expand All @@ -49,10 +50,11 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class SearchTask extends CancellableTask implements SearchBackpressureTask {
public class SearchTask extends CancellableTask implements SearchBackpressureTask, ResourceLimitGroupTask {
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
private String resourceLimitGroupId;

public SearchTask(
long id,
Expand Down Expand Up @@ -106,4 +108,12 @@ public final SearchProgressListener getProgressListener() {
public boolean shouldCancelChildrenOnCancellation() {
return true;
}

public String getResourceLimitGroupName() {
return resourceLimitGroupId;
}

public void setResourceLimitGroupName(String resourceLimitGroupId) {
this.resourceLimitGroupId = resourceLimitGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.index.query.Rewriteable;
import org.opensearch.search.MultiTenantLabel;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchService;
import org.opensearch.search.SearchShardTarget;
Expand Down Expand Up @@ -166,6 +167,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final String NOT_PROVIDED = "NOT_PROVIDED";

private final NodeClient client;
private final ThreadPool threadPool;
Expand Down Expand Up @@ -1104,6 +1106,15 @@ private void executeSearch(
concreteLocalIndices,
localShardIterators.size() + remoteShardIterators.size()
);

// Set tenant for this request in the task for tracking the tasks across tenants
String tenant = NOT_PROVIDED;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering this logic can also be extended to other places like indexing etc, defining this here doesn't make sense.
Either

  1. Define a global string/enum for this in tenant related class
    OR
  2. Leave it tenant name as null, make it Optional and handle it accordingly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! good catch. I have defined a global string just forgot to use that here.

if (searchRequest.source() != null) {
Map<String, Object> multiTenantLabels = searchRequest.source().multiTenantLabels();
tenant = (String) multiTenantLabels.get(MultiTenantLabel.TENANT.name());
}
task.setResourceLimitGroupName(tenant);

searchAsyncActionProvider.asyncSearchAction(
task,
searchRequest,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;

/**
* Enum to hold all multitenant labels in workloads
*/
public enum MultiTenantLabel implements Writeable {
// This label is basically used to define tenancy for multiple features e,g; Query Sandboxing, Query Insights
TENANT("tenant");

private final String value;

MultiTenantLabel(String name) {
this.value = name;
}

public String getValue() {
return value;
}

public static MultiTenantLabel fromName(String name) {
for (MultiTenantLabel label : values()) {
if (label.getValue().equalsIgnoreCase(name)) {
return label;
}
}
throw new IllegalArgumentException("Illegal name + " + name);
}

public static MultiTenantLabel fromName(StreamInput in) throws IOException {
return fromName(in.readString());
}

/**
* Write this into the {@linkplain StreamOutput}.
*
* @param out
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(value);
}
}
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;

import static org.opensearch.action.search.TransportSearchAction.NOT_PROVIDED;
import static org.opensearch.common.unit.TimeValue.timeValueHours;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
Expand Down Expand Up @@ -568,6 +569,7 @@ public void executeQueryPhase(
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
: "empty responses require more than one shard";
final IndexShard shard = getShard(request);
setTenantInTask(task, request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not also instrument this tenant logic in task for other phases like fetch etc?
If yes, we should ideally populate this tenant info from coordinator level itself so that all tasks automatically have this info?

Copy link
Contributor Author

@kaushalmahi12 kaushalmahi12 May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do populate the tenancy info at the coordinator level but it would not suffice because of following reasons

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. But should we also instrument this tenant logic for other phases as well like fetch etc?

rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest orig) {
Expand Down Expand Up @@ -598,6 +600,14 @@ public void onFailure(Exception exc) {
});
}

private void setTenantInTask(SearchShardTask task, ShardSearchRequest request) {
String tenant = NOT_PROVIDED;
if (request.source() != null && request.source().multiTenantLabels() != null) {
tenant = (String) request.source().multiTenantLabels().get(MultiTenantLabel.TENANT.name());
}
Comment on lines +604 to +607
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] Can be done inline like below:

String tenant = Optional.ofNullable(request.source())
.map(source -> source.multiTenantLabels())
.map(tenantLabels -> tenantLabels.get(MultiTenantLabel.TENANT.name())
.orElse(NOT_PROVIDED);

task.setResourceLimitGroupName(tenant);
}

private IndexShard getShard(ShardSearchRequest request) {
if (request.readerId() != null) {
return findReaderContext(request.readerId(), request).indexShard();
Expand Down Expand Up @@ -676,6 +686,7 @@ public void executeQueryPhase(
}
runAsync(getExecutor(readerContext.indexShard()), () -> {
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
setTenantInTask(task, shardSearchRequest);
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
Expand Down Expand Up @@ -778,6 +789,7 @@ public void executeFetchPhase(
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
setTenantInTask(task, shardSearchRequest);
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
runAsync(getExecutor(readerContext.indexShard()), () -> {
try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -136,6 +137,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
public static final ParseField SLICE = new ParseField("slice");
public static final ParseField POINT_IN_TIME = new ParseField("pit");
public static final ParseField SEARCH_PIPELINE = new ParseField("search_pipeline");
public static final ParseField MULTI_TENANT_LABELS = new ParseField("multitenant_attrs");

public static SearchSourceBuilder fromXContent(XContentParser parser) throws IOException {
return fromXContent(parser, true);
Expand Down Expand Up @@ -223,6 +225,7 @@ public static HighlightBuilder highlight() {
private PointInTimeBuilder pointInTimeBuilder = null;

private Map<String, Object> searchPipelineSource = null;
private Map<String, Object> multiTenantLabels = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just define it to Map<String, String>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping it generic if want to provide some complex objects as part of tenancy definition.


/**
* Constructs a new search source builder.
Expand Down Expand Up @@ -297,6 +300,10 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
derivedFields = in.readList(DerivedField::new);
}
}

if (in.getVersion().onOrAfter(Version.V_2_14_0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will need to define this to Version.V_3_0_0 initially in main branch, after backporting to 2.x, you will then need to change it to Version.V_2_14_0

multiTenantLabels = in.readMap();
}
}

@Override
Expand Down Expand Up @@ -377,6 +384,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeList(derivedFields);
}
}

if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeMap(multiTenantLabels);
}
}

/**
Expand Down Expand Up @@ -1088,6 +1099,14 @@ public SearchSourceBuilder searchPipelineSource(Map<String, Object> searchPipeli
return this;
}

/**
*
* @return {@code <String, String>} pairs
*/
public Map<String, Object> multiTenantLabels() {
return multiTenantLabels;
}

/**
* Rewrites this search source builder into its primitive form. e.g. by
* rewriting the QueryBuilder. If the builder did not change the identity
Expand Down Expand Up @@ -1334,6 +1353,8 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
searchPipelineSource = parser.mapOrdered();
} else if (DERIVED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
derivedFieldsObject = parser.map();
} else if (MULTI_TENANT_LABELS.match(currentFieldName, parser.getDeprecationHandler())) {
multiTenantLabels = parser.map();
} else {
throw new ParsingException(
parser.getTokenLocation(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

/**
* Tasks which should be grouped
*/
public interface ResourceLimitGroupTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are essentially grouping by "tenants" which internally have resource limits. So this naming doesn't look right to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming part we are still discussing.

void setResourceLimitGroupName(String name);

String getResourceLimitGroupName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.core.common.io.stream.StreamInput;

import java.io.IOException;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

public class MultiTenantLabelTests extends AbstractSearchTestCase {

public void testValidMultiTenantLabel() {
MultiTenantLabel label = MultiTenantLabel.fromName("tenant");
assertEquals(label.getValue(), "tenant");
}

public void testInvalidMultiTenantLabel() {
assertThrows(IllegalArgumentException.class, () -> MultiTenantLabel.fromName("foo"));
}

public void testValidMultiTenantLabelWithStreamInput() throws IOException {
StreamInput streamInput = mock(StreamInput.class);
doReturn("tenant").when(streamInput).readString();

MultiTenantLabel label = MultiTenantLabel.fromName(streamInput);
assertEquals(label.getValue(), "tenant");
}

public void testInvalidMultiTenantLabelWithStreamInput() throws IOException {
StreamInput streamInput = mock(StreamInput.class);
doReturn("foo").when(streamInput).readString();

assertThrows(IllegalArgumentException.class, () -> MultiTenantLabel.fromName(streamInput));
}

}
Loading