Skip to content

Commit

Permalink
fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 1, 2023
1 parent c454aff commit 5244077
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,55 +213,56 @@ public void testDeleteByQuery() throws Exception {
docsModifiedConcurrently.addAll(randomSubsetOf(finalConflictingOps, originalDocs));
}
);
BulkRequest conflictingUpdatesBulkRequest = new BulkRequest();
for (SearchHit searchHit : docsModifiedConcurrently) {
if (scriptEnabled && searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD)) {
conflictingOps--;
try (BulkRequest conflictingUpdatesBulkRequest = new BulkRequest()) {
for (SearchHit searchHit : docsModifiedConcurrently) {
if (scriptEnabled && searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD)) {
conflictingOps--;
}
conflictingUpdatesBulkRequest.add(createUpdatedIndexRequest(searchHit, targetIndex, useOptimisticConcurrency));
}
conflictingUpdatesBulkRequest.add(createUpdatedIndexRequest(searchHit, targetIndex, useOptimisticConcurrency));
}

// The bulk request is enqueued before the update by query
// Since #77731 TransportBulkAction is dispatched into the Write thread pool,
// this test makes use of a deterministic task order in the data node write
// thread pool. To ensure that ordering, execute the TransportBulkAction
// in a coordinator node preventing that additional tasks are scheduled into
// the data node write thread pool.
final ActionFuture<BulkResponse> bulkFuture = internalCluster().coordOnlyNodeClient().bulk(conflictingUpdatesBulkRequest);
// The bulk request is enqueued before the update by query
// Since #77731 TransportBulkAction is dispatched into the Write thread pool,
// this test makes use of a deterministic task order in the data node write
// thread pool. To ensure that ordering, execute the TransportBulkAction
// in a coordinator node preventing that additional tasks are scheduled into
// the data node write thread pool.
final ActionFuture<BulkResponse> bulkFuture = internalCluster().coordOnlyNodeClient().bulk(conflictingUpdatesBulkRequest);

// Ensure that the concurrent writes are enqueued before the update by query request is sent
assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(1)));
// Ensure that the concurrent writes are enqueued before the update by query request is sent
assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(1)));

requestBuilder.source(sourceIndex).maxDocs(maxDocs).abortOnVersionConflict(false);
requestBuilder.source(sourceIndex).maxDocs(maxDocs).abortOnVersionConflict(false);

if (scriptEnabled) {
final Script script = new Script(ScriptType.INLINE, SCRIPT_LANG, NOOP_GENERATOR, Collections.emptyMap());
((AbstractBulkIndexByScrollRequestBuilder) requestBuilder).script(script);
}
if (scriptEnabled) {
final Script script = new Script(ScriptType.INLINE, SCRIPT_LANG, NOOP_GENERATOR, Collections.emptyMap());
((AbstractBulkIndexByScrollRequestBuilder) requestBuilder).script(script);
}

final SearchRequestBuilder source = requestBuilder.source();
source.setSize(scrollSize);
source.addSort(SORTING_FIELD, SortOrder.DESC);
source.setQuery(QueryBuilders.matchAllQuery());
final ActionFuture<BulkByScrollResponse> updateByQueryResponse = requestBuilder.execute();
final SearchRequestBuilder source = requestBuilder.source();
source.setSize(scrollSize);
source.addSort(SORTING_FIELD, SortOrder.DESC);
source.setQuery(QueryBuilders.matchAllQuery());
final ActionFuture<BulkByScrollResponse> updateByQueryResponse = requestBuilder.execute();

assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(2)));
assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(2)));

// Allow tasks from the write thread to make progress
latch.countDown();
// Allow tasks from the write thread to make progress
latch.countDown();

final BulkResponse bulkItemResponses = bulkFuture.actionGet();
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
assertThat(Strings.toString(bulkItemResponses), bulkItemResponse.isFailed(), is(false));
}
final BulkResponse bulkItemResponses = bulkFuture.actionGet();
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
assertThat(Strings.toString(bulkItemResponses), bulkItemResponse.isFailed(), is(false));
}

final BulkByScrollResponse bulkByScrollResponse = updateByQueryResponse.actionGet();
assertThat(bulkByScrollResponse.getVersionConflicts(), lessThanOrEqualTo((long) conflictingOps));
// When scripts are enabled, the first maxDocs are a NoOp
final int candidateOps = scriptEnabled ? numDocs - maxDocs : numDocs;
int successfulOps = Math.min(candidateOps - conflictingOps, maxDocs);
assertThat(bulkByScrollResponse.getNoops(), is((long) (scriptEnabled ? maxDocs : 0)));
resultConsumer.accept(bulkByScrollResponse, successfulOps);
final BulkByScrollResponse bulkByScrollResponse = updateByQueryResponse.actionGet();
assertThat(bulkByScrollResponse.getVersionConflicts(), lessThanOrEqualTo((long) conflictingOps));
// When scripts are enabled, the first maxDocs are a NoOp
final int candidateOps = scriptEnabled ? numDocs - maxDocs : numDocs;
int successfulOps = Math.min(candidateOps - conflictingOps, maxDocs);
assertThat(bulkByScrollResponse.getNoops(), is((long) (scriptEnabled ? maxDocs : 0)));
resultConsumer.accept(bulkByScrollResponse, successfulOps);
}
}

private void createIndexWithSingleShard(String index) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public boolean hasReferences() {

@Override
public void close() {
if (true) new RuntimeException("who did this??").printStackTrace();
decRef();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -276,11 +277,11 @@ public void storeModel(Model model, ActionListener<Boolean> listener) {
false
);

client.prepareBulk()
.add(configRequest)
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
bulkRequestBuilder.add(configRequest)
.add(secretsRequest)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute(bulkResponseActionListener);
.execute(ActionListener.releaseAfter(bulkResponseActionListener, bulkRequestBuilder));
}

private static ActionListener<BulkResponse> getStoreModelListener(Model model, ActionListener<Boolean> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,22 +1026,23 @@ private AnalysisConfig.Builder createAnalysisConfig(String byFieldName, List<Str
}

private void indexScheduledEvents(List<ScheduledEvent> events) throws IOException {
BulkRequestBuilder bulkRequest = client().prepareBulk();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

for (ScheduledEvent event : events) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")
);
indexRequest.source(event.toXContent(builder, params));
bulkRequest.add(indexRequest);
try (BulkRequestBuilder bulkRequest = client().prepareBulk()) {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

for (ScheduledEvent event : events) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")
);
indexRequest.source(event.toXContent(builder, params));
bulkRequest.add(indexRequest);
}
}
BulkResponse response = bulkRequest.get();
if (response.hasFailures()) {
throw new IllegalStateException(Strings.toString(response));
}
}
BulkResponse response = bulkRequest.get();
if (response.hasFailures()) {
throw new IllegalStateException(Strings.toString(response));
}
}

Expand All @@ -1051,20 +1052,21 @@ private void indexDataCounts(DataCounts counts, String jobId) throws Interrupted
}

private void indexFilters(List<MlFilter> filters) throws IOException {
BulkRequestBuilder bulkRequest = client().prepareBulk();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

for (MlFilter filter : filters) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName()).id(filter.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")
);
indexRequest.source(filter.toXContent(builder, params));
bulkRequest.add(indexRequest);
try (BulkRequestBuilder bulkRequest = client().prepareBulk()) {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

for (MlFilter filter : filters) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName()).id(filter.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")
);
indexRequest.source(filter.toXContent(builder, params));
bulkRequest.add(indexRequest);
}
}
bulkRequest.get();
}
bulkRequest.get();
}

private void indexModelSizeStats(ModelSizeStats modelSizeStats) {
Expand Down Expand Up @@ -1101,19 +1103,20 @@ private void indexQuantiles(Quantiles quantiles) {
}

private void indexCalendars(List<Calendar> calendars) throws IOException {
BulkRequestBuilder bulkRequest = client().prepareBulk();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

for (Calendar calendar : calendars) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName()).id(calendar.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")
);
indexRequest.source(calendar.toXContent(builder, params));
bulkRequest.add(indexRequest);
try (BulkRequestBuilder bulkRequest = client().prepareBulk()) {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

for (Calendar calendar : calendars) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.indexName()).id(calendar.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")
);
indexRequest.source(calendar.toXContent(builder, params));
bulkRequest.add(indexRequest);
}
}
bulkRequest.get();
}
bulkRequest.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ public class TokenServiceTests extends ESTestCase {
.build();
private MockLicenseState licenseState;
private SecurityContext securityContext;
private BulkRequestBuilder bulkRequestBuilder;

@Before
public void setupClient() {
Expand All @@ -165,8 +164,7 @@ public void setupClient() {
return builder;
}).when(client).prepareGet(anyString(), anyString());
when(client.prepareIndex(any(String.class))).thenReturn(new IndexRequestBuilder(client));
bulkRequestBuilder = new BulkRequestBuilder(client); // closed in the test's cleanup() method
when(client.prepareBulk()).thenReturn(bulkRequestBuilder);
when(client.prepareBulk()).thenAnswer(invocation -> new BulkRequestBuilder(client));
when(client.prepareUpdate(any(String.class), any(String.class))).thenAnswer(inv -> {
final String index = (String) inv.getArguments()[0];
final String id = (String) inv.getArguments()[1];
Expand Down Expand Up @@ -235,13 +233,6 @@ public void setupClient() {
}
}

@After
public void cleanup() {
if (bulkRequestBuilder != null) {
bulkRequestBuilder.close();
}
}

private static DiscoveryNode addAnother7071DataNode(ClusterService clusterService) {
Version version;
TransportVersion transportVersion;
Expand Down

0 comments on commit 5244077

Please sign in to comment.