Skip to content

Commit

Permalink
Re-enable EsqlActionBreakerIT (elastic#100698) (elastic#100742)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn authored Oct 12, 2023
1 parent 29bd9d6 commit 67acd8b
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,41 +127,50 @@ public void addInput(Page page) {
DocVector docVector = page.<DocBlock>getBlock(docChannel).asVector();
final int shardIndex = docVector.shards().getInt(0);
final var source = sources.get(shardIndex);
if (docVector.singleSegmentNonDecreasing() && source.source() instanceof ValuesSource.Bytes.WithOrdinals withOrdinals) {
final IntVector segmentIndexVector = docVector.segments();
assert segmentIndexVector.isConstant();
final OrdinalSegmentAggregator ordinalAggregator = this.ordinalAggregators.computeIfAbsent(
new SegmentID(shardIndex, segmentIndexVector.getInt(0)),
k -> {
try {
final LeafReaderContext leafReaderContext = source.reader().leaves().get(k.segmentIndex);
return new OrdinalSegmentAggregator(
driverContext.blockFactory(),
this::createGroupingAggregators,
withOrdinals,
leafReaderContext,
bigArrays
);
} catch (IOException e) {
throw new UncheckedIOException(e);
boolean pagePassed = false;
try {
if (docVector.singleSegmentNonDecreasing() && source.source() instanceof ValuesSource.Bytes.WithOrdinals withOrdinals) {
final IntVector segmentIndexVector = docVector.segments();
assert segmentIndexVector.isConstant();
final OrdinalSegmentAggregator ordinalAggregator = this.ordinalAggregators.computeIfAbsent(
new SegmentID(shardIndex, segmentIndexVector.getInt(0)),
k -> {
try {
final LeafReaderContext leafReaderContext = source.reader().leaves().get(k.segmentIndex);
return new OrdinalSegmentAggregator(
driverContext.blockFactory(),
this::createGroupingAggregators,
withOrdinals,
leafReaderContext,
bigArrays
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
);
ordinalAggregator.addInput(docVector.docs(), page);
} else {
if (valuesAggregator == null) {
int channelIndex = page.getBlockCount(); // extractor will append a new block at the end
valuesAggregator = new ValuesAggregator(
sources,
docChannel,
groupingField,
channelIndex,
aggregatorFactories,
maxPageSize,
driverContext
);
pagePassed = true;
ordinalAggregator.addInput(docVector.docs(), page);
} else {
if (valuesAggregator == null) {
int channelIndex = page.getBlockCount(); // extractor will append a new block at the end
valuesAggregator = new ValuesAggregator(
sources,
docChannel,
groupingField,
channelIndex,
aggregatorFactories,
maxPageSize,
driverContext
);
}
pagePassed = true;
valuesAggregator.addInput(page);
}
} finally {
if (pagePassed == false) {
Releasables.closeExpectNoException(page::releaseBlocks);
}
valuesAggregator.addInput(page);
}
}

Expand Down Expand Up @@ -392,9 +401,18 @@ boolean seenNulls() {

@Override
public BitArray seenGroupIds(BigArrays bigArrays) {
BitArray seen = new BitArray(0, bigArrays);
seen.or(visitedOrds);
return seen;
final BitArray seen = new BitArray(0, bigArrays);
boolean success = false;
try {
// the or method can grow the `seen` bits
seen.or(visitedOrds);
success = true;
return seen;
} finally {
if (success == false) {
Releasables.close(seen);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

package org.elasticsearch.xpack.esql.action;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -18,17 +19,18 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100147")
public class EsqlActionBreakerIT extends EsqlActionIT {

public static class InternalTransportSettingPlugin extends Plugin {
Expand Down Expand Up @@ -72,59 +74,67 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {

private void setRequestCircuitBreakerLimit(ByteSizeValue limit) {
if (limit != null) {
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), limit).build()
)
.get();
assertAcked(
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), limit).build()
)
);
} else {
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()).build()
)
.get();
assertAcked(
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()).build()
)
);
}
}

@Override
protected EsqlQueryResponse run(String esqlCommands) {
protected EsqlQueryResponse run(EsqlQueryRequest request) {
setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 2048)));
try {
setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(128, 2048)));
try {
return super.run(esqlCommands);
} catch (Exception e) {
logger.info("request failed", e);
ensureBlocksReleased();
}
setRequestCircuitBreakerLimit(ByteSizeValue.ofMb(64));
return super.run(esqlCommands);
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
} catch (Exception e) {
logger.info("request failed", e);
ensureBlocksReleased();
} finally {
setRequestCircuitBreakerLimit(null);
}
return super.run(request);
}

/**
* Makes sure that the circuit breaker is "plugged in" to ESQL by configuring an
* unreasonably small breaker and tripping it.
*/
public void testBreaker() {
setRequestCircuitBreakerLimit(ByteSizeValue.ofKb(1));
client().admin()
.indices()
.prepareCreate("test_breaker")
.setMapping("foo", "type=keyword", "bar", "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put("index.routing.rebalance.enable", "none"))
.get();
int numDocs = between(1000, 5000);
for (int i = 0; i < numDocs; i++) {
DocWriteResponse response = client().prepareIndex("test_breaker")
.setId(Integer.toString(i))
.setSource("foo", "foo-" + i, "bar", "bar-" + (i * 2))
.get();
assertThat(Strings.toString(response), response.getResult(), equalTo(DocWriteResponse.Result.CREATED));
}
client().admin().indices().prepareRefresh("test_breaker").get();
ensureYellow("test_breaker");
setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 512)));
try {
for (int i = 0; i < 5000; i++) {
DocWriteResponse response = client().prepareIndex("test")
.setId(Integer.toString(i))
.setSource("foo", i, "bar", i * 2)
.get();
if (response.getResult() != DocWriteResponse.Result.CREATED) {
fail("failure: " + response);
final ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> {
var request = new EsqlQueryRequest();
request.query("from test_breaker | stats count_distinct(foo) by bar");
request.pragmas(randomPragmas());
try (var ignored = client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES)) {

}
}
client().admin().indices().prepareRefresh("test").get();
ensureYellow("test");
ElasticsearchException e = expectThrows(
ElasticsearchException.class,
() -> super.run("from test | stats avg(foo) by bar", QueryPragmas.EMPTY).close()
);
});
logger.info("expected error", e);
if (e instanceof CircuitBreakingException) {
// The failure occurred before starting the drivers
Expand Down

0 comments on commit 67acd8b

Please sign in to comment.