From 5244077d5e8b389313cb6558027b1a5e197bf66c Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 1 Dec 2023 12:53:32 -0600 Subject: [PATCH] fixing tests --- ...rollDocumentsAfterConflictsIntegTests.java | 79 +++++++++--------- .../action/bulk/BulkRequest.java | 1 + .../inference/registry/ModelRegistry.java | 7 +- .../ml/integration/JobResultsProviderIT.java | 81 ++++++++++--------- .../security/authc/TokenServiceTests.java | 11 +-- 5 files changed, 88 insertions(+), 91 deletions(-) diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java index 7dad062ab3bca..a90ba48406fcd 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java @@ -213,55 +213,56 @@ public void testDeleteByQuery() throws Exception { docsModifiedConcurrently.addAll(randomSubsetOf(finalConflictingOps, originalDocs)); } ); - BulkRequest conflictingUpdatesBulkRequest = new BulkRequest(); - for (SearchHit searchHit : docsModifiedConcurrently) { - if (scriptEnabled && searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD)) { - conflictingOps--; + try (BulkRequest conflictingUpdatesBulkRequest = new BulkRequest()) { + for (SearchHit searchHit : docsModifiedConcurrently) { + if (scriptEnabled && searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD)) { + conflictingOps--; + } + conflictingUpdatesBulkRequest.add(createUpdatedIndexRequest(searchHit, targetIndex, useOptimisticConcurrency)); } - conflictingUpdatesBulkRequest.add(createUpdatedIndexRequest(searchHit, targetIndex, useOptimisticConcurrency)); - } - // The bulk request is enqueued before the update by query - // Since #77731 TransportBulkAction is dispatched into the Write thread pool, - // this test makes use of a deterministic task order in the data node write - // thread pool. To ensure that ordering, execute the TransportBulkAction - // in a coordinator node preventing that additional tasks are scheduled into - // the data node write thread pool. - final ActionFuture bulkFuture = internalCluster().coordOnlyNodeClient().bulk(conflictingUpdatesBulkRequest); + // The bulk request is enqueued before the update by query + // Since #77731 TransportBulkAction is dispatched into the Write thread pool, + // this test makes use of a deterministic task order in the data node write + // thread pool. To ensure that ordering, execute the TransportBulkAction + // in a coordinator node preventing that additional tasks are scheduled into + // the data node write thread pool. + final ActionFuture bulkFuture = internalCluster().coordOnlyNodeClient().bulk(conflictingUpdatesBulkRequest); - // Ensure that the concurrent writes are enqueued before the update by query request is sent - assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(1))); + // Ensure that the concurrent writes are enqueued before the update by query request is sent + assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(1))); - requestBuilder.source(sourceIndex).maxDocs(maxDocs).abortOnVersionConflict(false); + requestBuilder.source(sourceIndex).maxDocs(maxDocs).abortOnVersionConflict(false); - if (scriptEnabled) { - final Script script = new Script(ScriptType.INLINE, SCRIPT_LANG, NOOP_GENERATOR, Collections.emptyMap()); - ((AbstractBulkIndexByScrollRequestBuilder) requestBuilder).script(script); - } + if (scriptEnabled) { + final Script script = new Script(ScriptType.INLINE, SCRIPT_LANG, NOOP_GENERATOR, Collections.emptyMap()); + ((AbstractBulkIndexByScrollRequestBuilder) requestBuilder).script(script); + } - final SearchRequestBuilder source = requestBuilder.source(); - source.setSize(scrollSize); - source.addSort(SORTING_FIELD, SortOrder.DESC); - source.setQuery(QueryBuilders.matchAllQuery()); - final ActionFuture updateByQueryResponse = requestBuilder.execute(); + final SearchRequestBuilder source = requestBuilder.source(); + source.setSize(scrollSize); + source.addSort(SORTING_FIELD, SortOrder.DESC); + source.setQuery(QueryBuilders.matchAllQuery()); + final ActionFuture updateByQueryResponse = requestBuilder.execute(); - assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(2))); + assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(2))); - // Allow tasks from the write thread to make progress - latch.countDown(); + // Allow tasks from the write thread to make progress + latch.countDown(); - final BulkResponse bulkItemResponses = bulkFuture.actionGet(); - for (BulkItemResponse bulkItemResponse : bulkItemResponses) { - assertThat(Strings.toString(bulkItemResponses), bulkItemResponse.isFailed(), is(false)); - } + final BulkResponse bulkItemResponses = bulkFuture.actionGet(); + for (BulkItemResponse bulkItemResponse : bulkItemResponses) { + assertThat(Strings.toString(bulkItemResponses), bulkItemResponse.isFailed(), is(false)); + } - final BulkByScrollResponse bulkByScrollResponse = updateByQueryResponse.actionGet(); - assertThat(bulkByScrollResponse.getVersionConflicts(), lessThanOrEqualTo((long) conflictingOps)); - // When scripts are enabled, the first maxDocs are a NoOp - final int candidateOps = scriptEnabled ? numDocs - maxDocs : numDocs; - int successfulOps = Math.min(candidateOps - conflictingOps, maxDocs); - assertThat(bulkByScrollResponse.getNoops(), is((long) (scriptEnabled ? maxDocs : 0))); - resultConsumer.accept(bulkByScrollResponse, successfulOps); + final BulkByScrollResponse bulkByScrollResponse = updateByQueryResponse.actionGet(); + assertThat(bulkByScrollResponse.getVersionConflicts(), lessThanOrEqualTo((long) conflictingOps)); + // When scripts are enabled, the first maxDocs are a NoOp + final int candidateOps = scriptEnabled ? numDocs - maxDocs : numDocs; + int successfulOps = Math.min(candidateOps - conflictingOps, maxDocs); + assertThat(bulkByScrollResponse.getNoops(), is((long) (scriptEnabled ? maxDocs : 0))); + resultConsumer.accept(bulkByScrollResponse, successfulOps); + } } private void createIndexWithSingleShard(String index) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 1fbe482fbabaa..30286a500f5e9 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -491,6 +491,7 @@ public boolean hasReferences() { @Override public void close() { + if (true) new RuntimeException("who did this??").printStackTrace(); decRef(); } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java index 05c664f7ceeea..986d0601c08f5 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/ModelRegistry.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; @@ -276,11 +277,11 @@ public void storeModel(Model model, ActionListener listener) { false ); - client.prepareBulk() - .add(configRequest) + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + bulkRequestBuilder.add(configRequest) .add(secretsRequest) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .execute(bulkResponseActionListener); + .execute(ActionListener.releaseAfter(bulkResponseActionListener, bulkRequestBuilder)); } private static ActionListener getStoreModelListener(Model model, ActionListener listener) { diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index 891779e28439b..c3f10353c37f7 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -1026,22 +1026,23 @@ private AnalysisConfig.Builder createAnalysisConfig(String byFieldName, List events) throws IOException { - BulkRequestBuilder bulkRequest = client().prepareBulk(); - bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - - for (ScheduledEvent event : events) { - IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName()); - try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - ToXContent.MapParams params = new ToXContent.MapParams( - Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true") - ); - indexRequest.source(event.toXContent(builder, params)); - bulkRequest.add(indexRequest); + try (BulkRequestBuilder bulkRequest = client().prepareBulk()) { + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (ScheduledEvent event : events) { + IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName()); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + ToXContent.MapParams params = new ToXContent.MapParams( + Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true") + ); + indexRequest.source(event.toXContent(builder, params)); + bulkRequest.add(indexRequest); + } + } + BulkResponse response = bulkRequest.get(); + if (response.hasFailures()) { + throw new IllegalStateException(Strings.toString(response)); } - } - BulkResponse response = bulkRequest.get(); - if (response.hasFailures()) { - throw new IllegalStateException(Strings.toString(response)); } } @@ -1051,20 +1052,21 @@ private void indexDataCounts(DataCounts counts, String jobId) throws Interrupted } private void indexFilters(List filters) throws IOException { - BulkRequestBuilder bulkRequest = client().prepareBulk(); - bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - - for (MlFilter filter : filters) { - IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName()).id(filter.documentId()); - try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - ToXContent.MapParams params = new ToXContent.MapParams( - Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true") - ); - indexRequest.source(filter.toXContent(builder, params)); - bulkRequest.add(indexRequest); + try (BulkRequestBuilder bulkRequest = client().prepareBulk()) { + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (MlFilter filter : filters) { + IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName()).id(filter.documentId()); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + ToXContent.MapParams params = new ToXContent.MapParams( + Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true") + ); + indexRequest.source(filter.toXContent(builder, params)); + bulkRequest.add(indexRequest); + } } + bulkRequest.get(); } - bulkRequest.get(); } private void indexModelSizeStats(ModelSizeStats modelSizeStats) { @@ -1101,19 +1103,20 @@ private void indexQuantiles(Quantiles quantiles) { } private void indexCalendars(List calendars) throws IOException { - BulkRequestBuilder bulkRequest = client().prepareBulk(); - bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - - for (Calendar calendar : calendars) { - IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName()).id(calendar.documentId()); - try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - ToXContent.MapParams params = new ToXContent.MapParams( - Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true") - ); - indexRequest.source(calendar.toXContent(builder, params)); - bulkRequest.add(indexRequest); + try (BulkRequestBuilder bulkRequest = client().prepareBulk()) { + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (Calendar calendar : calendars) { + IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName()).id(calendar.documentId()); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + ToXContent.MapParams params = new ToXContent.MapParams( + Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true") + ); + indexRequest.source(calendar.toXContent(builder, params)); + bulkRequest.add(indexRequest); + } } + bulkRequest.get(); } - bulkRequest.get(); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java index 455a10eace64c..69925b5640b36 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java @@ -152,7 +152,6 @@ public class TokenServiceTests extends ESTestCase { .build(); private MockLicenseState licenseState; private SecurityContext securityContext; - private BulkRequestBuilder bulkRequestBuilder; @Before public void setupClient() { @@ -165,8 +164,7 @@ public void setupClient() { return builder; }).when(client).prepareGet(anyString(), anyString()); when(client.prepareIndex(any(String.class))).thenReturn(new IndexRequestBuilder(client)); - bulkRequestBuilder = new BulkRequestBuilder(client); // closed in the test's cleanup() method - when(client.prepareBulk()).thenReturn(bulkRequestBuilder); + when(client.prepareBulk()).thenAnswer(invocation -> new BulkRequestBuilder(client)); when(client.prepareUpdate(any(String.class), any(String.class))).thenAnswer(inv -> { final String index = (String) inv.getArguments()[0]; final String id = (String) inv.getArguments()[1]; @@ -235,13 +233,6 @@ public void setupClient() { } } - @After - public void cleanup() { - if (bulkRequestBuilder != null) { - bulkRequestBuilder.close(); - } - } - private static DiscoveryNode addAnother7071DataNode(ClusterService clusterService) { Version version; TransportVersion transportVersion;