Skip to content

Commit

Permalink
fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 1, 2023
1 parent 2b42354 commit 249220b
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,16 @@ private void testCase(
// Not all test cases use the dest index but those that do require that it be on the node will small thread pools
indicesAdmin().prepareCreate("dest").setSettings(indexSettings).get();
// Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools.
BulkRequestBuilder bulk = client().prepareBulk();
for (int i = 0; i < DOC_COUNT; i++) {
bulk.add(prepareIndex("source").setSource("foo", "bar " + i));
}
try (BulkRequestBuilder bulk = client().prepareBulk()) {
for (int i = 0; i < DOC_COUNT; i++) {
bulk.add(prepareIndex("source").setSource("foo", "bar " + i));
}

Retry retry = new Retry(BackoffPolicy.exponentialBackoff(), client().threadPool());
BulkResponse initialBulkResponse = retry.withBackoff(client()::bulk, bulk.request()).actionGet();
assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures());
indicesAdmin().prepareRefresh("source").get();
Retry retry = new Retry(BackoffPolicy.exponentialBackoff(), client().threadPool());
BulkResponse initialBulkResponse = retry.withBackoff(client()::bulk, bulk.request()).actionGet();
assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures());
indicesAdmin().prepareRefresh("source").get();
}

AbstractBulkByScrollRequestBuilder<?, ?> builder = request.apply(internalCluster().masterClient());
// Make sure we use more than one batch so we have to scroll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public static void afterClass() {

public void testDefaultConstructor() throws Exception {
BulkProcessorFactory factory = new BulkProcessorFactory(mock(Client.class), mock(AnalyticsEventIngestConfig.class));
assertThat(factory.create(), instanceOf(BulkProcessor2.class));
try (BulkProcessor2 bulkProcessor = factory.create()) {
assertThat(bulkProcessor, instanceOf(BulkProcessor2.class));
}
}

public void testConfigValueAreUsed() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,12 @@ public void testFromStatsGroupingByDate() {

public void testFromGroupingByNumericFieldWithNulls() {
for (int i = 0; i < 5; i++) {
client().prepareBulk()
.add(new IndexRequest("test").id("no_count_old_" + i).source("data", between(1, 2), "data_d", 1d))
.add(new IndexRequest("test").id("no_count_new_" + i).source("data", 99, "data_d", 1d))
.add(new IndexRequest("test").id("no_data_" + i).source("count", 12, "count_d", 12d))
.get();
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
bulkRequestBuilder.add(new IndexRequest("test").id("no_count_old_" + i).source("data", between(1, 2), "data_d", 1d))
.add(new IndexRequest("test").id("no_count_new_" + i).source("data", 99, "data_d", 1d))
.add(new IndexRequest("test").id("no_data_" + i).source("count", 12, "count_d", 12d))
.get();
}
if (randomBoolean()) {
client().admin().indices().prepareRefresh("test").get();
}
Expand Down Expand Up @@ -266,11 +267,14 @@ record Group(String color, double avg) {

public void testFromStatsGroupingByKeywordWithNulls() {
for (int i = 0; i < 5; i++) {
client().prepareBulk()
.add(new IndexRequest("test").id("no_color_" + i).source("data", 12, "count", 120, "data_d", 2d, "count_d", 120d))
.add(new IndexRequest("test").id("no_count_red_" + i).source("data", 2, "data_d", 2d, "color", "red"))
.add(new IndexRequest("test").id("no_count_yellow_" + i).source("data", 2, "data_d", 2d, "color", "yellow"))
.get();
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
bulkRequestBuilder.add(
new IndexRequest("test").id("no_color_" + i).source("data", 12, "count", 120, "data_d", 2d, "count_d", 120d)
)
.add(new IndexRequest("test").id("no_count_red_" + i).source("data", 2, "data_d", 2d, "color", "red"))
.add(new IndexRequest("test").id("no_count_yellow_" + i).source("data", 2, "data_d", 2d, "color", "yellow"))
.get();
}
if (randomBoolean()) {
client().admin().indices().prepareRefresh("test").get();
}
Expand Down Expand Up @@ -584,7 +588,9 @@ public void testStringLength() {

public void testFilterWithNullAndEvalFromIndex() {
// append entry, with an absent count, to the index
client().prepareBulk().add(new IndexRequest("test").id("no_count").source("data", 12, "data_d", 2d, "color", "red")).get();
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
bulkRequestBuilder.add(new IndexRequest("test").id("no_count").source("data", 12, "data_d", 2d, "color", "red")).get();
}

client().admin().indices().prepareRefresh("test").get();
// sanity
Expand Down Expand Up @@ -731,7 +737,9 @@ public void testRefreshSearchIdleShards() throws Exception {
long sum = 0;
for (int i = 0; i < numDocs; i++) {
long value = randomLongBetween(1, 1000);
client().prepareBulk().add(new IndexRequest(indexName).id("doc-" + i).source("data", 1, "value", value)).get();
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
bulkRequestBuilder.add(new IndexRequest(indexName).id("doc-" + i).source("data", 1, "value", value)).get();
}
sum += value;
}
totalValues.set(sum);
Expand Down Expand Up @@ -898,14 +906,15 @@ public void testIndexPatterns() throws Exception {
.setMapping("data", "type=long", "count", "type=long")
);
ensureYellow(indexName);
client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest(indexName).id("1").source("data", ++i, "count", i * 1000))
.add(new IndexRequest(indexName).id("2").source("data", ++i, "count", i * 1000))
.add(new IndexRequest(indexName).id("3").source("data", ++i, "count", i * 1000))
.add(new IndexRequest(indexName).id("4").source("data", ++i, "count", i * 1000))
.add(new IndexRequest(indexName).id("5").source("data", ++i, "count", i * 1000))
.get();
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest(indexName).id("1").source("data", ++i, "count", i * 1000))
.add(new IndexRequest(indexName).id("2").source("data", ++i, "count", i * 1000))
.add(new IndexRequest(indexName).id("3").source("data", ++i, "count", i * 1000))
.add(new IndexRequest(indexName).id("4").source("data", ++i, "count", i * 1000))
.add(new IndexRequest(indexName).id("5").source("data", ++i, "count", i * 1000))
.get();
}
}

try (var results = run("from test_index_patterns* | stats count(data), sum(count)")) {
Expand Down Expand Up @@ -958,24 +967,25 @@ public void testOverlappingIndexPatterns() throws Exception {
.setMapping("field", "type=long")
);
ensureYellow("test_overlapping_index_patterns_1");
client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest("test_overlapping_index_patterns_1").id("1").source("field", 10))
.get();
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest("test_overlapping_index_patterns_1").id("1").source("field", 10))
.get();

assertAcked(
client().admin()
.indices()
.prepareCreate("test_overlapping_index_patterns_2")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.setMapping("field", "type=keyword")
);
assertAcked(
client().admin()
.indices()
.prepareCreate("test_overlapping_index_patterns_2")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.setMapping("field", "type=keyword")
);
}
ensureYellow("test_overlapping_index_patterns_2");
client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest("test_overlapping_index_patterns_2").id("1").source("field", "foo"))
.get();

try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest("test_overlapping_index_patterns_2").id("1").source("field", "foo"))
.get();
}
expectThrows(VerificationException.class, () -> run("from test_overlapping_index_patterns_* | sort field"));
}

Expand Down Expand Up @@ -1035,11 +1045,12 @@ public void testTopNPushedToLucene() {
var yellowNullCountDocId = "yellow_null_count_" + i;
var yellowNullDataDocId = "yellow_null_data_" + i;

client().prepareBulk()
.add(new IndexRequest("test").id(yellowDocId).source("data", i, "count", i * 10, "color", "yellow"))
.add(new IndexRequest("test").id(yellowNullCountDocId).source("data", i, "color", "yellow"))
.add(new IndexRequest("test").id(yellowNullDataDocId).source("count", i * 10, "color", "yellow"))
.get();
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
bulkRequestBuilder.add(new IndexRequest("test").id(yellowDocId).source("data", i, "count", i * 10, "color", "yellow"))
.add(new IndexRequest("test").id(yellowNullCountDocId).source("data", i, "color", "yellow"))
.add(new IndexRequest("test").id(yellowNullDataDocId).source("count", i * 10, "color", "yellow"))
.get();
}
if (randomBoolean()) {
client().admin().indices().prepareRefresh("test").get();
}
Expand Down Expand Up @@ -1380,27 +1391,28 @@ private void createNestedMappingIndex(String indexName) throws IOException {

private int indexDocsIntoNestedMappingIndex(String indexName, int docsCount) throws IOException {
int countValuesGreaterThanFifty = 0;
BulkRequestBuilder bulkBuilder = client().prepareBulk();
for (int j = 0; j < docsCount; j++) {
XContentBuilder builder = JsonXContent.contentBuilder();
int randomValue = randomIntBetween(0, 100);
countValuesGreaterThanFifty += randomValue >= 50 ? 1 : 0;
builder.startObject();
{
builder.field("data", randomValue);
builder.startArray("nested");
try (BulkRequestBuilder bulkBuilder = client().prepareBulk()) {
for (int j = 0; j < docsCount; j++) {
XContentBuilder builder = JsonXContent.contentBuilder();
int randomValue = randomIntBetween(0, 100);
countValuesGreaterThanFifty += randomValue >= 50 ? 1 : 0;
builder.startObject();
{
for (int k = 0, max = randomIntBetween(1, 5); k < max; k++) {
// nested values are all greater than any non-nested values found in the "data" long field
builder.startObject().field("foo", randomIntBetween(1000, 10000)).endObject();
builder.field("data", randomValue);
builder.startArray("nested");
{
for (int k = 0, max = randomIntBetween(1, 5); k < max; k++) {
// nested values are all greater than any non-nested values found in the "data" long field
builder.startObject().field("foo", randomIntBetween(1000, 10000)).endObject();
}
}
builder.endArray();
}
builder.endArray();
builder.endObject();
bulkBuilder.add(new IndexRequest(indexName).id(Integer.toString(j)).source(builder));
}
builder.endObject();
bulkBuilder.add(new IndexRequest(indexName).id(Integer.toString(j)).source(builder));
bulkBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
}
bulkBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
ensureYellow(indexName);

return countValuesGreaterThanFifty;
Expand Down Expand Up @@ -1454,25 +1466,26 @@ private void createAndPopulateIndex(String indexName, Settings additionalSetting
);
long timestamp = epoch;
for (int i = 0; i < 10; i++) {
client().prepareBulk()
.add(
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
bulkRequestBuilder.add(
new IndexRequest(indexName).id("1" + i)
.source("data", 1, "count", 40, "data_d", 1d, "count_d", 40d, "time", timestamp++, "color", "red")
)
.add(
new IndexRequest(indexName).id("2" + i)
.source("data", 2, "count", 42, "data_d", 2d, "count_d", 42d, "time", timestamp++, "color", "blue")
)
.add(
new IndexRequest(indexName).id("3" + i)
.source("data", 1, "count", 44, "data_d", 1d, "count_d", 44d, "time", timestamp++, "color", "green")
)
.add(
new IndexRequest(indexName).id("4" + i)
.source("data", 2, "count", 46, "data_d", 2d, "count_d", 46d, "time", timestamp++, "color", "red")
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
.add(
new IndexRequest(indexName).id("2" + i)
.source("data", 2, "count", 42, "data_d", 2d, "count_d", 42d, "time", timestamp++, "color", "blue")
)
.add(
new IndexRequest(indexName).id("3" + i)
.source("data", 1, "count", 44, "data_d", 1d, "count_d", 44d, "time", timestamp++, "color", "green")
)
.add(
new IndexRequest(indexName).id("4" + i)
.source("data", 2, "count", 46, "data_d", 2d, "count_d", 46d, "time", timestamp++, "color", "red")
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}
}
ensureYellow(indexName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ private void createIndexWithConstRuntimeField(String type) throws InterruptedExc
mapping.endObject();
client().admin().indices().prepareCreate("test").setMapping(mapping.endObject()).get();

BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < SIZE; i++) {
bulk.add(prepareIndex("test").setId(Integer.toString(i)).setSource("foo", i));
try (BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) {
for (int i = 0; i < SIZE; i++) {
bulk.add(prepareIndex("test").setId(Integer.toString(i)).setSource("foo", i));
}
bulk.get();
}
bulk.get();
}

public static class TestRuntimeFieldPlugin extends Plugin implements ScriptPlugin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ public void setupIndex() throws IOException {
.setMapping(mapping.endObject())
.get();

BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < NUM_DOCS; i++) {
bulk.add(prepareIndex("test").setId(Integer.toString(i)).setSource("foo", i));
try (BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) {
for (int i = 0; i < NUM_DOCS; i++) {
bulk.add(prepareIndex("test").setId(Integer.toString(i)).setSource("foo", i));
}
bulk.get();
}
bulk.get();
/*
* forceMerge so we can be sure that we don't bump into tiny
* segments that finish super quickly and cause us to report strange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ public void testConcurrentQueries() throws Exception {
)
.setMapping("user", "type=keyword", "tags", "type=keyword")
.get();
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int numDocs = between(5, 10);
for (int d = 0; d < numDocs; d++) {
String user = randomFrom("u1", "u2", "u3");
String tag = randomFrom("java", "elasticsearch", "lucene");
bulk.add(new IndexRequest().source(Map.of("user", user, "tags", tag)));
try (BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) {
int numDocs = between(5, 10);
for (int d = 0; d < numDocs; d++) {
String user = randomFrom("u1", "u2", "u3");
String tag = randomFrom("java", "elasticsearch", "lucene");
bulk.add(new IndexRequest().source(Map.of("user", user, "tags", tag)));
}
bulk.get();
}
bulk.get();
}
int numQueries = between(10, 20);
Thread[] threads = new Thread[numQueries];
Expand Down
Loading

0 comments on commit 249220b

Please sign in to comment.