Skip to content

Commit

Permalink
fixing ml tests
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 6, 2023
1 parent 0d7495b commit f143de6
Show file tree
Hide file tree
Showing 14 changed files with 726 additions and 693 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,26 +158,27 @@ private Job.Builder createJob(String id, TimeValue bucketSpan, String function,

private void writeData(Logger logger, String index, long numDocs, long start, long end) {
int maxDelta = (int) (end - start - 1);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest(index);
long timestamp = start + randomIntBetween(0, maxDelta);
assert timestamp >= start && timestamp < end;
indexRequest.source("time", timestamp, "value", i);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
if (bulkResponse.hasFailures()) {
int failures = 0;
for (BulkItemResponse itemResponse : bulkResponse) {
if (itemResponse.isFailed()) {
failures++;
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest(index);
long timestamp = start + randomIntBetween(0, maxDelta);
assert timestamp >= start && timestamp < end;
indexRequest.source("time", timestamp, "value", i);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
if (bulkResponse.hasFailures()) {
int failures = 0;
for (BulkItemResponse itemResponse : bulkResponse) {
if (itemResponse.isFailed()) {
failures++;
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
}
}
fail("Bulk response contained " + failures + " failures");
}
fail("Bulk response contained " + failures + " failures");
logger.info("Indexed [{}] documents", numDocs);
}
logger.info("Indexed [{}] documents", numDocs);
}

private Bucket getLatestFinalizedBucket(String jobId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,39 +67,40 @@ public void setUpData() {

nowMillis = System.currentTimeMillis();

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis - TimeValue.timeValueHours(2).millis(), "msg", "Node 1 started", "part", "nodes");
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(2).millis() + 1,
"msg",
"Failed to shutdown [error org.aaaa.bbbb.Cccc line 54 caused by foo exception]",
"part",
"shutdowns"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis - TimeValue.timeValueHours(1).millis(), "msg", "Node 2 started", "part", "nodes");
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(1).millis() + 1,
"msg",
"Failed to shutdown [error but this time completely different]",
"part",
"shutdowns"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis, "msg", "Node 3 started", "part", "nodes");
bulkRequestBuilder.add(indexRequest);
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis - TimeValue.timeValueHours(2).millis(), "msg", "Node 1 started", "part", "nodes");
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(2).millis() + 1,
"msg",
"Failed to shutdown [error org.aaaa.bbbb.Cccc line 54 caused by foo exception]",
"part",
"shutdowns"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis - TimeValue.timeValueHours(1).millis(), "msg", "Node 2 started", "part", "nodes");
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(1).millis() + 1,
"msg",
"Failed to shutdown [error but this time completely different]",
"part",
"shutdowns"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis, "msg", "Node 3 started", "part", "nodes");
bulkRequestBuilder.add(indexRequest);

BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
assertThat(bulkResponse.hasFailures(), is(false));
BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
assertThat(bulkResponse.hasFailures(), is(false));
}
}

@After
Expand Down Expand Up @@ -439,79 +440,80 @@ public void testNumMatchesAndCategoryPreference() throws Exception {

nowMillis = System.currentTimeMillis();

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(8).millis(),
"msg",
"2015-10-18 18:01:51,963 INFO [main] org.mortbay.log: jetty-6.1.26"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(7).millis(),
"msg",
"2015-10-18 18:01:52,728 INFO [main] org.mortbay.log: Started HttpServer2$SelectChannelConnectorWithSafeStartup@0.0.0.0:62267"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(6).millis(),
"msg",
"2015-10-18 18:01:53,400 INFO [main] org.apache.hadoop.yarn.webapp.WebApps: Registered webapp guice modules"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(5).millis(),
"msg",
"2015-10-18 18:01:53,447 INFO [main] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor: nodeBlacklistingEnabled:true"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(4).millis(),
"msg",
"2015-10-18 18:01:52,728 INFO [main] org.apache.hadoop.yarn.webapp.WebApps: Web app /mapreduce started at 62267"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(2).millis(),
"msg",
"2015-10-18 18:01:53,557 INFO [main] org.apache.hadoop.yarn.client.RMProxy: "
+ "Connecting to ResourceManager at msra-sa-41/10.190.173.170:8030"
);
bulkRequestBuilder.add(indexRequest);

indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(1).millis(),
"msg",
"2015-10-18 18:01:53,713 INFO [main] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: "
+ "maxContainerCapability: <memory:8192, vCores:32>"
);
bulkRequestBuilder.add(indexRequest);

indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis,
"msg",
"2015-10-18 18:01:53,713 INFO [main] org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: "
+ "yarn.client.max-cached-nodemanagers-proxies : 0"
);
bulkRequestBuilder.add(indexRequest);
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(8).millis(),
"msg",
"2015-10-18 18:01:51,963 INFO [main] org.mortbay.log: jetty-6.1.26"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(7).millis(),
"msg",
"2015-10-18 18:01:52,728 INFO [main] org.mortbay.log: Started HttpServer2$SelectChannelConnectorWithSafeStartup@0.0.0.0:62267"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(6).millis(),
"msg",
"2015-10-18 18:01:53,400 INFO [main] org.apache.hadoop.yarn.webapp.WebApps: Registered webapp guice modules"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(5).millis(),
"msg",
"2015-10-18 18:01:53,447 INFO [main] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor: nodeBlacklistingEnabled:true"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(4).millis(),
"msg",
"2015-10-18 18:01:52,728 INFO [main] org.apache.hadoop.yarn.webapp.WebApps: Web app /mapreduce started at 62267"
);
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(2).millis(),
"msg",
"2015-10-18 18:01:53,557 INFO [main] org.apache.hadoop.yarn.client.RMProxy: "
+ "Connecting to ResourceManager at msra-sa-41/10.190.173.170:8030"
);
bulkRequestBuilder.add(indexRequest);

indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis - TimeValue.timeValueHours(1).millis(),
"msg",
"2015-10-18 18:01:53,713 INFO [main] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: "
+ "maxContainerCapability: <memory:8192, vCores:32>"
);
bulkRequestBuilder.add(indexRequest);

indexRequest = new IndexRequest(index);
indexRequest.source(
"time",
nowMillis,
"msg",
"2015-10-18 18:01:53,713 INFO [main] org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy: "
+ "yarn.client.max-cached-nodemanagers-proxies : 0"
);
bulkRequestBuilder.add(indexRequest);

BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertThat(bulkResponse.hasFailures(), is(false));
BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertThat(bulkResponse.hasFailures(), is(false));
}

Job.Builder job = newJobBuilder("categorization-with-preferred-categories", Collections.emptyList(), false);
putJob(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,66 +717,68 @@ static void createAnimalsIndex(String indexName) {

static void indexAnimalsData(String indexName) {
List<String> animalNames = List.of("dog", "cat", "mouse", "ant", "fox");
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < animalNames.size(); i++) {
for (int j = 0; j < animalNames.size(); j++) {
for (int k = 0; k < j + 1; k++) {
List<?> topClasses = IntStream.range(0, 5).mapToObj(ix -> new HashMap<String, Object>() {
{
put("class_name", animalNames.get(ix));
put("class_probability", 0.4 - 0.1 * ix);
}
}).collect(toList());
bulkRequestBuilder.add(
new IndexRequest(indexName).source(
ANIMAL_NAME_KEYWORD_FIELD,
animalNames.get(i),
ANIMAL_NAME_PREDICTION_KEYWORD_FIELD,
animalNames.get((i + j) % animalNames.size()),
ANIMAL_NAME_PREDICTION_PROB_FIELD,
animalNames.get((i + j) % animalNames.size()),
NO_LEGS_KEYWORD_FIELD,
String.valueOf(i + 1),
NO_LEGS_INTEGER_FIELD,
i + 1,
NO_LEGS_PREDICTION_INTEGER_FIELD,
j + 1,
IS_PREDATOR_KEYWORD_FIELD,
String.valueOf(i % 2 == 0),
IS_PREDATOR_BOOLEAN_FIELD,
i % 2 == 0,
IS_PREDATOR_PREDICTION_BOOLEAN_FIELD,
(i + j) % 2 == 0,
IS_PREDATOR_PREDICTION_PROBABILITY_FIELD,
i % 2 == 0 ? 1.0 - 0.1 * i : 0.1 * i,
ML_TOP_CLASSES_FIELD,
topClasses
)
);
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) {
for (int i = 0; i < animalNames.size(); i++) {
for (int j = 0; j < animalNames.size(); j++) {
for (int k = 0; k < j + 1; k++) {
List<?> topClasses = IntStream.range(0, 5).mapToObj(ix -> new HashMap<String, Object>() {
{
put("class_name", animalNames.get(ix));
put("class_probability", 0.4 - 0.1 * ix);
}
}).collect(toList());
bulkRequestBuilder.add(
new IndexRequest(indexName).source(
ANIMAL_NAME_KEYWORD_FIELD,
animalNames.get(i),
ANIMAL_NAME_PREDICTION_KEYWORD_FIELD,
animalNames.get((i + j) % animalNames.size()),
ANIMAL_NAME_PREDICTION_PROB_FIELD,
animalNames.get((i + j) % animalNames.size()),
NO_LEGS_KEYWORD_FIELD,
String.valueOf(i + 1),
NO_LEGS_INTEGER_FIELD,
i + 1,
NO_LEGS_PREDICTION_INTEGER_FIELD,
j + 1,
IS_PREDATOR_KEYWORD_FIELD,
String.valueOf(i % 2 == 0),
IS_PREDATOR_BOOLEAN_FIELD,
i % 2 == 0,
IS_PREDATOR_PREDICTION_BOOLEAN_FIELD,
(i + j) % 2 == 0,
IS_PREDATOR_PREDICTION_PROBABILITY_FIELD,
i % 2 == 0 ? 1.0 - 0.1 * i : 0.1 * i,
ML_TOP_CLASSES_FIELD,
topClasses
)
);
}
}
}
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
}
}

private static void indexDistinctAnimals(String indexName, int distinctAnimalCount) {
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < distinctAnimalCount; i++) {
bulkRequestBuilder.add(
new IndexRequest(indexName).source(
ANIMAL_NAME_KEYWORD_FIELD,
"animal_" + i,
ANIMAL_NAME_PREDICTION_KEYWORD_FIELD,
randomAlphaOfLength(5)
)
);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) {
for (int i = 0; i < distinctAnimalCount; i++) {
bulkRequestBuilder.add(
new IndexRequest(indexName).source(
ANIMAL_NAME_KEYWORD_FIELD,
"animal_" + i,
ANIMAL_NAME_PREDICTION_KEYWORD_FIELD,
randomAlphaOfLength(5)
)
);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
}
}

Expand Down
Loading

0 comments on commit f143de6

Please sign in to comment.