diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index aa875772600a5..216627f996cad 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -127,41 +127,50 @@ public void addInput(Page page) { DocVector docVector = page.getBlock(docChannel).asVector(); final int shardIndex = docVector.shards().getInt(0); final var source = sources.get(shardIndex); - if (docVector.singleSegmentNonDecreasing() && source.source() instanceof ValuesSource.Bytes.WithOrdinals withOrdinals) { - final IntVector segmentIndexVector = docVector.segments(); - assert segmentIndexVector.isConstant(); - final OrdinalSegmentAggregator ordinalAggregator = this.ordinalAggregators.computeIfAbsent( - new SegmentID(shardIndex, segmentIndexVector.getInt(0)), - k -> { - try { - final LeafReaderContext leafReaderContext = source.reader().leaves().get(k.segmentIndex); - return new OrdinalSegmentAggregator( - driverContext.blockFactory(), - this::createGroupingAggregators, - withOrdinals, - leafReaderContext, - bigArrays - ); - } catch (IOException e) { - throw new UncheckedIOException(e); + boolean pagePassed = false; + try { + if (docVector.singleSegmentNonDecreasing() && source.source() instanceof ValuesSource.Bytes.WithOrdinals withOrdinals) { + final IntVector segmentIndexVector = docVector.segments(); + assert segmentIndexVector.isConstant(); + final OrdinalSegmentAggregator ordinalAggregator = this.ordinalAggregators.computeIfAbsent( + new SegmentID(shardIndex, segmentIndexVector.getInt(0)), + k -> { + try { + final LeafReaderContext leafReaderContext = source.reader().leaves().get(k.segmentIndex); + return new OrdinalSegmentAggregator( + driverContext.blockFactory(), + this::createGroupingAggregators, + withOrdinals, + leafReaderContext, + bigArrays + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } - } - ); - ordinalAggregator.addInput(docVector.docs(), page); - } else { - if (valuesAggregator == null) { - int channelIndex = page.getBlockCount(); // extractor will append a new block at the end - valuesAggregator = new ValuesAggregator( - sources, - docChannel, - groupingField, - channelIndex, - aggregatorFactories, - maxPageSize, - driverContext ); + pagePassed = true; + ordinalAggregator.addInput(docVector.docs(), page); + } else { + if (valuesAggregator == null) { + int channelIndex = page.getBlockCount(); // extractor will append a new block at the end + valuesAggregator = new ValuesAggregator( + sources, + docChannel, + groupingField, + channelIndex, + aggregatorFactories, + maxPageSize, + driverContext + ); + } + pagePassed = true; + valuesAggregator.addInput(page); + } + } finally { + if (pagePassed == false) { + Releasables.closeExpectNoException(page::releaseBlocks); } - valuesAggregator.addInput(page); } } @@ -392,9 +401,18 @@ boolean seenNulls() { @Override public BitArray seenGroupIds(BigArrays bigArrays) { - BitArray seen = new BitArray(0, bigArrays); - seen.or(visitedOrds); - return seen; + final BitArray seen = new BitArray(0, bigArrays); + boolean success = false; + try { + // the or method can grow the `seen` bits + seen.or(visitedOrds); + success = true; + return seen; + } finally { + if (success == false) { + Releasables.close(seen); + } + } } @Override diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java index a47b02c49fb20..73e72491c95a0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java @@ -7,9 +7,10 @@ package org.elasticsearch.xpack.esql.action; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -18,17 +19,18 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100147") public class EsqlActionBreakerIT extends EsqlActionIT { public static class InternalTransportSettingPlugin extends Plugin { @@ -72,35 +74,34 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { private void setRequestCircuitBreakerLimit(ByteSizeValue limit) { if (limit != null) { - clusterAdmin().prepareUpdateSettings() - .setPersistentSettings( - Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), limit).build() - ) - .get(); + assertAcked( + clusterAdmin().prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), limit).build() + ) + ); } else { - clusterAdmin().prepareUpdateSettings() - .setPersistentSettings( - Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()).build() - ) - .get(); + assertAcked( + clusterAdmin().prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()).build() + ) + ); } } @Override - protected EsqlQueryResponse run(String esqlCommands) { + protected EsqlQueryResponse run(EsqlQueryRequest request) { + setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 2048))); try { - setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(128, 2048))); - try { - return super.run(esqlCommands); - } catch (Exception e) { - logger.info("request failed", e); - ensureBlocksReleased(); - } - setRequestCircuitBreakerLimit(ByteSizeValue.ofMb(64)); - return super.run(esqlCommands); + return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES); + } catch (Exception e) { + logger.info("request failed", e); + ensureBlocksReleased(); } finally { setRequestCircuitBreakerLimit(null); } + return super.run(request); } /** @@ -108,23 +109,32 @@ protected EsqlQueryResponse run(String esqlCommands) { * unreasonably small breaker and tripping it. */ public void testBreaker() { - setRequestCircuitBreakerLimit(ByteSizeValue.ofKb(1)); + client().admin() + .indices() + .prepareCreate("test_breaker") + .setMapping("foo", "type=keyword", "bar", "type=keyword") + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put("index.routing.rebalance.enable", "none")) + .get(); + int numDocs = between(1000, 5000); + for (int i = 0; i < numDocs; i++) { + DocWriteResponse response = client().prepareIndex("test_breaker") + .setId(Integer.toString(i)) + .setSource("foo", "foo-" + i, "bar", "bar-" + (i * 2)) + .get(); + assertThat(Strings.toString(response), response.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + } + client().admin().indices().prepareRefresh("test_breaker").get(); + ensureYellow("test_breaker"); + setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 512))); try { - for (int i = 0; i < 5000; i++) { - DocWriteResponse response = client().prepareIndex("test") - .setId(Integer.toString(i)) - .setSource("foo", i, "bar", i * 2) - .get(); - if (response.getResult() != DocWriteResponse.Result.CREATED) { - fail("failure: " + response); + final ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> { + var request = new EsqlQueryRequest(); + request.query("from test_breaker | stats count_distinct(foo) by bar"); + request.pragmas(randomPragmas()); + try (var ignored = client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES)) { + } - } - client().admin().indices().prepareRefresh("test").get(); - ensureYellow("test"); - ElasticsearchException e = expectThrows( - ElasticsearchException.class, - () -> super.run("from test | stats avg(foo) by bar", QueryPragmas.EMPTY).close() - ); + }); logger.info("expected error", e); if (e instanceof CircuitBreakingException) { // The failure occurred before starting the drivers