From 1cded6523d738ed7536579e2634347fcced285a4 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Thu, 6 Jun 2024 09:56:41 -0700 Subject: [PATCH] Fix double invocation of postCollection when MultiBucketCollector is present (#14015) Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + .../bucket/terms/StringTermsIT.java | 68 +++++++++++++++++++ .../BucketCollectorProcessor.java | 4 +- .../bucket/BestBucketsDeferringCollector.java | 2 + 4 files changed, 73 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 597e0ae31b054..2e150edd0a41b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Painless: ensure type "UnmodifiableMap" for params ([#13885](https://github.com/opensearch-project/OpenSearch/pull/13885)) - Pass parent filter to inner hit query ([#13903](https://github.com/opensearch-project/OpenSearch/pull/13903)) - Fix NPE on restore searchable snapshot ([#13911](https://github.com/opensearch-project/OpenSearch/pull/13911)) +- Fix double invocation of postCollection when MultiBucketCollector is present ([#14015](https://github.com/opensearch-project/OpenSearch/pull/14015)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java index edf9cd432dda2..f5d018b2ef491 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java @@ -42,10 +42,12 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.script.Script; import org.opensearch.script.ScriptType; +import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.AggregationExecutionException; import org.opensearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.bucket.filter.Filter; +import org.opensearch.search.aggregations.bucket.filter.InternalFilters; import org.opensearch.search.aggregations.bucket.terms.Terms.Bucket; import org.opensearch.search.aggregations.metrics.Avg; import org.opensearch.search.aggregations.metrics.ExtendedStats; @@ -999,6 +1001,72 @@ public void testOtherDocCount() { testOtherDocCount(SINGLE_VALUED_FIELD_NAME, MULTI_VALUED_FIELD_NAME); } + public void testDeferredSubAggs() { + // Tests subAgg doc count is the same with different collection modes and additional top level aggs + SearchResponse r1 = client().prepareSearch("idx") + .setSize(0) + .addAggregation( + terms("terms1").collectMode(SubAggCollectionMode.BREADTH_FIRST) + .field("s_value") + .size(2) + .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery())) + ) + .addAggregation(AggregationBuilders.min("min").field("constant")) + .get(); + + SearchResponse r2 = client().prepareSearch("idx") + .setSize(0) + .addAggregation( + terms("terms1").collectMode(SubAggCollectionMode.DEPTH_FIRST) + .field("s_value") + .size(2) + .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery())) + ) + .addAggregation(AggregationBuilders.min("min").field("constant")) + .get(); + + SearchResponse r3 = client().prepareSearch("idx") + .setSize(0) + .addAggregation( + terms("terms1").collectMode(SubAggCollectionMode.BREADTH_FIRST) + .field("s_value") + .size(2) + .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery())) + ) + .get(); + + SearchResponse r4 = client().prepareSearch("idx") + .setSize(0) + .addAggregation( + terms("terms1").collectMode(SubAggCollectionMode.DEPTH_FIRST) + .field("s_value") + .size(2) + .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery())) + ) + .get(); + + assertNotNull(r1.getAggregations().get("terms1")); + assertNotNull(r2.getAggregations().get("terms1")); + assertNotNull(r3.getAggregations().get("terms1")); + assertNotNull(r4.getAggregations().get("terms1")); + + Terms terms = r1.getAggregations().get("terms1"); + Bucket b1 = terms.getBucketByKey("val0"); + InternalFilters f1 = b1.getAggregations().get("filter"); + long docCount1 = f1.getBuckets().get(0).getDocCount(); + Bucket b2 = terms.getBucketByKey("val1"); + InternalFilters f2 = b2.getAggregations().get("filter"); + long docCount2 = f1.getBuckets().get(0).getDocCount(); + + for (SearchResponse response : new SearchResponse[] { r2, r3, r4 }) { + terms = response.getAggregations().get("terms1"); + f1 = terms.getBucketByKey(b1.getKeyAsString()).getAggregations().get("filter"); + f2 = terms.getBucketByKey(b2.getKeyAsString()).getAggregations().get("filter"); + assertEquals(docCount1, f1.getBuckets().get(0).getDocCount()); + assertEquals(docCount2, f2.getBuckets().get(0).getDocCount()); + } + } + /** * Make sure that a request using a deterministic script or not using a script get cached. * Ensure requests using nondeterministic scripts do not get cached. diff --git a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java index df05ce3f5c049..32c243cc12aa6 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java @@ -71,10 +71,10 @@ public void processPostCollection(Collector collectorTree) throws IOException { collectors.offer(innerCollector); } } else if (currentCollector instanceof BucketCollector) { - ((BucketCollector) currentCollector).postCollection(); - // Perform build aggregation during post collection if (currentCollector instanceof Aggregator) { + // Do not perform postCollection for MultiBucketCollector as we are unwrapping that below + ((BucketCollector) currentCollector).postCollection(); ((Aggregator) currentCollector).buildTopLevel(); } else if (currentCollector instanceof MultiBucketCollector) { for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/BestBucketsDeferringCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/BestBucketsDeferringCollector.java index 223be3ba2d1ae..69d0fcd6c96f2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/BestBucketsDeferringCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/BestBucketsDeferringCollector.java @@ -124,6 +124,7 @@ private void finishLeaf() { if (context != null) { assert docDeltasBuilder != null && bucketsBuilder != null; entries.add(new Entry(context, docDeltasBuilder.build(), bucketsBuilder.build())); + context = null; } } @@ -161,6 +162,7 @@ public void preCollection() throws IOException { @Override public void postCollection() throws IOException { + assert searchContext.searcher().getLeafContexts().isEmpty() || finished != true; finishLeaf(); finished = true; }