Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent Segment Search Overview and Aggs HLD #6798

Closed
11 of 13 tasks
sohami opened this issue Mar 22, 2023 · 8 comments
Closed
11 of 13 tasks

Concurrent Segment Search Overview and Aggs HLD #6798

sohami opened this issue Mar 22, 2023 · 8 comments
Labels
discuss Issues intended to help drive brainstorming and decision making enhancement Enhancement or improvement to existing feature or request Search:Performance

Comments

@sohami
Copy link
Collaborator

sohami commented Mar 22, 2023

Is your feature request related to a problem? Please describe.
Concurrent Search plugin is added as a sandbox plugin with some pending work tracked here. This issue provides overview of current functionality and high level design options for supporting aggregation along with other open items that should be considered

Describe the solution you'd like

Background:

Today, in OpenSearch the indexing and search request is powered by Lucene library. An OpenSearch index consists of multiple shards and each of these shards represents a Lucene index. Each shard or Lucene index can have multiple segments in it. When a search request is served at an index level then it scatters the request to all of its shards (with an upper limit at per node level) and gather responses from all the shards to merge and create a final response to be returned to the user. On each shard the search request is executed sequentially over the segments in that shard. In Lucene, there is support for the concurrent search over multiple segments which was added in OpenSearch as an experimental sandbox plugin. Currently the concurrent search plugins doesn’t support Aggs and have few other gaps which are captured below and will create different issues for it eventually.

Overview of Sequential + Concurrent Segment Search:

In OpenSearch, for each search request the query execution on shard level looks something like below for the sequential query execution. The coordinator level search request can be performed in multiple phases: (i) CanMatch (optional) - Phase to pre-filter shards based on time range, (ii) DFS (optional) - To improve on the relevance of the document score by using global term frequency and document frequency instead of local shard level info (iii) Query - Actual search happens as part of this on each shard where only docIds are returned along with scores and (iv) Fetch - The requested fields for each document is retrieved post query phase. Among all the phases the most expensive will be query phase where all the scoring and document collection takes place across the segments. This is the phase which uses lucene search api and where concurrency is also introduced to search across different segments in the shard (or lucene index).

Current query phase execution have below operations which is performed on single search thread executing that phase.

QueryPhase

Example for a simple query:

QueryExample

With concurrent search support the query execution plan will look like below. A Segment Slice is a group of segments which are assigned to each concurrent searcher thread which will perform the search/collection over those assigned segments. Depending upon the document count and segment count all the segments are partitioned into multiple slices by the IndexSearcher. Lucene uses 2 parameters MAX_SEGMENTS_PER_SLICE (default to 5) and MAX_DOCS_PER_SLICE (default to 250K) to create the slice group. While the concurrent searcher threads are performing the search on their assigned slices, the main search thread also get the last slice assigned to it for execution. Upon completion it calls reduce on CollectorManager tree with the list of all the collectors tree created per slice. The reduce mechanism is the way to merge all the collected documents and create final documents list which is used to create or populate the query result in the end. Note: Today if there is any aggregation operation present in the search request the sequential path will be executed instead of concurrent search path

Concurrent segment search flow:
ConcurrentSearch

Design Concurrent Search for Aggs:

To support concurrent search model the operators or collectors will need to implement the CollectorManager interface provided by Lucene. Operators such as TopDocs, TopFields, TopScoreDocs, etc which are native to Lucene also provides mechanism to create the corresponding CollectorManager and perform reduce across leaf slices collectors in reduce phase. OpenSearch has to utilize those which are inherently supported and implement for others like
EarlyTerminatingCollector which are not native to Lucene. Similarly all the aggregation operators are not native to Lucene it is developed only in OpenSearch application layer using the Lucene collector interface.

Properties of Aggregators:

  • All the Aggregator operators in the OpenSearch is derived from BucketCollector class which implements the Collector interface of Lucene. BucketCollector is of 2 major types: 1) DeferringBucketCollector and 2) Aggregator where Aggregator is further split into 3 categories: a) MetricsAggregator b) BucketAggregators and c) NonCollectingAggregator (for fields with no mapping).
  • Aggregator tree can be nested with parent child relationship. For example: I can have a nested bucket aggregator where top level will collect all the docs across say 5 buckets and then within each bucket the child aggregator will collect 5 sub buckets and so on.
  • Global Aggregators are executed after all the other aggregators are executed during search phase. It works on all the documents instead of only documents meeting the search criterion.
  • After aggregator collectors (both global and non global ones) collect the documents during query/search phase it creates InternalAggregations object for the aggregation tree which is intermediate data structure to hold on to aggregation results. This result is returned to the coordinator to perform the final merge/reduce of results from all the shards

For supporting concurrency across all aggregators we can use one of the following options with recommended one being Option 3.

Option 1:

  • Provide CollectorManager for each of the aggregator or some mechanism of a common CollectorManager implementation which can take care of creating different Aggregator collectors (or BucketCollectors) per slice. The collectors of each slice needs to have its own state (or internal data structure) which will be populated with documents collected from that slice
  • We need mechanism for different Aggregators to be able to reduce the collected documents across slices in reduce phase. In the reduce phase, it needs to basically merge the results for each aggregator operator in the aggregation tree across all the slices. This can be tricky with nested aggregation operators and will be dependent on the implementation of each aggregations internal state or data structure used to collect the document.
  • As part of the reduce phase it needs to create the final state of all collected results/buckets in the aggregator tree in form of a single reduced aggregator tree similar to what it will look like when executed sequentially
  • After reduce phase, all the other operations on Aggregators (or BucketCollectors) can remain as is which is creating intermediate InternalAggregation objects to serialize and send to coordinator
  • For global aggregators which gets executed later on with matchAll query separately we will need to do different plumbing as that is executed after the above lucene search phase execution. This is because global aggregators work on all the documents whereas other works on the results returned by the query (if there is any filter like range or term matching, etc)

Pros:

  • No changes in coordinator layer
  • No changes in serialization protocol between coordinator and shards
  • Scalable: Collections and reduce happens concurrently at shard level where each shard performs operation for its own set of collectors. There is 2 phase of reduce: 1 at shard level then another finally at coordinator level (later is same as today)

Cons:

  • There are 50 aggregator operators and depending upon difference in the state of each aggregator we will need to understand and update each of the operator to implement the reduce phase. This can be both time consuming and error prone.

Option 2:

  • Have CollectorManager for Aggregator (same as above)
  • Don't perform the CollectorManager::reduce for Aggregation collectors on shards, create intermediate InternalAggregation data structures for list of Aggregation collectors (depending on number of slices) at each level of aggregation tree as compared to a single Aggregation collector today
  • Update coordinator to handle the list of InternalAggregation per aggregator operator in the tree (instead of 1) from a shard and perform the merge/reduce on these InternalAggregation object. Coordinator must already be performing this across the shards. For simplicity, we can think of this as it is performing merges of InternalAggregation list with 1 object per shard. Now it needs to perform merges for list of list of InternalAggregation with 1 list of objects per shard. This will be true for each level of the aggregation tree and for all the different aggregations in the request.

Pros:

  • Will be using existing mechanism to reduce the Aggregations intermediate results at coordinator layer instead of implementing reduce at each collector of an Aggregation
  • Simple and faster to implement compared to Option 1.

Cons:

  • Not scalable: We are increasing the number of InternalAggregation that needs to be reduced at coordinator by shard x slice count per shard. So in cases when we want to have high parallelism (or slice count) coordinator will become the bottleneck. This is because reduce will only happen at that level instead of 2 phase of reduce. This can also cause increase in consumed memory + CPU on coordinator as count of the entries from each shard will also increase as there can be duplicate entries for same bucket (i.e. histogram buckets) for each slice. For keeping memory under check we can lower the batch limit of the buffered aggregations (default is DEFAULT_BATCHED_REDUCE_SIZE = 512) such that reduce happens more frequently but that will still consume the CPU cycles.

Option 3 (Recommended):

  • Do hybrid of Option 1 and Option 2 where collect phase remain same and reduce on Aggregation collectors doesn’t happen as part of CollectorManager::reduce in lucene search phase.
  • After all collection is performed and the intermediate data structure of InternalAggregation is created. First level of merges/reduce happen at the shard level to merge the InternalAggregation across slices on the shard. Then it sends the merged output to the coordinator
  • Coordinator will again perform the reduce for results across shards which is same as current behavior

Pros:

  • All the pros of option 1 and option 2
    • No changes in coordinator layer
    • No changes in serialization protocol between coordinator and shards
    • Faster to implement as well by reusing the existing merge/reduce mechanism
  • Scalable: Collection happens concurrently at shard level where each shard performs operation for its own set of collectors. Reduce happens in 2 phases: a) at shard level then another b) at coordinator level (later is same as today) which will not add extra work or use more resources on the coordinator compared to Option 2.

Cons:

  • May need to handle the profile scenario differently here as the reduce phase is happening outside the Lucene search API call.

Option 4:

  • Have CollectorManager for Aggregator (same as above)
  • Instead of performing the reduce operation on each aggregator collector, make the internal data structure which needs to be shared across slices thread safe

Pros:

  • Can have smaller memory footprint at the cost of lock contention when multiple slice threads are trying to update the same shared data structure

Cons:

  • Will be complex to implement and ensure no new bugs creep in. Most of the internal structures are implemented in OpenSearch and is not using the generic data structures which can be replaced by their concurrent versions. With the complex nested aggregation tree this can become difficult to understand and handle all the cases. For example: Buckets in histograms is implemented as some sort of hash table using the Big Arrays
  • We may not see benefit from concurrent aggregation execution because of the locking in the shared data structure

Concurrent segment search with Aggs:

Concurrent Search with Aggs

Different Agg operators supported in OpenSearch
Aggregation Operators

Describe alternatives you've considered
All the options are listed above

Additional context
Different tasks which will be done in incremental way in addition to what is called out in META issue

  • Understanding concurrent search implementation and current Aggregator support in OpenSearch
  • High Level Design Option for supporting Aggregation with concurrent segment search
  • Prototype for recommended option
  • Implementation for recommended approach without profile support
  • Profile support for aggregation with concurrent search
  • Implementing Global Aggregation executed post query phase in concurrent manner
  • Support to dynamically enable/disable the concurrent search path
  • Add mechanism to change slice calculation by dynamically updating max documents and segment count considered for each slice
  • Enable all the search ITs/UTs with and without concurrent segment search
  • OSB Benchmark to be performed with concurrent search enabled
  • Explore any new metrics that can be added for concurrent segment search like threadpool stats, number of search using concurrent vs sequential path, etc
  • Explore changes needed for Task Resource tracking framework and search back pressure with concurrent segment search
  • Some future work which can be considered as well are: i) Dynamically choose to perform concurrent search on requests based on resource availability on a node.
@sohami sohami added enhancement Enhancement or improvement to existing feature or request untriaged labels Mar 22, 2023
@navneet1v
Copy link
Contributor

@sohami The proposal looks awesome. Just couple of comments:

  1. I cannot see a task which says that aggregations implemented by Plugins also require a change. Please add that. Example: https://github.com/opensearch-project/geospatial/blob/main/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java#L132

@reta
Copy link
Collaborator

reta commented Mar 24, 2023

@sohami thank you for assembling this one up, quite comprehensive

I am confused a bit by Option 3, specifically:

Do hybrid of Option 1 and Option 2 where collect phase remain same and reduce doesn’t happen at collector levels in lucene search phase.

AFAIK reduce happens at CollectorManager level, not collector level (or I misunderstood what the reduce at collector means). May be you could clearly state what should happens in this case and mentions that this is the hybrid of Option 1 and Option 2.

@sohami
Copy link
Collaborator Author

sohami commented Mar 24, 2023

@navneet1v Thanks for sharing the plugin. The good thing about Option 3 is that any Agg implementation will have a corresponding ResultReader that extends InternalAggregation class and will have corresponding reduce method. With this there won't be any separate change that will be needed for different Agg implementation or plugins. For GeospatialPlugin the reader GeoHexGrid extends InternalMultiBucketAggregation so it should be fine. Let me know if you see any other concern here.

@sohami
Copy link
Collaborator Author

sohami commented Mar 24, 2023

@reta I will clarify above. You are right the reduce is managed by CollectorManager across all the collector tree of the slices. This reduce phase happens in the lucene search api. What I meant to say there is for aggs collector in the collector tree, the reduce performed by CollectorManager will be no-op. Later in QueryPhase when the InternalAggregation objects are created thats where we will perform the reduce on all the different agg collector instances. So 1st phase of the reduce on aggregation collector still happens at shard level and then followed by at coordinator level.

@reta
Copy link
Collaborator

reta commented Mar 24, 2023

Gotha, thanks @sohami

What I meant to say there is for aggs collector in the collector tree, the reduce performed by CollectorManager will be no-op.

So that basically means we would have to bear a larger (potentially, significantly larger) state from stage to stage? I do understand the reduce stage would be specific per aggregation (that would need some work) but it looks to me to be the right thing to do - implement the proper reduce phase in the CollectorManager without cutting the corners (just an opinion).

@sohami
Copy link
Collaborator Author

sohami commented Mar 24, 2023

So that basically means we would have to bear a larger (potentially, significantly larger) state from stage to stage?

We are not moving it between different stages, it is done in the query phase itself right after the lucene search returns. The aggregation collector state is used today to perform postCollection on collected docs (for sub aggs) and build the intermediate results after that. These are done post lucene search phase and later the intermediate result is sent to the coordinator. With concurrent segment search we will basically perform a reduce on agg collector state as part of that result creation.

I am working on the PoC and will share that. Probably that will clarify more.

@anasalkouz anasalkouz added discuss Issues intended to help drive brainstorming and decision making and removed untriaged labels Mar 28, 2023
@sohami
Copy link
Collaborator Author

sohami commented Apr 10, 2023

[Update]: I have made PoC changes to a branch in my repository here. There are few test failures and also found few bugs with existing search path. I will be creating separate issues for the bugs followed by the PRs to fix those. Also will look into the test failures to understand if there is any gap with the approach.

Bukhtawar added a commit that referenced this issue Jul 23, 2023
I have nominated and maintainers have agreed to invite Sorabh Hamirwasia(@sohami) to be a co-maintainer. Sorabh has kindly accepted.

Sorabh has led the design and implementation of multiple features like query cancellation, concurrent segment search for aggregations and snapshot inter-operability w/remote storage in OpenSearch. Some significant issues and PRs authored by Sorabh for this effort are as follows:

Feature proposals
Concurrent segment search for aggregations : #6798
Searchable Remote Index : #2900

Implementations
Concurrent segment search for aggregations: #7514
Lucene changes to leaf slices for concurrent search: apache/lucene#12374
Moving concurrent search to core : #7203
Query cancellation support : #986
In total, Sorabh has authored 14 PRs going back to Aug 2021. He also frequently reviews contributions from others and has reviewed nearly 20 PRs in the same time frame. I believe Sorabh will be a valuable addition as a maintainer of OpenSearch and will continue to contribute to the success of the project going forward.

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
opensearch-trigger-bot bot pushed a commit that referenced this issue Jul 23, 2023
I have nominated and maintainers have agreed to invite Sorabh Hamirwasia(@sohami) to be a co-maintainer. Sorabh has kindly accepted.

Sorabh has led the design and implementation of multiple features like query cancellation, concurrent segment search for aggregations and snapshot inter-operability w/remote storage in OpenSearch. Some significant issues and PRs authored by Sorabh for this effort are as follows:

Feature proposals
Concurrent segment search for aggregations : #6798
Searchable Remote Index : #2900

Implementations
Concurrent segment search for aggregations: #7514
Lucene changes to leaf slices for concurrent search: apache/lucene#12374
Moving concurrent search to core : #7203
Query cancellation support : #986
In total, Sorabh has authored 14 PRs going back to Aug 2021. He also frequently reviews contributions from others and has reviewed nearly 20 PRs in the same time frame. I believe Sorabh will be a valuable addition as a maintainer of OpenSearch and will continue to contribute to the success of the project going forward.

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
(cherry picked from commit 7769682)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Bukhtawar pushed a commit that referenced this issue Jul 24, 2023
I have nominated and maintainers have agreed to invite Sorabh Hamirwasia(@sohami) to be a co-maintainer. Sorabh has kindly accepted.

Sorabh has led the design and implementation of multiple features like query cancellation, concurrent segment search for aggregations and snapshot inter-operability w/remote storage in OpenSearch. Some significant issues and PRs authored by Sorabh for this effort are as follows:

Feature proposals
Concurrent segment search for aggregations : #6798
Searchable Remote Index : #2900

Implementations
Concurrent segment search for aggregations: #7514
Lucene changes to leaf slices for concurrent search: apache/lucene#12374
Moving concurrent search to core : #7203
Query cancellation support : #986
In total, Sorabh has authored 14 PRs going back to Aug 2021. He also frequently reviews contributions from others and has reviewed nearly 20 PRs in the same time frame. I believe Sorabh will be a valuable addition as a maintainer of OpenSearch and will continue to contribute to the success of the project going forward.


(cherry picked from commit 7769682)

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
baba-devv pushed a commit to baba-devv/OpenSearch that referenced this issue Jul 29, 2023
I have nominated and maintainers have agreed to invite Sorabh Hamirwasia(@sohami) to be a co-maintainer. Sorabh has kindly accepted.

Sorabh has led the design and implementation of multiple features like query cancellation, concurrent segment search for aggregations and snapshot inter-operability w/remote storage in OpenSearch. Some significant issues and PRs authored by Sorabh for this effort are as follows:

Feature proposals
Concurrent segment search for aggregations : opensearch-project#6798
Searchable Remote Index : opensearch-project#2900

Implementations
Concurrent segment search for aggregations: opensearch-project#7514
Lucene changes to leaf slices for concurrent search: apache/lucene#12374
Moving concurrent search to core : opensearch-project#7203
Query cancellation support : opensearch-project#986
In total, Sorabh has authored 14 PRs going back to Aug 2021. He also frequently reviews contributions from others and has reviewed nearly 20 PRs in the same time frame. I believe Sorabh will be a valuable addition as a maintainer of OpenSearch and will continue to contribute to the success of the project going forward.

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
kaushalmahi12 pushed a commit to kaushalmahi12/OpenSearch that referenced this issue Sep 12, 2023
I have nominated and maintainers have agreed to invite Sorabh Hamirwasia(@sohami) to be a co-maintainer. Sorabh has kindly accepted.

Sorabh has led the design and implementation of multiple features like query cancellation, concurrent segment search for aggregations and snapshot inter-operability w/remote storage in OpenSearch. Some significant issues and PRs authored by Sorabh for this effort are as follows:

Feature proposals
Concurrent segment search for aggregations : opensearch-project#6798
Searchable Remote Index : opensearch-project#2900

Implementations
Concurrent segment search for aggregations: opensearch-project#7514
Lucene changes to leaf slices for concurrent search: apache/lucene#12374
Moving concurrent search to core : opensearch-project#7203
Query cancellation support : opensearch-project#986
In total, Sorabh has authored 14 PRs going back to Aug 2021. He also frequently reviews contributions from others and has reviewed nearly 20 PRs in the same time frame. I believe Sorabh will be a valuable addition as a maintainer of OpenSearch and will continue to contribute to the success of the project going forward.

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
brusic pushed a commit to brusic/OpenSearch that referenced this issue Sep 25, 2023
I have nominated and maintainers have agreed to invite Sorabh Hamirwasia(@sohami) to be a co-maintainer. Sorabh has kindly accepted.

Sorabh has led the design and implementation of multiple features like query cancellation, concurrent segment search for aggregations and snapshot inter-operability w/remote storage in OpenSearch. Some significant issues and PRs authored by Sorabh for this effort are as follows:

Feature proposals
Concurrent segment search for aggregations : opensearch-project#6798
Searchable Remote Index : opensearch-project#2900

Implementations
Concurrent segment search for aggregations: opensearch-project#7514
Lucene changes to leaf slices for concurrent search: apache/lucene#12374
Moving concurrent search to core : opensearch-project#7203
Query cancellation support : opensearch-project#986
In total, Sorabh has authored 14 PRs going back to Aug 2021. He also frequently reviews contributions from others and has reviewed nearly 20 PRs in the same time frame. I believe Sorabh will be a valuable addition as a maintainer of OpenSearch and will continue to contribute to the success of the project going forward.

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Ivan Brusic <ivan.brusic@flocksafety.com>
shiv0408 pushed a commit to Gaurav614/OpenSearch that referenced this issue Apr 25, 2024
I have nominated and maintainers have agreed to invite Sorabh Hamirwasia(@sohami) to be a co-maintainer. Sorabh has kindly accepted.

Sorabh has led the design and implementation of multiple features like query cancellation, concurrent segment search for aggregations and snapshot inter-operability w/remote storage in OpenSearch. Some significant issues and PRs authored by Sorabh for this effort are as follows:

Feature proposals
Concurrent segment search for aggregations : opensearch-project#6798
Searchable Remote Index : opensearch-project#2900

Implementations
Concurrent segment search for aggregations: opensearch-project#7514
Lucene changes to leaf slices for concurrent search: apache/lucene#12374
Moving concurrent search to core : opensearch-project#7203
Query cancellation support : opensearch-project#986
In total, Sorabh has authored 14 PRs going back to Aug 2021. He also frequently reviews contributions from others and has reviewed nearly 20 PRs in the same time frame. I believe Sorabh will be a valuable addition as a maintainer of OpenSearch and will continue to contribute to the success of the project going forward.

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
@sohami
Copy link
Collaborator Author

sohami commented Jun 3, 2024

Closing this issue as Concurrent Search is GA and other follow-up items will be tracked in their respective issues.

@sohami sohami closed this as completed Jun 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discuss Issues intended to help drive brainstorming and decision making enhancement Enhancement or improvement to existing feature or request Search:Performance
Projects
Status: Done
Development

No branches or pull requests

5 participants