Skip to content

Commit

Permalink
Added a way to read directories on ES from a different index (#543)
Browse files Browse the repository at this point in the history
* Added a way to read directories on ES from a different index
* incremented minor version
  • Loading branch information
rbizos authored Mar 11, 2020
1 parent 837450f commit 921fe2e
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 62 deletions.
84 changes: 83 additions & 1 deletion ELASTICSEARCH_DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,86 @@ driver will use the latest version of the metric metadata.

## Data

_Unsupported for now._
_Unsupported for now._

## Directory index

Another way to handle directory search is by adding another index for directories with this mapping:
```json
{
"_doc": {
"properties": {
"depth": {
"type": "long"
},

"name": {
"type": "keyword",
"ignore_above": 1024
},
"uuid": {
"type": "keyword"
},
"parent": {
"type": "keyword"
}
},
"dynamic_templates": [
{
"strings_as_keywords": {
"match": "p*",
"match_mapping_type": "string",
"mapping": {
"type": "keyword",
"ignore_above": 256,
"ignore_malformed": true
}
}
}
]
}
}
```
The current version of biggraphite only support reading from such an index.

When using it, the search for directory would look like this:

```json
# root.*.prod.*
GET biggraphite_metrics*/_search
{
"size": "0",
"query": {
"bool": {
"filter": [
{
"term": {
"p0": "root"
}
},
{
"term": {
"p2": "prod"
}
},
{
"term": {
"depth": "3"
}
},
]
}
},
"aggs": {
"distinct_dirs": {
"terms": {
"field": "p1",
"size": 10000,
"min_doc_count": 1
}
}
}
}
```
This narrows the search, especially if the directory is close from the root. We still perform aggregation to prevent duplicate accross different indices that would include the directory
If the directory parent is not glob, we can optimise by searching on parent field ([https://www.elastic.co/guide/en/elasticsearch/reference/6.8/tune-for-search-speed.html#_search_as_few_fields_as_possible](url))
165 changes: 105 additions & 60 deletions biggraphite/drivers/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,63 +35,53 @@
from biggraphite import metric as bg_metric
from biggraphite.drivers import _utils
from biggraphite.drivers import ttls
from biggraphite.drivers.cassandra_common import DIRECTORY_SEPARATOR

READ_ON = prometheus_client.Counter(
"bg_elasticsearch_read_on",
"Number of updates on the metric field read_on"
"bg_elasticsearch_read_on", "Number of updates on the metric field read_on"
)
UPDATED_ON = prometheus_client.Counter(
"bg_elasticsearch_updated_on",
"Number of updates on the metric field updated_on"
"bg_elasticsearch_updated_on", "Number of updates on the metric field updated_on"
)

CLEAN_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_clean_latency_seconds",
"clean latency in seconds"
"bg_elasticsearch_clean_latency_seconds", "clean latency in seconds"
)
CREATE_METRIC_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_create_metric_latency_seconds",
"create metric latency in seconds"
"bg_elasticsearch_create_metric_latency_seconds", "create metric latency in seconds"
)
DELETE_DIRECTORY_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_delete_directory_latency_seconds",
"delete directory latency in seconds"
"delete directory latency in seconds",
)
DELETE_METRIC_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_delete_metric_latency_seconds",
"delete metric latency in seconds"
"bg_elasticsearch_delete_metric_latency_seconds", "delete metric latency in seconds"
)
DROP_ALL_METRICS_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_drop_all_metrics_latency_seconds",
"drop all metrics latency in seconds",
)
FLUSH_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_flush_latency_seconds",
"flush latency in seconds"
"bg_elasticsearch_flush_latency_seconds", "flush latency in seconds"
)
GET_INDEX_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_get_index_latency_seconds",
"get index latency in seconds"
"bg_elasticsearch_get_index_latency_seconds", "get index latency in seconds"
)
GLOB_DIRECTORY_NAMES_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_glob_directory_names_latency_seconds",
"glob directory names latency in seconds"
"glob directory names latency in seconds",
)
GLOB_METRICS_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_glob_metrics_latency_seconds",
"glob metrics latency in seconds"
"bg_elasticsearch_glob_metrics_latency_seconds", "glob metrics latency in seconds"
)
METRIC_STATS_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_metric_stats_latency_seconds",
"metric stats latency in seconds"
"bg_elasticsearch_metric_stats_latency_seconds", "metric stats latency in seconds"
)
READ_METRIC_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_read_metric_latency_seconds",
"read metric latency in seconds"
"bg_elasticsearch_read_metric_latency_seconds", "read metric latency in seconds"
)
UPDATE_METRIC_LATENCY = prometheus_client.Summary(
"bg_elasticsearch_update_metric_latency_seconds",
"update metric latency in seconds"
"bg_elasticsearch_update_metric_latency_seconds", "update metric latency in seconds"
)

log = logging.getLogger(__name__)
Expand All @@ -109,7 +99,8 @@

INDEX_DOC_TYPE = "_doc"

DEFAULT_INDEX = "biggraphite_metrics"
DEFAULT_METRIC_INDEX = "biggraphite_metrics"
DEFAULT_DIRECTORY_INDEX = "biggraphite_directories"
DEFAULT_INDEX_SUFFIX = "_%Y-%m-%d"
DEFAULT_HOSTS = ["127.0.0.1"]
DEFAULT_PORT = 9200
Expand All @@ -128,6 +119,7 @@
"username": str,
"password": str,
"index": str,
"directory_index": str,
"index_suffix": str,
"hosts": _utils.list_from_str,
"port": int,
Expand All @@ -136,6 +128,7 @@
"updated_on_ttl_sec": int,
"read_on_ttl_sec": int,
"read_on_sampling_rate": float,
"directory_index_enabled": bool,
}


Expand All @@ -145,23 +138,33 @@ def add_argparse_arguments(parser):
"--elasticsearch_index",
metavar="NAME",
help="elasticsearch index.",
default=DEFAULT_INDEX,
default=DEFAULT_METRIC_INDEX,
)
parser.add_argument(
"--elasticsearch_directory_index",
metavar="NAME",
help="elasticsearch directory index prefix. Needs --using-directory-index",
default=DEFAULT_DIRECTORY_INDEX,
)

parser.add_argument(
"--elasticsearch_directory_index_enabled",
help="use secondary index for directories",
action="store_true",
default=False,
)

parser.add_argument(
"--elasticsearch_index_suffix",
metavar="NAME",
help="elasticsearch index suffix. Supports strftime format.",
default=DEFAULT_INDEX_SUFFIX,
)
parser.add_argument(
"--elasticsearch_username",
help="elasticsearch username.",
default=''
"--elasticsearch_username", help="elasticsearch username.", default=""
)
parser.add_argument(
"--elasticsearch_password",
help="elasticsearch password.",
default=''
"--elasticsearch_password", help="elasticsearch password.", default=""
)
parser.add_argument(
"--elasticsearch_hosts",
Expand Down Expand Up @@ -330,21 +333,25 @@ def __init__(
self,
hosts=DEFAULT_HOSTS,
port=DEFAULT_PORT,
index=DEFAULT_INDEX,
index=DEFAULT_METRIC_INDEX,
index_suffix=DEFAULT_INDEX_SUFFIX,
directory_index=DEFAULT_DIRECTORY_INDEX,
username=None,
password=None,
timeout=DEFAULT_TIMEOUT,
updated_on_ttl_sec=ttls.DEFAULT_UPDATED_ON_TTL_SEC,
read_on_ttl_sec=ttls.DEFAULT_READ_ON_TTL_SEC,
read_on_sampling_rate=DEFAULT_READ_ON_SAMPLING_RATE,
schema_path=DEFAULT_ES_SCHEMA_PATH,
directory_index_enabled=False,
):
"""Create a new ElasticSearchAccessor."""
super(_ElasticSearchAccessor, self).__init__("ElasticSearch")
self._hosts = list(hosts)
self._port = port
self._index_prefix = index
self._directory_index_enabled = directory_index_enabled
self._directory_index = directory_index
self._index_suffix = index_suffix
self._username = username or DEFAULT_USERNAME
self._password = password or DEFAULT_PASSWORD
Expand Down Expand Up @@ -577,6 +584,50 @@ def _search_metrics_from_components(self, glob, components, search=None):
search = search.filter(filter_type, **{"p%d" % i: value})
return False, search

def _search_directory_from_components(self, glob, components, search=None):
"""Assembles a query to search directory names.
Raises:
InvalidArgumentError: If the components include a globstar
"""
# TODO (r.bizos) add unittest with directory index
glob_depth = _get_depth_from_components(components)
if search is None:
search = self._create_search_query()
if components.count(bg_glob.Globstar()):
raise InvalidArgumentError("Directory glob does not handle globstar")

if self._directory_index_enabled and self.__glob_parser.is_fully_defined(
components
):
search = search.filter("term", name=".".join([c[0] for c in components]))

elif self._directory_index_enabled and self.__glob_parser.is_fully_defined(
components[:-1]
):
# fully defined parent, only usable with directory index
search = search.filter(
"term", parent=DIRECTORY_SEPARATOR.join([c[0] for c in components[:-1]])
)

else:
if self._directory_index_enabled:
# When using a second index for directories we don't need range
# aggregation prevent having duplicates across indices
search = search.filter("term", depth=glob_depth)
else:
# Use (glob_depth + 1) to filter only directories and
# exclude metrics whose depth is glob_depth.
search = search.filter("range", depth={"gte": glob_depth + 1})
components = components + [[bg_glob.AnySequence()]]
_, search = self._search_metrics_from_components(glob, components, search)

search = search.extra(from_=0, size=0) # Do not return metrics nor directories
search.aggs.bucket(
"distinct_dirs", "terms", field="p%d" % glob_depth, size=MAX_QUERY_SIZE
)
return search

@GLOB_METRICS_LATENCY.time()
@tracing.trace
def glob_metrics(self, glob, start_time=None, end_time=None):
Expand Down Expand Up @@ -605,8 +656,10 @@ def glob_metrics(self, glob, start_time=None, end_time=None):
log.debug(json.dumps(search.to_dict(), default=str))

documents = search.execute()
results = [self._document_to_metric(document)
for document in self._deduplicate_documents(documents)]
results = [
self._document_to_metric(document)
for document in self._deduplicate_documents(documents)
]
results.sort(key=lambda metric: metric.name)

return results
Expand Down Expand Up @@ -644,26 +697,12 @@ def glob_directory_names(self, glob, start_time=None, end_time=None):
return []

components = self.__glob_parser.parse(glob)
search = self._create_search_query(start_time, end_time)
# There are no "directory" documents, only "metric" documents. Hence appending the
# AnySequence after the provided glob: we search for metrics under that path.
has_globstar, search = self._search_metrics_from_components(
glob, components + [[bg_glob.AnySequence()]], search
)
if has_globstar:
# TODO (t.chataigner) Add a log or raise exception.
return []

glob_depth = _get_depth_from_components(components)
# Use (glob_depth + 1) to filter only directories and
# exclude metrics whose depth is glob_depth.
search = search.filter("range", depth={"gte": glob_depth + 1})
search = search.extra(from_=0, size=0) # Do not return metrics.

search.aggs.bucket(
"distinct_dirs", "terms", field="p%d" % glob_depth, size=MAX_QUERY_SIZE
)
search = self._create_directory_search_query(start_time, end_time)

try:
search = self._search_directory_from_components(glob, components, search)
except InvalidArgumentError:
return []
log.debug(json.dumps(search.to_dict(), default=str))
response = search.execute()

Expand All @@ -673,8 +712,9 @@ def glob_directory_names(self, glob, start_time=None, end_time=None):
if "distinct_dirs" not in response.aggregations:
# This happend when there is no index to search for the query.
return []

buckets = response.aggregations.distinct_dirs.buckets
if glob_depth == 0:
if len(components) == 1:
results = [b.key for b in buckets]
else:
glob_base = glob.rsplit(".", 1)[0]
Expand Down Expand Up @@ -958,14 +998,19 @@ def __update_document(self, data, index, document_id):
ignore=[404, 409],
)

def _create_directory_search_query(
self, start_time=None, end_time=None, index=None
):
if self._directory_index_enabled and not index:
# index is set to change from metric_index to directory_index
index = "%s*" % self._directory_index
end_time = start_time = None # Not implemented in directory index
return self._create_search_query(start_time, end_time, index)

def _create_search_query(self, start_time=None, end_time=None, index=None):
if not index:
index = "%s*" % self._index_prefix
search = (
elasticsearch_dsl.Search()
.using(self.client)
.index(index)
)
search = elasticsearch_dsl.Search().using(self.client).index(index)

if start_time is not None:
# `updated_on` field has a delay before it is updated, so a metric can still be active
Expand Down
Loading

0 comments on commit 921fe2e

Please sign in to comment.