From 72901d41400e1fa5f1dd3af82f7fa765a4bec122 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 18 Jul 2023 20:38:31 +0200 Subject: [PATCH 01/61] Change downsample threadpool size. (#97714) Changed the downsample threadpool size to one eight of the allocated processors. Downsampling is a cpu intensive operation (search + heavy aggregation that rolls up documents) and could overwhelm a node. On top of this the downsample operation also delegates to force merge api, which has a threadpool that is also bounded to one eight of the allocated processors. Relates to #97141 --- .../src/main/java/org/elasticsearch/threadpool/ThreadPool.java | 2 +- .../src/main/java/org/elasticsearch/xpack/rollup/Rollup.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index e0fc75db3306..f0d2aa6ca018 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -601,7 +601,7 @@ static int twiceAllocatedProcessors(final int allocatedProcessors) { return boundedBy(2 * allocatedProcessors, 2, Integer.MAX_VALUE); } - static int oneEighthAllocatedProcessors(final int allocatedProcessors) { + public static int oneEighthAllocatedProcessors(final int allocatedProcessors) { return boundedBy(allocatedProcessors / 8, 1, Integer.MAX_VALUE); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index b642b7a9d51f..7e6f807e5c4d 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -146,7 +146,7 @@ public List> getExecutorBuilders(Settings settingsToUse) { final FixedExecutorBuilder downsample = new FixedExecutorBuilder( settingsToUse, Rollup.DOWSAMPLE_TASK_THREAD_POOL_NAME, - ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(settingsToUse)), + ThreadPool.oneEighthAllocatedProcessors(EsExecutors.allocatedProcessors(settingsToUse)), Rollup.DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE, "xpack.downsample.thread_pool", EsExecutors.TaskTrackingConfig.DO_NOT_TRACK From b9f89f45165f39b33cf0ae854d0a0009ea6c425d Mon Sep 17 00:00:00 2001 From: Athena Brown Date: Tue, 18 Jul 2023 14:46:18 -0600 Subject: [PATCH 02/61] Improve how connections are rejected after shutdown (#97737) Prior to this PR, when a channel was accepted after Elasticsearch received a shutdown signal, `AbstractHttpServerTransport` would directly close the connection. This PR makes it throw an exception instead, which results in the connection being closed by Netty. This keeps the flow of control more consistent and readable, as well as logging additional details such as the host which made the request. This PR doesn't include any test changes because the behavior doesn't really change in a way that's easy to test. --- .../org/elasticsearch/http/AbstractHttpServerTransport.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 528efe8fa8b0..8ef43a447019 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -388,9 +388,7 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) { try { rlock.lock(); if (shuttingDown) { - logger.warn("server accepted channel after shutting down"); - httpChannel.close(); - return; + throw new IllegalStateException("Server cannot accept new channel while shutting down"); } RequestTrackingHttpChannel trackingChannel = httpChannels.putIfAbsent(httpChannel, new RequestTrackingHttpChannel(httpChannel)); assert trackingChannel == null : "Channel should only be added to http channel set once"; From ab65e6f15edd96b1a1daadc0a172c637880c9660 Mon Sep 17 00:00:00 2001 From: Jack Conradson Date: Tue, 18 Jul 2023 13:48:02 -0700 Subject: [PATCH 03/61] Fix `sub_searches` serialization bug (#97587) We have allowed hybrid search since 8.4. This means the internal changes for sub_searches must be able to write a compound query as early as 8.4, but currently we only do that back to 8.8. This change fixes that issue. Closes ##97144 --- docs/changelog/97587.yaml | 5 +++++ .../test/search.vectors/40_knn_search.yml | 10 ++++------ .../test/search.vectors/45_knn_search_byte.yml | 3 --- .../search/builder/SearchSourceBuilder.java | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) create mode 100644 docs/changelog/97587.yaml diff --git a/docs/changelog/97587.yaml b/docs/changelog/97587.yaml new file mode 100644 index 000000000000..39906bc4a441 --- /dev/null +++ b/docs/changelog/97587.yaml @@ -0,0 +1,5 @@ +pr: 97587 +summary: Fix `sub_searches` serialization bug +area: Ranking +type: bug +issues: [] diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/40_knn_search.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/40_knn_search.yml index c67ae7c0bfd5..f34aef9b8332 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/40_knn_search.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/40_knn_search.yml @@ -94,9 +94,8 @@ setup: --- "kNN search plus query": - skip: - version: all #' - 8.3.99' - reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/97144" - # reason: 'kNN added to search endpoint in 8.4' + version: ' - 8.3.99' + reason: 'kNN added to search endpoint in 8.4' - do: search: index: test @@ -122,9 +121,8 @@ setup: --- "kNN multi-field search with query": - skip: - version: all #' - 8.6.99' - reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/97144" - # reason: 'multi-field kNN search added to search endpoint in 8.7' + version: ' - 8.6.99' + reason: 'multi-field kNN search added to search endpoint in 8.7' - do: search: index: test diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/45_knn_search_byte.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/45_knn_search_byte.yml index 4d003a5c3b7b..873b6d87cac6 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/45_knn_search_byte.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/45_knn_search_byte.yml @@ -66,9 +66,6 @@ setup: --- "kNN search plus query": - - skip: - version: all - reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/97144" - do: search: index: test diff --git a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index a03c24e755cd..a04d514982ea 100644 --- a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -292,7 +292,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalNamedWriteable(postQueryBuilder); if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_013)) { out.writeList(subSearchSourceBuilders); - } else if (out.getTransportVersion().before(TransportVersion.V_8_8_0) && subSearchSourceBuilders.size() >= 2) { + } else if (out.getTransportVersion().before(TransportVersion.V_8_4_0) && subSearchSourceBuilders.size() >= 2) { throw new IllegalArgumentException("cannot serialize [sub_searches] to version [" + out.getTransportVersion() + "]"); } else { out.writeOptionalNamedWriteable(query()); From 760efbf1e58311d4c8a436920d1f281c50410a72 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Wed, 19 Jul 2023 11:20:02 +0300 Subject: [PATCH 04/61] Mute some PkiAuthDelegationIntegTests (#97774) Relates #97772 --- .../xpack/security/authc/pki/PkiAuthDelegationIntegTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/pki/PkiAuthDelegationIntegTests.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/pki/PkiAuthDelegationIntegTests.java index ffc47fc996e1..25de26d4bf1a 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/pki/PkiAuthDelegationIntegTests.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/pki/PkiAuthDelegationIntegTests.java @@ -151,6 +151,7 @@ void clearRealmCache() { new ClearRealmCacheRequestBuilder(client()).get(); } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/97772") public void testDelegateThenAuthenticate() throws Exception { final X509Certificate clientCertificate = readCertForPkiDelegation("testClient.crt"); final X509Certificate intermediateCA = readCertForPkiDelegation("testIntermediateCA.crt"); @@ -193,6 +194,7 @@ public void testDelegateThenAuthenticate() throws Exception { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/97772") public void testTokenInvalidate() throws Exception { final X509Certificate clientCertificate = readCertForPkiDelegation("testClient.crt"); final X509Certificate intermediateCA = readCertForPkiDelegation("testIntermediateCA.crt"); @@ -296,6 +298,7 @@ public void testDelegateUnauthorized() throws Exception { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/97772") public void testDelegatePkiWithRoleMapping() throws Exception { X509Certificate clientCertificate = readCertForPkiDelegation("testClient.crt"); X509Certificate intermediateCA = readCertForPkiDelegation("testIntermediateCA.crt"); From 57fd6b84fb98d96ad4d28e01720452cfb122133e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20Zolt=C3=A1n=20Szab=C3=B3?= Date: Wed, 19 Jul 2023 10:38:11 +0200 Subject: [PATCH 05/61] [DOCS] Expands ELSER tutorial with optimization info (#97392) Co-authored-by: David Kyle --- .../query-dsl/text-expansion-query.asciidoc | 33 ++++++- .../semantic-search-elser.asciidoc | 85 ++++++++++++++++--- 2 files changed, 101 insertions(+), 17 deletions(-) diff --git a/docs/reference/query-dsl/text-expansion-query.asciidoc b/docs/reference/query-dsl/text-expansion-query.asciidoc index 91d597b79ace..74ee80ba821a 100644 --- a/docs/reference/query-dsl/text-expansion-query.asciidoc +++ b/docs/reference/query-dsl/text-expansion-query.asciidoc @@ -54,8 +54,8 @@ The query text you want to use for search. [discrete] -[[text-expansion-query-notes]] -=== Notes +[[text-expansion-query-example]] +=== Example The following is an example of the `text_expansion` query that references the ELSER model to perform semantic search. For a more detailed description of how @@ -76,4 +76,31 @@ GET my-index/_search } } ---- -// TEST[skip: TBD] \ No newline at end of file +// TEST[skip: TBD] + +[discrete] +[[optimizing-text-expansion]] +=== Optimizing the search performance of the text_expansion query + +https://www.elastic.co/blog/faster-retrieval-of-top-hits-in-elasticsearch-with-block-max-wand[Max WAND] +is an optimization technique used by {es} to skip documents that cannot score +competitively against the current best matching documents. However, the tokens +generated by the ELSER model don't work well with the Max WAND optimization. +Consequently, enabling Max WAND can actually increase query latency for +`text_expansion`. For datasets of a significant size, disabling Max +WAND leads to lower query latencies. + +Max WAND is controlled by the +<> query parameter. Setting track_total_hits +to true forces {es} to consider all documents, resulting in lower query +latencies for the `text_expansion` query. However, other {es} queries run slower +when Max WAND is disabled. + +If you are combining the `text_expansion` query with standard text queries in a +compound search, it is recommended to measure the query performance before +deciding which setting to use. + +NOTE: The `track_total_hits` option applies to all queries in the search request +and may be optimal for some queries but not for others. Take into account the +characteristics of all your queries to determine the most suitable +configuration. diff --git a/docs/reference/search/search-your-data/semantic-search-elser.asciidoc b/docs/reference/search/search-your-data/semantic-search-elser.asciidoc index caac27929482..0ae2a7904e7b 100644 --- a/docs/reference/search/search-your-data/semantic-search-elser.asciidoc +++ b/docs/reference/search/search-your-data/semantic-search-elser.asciidoc @@ -49,9 +49,10 @@ to index the ELSER output. NOTE: ELSER output must be ingested into a field with the `rank_features` field type. Otherwise, {es} interprets the token-weight pairs as a massive amount of -fields in a document. If you get an error similar to this `"Limit of total fields [1000] has been exceeded while adding -new fields"` then the ELSER output field is not mapped properly and it has a -field type different than `rank_features`. +fields in a document. If you get an error similar to this +`"Limit of total fields [1000] has been exceeded while adding new fields"` then +the ELSER output field is not mapped properly and it has a field type different +than `rank_features`. [source,console] ---- @@ -59,21 +60,24 @@ PUT my-index { "mappings": { "properties": { - "ml.tokens": { - "type": "rank_features" <1> + "ml.tokens": { <1> + "type": "rank_features" <2> }, - "text": { <2> - "type": "text" <3> + "text": { <3> + "type": "text" <4> } } } } ---- // TEST[skip:TBD] -<1> The field that contains the prediction is a `rank_features` field. -<2> The name of the field from which to create the sparse vector representation. +<1> The name of the field to contain the generated tokens. +<2> The field to contain the tokens is a `rank_features` field. +<3> The name of the field from which to create the sparse vector representation. In this example, the name of the field is `text`. -<3> The field type which is text in this example. +<4> The field type which is text in this example. + +To learn how to optimize space, refer to the <> section. [discrete] @@ -176,9 +180,10 @@ follow the progress. It may take a couple of minutes to complete the process. [[text-expansion-query]] === Semantic search by using the `text_expansion` query -To perform semantic search, use the `text_expansion` query, -and provide the query text and the ELSER model ID. The example below uses -the query text "How to avoid muscle soreness after running?": +To perform semantic search, use the `text_expansion` query, and provide the +query text and the ELSER model ID. The example below uses the query text "How to +avoid muscle soreness after running?", the `ml-tokens` field contains the +generated ELSER output: [source,console] ---- @@ -240,6 +245,9 @@ weights. ---- // NOTCONSOLE +To learn about optimizing your `text_expansion` query, refer to +<>. + [discrete] [[text-expansion-compound-query]] @@ -297,10 +305,59 @@ search results. <4> Only the results with a score equal to or higher than `10` are displayed. +[discrete] +[[optimization]] +=== Optimizing performance + +[discrete] +[[save-space]] +==== Saving disk space by excluding the ELSER tokens from document source + +The tokens generated by ELSER must be indexed for use in the +<>. However, it is not +necessary to retain those terms in the document source. You can save disk space +by using the <> mapping to remove the ELSER +terms from the document source. + +WARNING: Reindex uses the document source to populate the destination index. +Once the ELSER terms have been excluded from the source, they cannot be +recovered through reindexing. Excluding the tokens from the source is a +space-saving optimsation that should only be applied if you are certain that +reindexing will not be required in the future! It's important to carefully +consider this trade-off and make sure that excluding the ELSER terms from the +source aligns with your specific requirements and use case. + +The mapping that excludes `ml.tokens` from the `_source` field can be created +by the following API call: + +[source,console] +---- +PUT my-index +{ + "mappings": { + "_source": { + "excludes": [ + "ml.tokens" + ] + }, + "properties": { + "ml.tokens": { + "type": "rank_features" + }, + "text": { + "type": "text" + } + } + } +} +---- +// TEST[skip:TBD] + + [discrete] [[further-reading]] === Further reading * {ml-docs}/ml-nlp-elser.html[How to download and deploy ELSER] * {ml-docs}/ml-nlp-limitations.html#ml-nlp-elser-v1-limit-512[ELSER v1 limitation] -// TO DO: refer to the ELSER blog post +* https://www.elastic.co/blog/may-2023-launch-information-retrieval-elasticsearch-ai-model[Improving information retrieval in the Elastic Stack: Introducing Elastic Learned Sparse Encoder, our new retrieval model] From f36542980a60e65fde58c4498ffeb0ef00597bb2 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 19 Jul 2023 10:56:38 +0200 Subject: [PATCH 06/61] Fix SharedBytesTests on Windows (#97720) Can't use mmap on Windows, so disabling it here like was done for other tests. closes #97626 --- .../org/elasticsearch/blobcache/shared/SharedBytesTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBytesTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBytesTests.java index c20cdbcabfd7..81899f7891e8 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBytesTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBytesTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.blobcache.shared; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; @@ -32,7 +33,7 @@ public void testReleasesFileCorrectly() throws Exception { nodeEnv, ignored -> {}, ignored -> {}, - randomBoolean() + IOUtils.WINDOWS == false && randomBoolean() ); final var sharedBytesPath = nodeEnv.nodeDataPaths()[0].resolve("shared_snapshot_cache"); assertTrue(Files.exists(sharedBytesPath)); From 1ef7d3f419a79726c19d924a743d41caa24cc05d Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Jul 2023 10:01:28 +0100 Subject: [PATCH 07/61] Fix NPE in Desired Balance API (#97775) If a shard has no desired assignment then we cannot send the desired balance over the wire or render it as JSON because the `ShardAssignmentView` is `null`. This commit replaces the spurious `null` with an empty object, and cleans up the tests a little to remove some unnecessary mocking. --- docs/changelog/97775.yaml | 5 ++ .../allocation/DesiredBalanceResponse.java | 12 +++- .../TransportGetDesiredBalanceAction.java | 6 +- ...TransportGetDesiredBalanceActionTests.java | 69 +++++++++++-------- 4 files changed, 59 insertions(+), 33 deletions(-) create mode 100644 docs/changelog/97775.yaml diff --git a/docs/changelog/97775.yaml b/docs/changelog/97775.yaml new file mode 100644 index 000000000000..9d6eee2569c3 --- /dev/null +++ b/docs/changelog/97775.yaml @@ -0,0 +1,5 @@ +pr: 97775 +summary: Fix NPE in Desired Balance API +area: Allocation +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponse.java index 28760cdb0d20..72b69e059741 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponse.java @@ -289,8 +289,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public record ShardAssignmentView(Set nodeIds, int total, int unassigned, int ignored) implements Writeable, ToXContentObject { + public static final ShardAssignmentView EMPTY = new ShardAssignmentView(Set.of(), 0, 0, 0); + public static ShardAssignmentView from(StreamInput in) throws IOException { - return new ShardAssignmentView(in.readSet(StreamInput::readString), in.readVInt(), in.readVInt(), in.readVInt()); + final var nodeIds = in.readSet(StreamInput::readString); + final var total = in.readVInt(); + final var unassigned = in.readVInt(); + final var ignored = in.readVInt(); + if (nodeIds.isEmpty() && total == 0 && unassigned == 0 && ignored == 0) { + return EMPTY; + } else { + return new ShardAssignmentView(nodeIds, total, unassigned, ignored); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java index 6075ee833dda..15ddc51e5c3b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java @@ -138,14 +138,14 @@ private Map> createRo shardId, new DesiredBalanceResponse.DesiredShards( shardViews, - shardAssignment != null - ? new DesiredBalanceResponse.ShardAssignmentView( + shardAssignment == null + ? DesiredBalanceResponse.ShardAssignmentView.EMPTY + : new DesiredBalanceResponse.ShardAssignmentView( shardAssignment.nodeIds(), shardAssignment.total(), shardAssignment.unassigned(), shardAssignment.ignored() ) - : null ) ); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java index ac42fcf38463..08a051ebf8e1 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java @@ -9,8 +9,8 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterName; @@ -40,15 +40,17 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.mockito.ArgumentCaptor; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.elasticsearch.cluster.ClusterModule.BALANCED_ALLOCATOR; @@ -57,7 +59,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TransportGetDesiredBalanceActionTests extends ESAllocationTestCase { @@ -74,13 +75,28 @@ public class TransportGetDesiredBalanceActionTests extends ESAllocationTestCase clusterInfoService, TEST_WRITE_LOAD_FORECASTER ); - @SuppressWarnings("unchecked") - private final ActionListener listener = mock(ActionListener.class); + + private static DesiredBalanceResponse execute(TransportGetDesiredBalanceAction action, ClusterState clusterState) throws Exception { + return PlainActionFuture.get( + future -> action.masterOperation( + new Task(1, "test", GetDesiredBalanceAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()), + new DesiredBalanceRequest(), + clusterState, + future + ), + 10, + TimeUnit.SECONDS + ); + } + + private DesiredBalanceResponse executeAction(ClusterState clusterState) throws Exception { + return execute(transportGetDesiredBalanceAction, clusterState); + } public void testReturnsErrorIfAllocatorIsNotDesiredBalanced() throws Exception { var clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadataWithConfiguredAllocator(BALANCED_ALLOCATOR)).build(); - new TransportGetDesiredBalanceAction( + final var action = new TransportGetDesiredBalanceAction( mock(TransportService.class), mock(ClusterService.class), mock(ThreadPool.class), @@ -89,12 +105,9 @@ public void testReturnsErrorIfAllocatorIsNotDesiredBalanced() throws Exception { mock(ShardsAllocator.class), mock(ClusterInfoService.class), mock(WriteLoadForecaster.class) - ).masterOperation(mock(Task.class), mock(DesiredBalanceRequest.class), clusterState, listener); - - ArgumentCaptor exceptionArgumentCaptor = ArgumentCaptor.forClass(ResourceNotFoundException.class); - verify(listener).onFailure(exceptionArgumentCaptor.capture()); + ); - final var exception = exceptionArgumentCaptor.getValue(); + final var exception = expectThrows(ResourceNotFoundException.class, () -> execute(action, clusterState)); assertEquals("Desired balance allocator is not in use, no desired balance found", exception.getMessage()); assertThat(exception.status(), equalTo(RestStatus.NOT_FOUND)); } @@ -104,12 +117,10 @@ public void testReturnsErrorIfDesiredBalanceIsNotAvailable() throws Exception { .metadata(metadataWithConfiguredAllocator(DESIRED_BALANCE_ALLOCATOR)) .build(); - transportGetDesiredBalanceAction.masterOperation(mock(Task.class), mock(DesiredBalanceRequest.class), clusterState, listener); - - ArgumentCaptor exceptionArgumentCaptor = ArgumentCaptor.forClass(ResourceNotFoundException.class); - verify(listener).onFailure(exceptionArgumentCaptor.capture()); - - assertEquals("Desired balance is not computed yet", exceptionArgumentCaptor.getValue().getMessage()); + assertEquals( + "Desired balance is not computed yet", + expectThrows(ResourceNotFoundException.class, () -> executeAction(clusterState)).getMessage() + ); } public void testGetDesiredBalance() throws Exception { @@ -220,15 +231,15 @@ public void testGetDesiredBalance() throws Exception { .routingTable(routingTable) .build(); - transportGetDesiredBalanceAction.masterOperation(mock(Task.class), mock(DesiredBalanceRequest.class), clusterState, listener); - - ArgumentCaptor desiredBalanceResponseCaptor = ArgumentCaptor.forClass(DesiredBalanceResponse.class); - verify(listener).onResponse(desiredBalanceResponseCaptor.capture()); - DesiredBalanceResponse desiredBalanceResponse = desiredBalanceResponseCaptor.getValue(); + final var desiredBalanceResponse = executeAction(clusterState); assertThat(desiredBalanceResponse.getStats(), equalTo(desiredBalanceStats)); assertThat(desiredBalanceResponse.getClusterBalanceStats(), notNullValue()); assertThat(desiredBalanceResponse.getClusterInfo(), equalTo(clusterInfo)); assertEquals(indexShards.keySet(), desiredBalanceResponse.getRoutingTable().keySet()); + + assertEquals(desiredBalanceResponse, copyWriteable(desiredBalanceResponse, writableRegistry(), DesiredBalanceResponse::from)); + AbstractChunkedSerializingTestCase.assertChunkCount(desiredBalanceResponse, r -> 2 + r.getRoutingTable().size()); + for (var e : desiredBalanceResponse.getRoutingTable().entrySet()) { String index = e.getKey(); Map shardsMap = e.getValue(); @@ -267,14 +278,14 @@ public void testGetDesiredBalance() throws Exception { ); assertEquals(indexMetadata.getTierPreference(), shardView.tierPreference()); } - Optional shardAssignment = Optional.ofNullable(shardAssignments.get(indexShardRoutingTable.shardId())); - if (shardAssignment.isPresent()) { - assertEquals(shardAssignment.get().nodeIds(), desiredShard.desired().nodeIds()); - assertEquals(shardAssignment.get().total(), desiredShard.desired().total()); - assertEquals(shardAssignment.get().unassigned(), desiredShard.desired().unassigned()); - assertEquals(shardAssignment.get().ignored(), desiredShard.desired().ignored()); + final var shardAssignment = shardAssignments.get(indexShardRoutingTable.shardId()); + if (shardAssignment == null) { + assertSame(desiredShard.desired(), DesiredBalanceResponse.ShardAssignmentView.EMPTY); } else { - assertNull(desiredShard.desired()); + assertEquals(shardAssignment.nodeIds(), desiredShard.desired().nodeIds()); + assertEquals(shardAssignment.total(), desiredShard.desired().total()); + assertEquals(shardAssignment.unassigned(), desiredShard.desired().unassigned()); + assertEquals(shardAssignment.ignored(), desiredShard.desired().ignored()); } } } From c78ff28b0f32dfbd1a1005c261914a92c47bba4e Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Wed, 19 Jul 2023 12:07:32 +0300 Subject: [PATCH 08/61] Mute testTimeThrottle (#97786) Relates #97518 --- .../xpack/watcher/actions/TimeThrottleIntegrationTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java b/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java index 3b8385f79c4f..328fd585fa18 100644 --- a/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java +++ b/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/actions/TimeThrottleIntegrationTests.java @@ -33,6 +33,7 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTestCase { + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/97518") public void testTimeThrottle() throws Exception { String id = randomAlphaOfLength(20); PutWatchResponse putWatchResponse = new PutWatchRequestBuilder(client()).setId(id) From 32f7ae6d010ee98aa9e8993a904e9ca6093cbc49 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Wed, 19 Jul 2023 12:18:30 +0300 Subject: [PATCH 09/61] Mute testScheduleNowWithSystemClock (#97788) Relates #95445 --- .../transform/transforms/scheduling/TransformSchedulerTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java index 8c1064bd596b..57426dae8206 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java @@ -327,6 +327,7 @@ public void testSchedulingWithSystemClock() throws Exception { transformScheduler.stop(); } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/95445") public void testScheduleNowWithSystemClock() throws Exception { String transformId = "test-schedule-now-with-system-clock"; TimeValue frequency = TimeValue.timeValueHours(1); // Very long pause between checkpoints From bf786f6df0e5412af69d95d3ff38c542b06e3d01 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Wed, 19 Jul 2023 12:22:58 +0300 Subject: [PATCH 10/61] Mute testDieWithDignity (#97790) Relates #97789 --- .../org/elasticsearch/qa/die_with_dignity/DieWithDignityIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/test/external-modules/die-with-dignity/src/javaRestTest/java/org/elasticsearch/qa/die_with_dignity/DieWithDignityIT.java b/test/external-modules/die-with-dignity/src/javaRestTest/java/org/elasticsearch/qa/die_with_dignity/DieWithDignityIT.java index 7eb577eb97b9..6d83424724d8 100644 --- a/test/external-modules/die-with-dignity/src/javaRestTest/java/org/elasticsearch/qa/die_with_dignity/DieWithDignityIT.java +++ b/test/external-modules/die-with-dignity/src/javaRestTest/java/org/elasticsearch/qa/die_with_dignity/DieWithDignityIT.java @@ -45,6 +45,7 @@ protected String getTestRestCluster() { return cluster.getHttpAddresses(); } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/97789") public void testDieWithDignity() throws Exception { final long pid = cluster.getPid(0); assertJvmArgs(pid, containsString("-Ddie.with.dignity.test=true")); From 7e16d2759dd837b1ab7383d44e4b0badc38cd391 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 19 Jul 2023 11:37:14 +0200 Subject: [PATCH 11/61] Speedup SharedBlobCacheService further (#97782) * Avoid creating capturing lambda for hot-path region looking * Made assertion code not run if asserts are off * and used proper atomic array instead of manually messing with an array of atomics in assertions * Extracted cold path for init cache chunk --- .../blobcache/common/SparseFileTracker.java | 1 + .../shared/SharedBlobCacheService.java | 110 +++++++++--------- 2 files changed, 58 insertions(+), 53 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java index 27d5b26b37e3..45352ed71642 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java @@ -260,6 +260,7 @@ private void determineStartingRange(ByteRange range, List pendingRanges, } public boolean checkAvailable(long upTo) { + assert upTo <= length : "tried to check availability up to [" + upTo + "] but length is only [" + length + "]"; return complete >= upTo; } diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 73bd89e2c7f3..038d5714d227 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -53,7 +53,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.LongAdder; import java.util.function.LongConsumer; import java.util.function.Predicate; @@ -264,7 +264,7 @@ public void validate(ByteSizeValue value, Map, Object> settings, bool private final int maxFreq; private final long minTimeDelta; - private final AtomicReference[] regionOwners; // to assert exclusive access of regions + private final AtomicReferenceArray regionOwners; // to assert exclusive access of regions private final CacheDecayTask decayTask; @@ -291,10 +291,7 @@ public SharedBlobCacheService(NodeEnvironment environment, Settings settings, Th this.numRegions = Math.toIntExact(cacheSize / regionSize); keyMapping = new ConcurrentHashMap<>(); if (Assertions.ENABLED) { - regionOwners = new AtomicReference[numRegions]; - for (int i = 0; i < numRegions; i++) { - regionOwners[i] = new AtomicReference<>(); - } + regionOwners = new AtomicReferenceArray<>(numRegions); } else { regionOwners = null; } @@ -395,48 +392,24 @@ private long getRegionSize(long fileLength, int region) { } public CacheFileRegion get(KeyType cacheKey, long fileLength, int region) { - final long effectiveRegionSize = getRegionSize(fileLength, region); final RegionKey regionKey = new RegionKey<>(cacheKey, region); final long now = threadPool.relativeTimeInMillis(); - final Entry entry = keyMapping.computeIfAbsent( - regionKey, - key -> new Entry<>(new CacheFileRegion(key, effectiveRegionSize), now) - ); + // try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path if we did not + // find an entry + var entry = keyMapping.get(regionKey); + if (entry == null) { + final long effectiveRegionSize = getRegionSize(fileLength, region); + entry = keyMapping.computeIfAbsent(regionKey, key -> new Entry<>(new CacheFileRegion(key, effectiveRegionSize), now)); + } // sharedBytesPos is volatile, double locking is fine, as long as we assign it last. if (entry.chunk.sharedBytesPos == -1) { synchronized (entry.chunk) { if (entry.chunk.sharedBytesPos == -1) { - if (keyMapping.get(regionKey) != entry) { - throw new AlreadyClosedException("no free region found (contender)"); - } - // new item - assert entry.freq == 0; - assert entry.prev == null; - assert entry.next == null; - final Integer freeSlot = freeRegions.poll(); - if (freeSlot != null) { - // no need to evict an item, just add - assignToSlot(entry, freeSlot); - } else { - // need to evict something - synchronized (this) { - maybeEvict(); - } - final Integer freeSlotRetry = freeRegions.poll(); - if (freeSlotRetry != null) { - assignToSlot(entry, freeSlotRetry); - } else { - boolean removed = keyMapping.remove(regionKey, entry); - assert removed; - throw new AlreadyClosedException("no free region found"); - } - } - - return entry.chunk; + return initChunk(entry); } } } - assertChunkActiveOrEvicted(entry); + assert assertChunkActiveOrEvicted(entry); // existing item, check if we need to promote item if (now - entry.lastAccessed >= minTimeDelta) { @@ -446,6 +419,38 @@ public CacheFileRegion get(KeyType cacheKey, long fileLength, int region) { return entry.chunk; } + private CacheFileRegion initChunk(Entry entry) { + assert Thread.holdsLock(entry.chunk); + RegionKey regionKey = entry.chunk.regionKey; + if (keyMapping.get(regionKey) != entry) { + throw new AlreadyClosedException("no free region found (contender)"); + } + // new item + assert entry.freq == 0; + assert entry.prev == null; + assert entry.next == null; + final Integer freeSlot = freeRegions.poll(); + if (freeSlot != null) { + // no need to evict an item, just add + assignToSlot(entry, freeSlot); + } else { + // need to evict something + synchronized (this) { + maybeEvict(); + } + final Integer freeSlotRetry = freeRegions.poll(); + if (freeSlotRetry != null) { + assignToSlot(entry, freeSlotRetry); + } else { + boolean removed = keyMapping.remove(regionKey, entry); + assert removed; + throw new AlreadyClosedException("no free region found"); + } + } + + return entry.chunk; + } + private void maybePromote(long now, Entry entry) { synchronized (this) { if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq && entry.chunk.isEvicted() == false) { @@ -458,10 +463,10 @@ private void maybePromote(long now, Entry entry) { } private void assignToSlot(Entry entry, int freeSlot) { - assert regionOwners[freeSlot].compareAndSet(null, entry.chunk); + assert regionOwners.compareAndSet(freeSlot, null, entry.chunk); synchronized (this) { if (entry.chunk.isEvicted()) { - assert regionOwners[freeSlot].compareAndSet(entry.chunk, null); + assert regionOwners.compareAndSet(freeSlot, entry.chunk, null); freeRegions.add(freeSlot); keyMapping.remove(entry.chunk.regionKey, entry); throw new AlreadyClosedException("evicted during free region allocation"); @@ -472,21 +477,20 @@ private void assignToSlot(Entry entry, int freeSlot) { } } - private void assertChunkActiveOrEvicted(Entry entry) { - if (Assertions.ENABLED) { - synchronized (this) { - // assert linked (or evicted) - assert entry.prev != null || entry.chunk.isEvicted(); + private boolean assertChunkActiveOrEvicted(Entry entry) { + synchronized (this) { + // assert linked (or evicted) + assert entry.prev != null || entry.chunk.isEvicted(); - } } - assert regionOwners[entry.chunk.sharedBytesPos].get() == entry.chunk || entry.chunk.isEvicted(); + assert regionOwners.get(entry.chunk.sharedBytesPos) == entry.chunk || entry.chunk.isEvicted(); + return true; } public void onClose(CacheFileRegion chunk) { // we held the "this" lock when this was evicted, hence if sharedBytesPos is not filled in, chunk will never be registered. if (chunk.sharedBytesPos != -1) { - assert regionOwners[chunk.sharedBytesPos].compareAndSet(chunk, null); + assert regionOwners.compareAndSet(chunk.sharedBytesPos, chunk, null); freeRegions.add(chunk.sharedBytesPos); } } @@ -812,7 +816,7 @@ void populateAndRead( rangeToRead, ActionListener.runBefore(listener, () -> Releasables.close(resources)).delegateFailureAndWrap((l, success) -> { final long physicalStartOffset = physicalStartOffset(); - assert regionOwners[sharedBytesPos].get() == this; + assert regionOwners.get(sharedBytesPos) == this; final int read = reader.onRangeAvailable( fileChannel, physicalStartOffset + rangeToRead.start(), @@ -852,7 +856,7 @@ protected void doRun() throws Exception { try { ensureOpen(); final long start = gap.start(); - assert regionOwners[sharedBytesPos].get() == CacheFileRegion.this; + assert regionOwners.get(sharedBytesPos) == CacheFileRegion.this; writer.fillCacheRange( fileChannel, physicalStartOffset() + gap.start(), @@ -916,7 +920,7 @@ public boolean tryRead(ByteBuffer buf, long offset) throws IOException { return false; } final CacheFileRegion fileRegion = get(cacheKey, length, startRegion); - if (fileRegion.tracker.checkAvailable(end) == false) { + if (fileRegion.tracker.checkAvailable(getRegionRelativePosition(end)) == false) { return false; } return fileRegion.tryRead(buf, offset); @@ -1032,7 +1036,7 @@ private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, Cac } private boolean assertValidRegionAndLength(CacheFileRegion fileRegion, long channelPos, long len) { - assert regionOwners[fileRegion.sharedBytesPos].get() == fileRegion; + assert regionOwners.get(fileRegion.sharedBytesPos) == fileRegion; assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset(); return true; } From 00b5050cb23d3201b012a03ac332b57f68363342 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Jul 2023 10:43:32 +0100 Subject: [PATCH 12/61] Fix NPE in Desired Balance API (#97775) If a shard has no desired assignment then we cannot send the desired balance over the wire or render it as JSON because the `ShardAssignmentView` is `null`. This commit replaces the spurious `null` with an empty object, and cleans up the tests a little to remove some unnecessary mocking. From 0b6055d65da9cd06d4b1594ae632c5147582e9d1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 19 Jul 2023 11:49:41 +0100 Subject: [PATCH 13/61] Improve chunking in desired balance API (#97800) The desired balance API response is chunked but it includes the `ClusterInfo` in a single chunk, and only splits up the routing table into per-index chunks. This commit makes the chunking more fine-grained. --- .../ClusterAllocationExplanation.java | 4 +- .../allocation/DesiredBalanceResponse.java | 59 ++++---- .../elasticsearch/cluster/ClusterInfo.java | 132 +++++++++++------- .../DesiredBalanceResponseTests.java | 6 +- ...TransportGetDesiredBalanceActionTests.java | 10 +- .../cluster/ClusterInfoTests.java | 10 ++ .../ReactiveStorageDeciderService.java | 4 +- 7 files changed, 139 insertions(+), 86 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java index fd2721241fb9..3200b41d23fe 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xcontent.ToXContentObject; @@ -182,7 +183,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (this.clusterInfo != null) { builder.startObject("cluster_info"); { - this.clusterInfo.toXContent(builder, params); + // This field might be huge, TODO add chunking support here + ChunkedToXContent.wrapAsToXContent(clusterInfo).toXContent(builder, params); } builder.endObject(); // end "cluster_info" } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponse.java index 72b69e059741..168dc5c054dc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponse.java @@ -31,7 +31,9 @@ import java.util.Objects; import java.util.Set; +import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endObject; import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.singleChunk; +import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.startObject; public class DesiredBalanceResponse extends ActionResponse implements ChunkedToXContentObject { @@ -95,26 +97,26 @@ public Iterator toXContentChunked(ToXContent.Params params (builder, p) -> builder.field("cluster_balance_stats", clusterBalanceStats), (builder, p) -> builder.startObject("routing_table") ), - routingTableToXContentChunked(), - singleChunk( - (builder, p) -> builder.endObject(), - (builder, p) -> builder.startObject("cluster_info").value(clusterInfo).endObject(), - (builder, p) -> builder.endObject() - ) + Iterators.flatMap( + routingTable.entrySet().iterator(), + indexEntry -> Iterators.concat( + startObject(indexEntry.getKey()), + Iterators.flatMap( + indexEntry.getValue().entrySet().iterator(), + shardEntry -> Iterators.concat( + singleChunk((builder, p) -> builder.field(String.valueOf(shardEntry.getKey()))), + shardEntry.getValue().toXContentChunked(params) + ) + ), + endObject() + ) + ), + singleChunk((builder, p) -> builder.endObject().startObject("cluster_info")), + clusterInfo.toXContentChunked(params), + singleChunk((builder, p) -> builder.endObject().endObject()) ); } - private Iterator routingTableToXContentChunked() { - return routingTable.entrySet().stream().map(indexEntry -> (ToXContent) (builder, p) -> { - builder.startObject(indexEntry.getKey()); - for (Map.Entry shardEntry : indexEntry.getValue().entrySet()) { - builder.field(String.valueOf(shardEntry.getKey())); - shardEntry.getValue().toXContent(builder, p); - } - return builder.endObject(); - }).iterator(); - } - public DesiredBalanceStats getStats() { return stats; } @@ -159,7 +161,7 @@ public String toString() { + "}"; } - public record DesiredShards(List current, ShardAssignmentView desired) implements Writeable, ToXContentObject { + public record DesiredShards(List current, ShardAssignmentView desired) implements Writeable, ChunkedToXContentObject { public static DesiredShards from(StreamInput in) throws IOException { return new DesiredShards(in.readList(ShardView::from), ShardAssignmentView.from(in)); @@ -172,17 +174,18 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.startArray("current"); - for (ShardView shardView : current) { - shardView.toXContent(builder, params); - } - builder.endArray(); - desired.toXContent(builder.field("desired"), params); - return builder.endObject(); + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat( + singleChunk((builder, p) -> builder.startObject(), (builder, p) -> builder.startArray("current")), + current().iterator(), + singleChunk( + (builder, p) -> builder.endArray(), + (builder, p) -> builder.field("desired"), + desired, + (builder, p) -> builder.endObject() + ) + ); } - } public record ShardView( diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 7fc85f64fee8..a166771a7e29 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -13,17 +13,20 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.StoreStats; -import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -31,6 +34,9 @@ import static org.elasticsearch.cluster.routing.ShardRouting.newUnassigned; import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.REINITIALIZED; +import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endArray; +import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.singleChunk; +import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.startObject; /** * ClusterInfo is an object representing a map of nodes to {@link DiskUsage} @@ -38,7 +44,7 @@ * InternalClusterInfoService.shardIdentifierFromRouting(String) * for the key used in the shardSizes map */ -public class ClusterInfo implements ToXContentFragment, Writeable { +public class ClusterInfo implements ChunkedToXContent, Writeable { public static final ClusterInfo EMPTY = new ClusterInfo(); @@ -132,66 +138,79 @@ private static ShardRouting createFakeShardRoutingFromNodeAndShard(NodeAndShard ).initialize(nodeAndShard.nodeId, null, 0L).moveToStarted(0L); } - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("nodes"); - { - for (Map.Entry c : this.leastAvailableSpaceUsage.entrySet()) { - builder.startObject(c.getKey()); - { // node - builder.field("node_name", c.getValue().getNodeName()); - builder.startObject("least_available"); - { - c.getValue().toShortXContent(builder); - } - builder.endObject(); // end "least_available" - builder.startObject("most_available"); - { - DiskUsage most = this.mostAvailableSpaceUsage.get(c.getKey()); - if (most != null) { - most.toShortXContent(builder); - } + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat(startObject("nodes"), leastAvailableSpaceUsage.entrySet().stream().map(c -> (builder, p) -> { + builder.startObject(c.getKey()); + { // node + builder.field("node_name", c.getValue().getNodeName()); + builder.startObject("least_available"); + { + c.getValue().toShortXContent(builder); + } + builder.endObject(); // end "least_available" + builder.startObject("most_available"); + { + DiskUsage most = this.mostAvailableSpaceUsage.get(c.getKey()); + if (most != null) { + most.toShortXContent(builder); } - builder.endObject(); // end "most_available" } - builder.endObject(); // end $nodename - } - } - builder.endObject(); // end "nodes" - builder.startObject("shard_sizes"); - { - for (Map.Entry c : this.shardSizes.entrySet()) { - builder.humanReadableField(c.getKey() + "_bytes", c.getKey(), ByteSizeValue.ofBytes(c.getValue())); - } - } - builder.endObject(); // end "shard_sizes" - builder.startObject("shard_data_set_sizes"); - { - for (Map.Entry c : this.shardDataSetSizes.entrySet()) { - builder.humanReadableField(c.getKey() + "_bytes", c.getKey().toString(), ByteSizeValue.ofBytes(c.getValue())); + builder.endObject(); // end "most_available" } - } - builder.endObject(); // end "shard_data_set_sizes" - builder.startObject("shard_paths"); - { - for (Map.Entry c : this.dataPath.entrySet()) { - builder.field(c.getKey().toString(), c.getValue()); - } - } - builder.endObject(); // end "shard_paths" - builder.startArray("reserved_sizes"); - { - for (Map.Entry c : this.reservedSpace.entrySet()) { + builder.endObject(); // end $nodename + return builder; + }).iterator(), + singleChunk( + (builder, p) -> builder.endObject(), // end "nodes" + (builder, p) -> builder.startObject("shard_sizes") + ), + + shardSizes.entrySet() + .stream() + .map( + c -> (builder, p) -> builder.humanReadableField(c.getKey() + "_bytes", c.getKey(), ByteSizeValue.ofBytes(c.getValue())) + ) + .iterator(), + singleChunk( + (builder, p) -> builder.endObject(), // end "shard_sizes" + (builder, p) -> builder.startObject("shard_data_set_sizes") + ), + shardDataSetSizes.entrySet() + .stream() + .map( + c -> (builder, p) -> builder.humanReadableField( + c.getKey() + "_bytes", + c.getKey().toString(), + ByteSizeValue.ofBytes(c.getValue()) + ) + ) + .iterator(), + singleChunk( + (builder, p) -> builder.endObject(), // end "shard_data_set_sizes" + (builder, p) -> builder.startObject("shard_paths") + ), + dataPath.entrySet() + .stream() + .map(c -> (builder, p) -> builder.field(c.getKey().toString(), c.getValue())) + .iterator(), + singleChunk( + (builder, p) -> builder.endObject(), // end "shard_paths" + (builder, p) -> builder.startArray("reserved_sizes") + ), + reservedSpace.entrySet().stream().map(c -> (builder, p) -> { builder.startObject(); { builder.field("node_id", c.getKey().nodeId); builder.field("path", c.getKey().path); c.getValue().toXContent(builder, params); } - builder.endObject(); // NodeAndPath - } - } - builder.endArray(); // end "reserved_sizes" - return builder; + return builder.endObject(); // NodeAndPath + }).iterator(), + + endArray() // end "reserved_sizes" + + ); } /** @@ -290,6 +309,11 @@ public String toString() { return Strings.toString(this, true, false); } + // exposed for tests, computed here rather than exposing all the collections separately + int getChunkCount() { + return leastAvailableSpaceUsage.size() + shardSizes.size() + shardDataSetSizes.size() + dataPath.size() + reservedSpace.size() + 6; + } + public record NodeAndShard(String nodeId, ShardId shardId) implements Writeable { public NodeAndShard { @@ -358,7 +382,7 @@ public boolean containsShardId(ShardId shardId) { return shardIds.contains(shardId); } - void toXContent(XContentBuilder builder, Params params) throws IOException { + void toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.field("total", total); builder.startArray("shards"); { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponseTests.java index f40dcbe3bc66..2154381d497c 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/DesiredBalanceResponseTests.java @@ -320,7 +320,11 @@ public void testToXContent() throws IOException { public void testChunking() { AbstractChunkedSerializingTestCase.assertChunkCount( new DesiredBalanceResponse(randomDesiredBalanceStats(), randomClusterBalanceStats(), randomRoutingTable(), randomClusterInfo()), - response -> response.getRoutingTable().size() + 2 + response -> 3 + ClusterInfoTests.getChunkCount(response.getClusterInfo()) + response.getRoutingTable() + .values() + .stream() + .mapToInt(indexEntry -> 2 + indexEntry.values().stream().mapToInt(shardEntry -> 3 + shardEntry.current().size()).sum()) + .sum() ); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java index 08a051ebf8e1..bfaac7cfe4a8 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterInfoTests; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; @@ -238,7 +239,14 @@ public void testGetDesiredBalance() throws Exception { assertEquals(indexShards.keySet(), desiredBalanceResponse.getRoutingTable().keySet()); assertEquals(desiredBalanceResponse, copyWriteable(desiredBalanceResponse, writableRegistry(), DesiredBalanceResponse::from)); - AbstractChunkedSerializingTestCase.assertChunkCount(desiredBalanceResponse, r -> 2 + r.getRoutingTable().size()); + AbstractChunkedSerializingTestCase.assertChunkCount( + desiredBalanceResponse, + response -> 3 + ClusterInfoTests.getChunkCount(response.getClusterInfo()) + response.getRoutingTable() + .values() + .stream() + .mapToInt(indexEntry -> 2 + indexEntry.values().stream().mapToInt(shardEntry -> 3 + shardEntry.current().size()).sum()) + .sum() + ); for (var e : desiredBalanceResponse.getRoutingTable().entrySet()) { String index = e.getKey(); diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 09e3fc71e033..8a772ab3e8d5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.test.AbstractWireSerializingTestCase; import java.util.HashMap; @@ -106,4 +107,13 @@ private static Map randomRes } return builder; } + + public void testChunking() { + AbstractChunkedSerializingTestCase.assertChunkCount(createTestInstance(), ClusterInfoTests::getChunkCount); + } + + // exposing this to tests in other packages + public static int getChunkCount(ClusterInfo clusterInfo) { + return clusterInfo.getChunkCount(); + } } diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index f66b21fc9b23..f80758351000 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext; @@ -61,6 +62,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -952,7 +954,7 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + public Iterator toXContentChunked(ToXContent.Params params) { throw new UnsupportedOperationException(); } } From 3fd94678e69da569b9d9cb0706577f88f9becf10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20Zolt=C3=A1n=20Szab=C3=B3?= Date: Wed, 19 Jul 2023 13:32:06 +0200 Subject: [PATCH 14/61] [DOCS] Adds important admonition to handling delayed data page (#97753) --- .../ml/anomaly-detection/ml-delayed-data-detection.asciidoc | 6 ++++++ .../elasticsearch/xpack/core/ml/job/messages/Messages.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/reference/ml/anomaly-detection/ml-delayed-data-detection.asciidoc b/docs/reference/ml/anomaly-detection/ml-delayed-data-detection.asciidoc index 60b3b5b16366..a5a7b01f095a 100644 --- a/docs/reference/ml/anomaly-detection/ml-delayed-data-detection.asciidoc +++ b/docs/reference/ml/anomaly-detection/ml-delayed-data-detection.asciidoc @@ -16,6 +16,12 @@ if it is set too high, analysis drifts farther away from real-time. The balance that is struck depends upon each use case and the environmental factors of the cluster. +IMPORTANT: If you get an error that says +`Datafeed missed XXXX documents due to ingest latency`, consider increasing +the value of `query_delay'. If it doesn't help, investigate the ingest latency and its +cause. You can do this by comparing event and ingest timestamps. High latency +is often caused by bursts of ingested documents, misconfiguration of the ingest +pipeline, or misalignment of system clocks. == Why worry about delayed data? If data are delayed randomly (and consequently are missing from analysis), the diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 77b4dcaaac83..5d0a4aebcca2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -154,7 +154,7 @@ public final class Messages { public static final String JOB_AUDIT_DATAFEED_NO_DATA = "Datafeed has been retrieving no data for a while"; public static final String JOB_AUDIT_DATAFEED_MISSING_DATA = "Datafeed has missed {0} documents due to ingest latency, latest bucket with missing data is [{1}]." - + " Consider increasing query_delay"; + + " Consider increasing query_delay and investigate the cause of high latency in your ingestion process."; public static final String JOB_AUDIT_DATAFEED_RECOVERED = "Datafeed has recovered data extraction and analysis"; public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]"; public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time"; From d7bb3a99ebaf7ad406b453c91d83ff7efa4d3297 Mon Sep 17 00:00:00 2001 From: Jonathan Buttner <56361221+jonathan-buttner@users.noreply.github.com> Date: Wed, 19 Jul 2023 08:25:55 -0400 Subject: [PATCH 15/61] [ML] Create task for model download (#97698) * Adding task and parent association * Experimenting with deleting on cancel * Removing comments * Refactoring tests * Addressing PR feedback --- .../elasticsearch/xpack/core/ml/MlTasks.java | 3 +- .../packageloader/action/ModelImporter.java | 32 ++++-- .../TransportLoadTrainedModelPackage.java | 99 +++++++++++++++---- ...TransportLoadTrainedModelPackageTests.java | 74 ++++++++++---- 4 files changed, 162 insertions(+), 46 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index 8c3d5a8ddabe..cc25b5c61d33 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -28,7 +28,8 @@ import java.util.stream.Collectors; public final class MlTasks { - + public static final String MODEL_IMPORT_TASK_TYPE = "model_import"; + public static final String MODEL_IMPORT_TASK_ACTION = "xpack/ml/model_import[n]"; public static final String TRAINED_MODEL_ASSIGNMENT_TASK_TYPE = "trained_model_assignment"; public static final String TRAINED_MODEL_ASSIGNMENT_TASK_ACTION = "xpack/ml/trained_model_assignment[n]"; diff --git a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/ModelImporter.java b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/ModelImporter.java index 2d0a2b196d62..9f2c7c4dee2d 100644 --- a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/ModelImporter.java +++ b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/ModelImporter.java @@ -11,11 +11,16 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.core.Tuple; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.ml.action.AuditMlNotificationAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelDefinitionPartAction; @@ -40,11 +45,13 @@ class ModelImporter { private final Client client; private final String modelId; private final ModelPackageConfig config; + private final CancellableTask task; - ModelImporter(Client client, String modelId, ModelPackageConfig packageConfig) { + ModelImporter(Client client, String modelId, ModelPackageConfig packageConfig, CancellableTask task) { this.client = client; this.modelId = Objects.requireNonNull(modelId); this.config = Objects.requireNonNull(packageConfig); + this.task = Objects.requireNonNull(task); } public void doImport() throws URISyntaxException, IOException, ElasticsearchStatusException { @@ -73,7 +80,7 @@ public void doImport() throws URISyntaxException, IOException, ElasticsearchStat for (int part = 0; part < totalParts - 1; ++part) { BytesArray definition = chunkIterator.next(); - PutTrainedModelDefinitionPartAction.Request r = new PutTrainedModelDefinitionPartAction.Request( + PutTrainedModelDefinitionPartAction.Request modelPartRequest = new PutTrainedModelDefinitionPartAction.Request( modelId, definition, part, @@ -81,7 +88,7 @@ public void doImport() throws URISyntaxException, IOException, ElasticsearchStat totalParts ); - client.execute(PutTrainedModelDefinitionPartAction.INSTANCE, r).actionGet(); + executeRequestIfNotCancelled(PutTrainedModelDefinitionPartAction.INSTANCE, modelPartRequest); } // get the last part, this time verify the checksum and size @@ -107,7 +114,7 @@ public void doImport() throws URISyntaxException, IOException, ElasticsearchStat throw new ElasticsearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR); } - PutTrainedModelDefinitionPartAction.Request r = new PutTrainedModelDefinitionPartAction.Request( + PutTrainedModelDefinitionPartAction.Request finalModelPartRequest = new PutTrainedModelDefinitionPartAction.Request( modelId, definition, totalParts - 1, @@ -115,7 +122,7 @@ public void doImport() throws URISyntaxException, IOException, ElasticsearchStat totalParts ); - client.execute(PutTrainedModelDefinitionPartAction.INSTANCE, r).actionGet(); + executeRequestIfNotCancelled(PutTrainedModelDefinitionPartAction.INSTANCE, finalModelPartRequest); logger.debug(format("finished importing model [%s] using [%d] parts", modelId, totalParts)); } @@ -124,14 +131,25 @@ private void uploadVocabulary() throws URISyntaxException { ModelLoaderUtils.resolvePackageLocation(config.getModelRepository(), config.getVocabularyFile()) ); - PutTrainedModelVocabularyAction.Request r2 = new PutTrainedModelVocabularyAction.Request( + PutTrainedModelVocabularyAction.Request request = new PutTrainedModelVocabularyAction.Request( modelId, vocabularyAndMerges.v1(), vocabularyAndMerges.v2(), List.of() ); - client.execute(PutTrainedModelVocabularyAction.INSTANCE, r2).actionGet(); + executeRequestIfNotCancelled(PutTrainedModelVocabularyAction.INSTANCE, request); + } + + private void executeRequestIfNotCancelled( + ActionType action, + Request request + ) { + if (task.isCancelled()) { + throw new TaskCancelledException(format("task cancelled with reason [%s]", task.getReasonCancelled())); + } + + client.execute(action, request).actionGet(); } private void writeDebugNotification(String modelId, String message) { diff --git a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackage.java b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackage.java index 750979923d5b..9be81ae1bf9d 100644 --- a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackage.java +++ b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackage.java @@ -17,19 +17,23 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.ml.action.AuditMlNotificationAction; import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; -import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ModelPackageConfig; import org.elasticsearch.xpack.core.ml.packageloader.action.LoadTrainedModelPackageAction; import org.elasticsearch.xpack.core.ml.packageloader.action.LoadTrainedModelPackageAction.Request; import org.elasticsearch.xpack.ml.packageloader.MachineLearningPackageLoader; @@ -37,11 +41,14 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URISyntaxException; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ml.MlTasks.MODEL_IMPORT_TASK_ACTION; +import static org.elasticsearch.xpack.core.ml.MlTasks.MODEL_IMPORT_TASK_TYPE; public class TransportLoadTrainedModelPackage extends TransportMasterNodeAction { @@ -72,26 +79,60 @@ public TransportLoadTrainedModelPackage( this.client = new OriginSettingClient(client, ML_ORIGIN); } + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return null; + } + @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - String modelId = request.getModelId(); - ModelPackageConfig packageConfig = request.getModelPackageConfig(); - ModelImporter modelImporter = new ModelImporter(client, modelId, packageConfig); + CancellableTask downloadTask = createDownloadTask(request); + + try { + ParentTaskAssigningClient parentTaskAssigningClient = getParentTaskAssigningClient(downloadTask); - threadPool.executor(MachineLearningPackageLoader.UTILITY_THREAD_POOL_NAME) - .execute(() -> importModel(client, request, modelImporter, listener)); + ModelImporter modelImporter = new ModelImporter( + parentTaskAssigningClient, + request.getModelId(), + request.getModelPackageConfig(), + downloadTask + ); + + threadPool.executor(MachineLearningPackageLoader.UTILITY_THREAD_POOL_NAME) + .execute(() -> importModel(client, taskManager, request, modelImporter, listener, downloadTask)); + } catch (Exception e) { + taskManager.unregister(downloadTask); + throw e; + } if (request.isWaitForCompletion() == false) { listener.onResponse(AcknowledgedResponse.TRUE); } } + private ParentTaskAssigningClient getParentTaskAssigningClient(Task originTask) { + var parentTaskId = new TaskId(clusterService.localNode().getId(), originTask.getId()); + return new ParentTaskAssigningClient(client, parentTaskId); + } + /** * This is package scope so that we can test the logic directly. * This should only be called from the masterOperation method and the tests + * + * @param auditClient a client which should only be used to send audit notifications. This client cannot be associated with the passed + * in task, that way when the task is cancelled the notification requests can + * still be performed. If it is associated with the task (i.e. via ParentTaskAssigningClient), + * then the requests will throw a TaskCancelledException. */ - static void importModel(Client client, Request request, ModelImporter modelImporter, ActionListener listener) { + static void importModel( + Client auditClient, + TaskManager taskManager, + Request request, + ModelImporter modelImporter, + ActionListener listener, + Task task + ) { String modelId = request.getModelId(); final AtomicReference exceptionRef = new AtomicReference<>(); @@ -102,31 +143,58 @@ static void importModel(Client client, Request request, ModelImporter modelImpor final long totalRuntimeNanos = System.nanoTime() - relativeStartNanos; logAndWriteNotificationAtInfo( - client, + auditClient, modelId, format("finished model import after [%d] seconds", TimeUnit.NANOSECONDS.toSeconds(totalRuntimeNanos)) ); } catch (ElasticsearchException e) { - recordError(client, modelId, exceptionRef, e); + recordError(auditClient, modelId, exceptionRef, e); } catch (MalformedURLException e) { - recordError(client, modelId, "an invalid URL", exceptionRef, e, RestStatus.INTERNAL_SERVER_ERROR); + recordError(auditClient, modelId, "an invalid URL", exceptionRef, e, RestStatus.INTERNAL_SERVER_ERROR); } catch (URISyntaxException e) { - recordError(client, modelId, "an invalid URL syntax", exceptionRef, e, RestStatus.INTERNAL_SERVER_ERROR); + recordError(auditClient, modelId, "an invalid URL syntax", exceptionRef, e, RestStatus.INTERNAL_SERVER_ERROR); } catch (IOException e) { - recordError(client, modelId, "an IOException", exceptionRef, e, RestStatus.SERVICE_UNAVAILABLE); + recordError(auditClient, modelId, "an IOException", exceptionRef, e, RestStatus.SERVICE_UNAVAILABLE); } catch (Exception e) { - recordError(client, modelId, "an Exception", exceptionRef, e, RestStatus.INTERNAL_SERVER_ERROR); + recordError(auditClient, modelId, "an Exception", exceptionRef, e, RestStatus.INTERNAL_SERVER_ERROR); } finally { + taskManager.unregister(task); + if (request.isWaitForCompletion()) { if (exceptionRef.get() != null) { listener.onFailure(exceptionRef.get()); } else { listener.onResponse(AcknowledgedResponse.TRUE); } + } } } + private CancellableTask createDownloadTask(Request request) { + return (CancellableTask) taskManager.register(MODEL_IMPORT_TASK_TYPE, MODEL_IMPORT_TASK_ACTION, new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) { + request.setParentTask(taskId); + } + + @Override + public void setRequestId(long requestId) { + request.setRequestId(requestId); + } + + @Override + public TaskId getParentTask() { + return request.getParentTask(); + } + + @Override + public CancellableTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers); + } + }); + } + private static void recordError(Client client, String modelId, AtomicReference exceptionRef, Exception e) { logAndWriteNotificationAtError(client, modelId, e.toString()); exceptionRef.set(e); @@ -162,9 +230,4 @@ private static void writeNotification(Client client, String modelId, String mess ActionListener.noop() ); } - - @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { - return null; - } } diff --git a/x-pack/plugin/ml-package-loader/src/test/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackageTests.java b/x-pack/plugin/ml-package-loader/src/test/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackageTests.java index 664fbf63a1f4..a4d7245acba6 100644 --- a/x-pack/plugin/ml-package-loader/src/test/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackageTests.java +++ b/x-pack/plugin/ml-package-loader/src/test/java/org/elasticsearch/xpack/ml/packageloader/action/TransportLoadTrainedModelPackageTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.action.AuditMlNotificationAction; import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ModelPackageConfig; @@ -22,6 +24,7 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URISyntaxException; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.core.Strings.format; import static org.hamcrest.core.Is.is; @@ -35,10 +38,19 @@ public class TransportLoadTrainedModelPackageTests extends ESTestCase { private static final String MODEL_IMPORT_FAILURE_MSG_FORMAT = "Model importing failed due to %s [%s]"; public void testSendsFinishedUploadNotification() { - ModelImporter uploader = mock(ModelImporter.class); - Client client = mock(Client.class); + var uploader = mock(ModelImporter.class); + var taskManager = mock(TaskManager.class); + var task = mock(Task.class); + var client = mock(Client.class); - TransportLoadTrainedModelPackage.importModel(client, createRequest(true), uploader, ActionListener.noop()); + TransportLoadTrainedModelPackage.importModel( + client, + taskManager, + createRequestWithWaiting(), + uploader, + ActionListener.noop(), + task + ); var notificationArg = ArgumentCaptor.forClass(AuditMlNotificationAction.Request.class); verify(client).execute(eq(AuditMlNotificationAction.INSTANCE), notificationArg.capture(), any()); @@ -80,20 +92,32 @@ public void testSendsErrorNotificationForException() throws URISyntaxException, } public void testCallsOnResponseWithAcknowledgedResponse() throws URISyntaxException, IOException { - Client client = mock(Client.class); + var client = mock(Client.class); + var taskManager = mock(TaskManager.class); + var task = mock(Task.class); ModelImporter uploader = createUploader(null); - @SuppressWarnings("unchecked") - var listener = (ActionListener) mock(ActionListener.class); - TransportLoadTrainedModelPackage.importModel(client, createRequest(true), uploader, listener); + var responseRef = new AtomicReference(); + var listener = ActionListener.wrap(responseRef::set, e -> fail("received an exception: " + e.getMessage())); - verify(listener).onResponse(AcknowledgedResponse.TRUE); + TransportLoadTrainedModelPackage.importModel(client, taskManager, createRequestWithWaiting(), uploader, listener, task); + assertThat(responseRef.get(), is(AcknowledgedResponse.TRUE)); } public void testDoesNotCallListenerWhenNotWaitingForCompletion() { var uploader = mock(ModelImporter.class); var client = mock(Client.class); - TransportLoadTrainedModelPackage.importModel(client, createRequest(false), uploader, ActionListener.running(ESTestCase::fail)); + var taskManager = mock(TaskManager.class); + var task = mock(Task.class); + + TransportLoadTrainedModelPackage.importModel( + client, + taskManager, + createRequestWithoutWaiting(), + uploader, + ActionListener.running(ESTestCase::fail), + task + ); } private void assertUploadCallsOnFailure(Exception exception, String message, RestStatus status) throws URISyntaxException, IOException { @@ -108,22 +132,28 @@ private void assertUploadCallsOnFailure(ElasticsearchStatusException exception, private void assertNotificationAndOnFailure(Exception thrownException, ElasticsearchStatusException onFailureException, String message) throws URISyntaxException, IOException { - Client client = mock(Client.class); + var client = mock(Client.class); + var taskManager = mock(TaskManager.class); + var task = mock(Task.class); ModelImporter uploader = createUploader(thrownException); - @SuppressWarnings("unchecked") - var listener = (ActionListener) mock(ActionListener.class); - TransportLoadTrainedModelPackage.importModel(client, createRequest(true), uploader, listener); + var failureRef = new AtomicReference(); + var listener = ActionListener.wrap( + (AcknowledgedResponse response) -> { fail("received a acknowledged response: " + response.toString()); }, + failureRef::set + ); + TransportLoadTrainedModelPackage.importModel(client, taskManager, createRequestWithWaiting(), uploader, listener, task); var notificationArg = ArgumentCaptor.forClass(AuditMlNotificationAction.Request.class); verify(client).execute(eq(AuditMlNotificationAction.INSTANCE), notificationArg.capture(), any()); assertThat(notificationArg.getValue().getMessage(), is(message)); - var listenerArg = ArgumentCaptor.forClass(ElasticsearchStatusException.class); - verify(listener).onFailure(listenerArg.capture()); - assertThat(listenerArg.getValue().toString(), is(onFailureException.toString())); - assertThat(listenerArg.getValue().status(), is(onFailureException.status())); - assertThat(listenerArg.getValue().getCause(), is(onFailureException.getCause())); + var receivedException = (ElasticsearchStatusException) failureRef.get(); + assertThat(receivedException.toString(), is(onFailureException.toString())); + assertThat(receivedException.status(), is(onFailureException.status())); + assertThat(receivedException.getCause(), is(onFailureException.getCause())); + + verify(taskManager).unregister(task); } private ModelImporter createUploader(Exception exception) throws URISyntaxException, IOException { @@ -135,7 +165,11 @@ private ModelImporter createUploader(Exception exception) throws URISyntaxExcept return uploader; } - private LoadTrainedModelPackageAction.Request createRequest(boolean waitForCompletion) { - return new LoadTrainedModelPackageAction.Request("id", mock(ModelPackageConfig.class), waitForCompletion); + private LoadTrainedModelPackageAction.Request createRequestWithWaiting() { + return new LoadTrainedModelPackageAction.Request("id", mock(ModelPackageConfig.class), true); + } + + private LoadTrainedModelPackageAction.Request createRequestWithoutWaiting() { + return new LoadTrainedModelPackageAction.Request("id", mock(ModelPackageConfig.class), false); } } From d2685926f31f2766197e2b07425f14c3e271d782 Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Wed, 19 Jul 2023 14:26:16 +0200 Subject: [PATCH 16/61] Document dictionary parameter for Search Applications (#97501) * Document dictionary parameter for Search Applications * Address PR feedback --- .../apis/put-search-application.asciidoc | 91 ++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/docs/reference/search-application/apis/put-search-application.asciidoc b/docs/reference/search-application/apis/put-search-application.asciidoc index 9e77b2c98cd6..e4ed907be8b9 100644 --- a/docs/reference/search-application/apis/put-search-application.asciidoc +++ b/docs/reference/search-application/apis/put-search-application.asciidoc @@ -49,6 +49,21 @@ The <> associated with this search application. - The template may be modified with subsequent <> requests. - If no template is specified when creating a search application, or if a template is removed from a search application, we use the <> defined in the template examples as a default. - This template will be used by the <> API to execute searches. +- The template accepts an optional `dictionary` parameter which defines a https://json-schema.org[JSON schema] used for validating parameters sent to the <> API. + +.Properties of `