Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize composite aggregation based on index sorting. #48130

Conversation

howardhuanghua
Copy link
Contributor

Issue

Currently composite aggregation could be optimized if the leading source field (the first field in sources) has terms (inverted index) or point values (BKD tree). However, some restrict limitations cause the optimization difficult to be triggered. The limitations include like:

  1. Query should be MatchAllDocsQuery() in most cases.
  2. No segment deletions.
  3. Not inverse SortOrder.

Related code blocks:

if (checkIfSortedDocsIsApplicable(reader, fieldType) == false ||
checkMatchAllOrRangeQuery(query, fieldType.name()) == false) {
return null;
}

if (checkIfSortedDocsIsApplicable(reader, fieldType) == false ||
fieldType instanceof StringFieldType == false ||
(query != null && query.getClass() != MatchAllDocsQuery.class)) {
return null;
}

Composite aggregation with complex query request is hard to benefit from current optimization.

Solution

With this PR, we introduced another alternatively optimization for composite aggregation based on index sorting. If the leading source field (sources[0]) is the configured leading sort field (the first field in index sorting), we could get matched startDocId from terms or index points, then use it to filter invalid docs during collection. Meanwhile, we could early terminate the search execution if the max required result size is reached. Here are the summarized relations between index sorting order and query order:

Index Sorting Order Query Order StartDocId Filter Early Terminate Early Terminate Condition
ASC ASC Yes Yes targetSize >= maxSize && current sources[0] value > candidate queue top value
DESC DESC Yes Yes targetSize >= maxSize && current sources[0] value < candidate queue top value
ASC DESC No Yes targetSize >= maxSize && current source[0] value > afterKey[0] value
DESC ASC No Yes targetSize >= maxSize && current source[0] value < afterKey[0] value

There are two scenarios:

  1. If index sorting and query request have the same order, both startDocId filter and early termination could be used. In this case, if max size is reached and current leading source field value is greater (ASC) or less (DESC) than the top value in candidate queue, segment collection loop could be terminated early.
  2. If index sorting and query request have different order, startDocId filter cannot be used, as doc id iterator could not be traversed backwards. In this case, if current leading source field value doesn't match (greater or less depends on orders) the first after key that specified in search request, segment collection loop still could be terminated early.

This optimization doesn't limit any query types, it could extend current optimization to match more search scenarios.

Testing

We have verified composite aggregation with range query, the search performance would be 3-7 times better than before.

Test info:
Node number: 3
Each node cpu and memory: 12 Processors, 30GB Heap
Index shard count: 6
Replica number: 1
Index store size: 123.3gb
Index doc count: 1,886,400,000

Index settings:

{
  "monitor_index": {
    "settings": {
      "index": {
        "refresh_interval": "30s",
        "translog": {
          "sync_interval": "10s",
          "durability": "async"
        },
        "sort": {
          "field": [
            "ip",
            "name",
            "device"
          ],
          "order": [
            "asc",
            "asc",
            "asc"
          ]
        },
        "number_of_replicas": "1",
        "number_of_shards": "6"
      }
    }
  }

Search request sample:

GET monitor_index/_search
{
  "size": 0, 
  "query": {
    "bool": {
      "filter": [ {
        "range": {
          "timestamp": {
            "gte": "2019-01-01 00:00:00",
            "lte": "2019-01-06 00:00:00",
            "format": "yyyy-MM-dd HH:mm:ss",
            "time_zone": "+08:00"
          }
        }
      }]
    }
  },
  "aggs": {
    "NAME": {
      "composite": {
        "size": 100,
        "sources": [
          {"ip": {"terms": {"field": "ip"  } }},
          {"name": {"terms": {"field": "name"  } }},
          {"device": {"terms": {"field": "device"  } }}
        ]
      }
    }
  }
}

Test result:
Search time cost (ms) comparison:

Query Range Before Optimization After Optimization Improved by N times
1 day 2375 767 3
5 days 5550 807 7
10 days 15364 2746 6
20 days 31265 4531 7

@howardhuanghua
Copy link
Contributor Author

Hi @jimczi , would you please help to check this PR? Thanks a lot.

@astefan astefan added the :Analytics/Aggregations Aggregations label Oct 16, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (:Analytics/Aggregations)

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @howardhuanghua , it's great that you're trying to tackle this. I have some difficulty to understand the logic and all edge cases but that's inherent to this aggregation. However I wonder why you use the inverted lists or the bkd-tree when index sorting is applicable. In this case it should be possible to collect the document like a normal query and just early terminate when we reached a bucket that is after the bottom in the queue.. I started something similar some time ago but never had time to finish it completely. It's appealing because it can early terminate any composite aggregation that uses leading fields that are compatible with the index sort even all fields if that's possible.
It has some restrictions too:

  • can only work on single-valued fields that set missingBucket to false (I think we could handle missingBucket too but I didn't spend too much time on it yet).

Overall I think this approach could be faster than using the inverted lists or the bdk-tree for numerics since it can be applied to several dimensions if they are compatible. So if you have a composite aggregation with three sources like ip, user and device that are also used in the index sorting, the new optimization could early terminate as soon as we see the Nth composite bucket. If the index is sorted by the two leading fields only, the aggregation can early terminate as soon as we reached the bottom device bucket. And that works on any query ;).
Here's the work in progress that I have:
https://github.com/elastic/elasticsearch/compare/master...jimczi:composite_sorted_index?expand=1
I think it's pretty advanced already but it requires some additional tests and a real benchmark. Since you have one that is ready and probably more realistic than what I can come up with would you be ok to run your benchmark with the changes exposed in this branch ?

@howardhuanghua
Copy link
Contributor Author

Hi @jimczi , thanks for your comment. For early termination, we don't need inverted lists or bkd-tree, the early termination only relies on index sorting (if the leading source field name matches the first index sorting field name). It's just the logic you have described -- "In this case it should be possible to collect the document like a normal query and just early terminate when we reached a bucket that is after the bottom in the queue."

We only use inverted lists or bkd-tree to get the first matched doc (start doc id) on leading source, and start to collect doc from this start doc id if the leading source field and index sorting field have the same order.

Currently we only considered the first index sorting field and the first sources field, I will check the details in your provided PR, and give you more feedbacks later.

BTW, do you have suggestions on benchmark tools? We have used Rally before, but it seems doesn't contains composite aggregation items.

@jimczi
Copy link
Contributor

jimczi commented Oct 21, 2019

We only use inverted lists or bkd-tree to get the first matched doc (start doc id) on leading source, and start to collect doc from this start doc id if the leading source field and index sorting field have the same order.

Yep that's the part I understand and my pr is very similar. The difference is that it doesn't need to compute this directly but uses the SearchAfterSortedDocQuery to skip documents if after is set and it also uses the priority queue to detect when early termination is possible.

BTW, do you have suggestions on benchmark tools? We have used Rally before, but it seems doesn't contains composite aggregation items.

We don't benchmark composite aggregation in our nightly benchmark but I don't see a reason why we wouldn't use Rally for this. You can send any type of queries to Rally so a composite aggregation should be testable with this tool. Are you saying that you cannot use Rally at the moment or that we don't have an example on how to setup a query with composite aggregation to benchmark in Rally ?

@howardhuanghua
Copy link
Contributor Author

Thank you @jimczi , I have checked your PR. You build a index sort prefix to use index sort fields as much as possible, and also use search after query to skip documents directly. It seems if index sorting order and source field order mismatch, it could not early terminate collection loop?

The mismatch order case would be handled in my PR. Let me try to use some pictures to explain early termination logic of my PR:
The same order case. Suppose we have index sorting config on field F, and composite aggregation request has source field F and other source field xxx, we only consider leading source field F, follow logic shows early termination point in collection loop:
image

The different order case:
image

Hope you could understand the early termination logic in my PR more clearly. :)
For the benchmark test, we used rally for benchmark testing several times, but we haven't setup customized query to benchmark in Rally before, I will try to investigate, if you have some examples please help to share with me. Thank you.

BTW, would you please share your future plan about this optimization? Is there anything I can do to help?

@jimczi
Copy link
Contributor

jimczi commented Oct 21, 2019

Hope you could understand the early termination logic in my PR more clearly. :)

No worries, that's inherent to the complex logic of this aggregation. You pr is good and I really appreciate all the effort to explain the changes so thanks !

It seems if index sorting order and source field order mismatch, it could not early terminate collection loop?

That's correct. Although we could apply the same kind of logic to my pr to handle the case where the leading source does not the match the order of the index sort ? I revived my pr mainly because it allows to early terminate even if you have a query other than match_all. That's the main achievement I think. I also couldn't find where you handle the case where the order doesn't match. You seem to use this test to check if the optimization is applicable but this seems to handle only the case where both orders match ?

BTW, would you please share your future plan about this optimization? Is there anything I can do to help?

I guess it depends on what we think is the most important. IMO using index sort should allow to early terminate when you have a query other than match_all and I was hoping that your benchmark would also tell the difference in performance between the two approaches.

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the code more carefully and I understand the logic now ;). I think it's great that we can also early terminate when the sort order in the leading source is different. However I wonder if we could merge the two approaches (https://github.com/elastic/elasticsearch/compare/master...jimczi:composite_sorted_index?expand=1 and this pr) in order to provide an optimization that can use leading sources to early terminate rather than only the first one. I can try to adapt your pr to merge my changes or I can help you to do it, whatever suits you. What do you think ?

@howardhuanghua
Copy link
Contributor Author

Thank you @jimczi , appreciate that you've already understood the logic. Yesterday I have enhanced the UT for different order case, and today I am working on using rally to benchmark the overall changes (add customized composite aggregation search in geonames track). I am hoping that we could contribute more to elastic, I would be very pleasure that if you could merge your changes to my PR. :)

jimczi added a commit that referenced this pull request Dec 17, 2019
Co-authored-by: Daniel Huang <danielhuang@tencent.com>

This is a spinoff of #48130 that generalizes the proposal to allow early termination with the composite aggregation when leading sources match a prefix or the entire index sort specification.
In such case the composite aggregation can use the index sort natural order to early terminate the collection when it reaches a composite key that is greater than the bottom of the queue.
The optimization is also applicable when a query other than match_all is provided. However the optimization is deactivated for sources that match the index sort in the following cases:
  * Multi-valued source, in such case early termination is not possible.
  * missing_bucket is set to true
jimczi added a commit that referenced this pull request Dec 17, 2019
Co-authored-by: Daniel Huang <danielhuang@tencent.com>

This is a spinoff of #48130 that generalizes the proposal to allow early termination with the composite aggregation when leading sources match a prefix or the entire index sort specification.
In such case the composite aggregation can use the index sort natural order to early terminate the collection when it reaches a composite key that is greater than the bottom of the queue.
The optimization is also applicable when a query other than match_all is provided. However the optimization is deactivated for sources that match the index sort in the following cases:
  * Multi-valued source, in such case early termination is not possible.
  * missing_bucket is set to true
jimczi added a commit that referenced this pull request Dec 20, 2019
Co-authored-by: Daniel Huang <danielhuang@tencent.com>

This is a spinoff of #48130 that generalizes the proposal to allow early termination with the composite aggregation when leading sources match a prefix or the entire index sort specification.
In such case the composite aggregation can use the index sort natural order to early terminate the collection when it reaches a composite key that is greater than the bottom of the queue.
The optimization is also applicable when a query other than match_all is provided. However the optimization is deactivated for sources that match the index sort in the following cases:
  * Multi-valued source, in such case early termination is not possible.
  * missing_bucket is set to true
@jimczi
Copy link
Contributor

jimczi commented Jan 20, 2020

Superseded by #48399 merged in 7.6

@jimczi jimczi closed this Jan 20, 2020
SivagurunathanV pushed a commit to SivagurunathanV/elasticsearch that referenced this pull request Jan 23, 2020
Co-authored-by: Daniel Huang <danielhuang@tencent.com>

This is a spinoff of elastic#48130 that generalizes the proposal to allow early termination with the composite aggregation when leading sources match a prefix or the entire index sort specification.
In such case the composite aggregation can use the index sort natural order to early terminate the collection when it reaches a composite key that is greater than the bottom of the queue.
The optimization is also applicable when a query other than match_all is provided. However the optimization is deactivated for sources that match the index sort in the following cases:
  * Multi-valued source, in such case early termination is not possible.
  * missing_bucket is set to true
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants