diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java index 39b1d711a9206..51497c99ba064 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java @@ -158,26 +158,27 @@ private Job.Builder createJob(String id, TimeValue bucketSpan, String function, private void writeData(Logger logger, String index, long numDocs, long start, long end) { int maxDelta = (int) (end - start - 1); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - for (int i = 0; i < numDocs; i++) { - IndexRequest indexRequest = new IndexRequest(index); - long timestamp = start + randomIntBetween(0, maxDelta); - assert timestamp >= start && timestamp < end; - indexRequest.source("time", timestamp, "value", i); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - if (bulkResponse.hasFailures()) { - int failures = 0; - for (BulkItemResponse itemResponse : bulkResponse) { - if (itemResponse.isFailed()) { - failures++; - logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + for (int i = 0; i < numDocs; i++) { + IndexRequest indexRequest = new IndexRequest(index); + long timestamp = start + randomIntBetween(0, maxDelta); + assert timestamp >= start && timestamp < end; + indexRequest.source("time", timestamp, "value", i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + if (bulkResponse.hasFailures()) { + int failures = 0; + for (BulkItemResponse itemResponse : bulkResponse) { + if (itemResponse.isFailed()) { + failures++; + logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); + } } + fail("Bulk response contained " + failures + " failures"); } - fail("Bulk response contained " + failures + " failures"); + logger.info("Indexed [{}] documents", numDocs); } - logger.info("Indexed [{}] documents", numDocs); } private Bucket getLatestFinalizedBucket(String jobId) { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CategorizationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CategorizationIT.java index 6d4feb7b88798..8c6acd9491a33 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CategorizationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/CategorizationIT.java @@ -67,39 +67,40 @@ public void setUpData() { nowMillis = System.currentTimeMillis(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - IndexRequest indexRequest = new IndexRequest(DATA_INDEX); - indexRequest.source("time", nowMillis - TimeValue.timeValueHours(2).millis(), "msg", "Node 1 started", "part", "nodes"); - bulkRequestBuilder.add(indexRequest); - indexRequest = new IndexRequest(DATA_INDEX); - indexRequest.source( - "time", - nowMillis - TimeValue.timeValueHours(2).millis() + 1, - "msg", - "Failed to shutdown [error org.aaaa.bbbb.Cccc line 54 caused by foo exception]", - "part", - "shutdowns" - ); - bulkRequestBuilder.add(indexRequest); - indexRequest = new IndexRequest(DATA_INDEX); - indexRequest.source("time", nowMillis - TimeValue.timeValueHours(1).millis(), "msg", "Node 2 started", "part", "nodes"); - bulkRequestBuilder.add(indexRequest); - indexRequest = new IndexRequest(DATA_INDEX); - indexRequest.source( - "time", - nowMillis - TimeValue.timeValueHours(1).millis() + 1, - "msg", - "Failed to shutdown [error but this time completely different]", - "part", - "shutdowns" - ); - bulkRequestBuilder.add(indexRequest); - indexRequest = new IndexRequest(DATA_INDEX); - indexRequest.source("time", nowMillis, "msg", "Node 3 started", "part", "nodes"); - bulkRequestBuilder.add(indexRequest); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + IndexRequest indexRequest = new IndexRequest(DATA_INDEX); + indexRequest.source("time", nowMillis - TimeValue.timeValueHours(2).millis(), "msg", "Node 1 started", "part", "nodes"); + bulkRequestBuilder.add(indexRequest); + indexRequest = new IndexRequest(DATA_INDEX); + indexRequest.source( + "time", + nowMillis - TimeValue.timeValueHours(2).millis() + 1, + "msg", + "Failed to shutdown [error org.aaaa.bbbb.Cccc line 54 caused by foo exception]", + "part", + "shutdowns" + ); + bulkRequestBuilder.add(indexRequest); + indexRequest = new IndexRequest(DATA_INDEX); + indexRequest.source("time", nowMillis - TimeValue.timeValueHours(1).millis(), "msg", "Node 2 started", "part", "nodes"); + bulkRequestBuilder.add(indexRequest); + indexRequest = new IndexRequest(DATA_INDEX); + indexRequest.source( + "time", + nowMillis - TimeValue.timeValueHours(1).millis() + 1, + "msg", + "Failed to shutdown [error but this time completely different]", + "part", + "shutdowns" + ); + bulkRequestBuilder.add(indexRequest); + indexRequest = new IndexRequest(DATA_INDEX); + indexRequest.source("time", nowMillis, "msg", "Node 3 started", "part", "nodes"); + bulkRequestBuilder.add(indexRequest); - BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); - assertThat(bulkResponse.hasFailures(), is(false)); + BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + assertThat(bulkResponse.hasFailures(), is(false)); + } } @After @@ -439,79 +440,80 @@ public void testNumMatchesAndCategoryPreference() throws Exception { nowMillis = System.currentTimeMillis(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - IndexRequest indexRequest = new IndexRequest(index); - indexRequest.source( - "time", - nowMillis - TimeValue.timeValueHours(8).millis(), - "msg", - "2015-10-18 18:01:51,963 INFO [main] org.mortbay.log: jetty-6.1.26" - ); - bulkRequestBuilder.add(indexRequest); - indexRequest = new IndexRequest(index); - indexRequest.source( - "time", - nowMillis - TimeValue.timeValueHours(7).millis(), - "msg", - "2015-10-18 18:01:52,728 INFO [main] org.mortbay.log: Started HttpServer2$SelectChannelConnectorWithSafeStartup@0.0.0.0:62267" - ); - bulkRequestBuilder.add(indexRequest); - indexRequest = new IndexRequest(index); - indexRequest.source( - "time", - nowMillis - TimeValue.timeValueHours(6).millis(), - "msg", - "2015-10-18 18:01:53,400 INFO [main] org.apache.hadoop.yarn.webapp.WebApps: Registered webapp guice modules" - ); - bulkRequestBuilder.add(indexRequest); - indexRequest = new IndexRequest(index); - indexRequest.source( - "time", - nowMillis - TimeValue.timeValueHours(5).millis(), - "msg", - "2015-10-18 18:01:53,447 INFO [main] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor: nodeBlacklistingEnabled:true" - ); - bulkRequestBuilder.add(indexRequest); - indexRequest = new IndexRequest(index); - indexRequest.source( - "time", - nowMillis - TimeValue.timeValueHours(4).millis(), - "msg", - "2015-10-18 18:01:52,728 INFO [main] org.apache.hadoop.yarn.webapp.WebApps: Web app /mapreduce started at 62267" - ); - bulkRequestBuilder.add(indexRequest); - indexRequest = new IndexRequest(index); - indexRequest.source( - "time", - nowMillis - TimeValue.timeValueHours(2).millis(), - "msg", - "2015-10-18 18:01:53,557 INFO [main] org.apache.hadoop.yarn.client.RMProxy: " - + "Connecting to ResourceManager at msra-sa-41/10.190.173.170:8030" - ); - bulkRequestBuilder.add(indexRequest); - - indexRequest = new IndexRequest(index); - indexRequest.source( - "time", - nowMillis - TimeValue.timeValueHours(1).millis(), - "msg", - "2015-10-18 18:01:53,713 INFO [main] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: " - + "maxContainerCapability: " - ); - bulkRequestBuilder.add(indexRequest); - - indexRequest = new IndexRequest(index); - indexRequest.source( - "time", - nowMillis, - "msg", - "2015-10-18 18:01:53,713 INFO [main] org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: " - + "yarn.client.max-cached-nodemanagers-proxies : 0" - ); - bulkRequestBuilder.add(indexRequest); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + IndexRequest indexRequest = new IndexRequest(index); + indexRequest.source( + "time", + nowMillis - TimeValue.timeValueHours(8).millis(), + "msg", + "2015-10-18 18:01:51,963 INFO [main] org.mortbay.log: jetty-6.1.26" + ); + bulkRequestBuilder.add(indexRequest); + indexRequest = new IndexRequest(index); + indexRequest.source( + "time", + nowMillis - TimeValue.timeValueHours(7).millis(), + "msg", + "2015-10-18 18:01:52,728 INFO [main] org.mortbay.log: Started HttpServer2$SelectChannelConnectorWithSafeStartup@0.0.0.0:62267" + ); + bulkRequestBuilder.add(indexRequest); + indexRequest = new IndexRequest(index); + indexRequest.source( + "time", + nowMillis - TimeValue.timeValueHours(6).millis(), + "msg", + "2015-10-18 18:01:53,400 INFO [main] org.apache.hadoop.yarn.webapp.WebApps: Registered webapp guice modules" + ); + bulkRequestBuilder.add(indexRequest); + indexRequest = new IndexRequest(index); + indexRequest.source( + "time", + nowMillis - TimeValue.timeValueHours(5).millis(), + "msg", + "2015-10-18 18:01:53,447 INFO [main] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor: nodeBlacklistingEnabled:true" + ); + bulkRequestBuilder.add(indexRequest); + indexRequest = new IndexRequest(index); + indexRequest.source( + "time", + nowMillis - TimeValue.timeValueHours(4).millis(), + "msg", + "2015-10-18 18:01:52,728 INFO [main] org.apache.hadoop.yarn.webapp.WebApps: Web app /mapreduce started at 62267" + ); + bulkRequestBuilder.add(indexRequest); + indexRequest = new IndexRequest(index); + indexRequest.source( + "time", + nowMillis - TimeValue.timeValueHours(2).millis(), + "msg", + "2015-10-18 18:01:53,557 INFO [main] org.apache.hadoop.yarn.client.RMProxy: " + + "Connecting to ResourceManager at msra-sa-41/10.190.173.170:8030" + ); + bulkRequestBuilder.add(indexRequest); + + indexRequest = new IndexRequest(index); + indexRequest.source( + "time", + nowMillis - TimeValue.timeValueHours(1).millis(), + "msg", + "2015-10-18 18:01:53,713 INFO [main] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: " + + "maxContainerCapability: " + ); + bulkRequestBuilder.add(indexRequest); + + indexRequest = new IndexRequest(index); + indexRequest.source( + "time", + nowMillis, + "msg", + "2015-10-18 18:01:53,713 INFO [main] org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: " + + "yarn.client.max-cached-nodemanagers-proxies : 0" + ); + bulkRequestBuilder.add(indexRequest); - BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - assertThat(bulkResponse.hasFailures(), is(false)); + BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + assertThat(bulkResponse.hasFailures(), is(false)); + } Job.Builder job = newJobBuilder("categorization-with-preferred-categories", Collections.emptyList(), false); putJob(job); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationEvaluationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationEvaluationIT.java index c42f2b0fcda6f..1b60f238aa55e 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationEvaluationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationEvaluationIT.java @@ -717,66 +717,68 @@ static void createAnimalsIndex(String indexName) { static void indexAnimalsData(String indexName) { List animalNames = List.of("dog", "cat", "mouse", "ant", "fox"); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < animalNames.size(); i++) { - for (int j = 0; j < animalNames.size(); j++) { - for (int k = 0; k < j + 1; k++) { - List topClasses = IntStream.range(0, 5).mapToObj(ix -> new HashMap() { - { - put("class_name", animalNames.get(ix)); - put("class_probability", 0.4 - 0.1 * ix); - } - }).collect(toList()); - bulkRequestBuilder.add( - new IndexRequest(indexName).source( - ANIMAL_NAME_KEYWORD_FIELD, - animalNames.get(i), - ANIMAL_NAME_PREDICTION_KEYWORD_FIELD, - animalNames.get((i + j) % animalNames.size()), - ANIMAL_NAME_PREDICTION_PROB_FIELD, - animalNames.get((i + j) % animalNames.size()), - NO_LEGS_KEYWORD_FIELD, - String.valueOf(i + 1), - NO_LEGS_INTEGER_FIELD, - i + 1, - NO_LEGS_PREDICTION_INTEGER_FIELD, - j + 1, - IS_PREDATOR_KEYWORD_FIELD, - String.valueOf(i % 2 == 0), - IS_PREDATOR_BOOLEAN_FIELD, - i % 2 == 0, - IS_PREDATOR_PREDICTION_BOOLEAN_FIELD, - (i + j) % 2 == 0, - IS_PREDATOR_PREDICTION_PROBABILITY_FIELD, - i % 2 == 0 ? 1.0 - 0.1 * i : 0.1 * i, - ML_TOP_CLASSES_FIELD, - topClasses - ) - ); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < animalNames.size(); i++) { + for (int j = 0; j < animalNames.size(); j++) { + for (int k = 0; k < j + 1; k++) { + List topClasses = IntStream.range(0, 5).mapToObj(ix -> new HashMap() { + { + put("class_name", animalNames.get(ix)); + put("class_probability", 0.4 - 0.1 * ix); + } + }).collect(toList()); + bulkRequestBuilder.add( + new IndexRequest(indexName).source( + ANIMAL_NAME_KEYWORD_FIELD, + animalNames.get(i), + ANIMAL_NAME_PREDICTION_KEYWORD_FIELD, + animalNames.get((i + j) % animalNames.size()), + ANIMAL_NAME_PREDICTION_PROB_FIELD, + animalNames.get((i + j) % animalNames.size()), + NO_LEGS_KEYWORD_FIELD, + String.valueOf(i + 1), + NO_LEGS_INTEGER_FIELD, + i + 1, + NO_LEGS_PREDICTION_INTEGER_FIELD, + j + 1, + IS_PREDATOR_KEYWORD_FIELD, + String.valueOf(i % 2 == 0), + IS_PREDATOR_BOOLEAN_FIELD, + i % 2 == 0, + IS_PREDATOR_PREDICTION_BOOLEAN_FIELD, + (i + j) % 2 == 0, + IS_PREDATOR_PREDICTION_PROBABILITY_FIELD, + i % 2 == 0 ? 1.0 - 0.1 * i : 0.1 * i, + ML_TOP_CLASSES_FIELD, + topClasses + ) + ); + } } } - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } } private static void indexDistinctAnimals(String indexName, int distinctAnimalCount) { - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < distinctAnimalCount; i++) { - bulkRequestBuilder.add( - new IndexRequest(indexName).source( - ANIMAL_NAME_KEYWORD_FIELD, - "animal_" + i, - ANIMAL_NAME_PREDICTION_KEYWORD_FIELD, - randomAlphaOfLength(5) - ) - ); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < distinctAnimalCount; i++) { + bulkRequestBuilder.add( + new IndexRequest(indexName).source( + ANIMAL_NAME_KEYWORD_FIELD, + "animal_" + i, + ANIMAL_NAME_PREDICTION_KEYWORD_FIELD, + randomAlphaOfLength(5) + ) + ); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationHousePricingIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationHousePricingIT.java index 610a492ef078f..f7a60e1a77ec5 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationHousePricingIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationHousePricingIT.java @@ -1600,33 +1600,34 @@ public void testFeatureImportanceValues() throws Exception { } static void indexData(String sourceIndex) { - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (String row : DATA) { - String[] values = row.split(","); - List source = List.of( - "1stFlrSF", - Integer.valueOf(values[0]), - "CentralAir", - values[1], - "Electrical", - values[2], - "GarageArea", - Integer.valueOf(values[3]), - "GarageCars", - Integer.valueOf(values[4]), - "Heating", - values[5], - "Neighborhood", - values[6], - "YearBuilt", - Integer.valueOf(values[7]) - ); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (String row : DATA) { + String[] values = row.split(","); + List source = List.of( + "1stFlrSF", + Integer.valueOf(values[0]), + "CentralAir", + values[1], + "Electrical", + values[2], + "GarageArea", + Integer.valueOf(values[3]), + "GarageCars", + Integer.valueOf(values[4]), + "Heating", + values[5], + "Neighborhood", + values[6], + "YearBuilt", + Integer.valueOf(values[7]) + ); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index b610ac60dab00..220ab5f3b8a97 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -596,14 +596,15 @@ public void testDependentVariableCardinalityTooHighError() throws Exception { indexData(sourceIndex, 6, 5, KEYWORD_FIELD); // Index enough documents to have more classes than the allowed limit - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < Classification.MAX_DEPENDENT_VARIABLE_CARDINALITY - 1; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(KEYWORD_FIELD, "fox-" + i); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < Classification.MAX_DEPENDENT_VARIABLE_CARDINALITY - 1; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(KEYWORD_FIELD, "fox-" + i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD)); @@ -619,14 +620,15 @@ public void testDependentVariableCardinalityTooHighButWithQueryMakesItWithinRang indexData(sourceIndex, 6, 5, KEYWORD_FIELD); // Index enough documents to have more classes than the allowed limit - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < Classification.MAX_DEPENDENT_VARIABLE_CARDINALITY - 1; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(KEYWORD_FIELD, "fox-" + i); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < Classification.MAX_DEPENDENT_VARIABLE_CARDINALITY - 1; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(KEYWORD_FIELD, "fox-" + i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } QueryBuilder query = QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery(KEYWORD_FIELD, KEYWORD_FIELD_VALUES)); @@ -1127,56 +1129,57 @@ static void createIndex(String index, boolean isDatastream) { } static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows, String dependentVariable) { - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < numTrainingRows; i++) { - List source = List.of( - "@timestamp", - "2020-12-12", - BOOLEAN_FIELD, - BOOLEAN_FIELD_VALUES.get(i % BOOLEAN_FIELD_VALUES.size()), - NUMERICAL_FIELD, - NUMERICAL_FIELD_VALUES.get(i % NUMERICAL_FIELD_VALUES.size()), - DISCRETE_NUMERICAL_FIELD, - DISCRETE_NUMERICAL_FIELD_VALUES.get(i % DISCRETE_NUMERICAL_FIELD_VALUES.size()), - TEXT_FIELD, - KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()), - KEYWORD_FIELD, - KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()), - NESTED_FIELD, - KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()) - ); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); - bulkRequestBuilder.add(indexRequest); - } - for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) { - List source = new ArrayList<>(); - if (BOOLEAN_FIELD.equals(dependentVariable) == false) { - source.addAll(List.of(BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES.get(i % BOOLEAN_FIELD_VALUES.size()))); - } - if (NUMERICAL_FIELD.equals(dependentVariable) == false) { - source.addAll(List.of(NUMERICAL_FIELD, NUMERICAL_FIELD_VALUES.get(i % NUMERICAL_FIELD_VALUES.size()))); - } - if (DISCRETE_NUMERICAL_FIELD.equals(dependentVariable) == false) { - source.addAll( - List.of(DISCRETE_NUMERICAL_FIELD, DISCRETE_NUMERICAL_FIELD_VALUES.get(i % DISCRETE_NUMERICAL_FIELD_VALUES.size())) + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < numTrainingRows; i++) { + List source = List.of( + "@timestamp", + "2020-12-12", + BOOLEAN_FIELD, + BOOLEAN_FIELD_VALUES.get(i % BOOLEAN_FIELD_VALUES.size()), + NUMERICAL_FIELD, + NUMERICAL_FIELD_VALUES.get(i % NUMERICAL_FIELD_VALUES.size()), + DISCRETE_NUMERICAL_FIELD, + DISCRETE_NUMERICAL_FIELD_VALUES.get(i % DISCRETE_NUMERICAL_FIELD_VALUES.size()), + TEXT_FIELD, + KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()), + KEYWORD_FIELD, + KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()), + NESTED_FIELD, + KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()) ); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); } - if (TEXT_FIELD.equals(dependentVariable) == false) { - source.addAll(List.of(TEXT_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); - } - if (KEYWORD_FIELD.equals(dependentVariable) == false) { - source.addAll(List.of(KEYWORD_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); + for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) { + List source = new ArrayList<>(); + if (BOOLEAN_FIELD.equals(dependentVariable) == false) { + source.addAll(List.of(BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES.get(i % BOOLEAN_FIELD_VALUES.size()))); + } + if (NUMERICAL_FIELD.equals(dependentVariable) == false) { + source.addAll(List.of(NUMERICAL_FIELD, NUMERICAL_FIELD_VALUES.get(i % NUMERICAL_FIELD_VALUES.size()))); + } + if (DISCRETE_NUMERICAL_FIELD.equals(dependentVariable) == false) { + source.addAll( + List.of(DISCRETE_NUMERICAL_FIELD, DISCRETE_NUMERICAL_FIELD_VALUES.get(i % DISCRETE_NUMERICAL_FIELD_VALUES.size())) + ); + } + if (TEXT_FIELD.equals(dependentVariable) == false) { + source.addAll(List.of(TEXT_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); + } + if (KEYWORD_FIELD.equals(dependentVariable) == false) { + source.addAll(List.of(KEYWORD_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); + } + if (NESTED_FIELD.equals(dependentVariable) == false) { + source.addAll(List.of(NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); + } + source.addAll(List.of("@timestamp", "2020-12-12")); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); } - if (NESTED_FIELD.equals(dependentVariable) == false) { - source.addAll(List.of(NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); } - source.addAll(List.of("@timestamp", "2020-12-12")); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java index 9773a4d3b3d82..7115703087a33 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java @@ -116,16 +116,17 @@ private void testDfWithAggs(AggregatorFactories.Builder aggs, Detector.Builder d long aMinuteAgo = now - TimeValue.timeValueMinutes(1).millis(); long aMinuteLater = now + TimeValue.timeValueMinutes(1).millis(); long curTime = aMinuteAgo; - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - while (curTime < aMinuteLater) { - IndexRequest indexRequest = new IndexRequest(dfId); - indexRequest.source("time", curTime, "field", randomFrom("foo", "bar", "baz")); - bulkRequestBuilder.add(indexRequest); - curTime += TimeValue.timeValueSeconds(1).millis(); - } - BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index docs: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + while (curTime < aMinuteLater) { + IndexRequest indexRequest = new IndexRequest(dfId); + indexRequest.source("time", curTime, "field", randomFrom("foo", "bar", "baz")); + bulkRequestBuilder.add(indexRequest); + curTime += TimeValue.timeValueSeconds(1).millis(); + } + BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index docs: " + bulkResponse.buildFailureMessage()); + } } // And start datafeed in real-time mode diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java index 2d527ac974723..226056b61f9af 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java @@ -220,26 +220,27 @@ private Job.Builder createJob(String id, TimeValue bucketSpan, String function, private void writeData(Logger logger, String index, long numDocs, long start, long end) { int maxDelta = (int) (end - start - 1); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - for (int i = 0; i < numDocs; i++) { - IndexRequest indexRequest = new IndexRequest(index); - long timestamp = start + randomIntBetween(0, maxDelta); - assert timestamp >= start && timestamp < end; - indexRequest.source("time", timestamp, "value", i); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - if (bulkResponse.hasFailures()) { - int failures = 0; - for (BulkItemResponse itemResponse : bulkResponse) { - if (itemResponse.isFailed()) { - failures++; - logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + for (int i = 0; i < numDocs; i++) { + IndexRequest indexRequest = new IndexRequest(index); + long timestamp = start + randomIntBetween(0, maxDelta); + assert timestamp >= start && timestamp < end; + indexRequest.source("time", timestamp, "value", i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + if (bulkResponse.hasFailures()) { + int failures = 0; + for (BulkItemResponse itemResponse : bulkResponse) { + if (itemResponse.isFailed()) { + failures++; + logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); + } } + fail("Bulk response contained " + failures + " failures"); } - fail("Bulk response contained " + failures + " failures"); + logger.info("Indexed [{}] documents", numDocs); } - logger.info("Indexed [{}] documents", numDocs); } private Bucket getLatestFinalizedBucket(String jobId) { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java index ecc601b0f1eae..ff0490e46bb7a 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java @@ -70,26 +70,27 @@ public void testSourceQueryIsApplied() throws IOException { ) .get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - - for (int i = 0; i < 30; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); - indexRequest.source( - "numeric_1", - 1.0, - "numeric_2", - 2, - "categorical", - i % 2 == 0 ? "class_1" : "class_2", - "filtered_field", - i < 2 ? "bingo" : "rest" - ); // We tag bingo on the first two docs to ensure we have 2 classes - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (int i = 0; i < 30; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + indexRequest.source( + "numeric_1", + 1.0, + "numeric_2", + 2, + "categorical", + i % 2 == 0 ? "class_1" : "class_2", + "filtered_field", + i < 2 ? "bingo" : "rest" + ); // We tag bingo on the first two docs to ensure we have 2 classes + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_source_query_is_applied"; @@ -242,15 +243,16 @@ public void testRuntimeFields() { } }"""; client().admin().indices().prepareCreate(sourceIndex).setMapping(mapping).get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 10; i++) { - Object[] source = new Object[] { "mapped_field", i }; - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source).opType(DocWriteRequest.OpType.CREATE); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < 10; i++) { + Object[] source = new Object[] { "mapped_field", i }; + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } Map configRuntimeField = new HashMap<>(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java index 11ab23bf665bd..8c29c30d1a2ea 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java @@ -246,18 +246,19 @@ private void persistModelSnapshotDoc(String jobId, String snapshotId, Date times private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) { assertThat(numDocs, greaterThan(0)); - BulkRequest bulkRequest = new BulkRequest(); - for (int i = 1; i <= numDocs; ++i) { - IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).id( - ModelState.documentId(jobId, snapshotId, i) - ) - // The exact contents of the model state doesn't matter - we are not going to try and restore it - .source(Collections.singletonMap("compressed", Collections.singletonList("foo"))) - .setRequireAlias(true); - bulkRequest.add(indexRequest); - } + try (BulkRequest bulkRequest = new BulkRequest()) { + for (int i = 1; i <= numDocs; ++i) { + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).id( + ModelState.documentId(jobId, snapshotId, i) + ) + // The exact contents of the model state doesn't matter - we are not going to try and restore it + .source(Collections.singletonMap("compressed", Collections.singletonList("foo"))) + .setRequireAlias(true); + bulkRequest.add(indexRequest); + } - BulkResponse bulkResponse = client().execute(BulkAction.INSTANCE, bulkRequest).actionGet(); - assertFalse(bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + BulkResponse bulkResponse = client().execute(BulkAction.INSTANCE, bulkRequest).actionGet(); + assertFalse(bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + } } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotSearchIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotSearchIT.java index 9852517ff0231..2005c6d1fee0f 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotSearchIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotSearchIT.java @@ -133,18 +133,19 @@ private void persistModelSnapshotDoc(String jobId, String snapshotId, Date times private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) { assertThat(numDocs, greaterThan(0)); - BulkRequest bulkRequest = new BulkRequest(); - for (int i = 1; i <= numDocs; ++i) { - IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).id( - ModelState.documentId(jobId, snapshotId, i) - ) - // The exact contents of the model state doesn't matter - we are not going to try and restore it - .source(Collections.singletonMap("compressed", Collections.singletonList("foo"))) - .setRequireAlias(true); - bulkRequest.add(indexRequest); + try (BulkRequest bulkRequest = new BulkRequest()) { + for (int i = 1; i <= numDocs; ++i) { + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).id( + ModelState.documentId(jobId, snapshotId, i) + ) + // The exact contents of the model state doesn't matter - we are not going to try and restore it + .source(Collections.singletonMap("compressed", Collections.singletonList("foo"))) + .setRequireAlias(true); + bulkRequest.add(indexRequest); + } + + BulkResponse bulkResponse = client().execute(BulkAction.INSTANCE, bulkRequest).actionGet(); + assertFalse(bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); } - - BulkResponse bulkResponse = client().execute(BulkAction.INSTANCE, bulkRequest).actionGet(); - assertFalse(bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java index bdc76e04ec4b5..fa3e828030727 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java @@ -38,33 +38,34 @@ public void testMissingFields() throws Exception { client().admin().indices().prepareCreate(sourceIndex).setMapping("numeric", "type=double", "categorical", "type=keyword").get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - - // 5 docs with valid numeric value and missing categorical field (which should be ignored as it's not analyzed) - for (int i = 0; i < 5; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); - indexRequest.source("numeric", 42.0); - bulkRequestBuilder.add(indexRequest); - } + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + // 5 docs with valid numeric value and missing categorical field (which should be ignored as it's not analyzed) + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + indexRequest.source("numeric", 42.0); + bulkRequestBuilder.add(indexRequest); + } - // Add a doc with missing field - { - IndexRequest missingIndexRequest = new IndexRequest(sourceIndex); - missingIndexRequest.source("categorical", "foo"); - bulkRequestBuilder.add(missingIndexRequest); - } + // Add a doc with missing field + { + IndexRequest missingIndexRequest = new IndexRequest(sourceIndex); + missingIndexRequest.source("categorical", "foo"); + bulkRequestBuilder.add(missingIndexRequest); + } - // Add a doc with numeric being array which is also treated as missing - { - IndexRequest arrayIndexRequest = new IndexRequest(sourceIndex); - arrayIndexRequest.source("numeric", new double[] { 1.0, 2.0 }, "categorical", "foo"); - bulkRequestBuilder.add(arrayIndexRequest); - } + // Add a doc with numeric being array which is also treated as missing + { + IndexRequest arrayIndexRequest = new IndexRequest(sourceIndex); + arrayIndexRequest.source("numeric", new double[] { 1.0, 2.0 }, "categorical", "foo"); + bulkRequestBuilder.add(arrayIndexRequest); + } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_outlier_detection_with_missing_fields"; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionEvaluationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionEvaluationIT.java index dcf98ee9b92f7..da8c5ef90b138 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionEvaluationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionEvaluationIT.java @@ -142,13 +142,14 @@ private static void createHousesIndex(String indexName) { } private static void indexHousesData(String indexName) { - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 100; i++) { - bulkRequestBuilder.add(new IndexRequest(indexName).source(PRICE_FIELD, 1000, PRICE_PREDICTION_FIELD, 0)); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < 100; i++) { + bulkRequestBuilder.add(new IndexRequest(indexName).source(PRICE_FIELD, 1000, PRICE_PREDICTION_FIELD, 0)); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index ed7cfad8bf195..1ba80ced6890d 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -574,15 +574,16 @@ public void testAliasFields() throws Exception { client().admin().indices().prepareCreate(sourceIndex).setMapping(mapping).get(); int totalDocCount = 300; - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < totalDocCount; i++) { - List source = List.of("field_1", i, "field_2", 2 * i); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < totalDocCount; i++) { + List source = List.of("field_1", i, "field_2", 2 * i); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } // Very infrequently this test may fail as the algorithm underestimates the @@ -917,36 +918,37 @@ static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainin client().admin().indices().prepareCreate(sourceIndex).setMapping(mapping).get(); } - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < numTrainingRows; i++) { - List source = List.of( - NUMERICAL_FEATURE_FIELD, - NUMERICAL_FEATURE_VALUES.get(i % NUMERICAL_FEATURE_VALUES.size()), - DISCRETE_NUMERICAL_FEATURE_FIELD, - DISCRETE_NUMERICAL_FEATURE_VALUES.get(i % DISCRETE_NUMERICAL_FEATURE_VALUES.size()), - DEPENDENT_VARIABLE_FIELD, - DEPENDENT_VARIABLE_VALUES.get(i % DEPENDENT_VARIABLE_VALUES.size()), - "@timestamp", - Instant.now().toEpochMilli() - ); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); - bulkRequestBuilder.add(indexRequest); - } - for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) { - List source = List.of( - NUMERICAL_FEATURE_FIELD, - NUMERICAL_FEATURE_VALUES.get(i % NUMERICAL_FEATURE_VALUES.size()), - DISCRETE_NUMERICAL_FEATURE_FIELD, - DISCRETE_NUMERICAL_FEATURE_VALUES.get(i % DISCRETE_NUMERICAL_FEATURE_VALUES.size()), - "@timestamp", - Instant.now().toEpochMilli() - ); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < numTrainingRows; i++) { + List source = List.of( + NUMERICAL_FEATURE_FIELD, + NUMERICAL_FEATURE_VALUES.get(i % NUMERICAL_FEATURE_VALUES.size()), + DISCRETE_NUMERICAL_FEATURE_FIELD, + DISCRETE_NUMERICAL_FEATURE_VALUES.get(i % DISCRETE_NUMERICAL_FEATURE_VALUES.size()), + DEPENDENT_VARIABLE_FIELD, + DEPENDENT_VARIABLE_VALUES.get(i % DEPENDENT_VARIABLE_VALUES.size()), + "@timestamp", + Instant.now().toEpochMilli() + ); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); + } + for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) { + List source = List.of( + NUMERICAL_FEATURE_FIELD, + NUMERICAL_FEATURE_VALUES.get(i % NUMERICAL_FEATURE_VALUES.size()), + DISCRETE_NUMERICAL_FEATURE_FIELD, + DISCRETE_NUMERICAL_FEATURE_VALUES.get(i % DISCRETE_NUMERICAL_FEATURE_VALUES.size()), + "@timestamp", + Instant.now().toEpochMilli() + ); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 2ab5ecb00aa00..8d073a233d5b0 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -65,21 +65,22 @@ public void testOutlierDetectionWithFewDocuments() throws Exception { .setMapping("numeric_1", "type=double", "numeric_2", "type=unsigned_long", "categorical_1", "type=keyword") .get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 5; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); - // We insert one odd value out of 5 for one feature - String docId = i == 0 ? "outlier" : "normal" + i; - indexRequest.id(docId); - indexRequest.source("numeric_1", i == 0 ? 100.0 : 1.0, "numeric_2", 1, "categorical_1", "foo_" + i); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + // We insert one odd value out of 5 for one feature + String docId = i == 0 ? "outlier" : "normal" + i; + indexRequest.id(docId); + indexRequest.source("numeric_1", i == 0 ? 100.0 : 1.0, "numeric_2", 1, "categorical_1", "foo_" + i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_outlier_detection_with_few_docs"; @@ -161,21 +162,22 @@ public void testPreview() throws Exception { .setMapping("numeric_1", "type=double", "numeric_2", "type=unsigned_long", "categorical_1", "type=keyword") .get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 5; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); - // We insert one odd value out of 5 for one feature - String docId = i == 0 ? "outlier" : "normal" + i; - indexRequest.id(docId); - indexRequest.source("numeric_1", i == 0 ? 100.0 : 1.0, "numeric_2", 1, "categorical_1", "foo_" + i); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + // We insert one odd value out of 5 for one feature + String docId = i == 0 ? "outlier" : "normal" + i; + indexRequest.id(docId); + indexRequest.source("numeric_1", i == 0 ? 100.0 : 1.0, "numeric_2", 1, "categorical_1", "foo_" + i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_outlier_detection_preview"; @@ -201,62 +203,63 @@ public void testOutlierDetectionWithEnoughDocumentsToScroll() throws Exception { .setMapping("numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword") .get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - - int docCount = randomIntBetween(1024, 2048); - for (int i = 0; i < docCount; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); - indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", randomAlphaOfLength(10)); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); - } - - String id = "test_outlier_detection_with_enough_docs_to_scroll"; - DataFrameAnalyticsConfig config = buildAnalytics( - id, - sourceIndex, - sourceIndex + "-results", - "custom_ml", - new OutlierDetection.Builder().build() - ); - putAnalytics(config); - - assertIsStopped(id); - assertProgressIsZero(id); - - startAnalytics(id); - waitUntilAnalyticsIsStopped(id); - - // Check we've got all docs - assertHitCount(prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true), docCount); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - // Check they all have an outlier_score - assertHitCount( - prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true) - .setQuery(QueryBuilders.existsQuery("custom_ml.outlier_score")), - docCount - ); + int docCount = randomIntBetween(1024, 2048); + for (int i = 0; i < docCount; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", randomAlphaOfLength(10)); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } - assertProgressComplete(id); - assertStoredProgressHits(id, 1); - assertThatAuditMessagesMatch( - id, - "Created analytics with type [outlier_detection]", - "Estimated memory usage [", - "Starting analytics on node", - "Started analytics", - "Creating destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", - "Started reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", - "Finished reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", - "Started loading data", - "Started analyzing", - "Started writing results", - "Finished analysis" - ); + String id = "test_outlier_detection_with_enough_docs_to_scroll"; + DataFrameAnalyticsConfig config = buildAnalytics( + id, + sourceIndex, + sourceIndex + "-results", + "custom_ml", + new OutlierDetection.Builder().build() + ); + putAnalytics(config); + + assertIsStopped(id); + assertProgressIsZero(id); + + startAnalytics(id); + waitUntilAnalyticsIsStopped(id); + + // Check we've got all docs + assertHitCount(prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true), docCount); + + // Check they all have an outlier_score + assertHitCount( + prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true) + .setQuery(QueryBuilders.existsQuery("custom_ml.outlier_score")), + docCount + ); + + assertProgressComplete(id); + assertStoredProgressHits(id, 1); + assertThatAuditMessagesMatch( + id, + "Created analytics with type [outlier_detection]", + "Estimated memory usage [", + "Starting analytics on node", + "Started analytics", + "Creating destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", + "Started reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", + "Finished reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis" + ); + } } public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Exception { @@ -274,27 +277,28 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex docValueLimitSetting.getIndexToSettings().values().iterator().next() ); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 100; i++) { - StringBuilder source = new StringBuilder("{"); - for (int fieldCount = 0; fieldCount < docValueLimit + 1; fieldCount++) { - source.append("\"field_").append(fieldCount).append("\":").append(randomDouble()); - if (fieldCount < docValueLimit) { - source.append(","); + StringBuilder source = new StringBuilder("{"); + for (int fieldCount = 0; fieldCount < docValueLimit + 1; fieldCount++) { + source.append("\"field_").append(fieldCount).append("\":").append(randomDouble()); + if (fieldCount < docValueLimit) { + source.append(","); + } } - } - source.append("}"); + source.append("}"); - IndexRequest indexRequest = new IndexRequest(sourceIndex); - indexRequest.source(source.toString(), XContentType.JSON); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + IndexRequest indexRequest = new IndexRequest(sourceIndex); + indexRequest.source(source.toString(), XContentType.JSON); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_outlier_detection_with_more_fields_than_docvalue_limit"; @@ -359,18 +363,19 @@ public void testStopOutlierDetectionWithEnoughDocumentsToScroll() throws Excepti .setMapping("numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword") .get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - int docCount = randomIntBetween(1024, 2048); - for (int i = 0; i < docCount; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); - indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", randomAlphaOfLength(10)); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (int i = 0; i < docCount; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", randomAlphaOfLength(10)); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_stop_outlier_detection_with_enough_docs_to_scroll"; @@ -431,60 +436,61 @@ public void testOutlierDetectionWithMultipleSourceIndices() throws Exception { .setMapping("numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword") .get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (String index : sourceIndex) { - for (int i = 0; i < 5; i++) { - IndexRequest indexRequest = new IndexRequest(index); - indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", "foo_" + i); - bulkRequestBuilder.add(indexRequest); + for (String index : sourceIndex) { + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(index); + indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", "foo_" + i); + bulkRequestBuilder.add(indexRequest); + } + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); } - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); - } - - String id = "test_outlier_detection_with_multiple_source_indices"; - DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder().setId(id) - .setSource(new DataFrameAnalyticsSource(sourceIndex, null, null, null)) - .setDest(new DataFrameAnalyticsDest(destIndex, null)) - .setAnalysis(new OutlierDetection.Builder().build()) - .build(); - putAnalytics(config); - - assertIsStopped(id); - assertProgressIsZero(id); - - startAnalytics(id); - waitUntilAnalyticsIsStopped(id); - - // Check we've got all docs - assertHitCount(prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true), bulkRequestBuilder.numberOfActions()); - - // Check they all have an outlier_score - assertHitCount( - prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true).setQuery(QueryBuilders.existsQuery("ml.outlier_score")), - bulkRequestBuilder.numberOfActions() - ); - assertProgressComplete(id); - assertStoredProgressHits(id, 1); - assertThatAuditMessagesMatch( - id, - "Created analytics with type [outlier_detection]", - "Estimated memory usage [", - "Starting analytics on node", - "Started analytics", - "Creating destination index [test-outlier-detection-with-multiple-source-indices-results]", - "Started reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]", - "Finished reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]", - "Started loading data", - "Started analyzing", - "Started writing results", - "Finished analysis" - ); + String id = "test_outlier_detection_with_multiple_source_indices"; + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder().setId(id) + .setSource(new DataFrameAnalyticsSource(sourceIndex, null, null, null)) + .setDest(new DataFrameAnalyticsDest(destIndex, null)) + .setAnalysis(new OutlierDetection.Builder().build()) + .build(); + putAnalytics(config); + + assertIsStopped(id); + assertProgressIsZero(id); + + startAnalytics(id); + waitUntilAnalyticsIsStopped(id); + + // Check we've got all docs + assertHitCount(prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true), bulkRequestBuilder.numberOfActions()); + + // Check they all have an outlier_score + assertHitCount( + prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true).setQuery(QueryBuilders.existsQuery("ml.outlier_score")), + bulkRequestBuilder.numberOfActions() + ); + + assertProgressComplete(id); + assertStoredProgressHits(id, 1); + assertThatAuditMessagesMatch( + id, + "Created analytics with type [outlier_detection]", + "Estimated memory usage [", + "Starting analytics on node", + "Started analytics", + "Creating destination index [test-outlier-detection-with-multiple-source-indices-results]", + "Started reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]", + "Finished reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis" + ); + } } public void testOutlierDetectionWithPreExistingDestIndex() throws Exception { @@ -499,53 +505,54 @@ public void testOutlierDetectionWithPreExistingDestIndex() throws Exception { .setMapping("numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword") .get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - - for (int i = 0; i < 5; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); - indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", "foo_" + i); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); - } - - String id = "test_outlier_detection_with_pre_existing_dest_index"; - DataFrameAnalyticsConfig config = buildAnalytics(id, sourceIndex, destIndex, null, new OutlierDetection.Builder().build()); - putAnalytics(config); - - assertIsStopped(id); - assertProgressIsZero(id); - - startAnalytics(id); - waitUntilAnalyticsIsStopped(id); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - // Check we've got all docs - assertHitCount(prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true), bulkRequestBuilder.numberOfActions()); - // Check they all have an outlier_score - assertHitCount( - prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true).setQuery(QueryBuilders.existsQuery("ml.outlier_score")), - bulkRequestBuilder.numberOfActions() - ); + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", "foo_" + i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } - assertProgressComplete(id); - assertStoredProgressHits(id, 1); - assertThatAuditMessagesMatch( - id, - "Created analytics with type [outlier_detection]", - "Estimated memory usage [", - "Starting analytics on node", - "Started analytics", - "Using existing destination index [test-outlier-detection-with-pre-existing-dest-index-results]", - "Started reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]", - "Finished reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]", - "Started loading data", - "Started analyzing", - "Started writing results", - "Finished analysis" - ); + String id = "test_outlier_detection_with_pre_existing_dest_index"; + DataFrameAnalyticsConfig config = buildAnalytics(id, sourceIndex, destIndex, null, new OutlierDetection.Builder().build()); + putAnalytics(config); + + assertIsStopped(id); + assertProgressIsZero(id); + + startAnalytics(id); + waitUntilAnalyticsIsStopped(id); + + // Check we've got all docs + assertHitCount(prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true), bulkRequestBuilder.numberOfActions()); + // Check they all have an outlier_score + assertHitCount( + prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true).setQuery(QueryBuilders.existsQuery("ml.outlier_score")), + bulkRequestBuilder.numberOfActions() + ); + + assertProgressComplete(id); + assertStoredProgressHits(id, 1); + assertThatAuditMessagesMatch( + id, + "Created analytics with type [outlier_detection]", + "Estimated memory usage [", + "Starting analytics on node", + "Started analytics", + "Using existing destination index [test-outlier-detection-with-pre-existing-dest-index-results]", + "Started reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]", + "Finished reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis" + ); + } } public void testModelMemoryLimitLowerThanEstimatedMemoryUsage() throws Exception { @@ -553,14 +560,15 @@ public void testModelMemoryLimitLowerThanEstimatedMemoryUsage() throws Exception indicesAdmin().prepareCreate(sourceIndex).setMapping("col_1", "type=double", "col_2", "type=float", "col_3", "type=keyword").get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 10000; i++) { // This number of rows should make memory usage estimate greater than 1MB - IndexRequest indexRequest = new IndexRequest(sourceIndex).id("doc_" + i).source("col_1", 1.0, "col_2", 1.0, "col_3", "str"); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < 10000; i++) { // This number of rows should make memory usage estimate greater than 1MB + IndexRequest indexRequest = new IndexRequest(sourceIndex).id("doc_" + i).source("col_1", 1.0, "col_2", 1.0, "col_3", "str"); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_model_memory_limit_lower_than_estimated_memory_usage"; @@ -586,12 +594,13 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws indicesAdmin().prepareCreate(sourceIndex).setMapping("col_1", "type=double", "col_2", "type=float", "col_3", "type=keyword").get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - IndexRequest indexRequest = new IndexRequest(sourceIndex).id("doc_1").source("col_1", 1.0, "col_2", 1.0, "col_3", "str"); - bulkRequestBuilder.add(indexRequest); - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + IndexRequest indexRequest = new IndexRequest(sourceIndex).id("doc_1").source("col_1", 1.0, "col_2", 1.0, "col_3", "str"); + bulkRequestBuilder.add(indexRequest); + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_lazy_assign_model_memory_limit_too_high"; @@ -639,18 +648,19 @@ public void testOutlierDetectionStopAndRestart() throws Exception { .setMapping("numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword") .get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - int docCount = randomIntBetween(1024, 2048); - for (int i = 0; i < docCount; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); - indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", randomAlphaOfLength(10)); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (int i = 0; i < docCount; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", randomAlphaOfLength(10)); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_outlier_detection_stop_and_restart"; @@ -707,21 +717,22 @@ public void testOutlierDetectionWithCustomParams() throws Exception { .setMapping("numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword") .get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 5; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); - // We insert one odd value out of 5 for one feature - String docId = i == 0 ? "outlier" : "normal" + i; - indexRequest.id(docId); - indexRequest.source("numeric_1", i == 0 ? 100.0 : 1.0, "numeric_2", 1.0, "categorical_1", "foo_" + i); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + // We insert one odd value out of 5 for one feature + String docId = i == 0 ? "outlier" : "normal" + i; + indexRequest.id(docId); + indexRequest.source("numeric_1", i == 0 ? 100.0 : 1.0, "numeric_2", 1.0, "categorical_1", "foo_" + i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_outlier_detection_with_custom_params"; @@ -818,21 +829,22 @@ public void testOutlierDetection_GivenIndexWithRuntimeFields() throws Exception client().admin().indices().prepareCreate(sourceIndex).setMapping(mappings).get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 5; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); - // We insert one odd value out of 5 for one feature - String docId = i == 0 ? "outlier" : "normal" + i; - indexRequest.id(docId); - indexRequest.source("numeric", i == 0 ? 100.0 : 1.0); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + // We insert one odd value out of 5 for one feature + String docId = i == 0 ? "outlier" : "normal" + i; + indexRequest.id(docId); + indexRequest.source("numeric", i == 0 ? 100.0 : 1.0); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_outlier_detection_with_index_with_runtime_mappings"; @@ -919,21 +931,22 @@ public void testOutlierDetection_GivenSearchRuntimeMappings() throws Exception { client().admin().indices().prepareCreate(sourceIndex).setMapping(mappings).get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 5; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); - // We insert one odd value out of 5 for one feature - String docId = i == 0 ? "outlier" : "normal" + i; - indexRequest.id(docId); - indexRequest.source("numeric", i == 0 ? 100.0 : 1.0); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + // We insert one odd value out of 5 for one feature + String docId = i == 0 ? "outlier" : "normal" + i; + indexRequest.id(docId); + indexRequest.source("numeric", i == 0 ? 100.0 : 1.0); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test_outlier_detection_index_with_search_runtime_fields"; @@ -1023,18 +1036,19 @@ public void testStart_GivenTimeout_Returns408() throws Exception { client().admin().indices().prepareCreate(sourceIndex).setMapping("numeric_1", "type=integer", "numeric_2", "type=integer").get(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 5; i++) { - IndexRequest indexRequest = new IndexRequest(sourceIndex); - indexRequest.id(String.valueOf(i)); - indexRequest.source("numeric_1", randomInt(), "numeric_2", randomInt()); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + indexRequest.id(String.valueOf(i)); + indexRequest.source("numeric_1", randomInt(), "numeric_2", randomInt()); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } } String id = "test-timeout-returns-408";