diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 397949525a3ec..d5ad0eca70208 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -198,6 +198,7 @@ import org.opensearch.plugins.IdentityPlugin; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.plugins.IngestPlugin; +import org.opensearch.plugins.LabelingPlugin; import org.opensearch.plugins.MapperPlugin; import org.opensearch.plugins.MetadataUpgrader; import org.opensearch.plugins.NetworkPlugin; @@ -212,6 +213,7 @@ import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.plugins.TelemetryAwarePlugin; import org.opensearch.plugins.TelemetryPlugin; +import org.opensearch.querygroup.LabelingService; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.ratelimitting.admissioncontrol.transport.AdmissionControlTransportInterceptor; import org.opensearch.repositories.RepositoriesModule; @@ -1114,6 +1116,8 @@ protected Node( transportService.getTaskManager() ); + final LabelingService labelingService = new LabelingService(pluginsService.filterPlugins(LabelingPlugin.class)); + final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService); RepositoriesModule repositoriesModule = new RepositoriesModule( this.environment, diff --git a/server/src/main/java/org/opensearch/plugins/LabelingPlugin.java b/server/src/main/java/org/opensearch/plugins/LabelingPlugin.java new file mode 100644 index 0000000000000..e059772f7b795 --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/LabelingPlugin.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.action.IndicesRequest; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.querygroup.LabelingService.LabelingImplementationType; + +/** + * This plugin introduces contracts on how should the incoming requests be labeled using implicit/explicit request attributes + * + */ +public interface LabelingPlugin { + + LabelingImplementationType getImplementationName(); + + /** + * This method will compute label value/values and + * put these in the {@link ThreadContext} against {@link org.opensearch.querygroup.LabelingHeader} + * @param request + * @param threadContext + */ + void labelRequest(final IndicesRequest request, final ThreadContext threadContext); +} diff --git a/server/src/main/java/org/opensearch/querygroup/LabelingHeader.java b/server/src/main/java/org/opensearch/querygroup/LabelingHeader.java new file mode 100644 index 0000000000000..7bae5f5c210ca --- /dev/null +++ b/server/src/main/java/org/opensearch/querygroup/LabelingHeader.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.querygroup; + +/** + * Main enum define various headers introduced by {@link org.opensearch.plugins.LabelingPlugin}s + */ +public enum LabelingHeader { + QUERY_GROUP_ID("queryGroupId"); + + private final String name; + + private LabelingHeader(final String name) { + this.name = name; + } + + public static LabelingHeader fromName(String name) { + for (final LabelingHeader header : values()) { + if (header.getName().equals(name)) { + return header; + } + } + throw new IllegalArgumentException(name + " is not a valid [LabelingHeader]"); + } + + private String getName() { + return name; + } +} diff --git a/server/src/main/java/org/opensearch/querygroup/LabelingService.java b/server/src/main/java/org/opensearch/querygroup/LabelingService.java new file mode 100644 index 0000000000000..b8dfe5f81412f --- /dev/null +++ b/server/src/main/java/org/opensearch/querygroup/LabelingService.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.querygroup; + +import org.opensearch.action.IndicesRequest; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.plugins.LabelingPlugin; + +import java.util.EnumMap; +import java.util.List; + +/** + * TODO: Don't know the right package to put this class, need suggestions for maintainers on this + * Main class to hold multiple implementations of {@link org.opensearch.plugins.LabelingPlugin} + * this class will facilitate access and interactions to different implementations + * Usage: This class should be used as a member to orchestrate the working of your {@link LabelingPlugin} + */ +public class LabelingService { + /** + * Enum to define what all are currently implementing the plugin + */ + public enum LabelingImplementationType { + QUERY_GROUP_RESOURCE_MANAGEMENT, + NOOP + } + + EnumMap implementations; + + public LabelingService(List loadedPlugins) { + implementations = new EnumMap<>(LabelingImplementationType.class); + for (LabelingPlugin plugin : loadedPlugins) { + if (implementations.containsKey(plugin.getImplementationName())) { + throw new IllegalArgumentException("There should not be two implementations of a LabelingImplementation type"); + } + implementations.put(plugin.getImplementationName(), plugin); + } + } + + /** + * populates the threadContext with the labels yielded by the {@param type} against the {@link LabelingHeader} keys + * @param type + * @param request + * @param threadContext + */ + public void labelRequestFor(final LabelingImplementationType type, final IndicesRequest request, final ThreadContext threadContext) { + final LabelingPlugin plugin = implementations.get(type); + if (plugin == null) { + throw new IllegalArgumentException(type + " implementation is not enabled"); + } + plugin.labelRequest(request, threadContext); + } +} diff --git a/server/src/main/java/org/opensearch/querygroup/package-info.java b/server/src/main/java/org/opensearch/querygroup/package-info.java new file mode 100644 index 0000000000000..d5210376b1ef7 --- /dev/null +++ b/server/src/main/java/org/opensearch/querygroup/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.querygroup; diff --git a/server/src/test/java/org/opensearch/querygroup/LabelingServiceTests.java b/server/src/test/java/org/opensearch/querygroup/LabelingServiceTests.java new file mode 100644 index 0000000000000..ddd8035e56f89 --- /dev/null +++ b/server/src/test/java/org/opensearch/querygroup/LabelingServiceTests.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.querygroup; + +import org.opensearch.action.IndicesRequest; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.plugins.LabelingPlugin; +import org.opensearch.querygroup.LabelingService.LabelingImplementationType; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class LabelingServiceTests extends OpenSearchTestCase { + + ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("QSB"); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testInvalidInstantiationOfLabelingService() { + assertThrows( + IllegalArgumentException.class, + () -> new LabelingService( + List.of( + getTestImplementation("", "", LabelingImplementationType.QUERY_GROUP_RESOURCE_MANAGEMENT), + getTestImplementation("", "", LabelingImplementationType.QUERY_GROUP_RESOURCE_MANAGEMENT) + ) + ) + ); + } + + public void testExistingImplementationExistingCase() { + LabelingService labelingService = new LabelingService( + List.of( + getTestImplementation("queryGroupId", "akfagagnaga232_2434t", LabelingImplementationType.QUERY_GROUP_RESOURCE_MANAGEMENT) + ) + ); + IndicesRequest request = mock(IndicesRequest.class); + // threadPool = new TestThreadPool("QSB"); + ThreadContext threadContext = threadPool.getThreadContext(); + + labelingService.labelRequestFor(LabelingImplementationType.QUERY_GROUP_RESOURCE_MANAGEMENT, request, threadContext); + assertEquals(threadContext.getHeader("queryGroupId"), "akfagagnaga232_2434t"); + } + + public void testNonExistingImplementationExistingCase() { + LabelingService labelingService = new LabelingService( + List.of( + getTestImplementation("queryGroupId", "akfagagnaga232_2434t", LabelingImplementationType.QUERY_GROUP_RESOURCE_MANAGEMENT) + ) + ); + IndicesRequest request = mock(IndicesRequest.class); + // threadPool = new TestThreadPool("QSB"); + ThreadContext threadContext = threadPool.getThreadContext(); + + assertThrows( + IllegalArgumentException.class, + () -> labelingService.labelRequestFor(LabelingImplementationType.NOOP, request, threadContext) + ); + + } + + LabelingPlugin getTestImplementation(String header, String value, LabelingImplementationType type) { + return new LabelingPlugin() { + @Override + public LabelingImplementationType getImplementationName() { + return type; + } + + @Override + public void labelRequest(IndicesRequest request, ThreadContext threadContext) { + threadContext.putHeader(header, value); + } + }; + } +}