From d4bdc7a601a8e212ddffcdbcd95bba4fff78a872 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 7 Aug 2024 11:59:54 -0700 Subject: [PATCH] Add queryGroupId to search workload tasks at co-ordinator and data node level (#14708) (#15029) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add logic to add headers to Task * add logic to add queryGroupId to task headers * remove redundant code * add changelog entry * address comments * fix precommit * Add UTs for RemoteIndexMetadataManager (#14660) * Fix match_phrase_prefix_query not working on text field with multiple values and index_prefixes (#10959) * Fix match_phrase_prefix_query not working on text field with multiple values and index_prefixes * Add more test * modify change log * Fix test failure * Change the indexAnalyzer used by prefix field * Skip old version for yaml test * Optimize some code * Fix test failure * Modify yaml test description * Remove the name parameter for setAnalyzer() --------- * Offline calculation of total shard per node and caching it for weight calculation inside LocalShardBalancer (#14675) * [bug fix] validate lower bound for top n size (#14587) * Create SystemIndexRegistry with helper method matchesSystemIndex (#14415) * Create new extension point in SystemIndexPlugin for a single plugin to get registered system indices * Add to CHANGELOG * WIP on system indices from IndexNameExpressionResolver * Add test in IndexNameExpressionResolverTests * Remove changes in SystemIndexPlugin * Add method in IndexNameExpressionResolver to get matching system indices * Show how resolver can be chained to get system indices * Fix forbiddenApis check * Update CHANGELOG * Make SystemIndices internal * Remove unneeded changes * Fix CI failures * Fix precommit errors * Use Regex instead of WildcardMatcher * Address code review feedback * Allow caller to pass index expressions * Create SystemIndexRegistry * Update CHANGELOG * Remove singleton limitation * Add javadoc * Add @ExperimentalApi annotation --------- * Refactor Grok validate pattern to iterative approach (#14206) * grok validate patterns recusrion to iterative * Add max depth in resolving a pattern to avoid OOM * change path from deque to arraylist * rename queue to stack * Change max depth to 500 * typo originPatternName fix * spotless --------- * Bump opentelemetry from 1.39.0 to 1.40.0 (#14674) * Bump jackson from 2.17.1 to 2.17.2 (#14687) * Add release notes for release 1.3.18 (#14699) * Bump reactor from 3.5.19 to 3.5.20 (#14697) * Add unit tests for read flow of RemoteClusterStateService and bug fix for transient settings (#14476) * Update version check for the bug fix of match_phrase_prefix_query not working on text field with multiple values and index_prefixes (#14703) * Remove unnecessary cast to int from test (#14696) * print reason why parent task was cancelled (#14604) * Use set of shard routing for shard in unassigned shard batch check. (#14533) * Add versioning for UploadedIndexMetadata (#14677) * Add versioning for UploadedIndexMetadata * Handle componentPrefix for backward compatibility * Fix: update help output for _cat (#14722) * fixed help output for _cat * updated changelog * updated changelog --------- * Fix hdfs-fixture kerb-admin & hadoop-minicluster dependencies are not being updated / false positive reports on CVEs (#14729) * Update to Gradle 8.9 (#14574) * Fix hdfs-fixture hadoop-minicluster dependencies are not being updated / false positive reports on CVEs (#14732) * Add `strict_allow_templates` dynamic mapping option (#14555) * The dynamic mapping parameter supports strict_allow_templates * Modify change log * Modify skip version in yml test file * Refactor some code * Keep the old methods * change public to private * Optimize some code * Do not override toString method for Dynamic * Optimize some code and modify the changelog --------- * Bump net.minidev:json-smart from 2.5.0 to 2.5.1 in /plugins/repository-azure (#14748) * Bump net.minidev:json-smart in /plugins/repository-azure Bumps [net.minidev:json-smart](https://github.com/netplex/json-smart-v2) from 2.5.0 to 2.5.1. - [Release notes](https://github.com/netplex/json-smart-v2/releases) - [Commits](https://github.com/netplex/json-smart-v2/compare/2.5.0...2.5.1) --- updated-dependencies: - dependency-name: net.minidev:json-smart dependency-type: direct:production update-type: version-update:semver-patch ... * Updating SHAs * Update changelog --------- * remove query insights plugin from core (#14743) * Add `strict_allow_templates` dynamic mapping option (#14555) (#14737) (#14742) * The dynamic mapping parameter supports strict_allow_templates * Modify change log * Modify skip version in yml test file * Refactor some code * Keep the old methods * change public to private * Optimize some code * Do not override toString method for Dynamic * Optimize some code and modify the changelog --------- (cherry picked from commit 6b8b3efe01a62c221f308a2e3b019d75a7f5ad8a) * Fix create or update alias API doesn't throw exception for unsupported parameters (#14719) * Fix create or update alias API doesn't throw exception for unsupported parameters * Update version check in yml test * modify change log --------- * Remove query categorization from core (#14759) * Remove query categorization from core * Add changelog * Trigger Build --------- * Add changes to propagate queryGroupId across child requests and nodes (#14614) * add query group header propagator * apply spotless check * add new propagator in ThreadContext * spotlessApply * address comments * Bump com.microsoft.azure:msal4j from 1.15.1 to 1.16.0 in /plugins/repository-azure (#14610) * Bump com.microsoft.azure:msal4j in /plugins/repository-azure Bumps [com.microsoft.azure:msal4j](https://github.com/AzureAD/microsoft-authentication-library-for-java) from 1.15.1 to 1.16.0. - [Release notes](https://github.com/AzureAD/microsoft-authentication-library-for-java/releases) - [Changelog](https://github.com/AzureAD/microsoft-authentication-library-for-java/blob/dev/changelog.txt) - [Commits](https://github.com/AzureAD/microsoft-authentication-library-for-java/compare/v1.15.1...v1.16.0) --- updated-dependencies: - dependency-name: com.microsoft.azure:msal4j dependency-type: direct:production update-type: version-update:semver-minor ... * Updating SHAs * Update changelog --------- * [Bugfix] Fix ICacheKeySerializerTests flakiness (#14564) * Fix testInvalidInput flakiness * Addressed andrross's comment * rerun security check --------- * Correct typo in method name (#14621) * Refactoring FilterPath.parse by using an iterative approach instead of recursion. (#14200) * Refactor FilterPath parse function (#12067) * Implement unit tests for FilterPathTests (#12067) * Write warn log if Filter is empty; Add comments (#12067) * Add changelog * Remove unnecessary log statement * Remove unused logger * Spotless apply * Remove incorrect changelog --------- * Removing String format in RemoteStoreMigrationAllocationDecider to optimise performance(#14612) * Clear templates before Adding; Use NamedWriteableAwareStreamInput for RemoteCustomMetadata; Correct the check for deciding upload of HashesOfConsistentSettings (#14513) * Clear templates before Adding; Use NamedWriteableAwareStreamInput for RemoteCustomMetadata * Correct the check for deciding upload of hashes of consistent settings * add changelog * add PR link changelog * Improve reroute performance by optimising List.removeAll in LocalShardsBalancer to filter remote search shard from relocation decision (#14613) * Fix assertion failure while deleting remote backed index (#14601) * Allow system index warning in OpenSearchRestTestCase.refreshAllIndices (#14635) * Allow system index warning * Add to CHANGELOG * Address code review comments --------- * Star tree codec changes (#14514) --------- * Bump com.github.spullara.mustache.java:compiler from 0.9.13 to 0.9.14 in /modules/lang-mustache (#14672) * Bump com.github.spullara.mustache.java:compiler Bumps [com.github.spullara.mustache.java:compiler](https://github.com/spullara/mustache.java) from 0.9.13 to 0.9.14. - [Commits](https://github.com/spullara/mustache.java/compare/mustache.java-0.9.13...mustache.java-0.9.14) --- updated-dependencies: - dependency-name: com.github.spullara.mustache.java:compiler dependency-type: direct:production update-type: version-update:semver-patch ... * Updating SHAs * Update changelog --------- * Bump net.minidev:accessors-smart from 2.5.0 to 2.5.1 in /plugins/repository-azure (#14673) * Bump net.minidev:accessors-smart in /plugins/repository-azure Bumps [net.minidev:accessors-smart](https://github.com/netplex/json-smart-v2) from 2.5.0 to 2.5.1. - [Release notes](https://github.com/netplex/json-smart-v2/releases) - [Commits](https://github.com/netplex/json-smart-v2/compare/2.5.0...2.5.1) --- updated-dependencies: - dependency-name: net.minidev:accessors-smart dependency-type: direct:production update-type: version-update:semver-patch ... * Updating SHAs * Update changelog --------- * move query group thread context propagator out of ThreadContext --------- * Add consumers to remote store based index settings (#14764) * Add matchesPluginSystemIndexPattern to SystemIndexRegistry (#14750) * Add matchesPluginSystemIndexPattern to SystemIndexRegistry * Add to CHANGELOG * Use single data structure to keep track of system indices * Address code review comments * Add test for getAllDescriptors * Update server/src/main/java/org/opensearch/indices/SystemIndexRegistry.java --------- * SPI for loading ABC templates (#14659) * SPI for loading ABC templates * Fix bulk upsert ignores the default_pipeline and final_pipeline when the auto-created index matches the index template (#12891) * Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches with the index template * Modify changelog & comment * Use new approach * Fix test failure --------- * Fix flaky test due to node being used across all tests (#14787) * Star Tree Implementation [OnHeap] (#14512) --------- * Add Gao Binlong as maintainer (#14796) * Clear ehcache disk cache files during initialization (#14738) * Clear ehcache disk cache files during initialization * Adding UT to fix line coverage * Addressing comment * Adding more Uts for better line coverage * Throwing exception in case we fail to clear cache files during startup * Adding more UTs * Adding a UT for more coverage * Fixing gradle build * Update ehcache disk cache close() logic --------- * Refactor remote-routing-table service inline with remote state interfaces (#14668) --------- * Set version to 2.15 for determining metadata during migration to remote store * Fix bulk upsert ignores the default_pipeline and final_pipeline when the auto-created index matches the index template (#14793) * Fix create or update alias API doesn't throw exception for unsupported parameters (#14769) * Change RCSS info logs to debug (#14814) * [Bugfix] Fix NPE in ReplicaShardAllocator (#13993) (#14385) * [Bugfix] Fix NPE in ReplicaShardAllocator (#13993) * Add fix info to CHANGELOG.md --------- * Run performance benchmark on pull requests (#14760) * add performance benchmark workflow for pull requests * Update PERFORMANCE_BENCHMARKS.md * Update PERFORMANCE_BENCHMARKS.md * Update .github/workflows/benchmark-pull-request.yml * Update .github/workflows/benchmark-pull-request.yml * Update .github/workflows/benchmark-pull-request.yml * Update .github/workflows/benchmark-pull-request.yml --------- * fix constant_keyword field type (#14807) test * [Remote Store Migration] Reconcile remote store based index settings during STRICT mode switch (#14792) * Add prefix mode verification setting for repository verification (#14790) * Add prefix mode verification setting for repository verification * Add UTs and randomise prefix mode repository verification * Incorporate PR review feedback --------- * add length check on comment body for benchmark workflow (#14834) * Add restore-from-snapshot test procedure for snapshot run benchmark config (#14842) * Fix env variable name typo (#14843) * Use circuit breaker in InternalHistogram when adding empty buckets (#14754) * introduce circuit breaker in InternalHistogram * use circuit breaker from reduce context * add test * revert use_real_memory change in OpenSearchNode * add change log --------- * [Remote State] Create interface RemoteEntitiesManager (#14671) * Create interface RemoteEntitiesManager * Optimise TransportNodesAction to not send DiscoveryNodes for NodeStat… (#14749) * Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call * Enabling term version check on local state for all ClusterManager Read Transport Actions (#14273) * enabling term version check on local state for all admin read actions * Reduce logging in DEBUG for MasterService:run (#14795) * Reduce logging in DEBUG for MasteService:run by introducing short and long summary in Taskbatcher * Add SplitResponseProcessor to Search Pipelines (#14800) * Add SplitResponseProcessor for search pipelines * Register the split processor factory * Address code review comments * Avoid list copy by casting array --------- * Add integration tests for RemoteRoutingTable Service. (#14631) * Add SortResponseProcessor to Search Pipelines (#14785) * Add SortResponseProcessor for search pipelines * Add stupid and unnecessary javadocs to satisfy overly strict CI * Split casting and sorting methods for readability * Register the sort processor factory * Address code review comments * Cast individual list elements to avoid creating two lists * Add yamlRestTests * Clarify why there's unusual sorting * Use instanceof instead of isAssignableFrom --------- * Fix allowUnmappedFields, mapUnmappedFieldAsString settings to be applied when parsing query string query (#13957) * Modify to invoke QueryShardContext.fieldMapper() method to apply allowUnmappedFields and mapUnmappedFieldAsString settings * Add test cases to verify returning 400 responses if unmapped fields are included for some types of query * Add changelog --------- * Bump com.microsoft.azure:msal4j from 1.16.0 to 1.16.1 in /plugins/repository-azure (#14857) * Bump com.microsoft.azure:msal4j in /plugins/repository-azure Bumps [com.microsoft.azure:msal4j](https://github.com/AzureAD/microsoft-authentication-library-for-java) from 1.16.0 to 1.16.1. - [Release notes](https://github.com/AzureAD/microsoft-authentication-library-for-java/releases) - [Changelog](https://github.com/AzureAD/microsoft-authentication-library-for-java/blob/dev/changelog.txt) - [Commits](https://github.com/AzureAD/microsoft-authentication-library-for-java/compare/v1.16.0...v1.16.1) --- updated-dependencies: - dependency-name: com.microsoft.azure:msal4j dependency-type: direct:production update-type: version-update:semver-patch ... * Updating SHAs * Update changelog --------- * Bump com.gradle.develocity from 3.17.5 to 3.17.6 (#14856) * Bump com.gradle.develocity from 3.17.5 to 3.17.6 Bumps com.gradle.develocity from 3.17.5 to 3.17.6. --- updated-dependencies: - dependency-name: com.gradle.develocity dependency-type: direct:production update-type: version-update:semver-patch ... * Update changelog --------- * Bump org.jline:jline in /test/fixtures/hdfs-fixture (#14859) Bumps [org.jline:jline](https://github.com/jline/jline3) from 3.26.2 to 3.26.3. - [Release notes](https://github.com/jline/jline3/releases) - [Changelog](https://github.com/jline/jline3/blob/master/changelog.md) - [Commits](https://github.com/jline/jline3/compare/jline-parent-3.26.2...jline-parent-3.26.3) --- updated-dependencies: - dependency-name: org.jline:jline dependency-type: direct:production update-type: version-update:semver-patch ... * Use Lucene provided Persian stem (#14847) Lucene provided Persian stem apparently isn't hooked yet and this change is doing that based on what is done for Arabic stem support. * Bump actions/checkout from 2 to 4 (#14858) * Bump actions/checkout from 2 to 4 Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 4. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v2...v4) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... * Update changelog --------- * Deprecate batch_size parameter on bulk API (#14725) By default the full _bulk payload will be passed to ingest processors as a batch, with any sub batching logic to be implemented by each processor if necessary. * Add perms for remote snapshot cache eviction on scripted query (#14411) * add transport interceptor to populate queryGroupId in task headers * Add rest, transport layer changes for Hot to warm tiering - dedicated setup (#13980) * Create listener to refresh search thread resource usage (#14832) * [bug fix] fix incorrect coordinator node search resource usages * fix bug on serialization when passing task resource usage to coordinator * add more unit tests * remove query insights plugin related code * create per request listener to refresh task resource usage * Make new listener API public * Add changelog * Remove wrong files added * Address review comments * Build fix * Make singleton * Address review comments * Make sure listener runs before plugin listeners * Spotless * Minor fix --------- * Caching avg total bytes and avg free bytes inside ClusterInfo (#14851) * Use default value when index.number_of_replicas is null (#14812) * Use default value when index.number_of_replicas is null * Add integration test * Add changelog --------- * [Remote Routing Table] Implement write and read flow for shard diff file. (#14684) * Implement write and read flow to upload/download shard diff file. * Optimized ClusterStatsIndices to precomute shard stats (#14426) * Optimize Cluster Stats Indices to precomute node level stats * Fix constraint bug which allows more primary shards than average primary shards per index (#14908) * Optmising AwarenessAllocationDecider for hashmap.get call (#14761) * update comment * Fix IngestServiceTests.testBulkRequestExecutionWithFailures (#14918) The test would previously fail if the randomness led to only a single indexing request being included in the bulk payload. This change guarantees multiple indexing requests in order to ensure the batch logic kicks in. Also replace some unneeded mocks with real classes. * add queryGroupTask * remove unnecessary imports * add QueryGroupTask tests * rename WLM transport request handler * add CHANGELOG entry * fix ut * address comments * fix UT to remove the verify for final method * apply spotless --------- (cherry picked from commit eb306d2bab43de789b59adc01265c683a8fb69fb) Signed-off-by: Kaushal Kumar Signed-off-by: Shivansh Arora Signed-off-by: Gao Binlong Signed-off-by: RS146BIJAY Signed-off-by: Chenyang Ji Signed-off-by: Craig Perkins Signed-off-by: Sandesh Kumar Signed-off-by: Andriy Redko Signed-off-by: Zelin Hao Signed-off-by: Lukáš Vlček Signed-off-by: kkewwei Signed-off-by: Swetha Guptha Signed-off-by: Sooraj Sinha Signed-off-by: ahmedsobeh Signed-off-by: dependabot[bot] Signed-off-by: github-actions[bot] Signed-off-by: Siddhant Deshmukh Signed-off-by: Peter Alfonsi Signed-off-by: vatsal Signed-off-by: Sachin Kale Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Signed-off-by: Craig Perkins Signed-off-by: mgodwan Signed-off-by: Mohit Godwani Signed-off-by: Sagar Upadhyaya Signed-off-by: Sandeep Kumawat Signed-off-by: Daniil Roman Signed-off-by: Daniil Roman Signed-off-by: Rishabh Singh Signed-off-by: Rishabh Singh Signed-off-by: Daniel (dB.) Doubrovkine Signed-off-by: Ashish Singh Signed-off-by: bowenlan-amzn Signed-off-by: Pranshu Shukla Signed-off-by: Rajiv Kumar Vaidyanathan Signed-off-by: Sumit Bansal Signed-off-by: Daniel Widdis Signed-off-by: Shailendra Singh Signed-off-by: imyp92 Signed-off-by: gaobinlong Signed-off-by: Ebrahim Byagowi Signed-off-by: Liyun Xiu Signed-off-by: Finn Carroll Signed-off-by: Neetika Singhal Signed-off-by: Jay Deng Signed-off-by: Gaurav Bafna Signed-off-by: Andrew Ross Co-authored-by: github-actions[bot] Co-authored-by: Shivansh Arora Co-authored-by: Arpit-Bandejiya Co-authored-by: gaobinlong Co-authored-by: rishavz_sagar Co-authored-by: Chenyang Ji Co-authored-by: Craig Perkins Co-authored-by: Sandesh Kumar Co-authored-by: Andriy Redko Co-authored-by: Zelin Hao Co-authored-by: Lukáš Vlček Co-authored-by: kkewwei Co-authored-by: SwethaGuptha <156877431+SwethaGuptha@users.noreply.github.com> Co-authored-by: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Co-authored-by: Ahmed Sobeh Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Co-authored-by: Siddhant Deshmukh Co-authored-by: Peter Alfonsi Co-authored-by: Peter Alfonsi Co-authored-by: Vatsal <36672090+imvtsl@users.noreply.github.com> Co-authored-by: Robin Friedmann Co-authored-by: Sachin Kale Co-authored-by: Bharathwaj G Co-authored-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Co-authored-by: Craig Perkins Co-authored-by: Andriy Redko Co-authored-by: Mohit Godwani <81609427+mgodwan@users.noreply.github.com> Co-authored-by: Sarthak Aggarwal Co-authored-by: Sagar <99425694+sgup432@users.noreply.github.com> Co-authored-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com> Co-authored-by: Sandeep Kumawat Co-authored-by: Daniil Roman Co-authored-by: Rishabh Singh Co-authored-by: kkewwei Co-authored-by: Daniel (dB.) Doubrovkine Co-authored-by: Ashish Singh Co-authored-by: bowenlan-amzn Co-authored-by: Pranshu Shukla <55992439+Pranshu-S@users.noreply.github.com> Co-authored-by: rajiv-kv <157019998+rajiv-kv@users.noreply.github.com> Co-authored-by: Sumit Bansal Co-authored-by: Daniel Widdis Co-authored-by: shailendra0811 <167273922+shailendra0811@users.noreply.github.com> Co-authored-by: Park, Yeongwu Co-authored-by: ebraminio Co-authored-by: Liyun Xiu Co-authored-by: Finn Co-authored-by: Neetika Singhal Co-authored-by: Jay Deng Co-authored-by: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Co-authored-by: Andrew Ross --- CHANGELOG.md | 1 + .../action/search/SearchShardTask.java | 4 +- .../opensearch/action/search/SearchTask.java | 4 +- .../action/search/TransportSearchAction.java | 7 ++ .../main/java/org/opensearch/node/Node.java | 10 ++- .../org/opensearch/wlm/QueryGroupTask.java | 76 +++++++++++++++++++ ...orkloadManagementTransportInterceptor.java | 64 ++++++++++++++++ .../opensearch/wlm/QueryGroupTaskTests.java | 44 +++++++++++ ...adManagementTransportInterceptorTests.java | 40 ++++++++++ ...anagementTransportRequestHandlerTests.java | 75 ++++++++++++++++++ 10 files changed, 320 insertions(+), 5 deletions(-) create mode 100644 server/src/main/java/org/opensearch/wlm/QueryGroupTask.java create mode 100644 server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java create mode 100644 server/src/test/java/org/opensearch/wlm/QueryGroupTaskTests.java create mode 100644 server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java create mode 100644 server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index d2b51e573aab0..4e5752536f977 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added - Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972)) +- [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708)) - Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991)) - Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618)) - Add ThreadContextPermission for markAsSystemContext and allow core to perform the method ([#15016](https://github.com/opensearch-project/OpenSearch/pull/15016)) diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java index dfecf4f462c4d..ed2943db94420 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java @@ -37,8 +37,8 @@ import org.opensearch.core.tasks.TaskId; import org.opensearch.search.fetch.ShardFetchSearchRequest; import org.opensearch.search.internal.ShardSearchRequest; -import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.SearchBackpressureTask; +import org.opensearch.wlm.QueryGroupTask; import java.util.Map; import java.util.function.Supplier; @@ -50,7 +50,7 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public class SearchShardTask extends CancellableTask implements SearchBackpressureTask { +public class SearchShardTask extends QueryGroupTask implements SearchBackpressureTask { // generating metadata in a lazy way since source can be quite big private final MemoizedSupplier metadataSupplier; diff --git a/server/src/main/java/org/opensearch/action/search/SearchTask.java b/server/src/main/java/org/opensearch/action/search/SearchTask.java index d3c1043c50cce..2a1a961e7607b 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTask.java @@ -35,8 +35,8 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.tasks.TaskId; -import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.SearchBackpressureTask; +import org.opensearch.wlm.QueryGroupTask; import java.util.Map; import java.util.function.Supplier; @@ -49,7 +49,7 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public class SearchTask extends CancellableTask implements SearchBackpressureTask { +public class SearchTask extends QueryGroupTask implements SearchBackpressureTask { // generating description in a lazy way since source can be quite big private final Supplier descriptionSupplier; private SearchProgressListener progressListener = SearchProgressListener.NOOP; diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 7d3237d43cd5c..88bf7ebea8e52 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -101,6 +101,7 @@ import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; +import org.opensearch.wlm.QueryGroupTask; import java.util.ArrayList; import java.util.Arrays; @@ -442,6 +443,12 @@ private void executeRequest( ); searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); + // At this point either the QUERY_GROUP_ID header will be present in ThreadContext either via ActionFilter + // or HTTP header (HTTP header will be deprecated once ActionFilter is implemented) + if (task instanceof QueryGroupTask) { + ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); + } + PipelinedRequest searchRequest; ActionListener listener; try { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 205fd34f89c2b..734bb1815b6d4 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -261,6 +261,7 @@ import org.opensearch.transport.TransportService; import org.opensearch.usage.UsageService; import org.opensearch.watcher.ResourceWatcherService; +import org.opensearch.wlm.WorkloadManagementTransportInterceptor; import javax.net.ssl.SNIHostName; @@ -1041,6 +1042,10 @@ protected Node( admissionControlService ); + WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor( + threadPool + ); + final Collection secureSettingsFactories = pluginsService.filterPlugins(Plugin.class) .stream() .map(p -> p.getSecureSettingFactory(settings)) @@ -1048,7 +1053,10 @@ protected Node( .map(Optional::get) .collect(Collectors.toList()); - List transportInterceptors = List.of(admissionControlTransportInterceptor); + List transportInterceptors = List.of( + admissionControlTransportInterceptor, + workloadManagementTransportInterceptor + ); final NetworkModule networkModule = new NetworkModule( settings, pluginsService.filterPlugins(NetworkPlugin.class), diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java b/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java new file mode 100644 index 0000000000000..4eb413be61b72 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupTask.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.tasks.CancellableTask; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +import static org.opensearch.search.SearchService.NO_TIMEOUT; + +/** + * Base class to define QueryGroup tasks + */ +public class QueryGroupTask extends CancellableTask { + + private static final Logger logger = LogManager.getLogger(QueryGroupTask.class); + public static final String QUERY_GROUP_ID_HEADER = "queryGroupId"; + public static final Supplier DEFAULT_QUERY_GROUP_ID_SUPPLIER = () -> "DEFAULT_QUERY_GROUP"; + private String queryGroupId; + + public QueryGroupTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT); + } + + public QueryGroupTask( + long id, + String type, + String action, + String description, + TaskId parentTaskId, + Map headers, + TimeValue cancelAfterTimeInterval + ) { + super(id, type, action, description, parentTaskId, headers, cancelAfterTimeInterval); + } + + /** + * This method should always be called after calling setQueryGroupId at least once on this object + * @return task queryGroupId + */ + public final String getQueryGroupId() { + if (queryGroupId == null) { + logger.warn("QueryGroup _id can't be null, It should be set before accessing it. This is abnormal behaviour "); + } + return queryGroupId; + } + + /** + * sets the queryGroupId from threadContext into the task itself, + * This method was defined since the queryGroupId can only be evaluated after task creation + * @param threadContext current threadContext + */ + public final void setQueryGroupId(final ThreadContext threadContext) { + this.queryGroupId = Optional.ofNullable(threadContext) + .map(threadContext1 -> threadContext1.getHeader(QUERY_GROUP_ID_HEADER)) + .orElse(DEFAULT_QUERY_GROUP_ID_SUPPLIER.get()); + } + + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } +} diff --git a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java new file mode 100644 index 0000000000000..848df8712549a --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * This class is used to intercept search traffic requests and populate the queryGroupId header in task headers + */ +public class WorkloadManagementTransportInterceptor implements TransportInterceptor { + private final ThreadPool threadPool; + + public WorkloadManagementTransportInterceptor(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + return new RequestHandler(threadPool, actualHandler); + } + + /** + * This class is mainly used to populate the queryGroupId header + * @param T is Search related request + */ + public static class RequestHandler implements TransportRequestHandler { + + private final ThreadPool threadPool; + TransportRequestHandler actualHandler; + + public RequestHandler(ThreadPool threadPool, TransportRequestHandler actualHandler) { + this.threadPool = threadPool; + this.actualHandler = actualHandler; + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + if (isSearchWorkloadRequest(task)) { + ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); + } + actualHandler.messageReceived(request, channel, task); + } + + boolean isSearchWorkloadRequest(Task task) { + return task instanceof QueryGroupTask; + } + } +} diff --git a/server/src/test/java/org/opensearch/wlm/QueryGroupTaskTests.java b/server/src/test/java/org/opensearch/wlm/QueryGroupTaskTests.java new file mode 100644 index 0000000000000..d292809c30124 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/QueryGroupTaskTests.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Collections; + +import static org.opensearch.wlm.QueryGroupTask.DEFAULT_QUERY_GROUP_ID_SUPPLIER; +import static org.opensearch.wlm.QueryGroupTask.QUERY_GROUP_ID_HEADER; + +public class QueryGroupTaskTests extends OpenSearchTestCase { + private ThreadPool threadPool; + private QueryGroupTask sut; + + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getTestName()); + sut = new QueryGroupTask(123, "transport", "Search", "test task", null, Collections.emptyMap()); + } + + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testSuccessfulSetQueryGroupId() { + sut.setQueryGroupId(threadPool.getThreadContext()); + assertEquals(DEFAULT_QUERY_GROUP_ID_SUPPLIER.get(), sut.getQueryGroupId()); + + threadPool.getThreadContext().putHeader(QUERY_GROUP_ID_HEADER, "akfanglkaglknag2332"); + + sut.setQueryGroupId(threadPool.getThreadContext()); + assertEquals("akfanglkaglknag2332", sut.getQueryGroupId()); + } +} diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java new file mode 100644 index 0000000000000..db4e5e45d49ed --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.wlm.WorkloadManagementTransportInterceptor.RequestHandler; + +import static org.opensearch.threadpool.ThreadPool.Names.SAME; + +public class WorkloadManagementTransportInterceptorTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + private WorkloadManagementTransportInterceptor sut; + + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getTestName()); + sut = new WorkloadManagementTransportInterceptor(threadPool); + } + + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testInterceptHandler() { + TransportRequestHandler requestHandler = sut.interceptHandler("Search", SAME, false, null); + assertTrue(requestHandler instanceof RequestHandler); + } +} diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java new file mode 100644 index 0000000000000..789c02345e774 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.action.index.IndexRequest; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +import java.util.Collections; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +public class WorkloadManagementTransportRequestHandlerTests extends OpenSearchTestCase { + private WorkloadManagementTransportInterceptor.RequestHandler sut; + private ThreadPool threadPool; + + private TestTransportRequestHandler actualHandler; + + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getTestName()); + actualHandler = new TestTransportRequestHandler<>(); + + sut = new WorkloadManagementTransportInterceptor.RequestHandler<>(threadPool, actualHandler); + } + + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testMessageReceivedForSearchWorkload() throws Exception { + ShardSearchRequest request = mock(ShardSearchRequest.class); + QueryGroupTask spyTask = getSpyTask(); + + sut.messageReceived(request, mock(TransportChannel.class), spyTask); + assertTrue(sut.isSearchWorkloadRequest(spyTask)); + } + + public void testMessageReceivedForNonSearchWorkload() throws Exception { + IndexRequest indexRequest = mock(IndexRequest.class); + Task task = mock(Task.class); + sut.messageReceived(indexRequest, mock(TransportChannel.class), task); + assertFalse(sut.isSearchWorkloadRequest(task)); + assertEquals(1, actualHandler.invokeCount); + } + + private static QueryGroupTask getSpyTask() { + final QueryGroupTask task = new QueryGroupTask(123, "transport", "Search", "test task", null, Collections.emptyMap()); + + return spy(task); + } + + private static class TestTransportRequestHandler implements TransportRequestHandler { + int invokeCount = 0; + + @Override + public void messageReceived(TransportRequest request, TransportChannel channel, Task task) throws Exception { + invokeCount += 1; + } + }; +}