diff --git a/eventdata/challenges/bulk-size-evaluation.json b/eventdata/challenges/bulk-size-evaluation.json index c48101cbf508a..1e2346f65030f 100644 --- a/eventdata/challenges/bulk-size-evaluation.json +++ b/eventdata/challenges/bulk-size-evaluation.json @@ -1,8 +1,12 @@ +{% set p_bulk_indexing_clients = (bulk_indexing_clients | default(8)) %} +{% set p_replica_count = (replica_count | default(0)) %} +{% set p_shard_count = (shard_count | default(2)) %} + { "name": "bulk-size-evaluation", "description": "Indexes with different bulk sizes. IDs are autogenerated by Elasticsearch, meaning there are no conflicts.", "meta": { - "client_count": {{ clients }}, + "client_count": {{ p_bulk_indexing_clients }}, "benchmark_type": "indexing_bulksize" }, "schedule": [ @@ -13,8 +17,8 @@ "operation": { "operation-type": "create-index", "settings": { - "index.number_of_replicas": {{ replica_count }}, - "index.number_of_shards": {{ shard_count }} + "index.number_of_replicas": {{ p_replica_count }}, + "index.number_of_shards": {{ p_shard_count }} } } }, @@ -22,55 +26,55 @@ "operation": "index-append-125", "warmup-time-period": 0, "time-period": 600, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} }, { "operation": "index-append-250", "warmup-time-period": 0, "time-period": 600, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} }, { "operation": "index-append-500", "warmup-time-period": 0, "time-period": 600, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} }, { "operation": "index-append-1000", "warmup-time-period": 0, "time-period": 600, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} }, { "operation": "index-append-2000", "warmup-time-period": 0, "time-period": 600, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} }, { "operation": "index-append-5000", "warmup-time-period": 0, "time-period": 600, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} }, { "operation": "index-append-10000", "warmup-time-period": 0, "time-period": 600, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} }, { "operation": "index-append-20000", "warmup-time-period": 0, "time-period": 600, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} }, { "operation": "index-append-50000", "warmup-time-period": 0, "time-period": 600, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} } ] } \ No newline at end of file diff --git a/eventdata/challenges/combined-indexing-and-querying.json b/eventdata/challenges/combined-indexing-and-querying.json index 996d97e01cc45..5bca39acf34aa 100644 --- a/eventdata/challenges/combined-indexing-and-querying.json +++ b/eventdata/challenges/combined-indexing-and-querying.json @@ -1,3 +1,9 @@ +{% set p_bulk_indexing_clients = (bulk_indexing_clients | default(8)) %} +{% set p_client_count = (8 + (bulk_indexing_clients | default(8))) %} +{% set p_rate_limit_duration_secs = (rate_limit_duration_secs | default(1200)) %} +{% set p_rate_limit_step = (rate_limit_step | default(2)) %} +{% set p_rate_limit_max = (rate_limit_max | default(32)) %} + { "name": "combined-indexing-and-querying", "description": "This challenge simulates a set of Kibana queries against historical data (elasticlogs_q-* indices) as well as against the most recent data currently being indexed. It combined this with rate-limited indexing at varying levels. It assumes one of the challenges creating elasticlogs_q-* indices has been run.", @@ -10,7 +16,10 @@ "operation": "deleteindex_elasticlogs_i-*" }, { - "operation": "fieldstats_elasticlogs_q-*_ELASTICLOGS" + "operation": "fieldstats_elasticlogs_q-*_ELASTICLOGS", + "warmup-iterations": {{ p_client_count }}, + "iterations": {{ p_client_count }}, + "clients": {{ p_client_count }} }, { "operation": "relative-kibana-content_issues-dashboard_75%", @@ -24,7 +33,7 @@ "operation": "relative-kibana-content_issues-dashboard_50%", "target-interval": 60, "warmup-time-period": 0, - "time-period": {{ rate_limit_duration_secs }}, + "time-period": {{ p_rate_limit_duration_secs }}, "meta": { "target_indexing_rate": 0, "query_type": "historic" @@ -38,24 +47,25 @@ }, {# Add some data to index so it does not start empty #} { - "operation": "index-append-1000-elasticlogs_i_write", - "warmup-time-period": {{ rate_limit_duration_secs }}, - "time-period": {{ rate_limit_duration_secs }}, + "operation": "index-append-1000-elasticlogs_i_write", + "time-period": {{ p_rate_limit_duration_secs }}, "target-throughput": 10, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} }, - {% for ops in rate_limit_ops %} + {% for ops in range(p_rate_limit_step, p_rate_limit_max, p_rate_limit_step) %} + + {% set rate = ops * 1000 %} { "parallel": { "warmup-time-period": 0, - "time-period": {{ rate_limit_duration_secs }}, + "time-period": {{ p_rate_limit_duration_secs }}, "tasks": [ { "name": "index-append-1000-elasticlogs_i_write-{{rate}}", "operation": "index-append-1000-elasticlogs_i_write", "target-throughput": {{ ops }}, - "clients": {{ clients }}, + "clients": {{ p_bulk_indexing_clients }}, "meta": { "target_indexing_rate": {{ rate }} } diff --git a/eventdata/challenges/elasticlogs-1bn-load.json b/eventdata/challenges/elasticlogs-1bn-load.json index db53e98937835..2386a6a90c8c0 100644 --- a/eventdata/challenges/elasticlogs-1bn-load.json +++ b/eventdata/challenges/elasticlogs-1bn-load.json @@ -1,8 +1,10 @@ +{% set p_bulk_indexing_clients = (bulk_indexing_clients | default(8)) %} + { "name": "elasticlogs-1bn-load", - "description": "Indexes 1bn documents (20 batches of 50M) into elasticlogs_q-* indices. IDs are autogenerated by Elasticsearch, meaning there are no conflicts.", + "description": "Indexes 1bn documents into elasticlogs_q-* indices. IDs are autogenerated by Elasticsearch, meaning there are no conflicts.", "meta": { - "client_count": {{ clients }}, + "client_count": {{ p_bulk_indexing_clients }}, "benchmark_type": "indexing" }, "schedule": [ @@ -12,36 +14,28 @@ "warmup-iterations": 0, "iterations": 1 }, - {% for n in range(19) %} { - "name": "index-append-1000-elasticlogs_q_write-{{n}}", - "operation": "index-append-1000-elasticlogs_q_write", - "iterations": 50000, - "clients": {{ clients }} - }, - { - "name": "rollover_elasticlogs_q_write_10M-{{n}}", - "operation": "rollover_elasticlogs_q_write_10M", - "iterations": 1 - }, - { - "name": "node_storage-{{n}}", - "operation": "node_storage", - "iterations": 1 - }, - {% endfor %} - { - "operation": "index-append-1000-elasticlogs_q_write", - "iterations": 50000, - "clients": {{ clients }} - }, - { - "operation": "rollover_elasticlogs_q_write_10M", - "iterations": 1 + "parallel": { + "completed-by": "index-append-1000-elasticlogs_q_write", + "tasks": [ + { + "operation": "index-append-1000-elasticlogs_q_write", + "iterations": 1000000, + "clients": {{ p_bulk_indexing_clients }} + }, + { + "operation": "rollover_elasticlogs_q_write_100M", + "clients": 1, + "warmup-iterations": 100000, + "iterations": 100000, + "target-interval": 30 + } + ] + } }, { "operation": "node_storage", "iterations": 1 } ] -} \ No newline at end of file +} diff --git a/eventdata/challenges/elasticlogs-querying.json b/eventdata/challenges/elasticlogs-querying.json index a590c6f8c17b8..dc6dacee359dd 100644 --- a/eventdata/challenges/elasticlogs-querying.json +++ b/eventdata/challenges/elasticlogs-querying.json @@ -8,7 +8,8 @@ "schedule": [ { "operation": "fieldstats_elasticlogs_q-*_ELASTICLOGS", - "iterations": 1 + "iterations": 1, + "clients": 3 }, { "parallel": { diff --git a/eventdata/challenges/generate-historic-data.json b/eventdata/challenges/generate-historic-data.json index e5a8c93aa4ade..27c875d85950e 100644 --- a/eventdata/challenges/generate-historic-data.json +++ b/eventdata/challenges/generate-historic-data.json @@ -1,8 +1,10 @@ +{% set p_bulk_indexing_clients = (bulk_indexing_clients | default(8)) %} + { "name": "generate-historic-data", "description": "Example of how to index 5 days worth of data at 6x speed (20 hours). IDs are autogenerated by Elasticsearch, meaning there are no conflicts.", "meta": { - "client_count": 8, + "client_count": {{ p_bulk_indexing_clients }}, "benchmark_type": "generate-historic-data" }, "schedule": [ @@ -17,7 +19,7 @@ "target-throughput": 19, "warmup-time-period": 0, "time-period": 72000, - "clients": 8 + "clients": {{ p_bulk_indexing_clients }} } ] } \ No newline at end of file diff --git a/eventdata/challenges/shard-size-on-disk.json b/eventdata/challenges/shard-size-on-disk.json index 517a370742952..4d6c1a372157f 100644 --- a/eventdata/challenges/shard-size-on-disk.json +++ b/eventdata/challenges/shard-size-on-disk.json @@ -1,3 +1,5 @@ +{% set p_bulk_indexing_clients = (bulk_indexing_clients | default(8)) %} + { "name": "shard-size-on-disk", "description": "Indexes sets of 2M events into Elasticsearch, followed by index statistics in order to track how index size depends on event count. IDs are autogenerated by Elasticsearch, meaning there are no conflicts. This process is repeatedly run until 100M events have been indexed into the shard.", @@ -24,7 +26,7 @@ "name": "index-append-1000-shard-sizing-{{n}}", "operation": "index-append-1000-shard-sizing", "iterations": 2000, - "clients": 8, + "clients": {{ p_bulk_indexing_clients }}, "meta": { "iteration_number": {{ n }} } @@ -41,7 +43,7 @@ { "operation": "index-append-1000-shard-sizing", "iterations": 2000, - "clients": 8, + "clients": {{ p_bulk_indexing_clients }}, "meta": { "iteration_number": {{ 50 }} } diff --git a/eventdata/challenges/shard-sizing.json b/eventdata/challenges/shard-sizing.json index 7e571e112cc15..ff73df41df4ca 100644 --- a/eventdata/challenges/shard-sizing.json +++ b/eventdata/challenges/shard-sizing.json @@ -1,3 +1,7 @@ +{% set p_bulk_indexing_clients = (bulk_indexing_clients | default(8)) %} +{% set p_shard_sizing_iterations = (shard_sizing_iterations | default(25)) %} +{% set p_shard_sizing_queries = (shard_sizing_queries | default(20)) %} + { "name": "shard-sizing", "description": "Indexes sets of 2M events into Elasticsearch, followed by index statistics and simulated Kibana queries. IDs are autogenerated by Elasticsearch, meaning there are no conflicts. This process is repeatedly run until 50M events have been indexed into the shard. This allows query latency to be evaluated as a function of shard size.", @@ -19,12 +23,12 @@ } } }, - {% for n in range(1,shard_sizing_iterations) %} + {% for n in range(1, p_shard_sizing_iterations) %} { "name": "index-append-1000-shard-sizing-iteration-{{n}}", "operation": "index-append-1000-shard-sizing", "iterations": 2000, - "clients": 8, + "clients": {{ p_bulk_indexing_clients }}, "meta": { "iteration_number": {{ n }} } @@ -40,7 +44,7 @@ { "name": "kibana-traffic-shard-sizing-50%-iteration-{{n}}", "operation": "kibana-traffic-shard-sizing-50%", - "iterations": {{ shard_sizing_queries }}, + "iterations": {{ p_shard_sizing_queries }}, "meta": { "iteration_number": {{ n }} } @@ -48,7 +52,7 @@ { "name": "kibana-traffic-shard-sizing-90%-iteration-{{n}}", "operation": "kibana-traffic-shard-sizing-90%", - "iterations": {{ shard_sizing_queries }}, + "iterations": {{ p_shard_sizing_queries }}, "meta": { "iteration_number": {{ n }} } @@ -56,7 +60,7 @@ { "name": "kibana-content_issues-shard-sizing-50%-iteration-{{n}}", "operation": "kibana-content_issues-shard-sizing-50%", - "iterations": {{ shard_sizing_queries }}, + "iterations": {{ p_shard_sizing_queries }}, "meta": { "iteration_number": {{ n }} } @@ -64,7 +68,7 @@ { "name": "kibana-content_issues-shard-sizing-90%-iteration-{{n}}", "operation": "kibana-content_issues-shard-sizing-90%", - "iterations": {{ shard_sizing_queries }}, + "iterations": {{ p_shard_sizing_queries }}, "meta": { "iteration_number": {{ n }} } @@ -73,44 +77,44 @@ { "operation": "index-append-1000-shard-sizing", "iterations": 2000, - "clients": 8, + "clients": {{ p_bulk_indexing_clients }}, "meta": { - "iteration_number": {{ shard_sizing_iterations }} + "iteration_number": {{ p_shard_sizing_iterations }} } }, { "operation": "indicesstats_elasticlogs", "iterations": 1, "meta": { - "iteration_number": {{ shard_sizing_iterations }} + "iteration_number": {{ p_shard_sizing_iterations }} } }, { "operation": "kibana-traffic-shard-sizing-50%", - "iterations": {{ shard_sizing_queries }}, + "iterations": {{ p_shard_sizing_queries }}, "meta": { - "iteration_number": {{ shard_sizing_iterations }} + "iteration_number": {{ p_shard_sizing_iterations }} } }, { "operation": "kibana-traffic-shard-sizing-90%", - "iterations": {{ shard_sizing_queries }}, + "iterations": {{ p_shard_sizing_queries }}, "meta": { - "iteration_number": {{ shard_sizing_iterations }} + "iteration_number": {{ p_shard_sizing_iterations }} } }, { "operation": "kibana-content_issues-shard-sizing-50%", - "iterations": {{ shard_sizing_queries }}, + "iterations": {{ p_shard_sizing_queries }}, "meta": { - "iteration_number": {{ shard_sizing_iterations }} + "iteration_number": {{ p_shard_sizing_iterations }} } }, { "operation": "kibana-content_issues-shard-sizing-90%", - "iterations": {{ shard_sizing_queries }}, + "iterations": {{ p_shard_sizing_queries }}, "meta": { - "iteration_number": {{ shard_sizing_iterations }} + "iteration_number": {{ p_shard_sizing_iterations }} } } ] diff --git a/eventdata/operations/indexing.json b/eventdata/operations/indexing.json index 18a8525ff4789..495328adddb4b 100644 --- a/eventdata/operations/indexing.json +++ b/eventdata/operations/indexing.json @@ -85,8 +85,8 @@ "settings": { "index.refresh_interval": "5s", "index.codec": "best_compression", - "index.number_of_replicas": {{ replica_count }}, - "index.number_of_shards": {{ shard_count }} + "index.number_of_replicas": {{ replica_count | default(0) }}, + "index.number_of_shards": {{ shard_count | default(2) }} }, "mappings": {% include "mappings.json" %} @@ -105,8 +105,8 @@ "settings": { "index.refresh_interval": "5s", "index.codec": "best_compression", - "index.number_of_replicas": {{ replica_count }}, - "index.number_of_shards": {{ shard_count }} + "index.number_of_replicas": {{ replica_count | default(0) }}, + "index.number_of_shards": {{ shard_count | default(2) }} }, "mappings": {% include "mappings.json" %} @@ -185,4 +185,9 @@ "name": "deleteindex_elasticlogs_i-*", "operation-type": "delete-index", "index": "elasticlogs_i-*" +}, +{ + "name": "deleteindex_elasticlogs_q-*", + "operation-type": "delete-index", + "index": "elasticlogs_q-*" } \ No newline at end of file diff --git a/eventdata/parameter_sources/elasticlogs_bulk_source.py b/eventdata/parameter_sources/elasticlogs_bulk_source.py index 3ce60dbe8917b..5ec52582ab7d9 100755 --- a/eventdata/parameter_sources/elasticlogs_bulk_source.py +++ b/eventdata/parameter_sources/elasticlogs_bulk_source.py @@ -1,7 +1,8 @@ import logging +import random from eventdata.parameter_sources.randomevent import RandomEvent -logger = logging.getLogger("track.elasticlogs") +logger = logging.getLogger("track.eventdata") class ElasticlogsBulkSource: @@ -48,6 +49,7 @@ class ElasticlogsBulkSource: def __init__(self, track, params, **kwargs): self._indices = track.indices self._params = params + self._params = params self._randomevent = RandomEvent(params) self._bulk_size = 1000 @@ -72,6 +74,8 @@ def __init__(self, track, params, **kwargs): self._params['type'] = t if isinstance(t, str) else t.name def partition(self, partition_index, total_partitions): + seed = partition_index * self._params["seed"] if "seed" in self._params else None + random.seed(seed) return self def size(self): @@ -85,7 +89,7 @@ def params(self): bulk_array.append('{"index": {"_index": "%s", "_type": "%s"}}"' % (idx, typ)) bulk_array.append(evt) - response = { "body": "\n".join(bulk_array), "action_metadata_present": True, "bulk-size": self._bulk_size } + response = { "body": "\n".join(bulk_array), "action-metadata-present": True, "bulk-size": self._bulk_size } if "pipeline" in self._params.keys(): response["pipeline"] = self._params["pipeline"] diff --git a/eventdata/parameter_sources/elasticlogs_kibana_source.py b/eventdata/parameter_sources/elasticlogs_kibana_source.py index ab62df548cdff..aa931af6cddee 100755 --- a/eventdata/parameter_sources/elasticlogs_kibana_source.py +++ b/eventdata/parameter_sources/elasticlogs_kibana_source.py @@ -1,3 +1,4 @@ +from eventdata.utils import fieldstats as fs import math import re import json @@ -8,7 +9,7 @@ import datetime import time -logger = logging.getLogger("track.elasticlogs") +logger = logging.getLogger("track.eventdata") available_dashboards = ['traffic', 'content_issues'] @@ -36,18 +37,17 @@ class ElasticlogsKibanaSource: "dashboard" - String indicating which dashboard to simulate. Defaults to 'traffic'. "query_string" - String or list of strings indicating query parameters to randomize during benchmarking. Defaults to "*", If a list has been specified, a random value will be selected. - "index_pattern" - String opr list of strings representing the index pattern to query. Defaults to 'elasticlogs-*'. If a list has - been specified, a random value will be selected. - "fieldstats_id". - fieldstats_id to base relative window definitions on. (not mandatory) + "index_pattern" - String or list of strings representing the index pattern to query. Defaults to 'elasticlogs-*'. If a list has + been specified, a random value will be selected. "window_end" - Specification of aggregation window end or period within which it should end. If one single value is specified, that will be used to anchor the window. If two values are given in a comma separated list, the end of the window will be randomized within this interval. Values can be either absolute or relative: 'now' - Always evaluated to the current timestamp. This is the default value. 'now-1h' - Offset to the current timestamp. Consists of a number and either m (minutes), h (hours) or d (days). '2016-12-20 20:12:32' - Exact timestamp. - 'START' - If an fieldstats_id has been provided, 'START' can be used to reference the start of this interval. - 'END' - If an fieldstats_id has been provided, 'END' can be used to reference the end of this interval. - 'END-40%' - When an interval has been specified through an fieldstats_id, it is possible to express a volume + 'START' - If fieldstats has been run for the index pattern and `@timestamp` field, 'START' can be used to reference the start of this interval. + 'END' - If fieldstats has been run for the index pattern and `@timestamp` field, 'END' can be used to reference the end of this interval. + 'END-40%' - When an interval has been specified based on fieldstats, it is possible to express a volume relative to the size of the interval as a percentage. If we assume the interval covers 10 hours, 'END-40%' represents the timestamp 4 hours (40% of the 10 hour interval) before the END timestamp. "window_length" - String indicating length of the time window to aggregate across. Values can be either absolute @@ -57,6 +57,7 @@ class ElasticlogsKibanaSource: """ def __init__(self, track, params, **kwargs): self._params = params + self._indices = track.indices self._index_pattern = 'elasticlogs-*' self._query_string_list = ['*'] self._dashboard = 'traffic' @@ -75,16 +76,12 @@ def __init__(self, track, params, **kwargs): else: logger.info("[kibana] Illegal dashboard configured ({}). Using default dashboard instead.".format(params['dashboard'])) - if 'fieldstats_id' in params.keys(): - file_name = "{}/.rally/temp/{}.json".format(os.environ['HOME'], params['fieldstats_id']) - if os.path.isfile(file_name): - filedata = open(file_name, 'r').read() - data = json.loads(filedata) - self._fieldstats_start_ms = data['ts_min_ms'] - self._fieldstats_end_ms = data['ts_max_ms'] - self._fieldstats_provided = True - else: - raise ConfigurationError('fieldstats_id does not correspond to exiasting file.') + key = "{}_@timestamp".format(self._index_pattern); + if key in fs.global_fieldstats.keys(): + stats = fs.global_fieldstats[key]; + self._fieldstats_start_ms = stats['min'] + self._fieldstats_end_ms = stats['max'] + self._fieldstats_provided = True else: self._fieldstats_provided = False @@ -114,9 +111,9 @@ def __init__(self, track, params, **kwargs): val = int(math.fabs(float(m2.group(1)) / 100.0) * (self._fieldstats_end_ms - self._fieldstats_start_ms)) self._window_duration_ms = val else: - raise ConfigurationError('Invalid window_length as a percentage ({}) may only be used when fieldstats_id provided.'.format(wli)) + raise ConfigurationError('Invalid window_length as a percentage ({}) may only be used when fieldstats have been provided.'.format(params['window_length'])) else: - raise ConfigurationError('Invalid window_length parameter supplied: {}.'.format(wli)) + raise ConfigurationError('Invalid window_length parameter supplied: {}.'.format(params['window_length'])) # Interpret window specification(s) if 'window_end' in params.keys(): @@ -125,6 +122,8 @@ def __init__(self, track, params, **kwargs): self._window_end = [{'type': 'relative', 'offset_ms': 0}] def partition(self, partition_index, total_partitions): + seed = partition_index * self._params["seed"] if "seed" in self._params else None + random.seed(seed) return self def size(self): diff --git a/eventdata/parameter_sources/interval_query_source.py b/eventdata/parameter_sources/interval_query_source.py new file mode 100644 index 0000000000000..1a61f0ace7f9a --- /dev/null +++ b/eventdata/parameter_sources/interval_query_source.py @@ -0,0 +1,136 @@ +from eventdata.utils import fieldstats as fs +import json +import logging +import random +import copy + +logger = logging.getLogger("track.eventdata") + +class Error(Exception): + """Base class for exceptions in this module.""" + pass + +class ConfigurationError(Error): + """Exception raised for parameter errors. + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message): + self.message = message + +class ParameterSourceError(Error): + """Exception raised for runtime errors. + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message): + self.message = message + +class IntervalQuerySource: + """ + Adds range filter based on input from fieldstats_runner. + + It expects the parameter hash to contain the following keys: + + + "body" - Query body the range filter shouyld be added to. [Mandatory] + "index_pattern" - String representing the index pattern to query. This is used to look up correct field statistics. Defaults to 'filebeat-*'. + "type" - Type to query within the index. Defaults to '*'. + "fieldname" - Name of the field to filter on. Defaults to '@timestamp'. + "min_interval_size_pct" - Minimal percentage of the range to filter on. Parameter must be in the range 0 < min_interval_size_pct < 100. [Mandatory] + "max_interval_size_pct" - Maximum percentage of the range to filter on. Parameter must be in the range min_interval_size_pct < min_interval_size_pct <= 100. [Mandatory] + "cache" - Boolean indicating whether request cache is to be used. Defaults to 'False'. + + This parameter source will take the query supplied through the 'body' parameter and add a range filter to it. This will be based on the field specified + through the 'fieldname' parameter. The upper end of the range (range_max) will be set to the maximum value (fieldname_max) detected for the field through the fieldstats_runner. + The lower end of the range (range_min) will be set based on the following calculation: + + range = fieldname_max - fieldname_min + interval_size_pct = min_interval_size_pct + random(max_interval_size_pct - min_interval_size_pct) + range_min = range_max - (range * interval_size_pct / 100) + + """ + def __init__(self, track, params, **kwargs): + self._index_pattern = "filebeat-*"; + self._fieldname = "@timestamp"; + self._type = "*"; + self._cache = False; + + random.seed(); + if 'cache' in params.keys(): + self._cache = params['cache'] + + if 'index_pattern' in params.keys(): + self._index_pattern = params['index_pattern'] + + if 'type' in params.keys(): + self._type = params['type'] + + if 'fieldname' in params.keys(): + self._fieldname = params['fieldname'] + + if 'body' in params.keys(): + self._body = params['body'] + + # Verify that body contains `query.bool.must` key. + if not params['body'] or not params['body']['query'] or not params['body']['query']['bool'] or ('must' not in params['body']['query']['bool'].keys()): + raise ConfigurationError("Parameter 'body' must contain `query.bool.must` key."); + else: + raise ConfigurationError("Parameter 'body' must be specified."); + + if 'min_interval_size_pct' in params.keys(): + self._min_interval_size_pct = float(params['min_interval_size_pct']); + if self._min_interval_size_pct <= 0: + raise ConfigurationError("Parameter 'min_interval_size_pct' must be > 0."); + if self._min_interval_size_pct >= 100: + raise ConfigurationError("Parameter 'min_interval_size_pct' must be < 100."); + else: + raise ConfigurationError("Parameter 'min_interval_size_pct' must be specified."); + + if 'max_interval_size_pct' in params.keys(): + self._max_interval_size_pct = float(params['max_interval_size_pct']); + if self._max_interval_size_pct <= self._min_interval_size_pct: + raise ConfigurationError("Parameter 'max_interval_size_pct' must be > 'min_interval_size_pct'."); + if self._max_interval_size_pct > 100: + raise ConfigurationError("Parameter 'max_interval_size_pct' must be <= 100."); + else: + raise ConfigurationError("Parameter 'max_interval_size_pct' must be specified."); + + def partition(self, partition_index, total_partitions): + return self + + def size(self): + return 1 + + def params(self): + key = "{}_{}".format(self._index_pattern, self._fieldname); + if key in fs.global_fieldstats.keys(): + stats = fs.global_fieldstats[key]; + else: + raise ParameterSourceError("No statistics found for field `{}` in index pattern `{}`.".format(self._fieldname, self._index_pattern)); + + range_max = stats['max']; + range_min_upper = int(stats['max'] - ((stats['max'] - stats['min']) * self._min_interval_size_pct / 100.0)); + delta = (stats['max'] - stats['min']) * ((self._max_interval_size_pct - self._min_interval_size_pct) / 100.0); + range_min = range_min_upper - int(delta * random.random()); + body = copy.deepcopy(self._body); + + # Rewrite query to include new range filter based on range_min and range_max. + # This assumes that the body contains `query.bool.must` key. + if not isinstance(body['query']['bool']['must'], list): + body['query']['bool']['must'] = [ body['query']['bool']['must'] ]; + + range_clause = {} + range_clause['range'] = {} + range_clause['range'][self._fieldname] = { 'gte': range_min, 'lte': range_max, 'format': 'epoch_millis'} + body['query']['bool']['must'].append(range_clause); + + logger.info("[interval_query] Interval generated for field `{}`: Min: {} [{}, {}], Max: {}".format(self._fieldname, range_min, stats['min'], range_min_upper, range_max)); + + request = { 'index': self._index_pattern, 'type': self._type, 'body': body, 'cache': self._cache}; + + return request; diff --git a/eventdata/parameter_sources/sample_based_bulk_source.py b/eventdata/parameter_sources/sample_based_bulk_source.py index 7da3fc6a42315..1f5e37e2c57df 100755 --- a/eventdata/parameter_sources/sample_based_bulk_source.py +++ b/eventdata/parameter_sources/sample_based_bulk_source.py @@ -11,7 +11,7 @@ import logging -logger = logging.getLogger("track.elasticlogs") +logger = logging.getLogger("track.eventdata") class Error(Exception): """Base class for exceptions in this module.""" @@ -168,6 +168,8 @@ def __init__(self, track, params, **kwargs): self._timestamp_generator = TimestampStructGenerator.StartingPoint(sp) def partition(self, partition_index, total_partitions): + seed = partition_index * self._params["seed"] if "seed" in self._params else None + random.seed(seed) return self def size(self): @@ -181,7 +183,7 @@ def params(self): bulk_array.append({'index': {'_index': idx, '_type': typ}}) bulk_array.append(evt) - response = { "body": bulk_array, "action_metadata_present": True, "bulk-size": self._bulk_size } + response = { "body": bulk_array, "action-metadata-present": True, "bulk-size": self._bulk_size } if "pipeline" in self._params.keys(): response["pipeline"] = params["pipeline"] diff --git a/eventdata/runners/createindex_runner.py b/eventdata/runners/createindex_runner.py index dd0c3f0c9a464..6578b32222f5d 100755 --- a/eventdata/runners/createindex_runner.py +++ b/eventdata/runners/createindex_runner.py @@ -2,7 +2,7 @@ import json import elasticsearch -logger = logging.getLogger("track.elasticlogs") +logger = logging.getLogger("track.eventdata") def createindex(es, params): """ diff --git a/eventdata/runners/fieldstats_runner.py b/eventdata/runners/fieldstats_runner.py index 7c2e4272d516a..19b1d1fd98550 100755 --- a/eventdata/runners/fieldstats_runner.py +++ b/eventdata/runners/fieldstats_runner.py @@ -1,13 +1,7 @@ -import datetime -import os -import json -import sys - +from eventdata.utils import fieldstats as fs import logging -logger = logging.getLogger("track.elasticlogs") - -epoch = datetime.datetime.utcfromtimestamp(0) +logger = logging.getLogger("track.eventdata") class Error(Exception): """Base class for exceptions in this module.""" @@ -15,7 +9,6 @@ class Error(Exception): class ParameterError(Error): """Exception raised for parameter errors. - Attributes: message -- explanation of the error """ @@ -23,60 +16,26 @@ class ParameterError(Error): def __init__(self, message): self.message = message -def __perform_field_stats_lookup(es, index_pattern, field): - result = es.field_stats(index=index_pattern, fields=field) - min_ts = sys.maxsize - max_ts = 0 - - for idx in result['indices'].keys(): - if result['indices'][idx]['fields'][field]['min_value'] < min_ts: - min_ts = result['indices'][idx]['fields'][field]['min_value'] - - if result['indices'][idx]['fields'][field]['max_value'] > max_ts: - max_ts = result['indices'][idx]['fields'][field]['max_value'] - - return min_ts, max_ts - -def __write_to_file(id, data): - dir_name = "{}/.rally/temp".format(os.environ['HOME']) - file_name = "{}/.rally/temp/{}.json".format(os.environ['HOME'], id) - - if not os.path.exists(dir_name): - os.makedirs(dir_name) - - file = open(file_name, 'w+') - - file.write(data) - def fieldstats(es, params): """ - Creates a file with variables in the Rally temporary directory, ~/.rally/temp. - Populates a set of environment variables with statistics around the data matching an index pattern - for use with the Kibana parameter source. Window definition and length can be defined relative the - size of the time period covered by the index pattern. - + Looks up minimum and maximum values for a specified field for an index pattern and stores + this inform ation in a globval variable that can be accessed by other componenets of the track. It expects the parameter hash to contain the following keys: - "fieldstats_id" - String prefix representing this set of variables. Defaults to "ELASTICLOGS". - "index_pattern" - Index pattern statistics are retrieved for. Defaults to "elasticlogs-*". - "timestamp_field" - Timestamp field to extract field stats for. Defaults to @timestamp. - - Based on this the field stats API is called for the index pattern and the highest and lowest timestamp covered - by the index patterns for the @timestamp field is retrieved. A JSON document with the field "index_pattern", - "ts_min_ms" and "ts_max_ms" will be written to a file named .json in the Rally temporary directory. + "index_pattern" - Index pattern statistics are retrieved for. Defaults to "filebeat-*". + "fieldname" - Field to extract statistics for. Defaults to "@timestamp". """ - if 'fieldstats_id' not in params: - params['fieldstats_id'] = 'ELASTICLOGS' - if 'index_pattern' not in params: params['index_pattern'] = 'elasticlogs-*' - if 'timestamp_field' not in params: - params['timestamp_field'] = '@timestamp' - - min_ts, max_ts = __perform_field_stats_lookup(es, params['index_pattern'], params['timestamp_field']) + if 'fieldname' not in params: + params['fieldname'] = '@timestamp' - result = {'ts_min_ms': min_ts, 'ts_max_ms': max_ts} + result = es.search(index=params['index_pattern'], body={"query": {"match_all": {}},"aggs" : {"maxval" : { "max" : { "field" : params['fieldname'] } },"minval" : { "min" : { "field" : params['fieldname'] } }},"size":0}) - __write_to_file(params['fieldstats_id'], json.dumps(result)) + if result['hits']['total'] > 0: + key = "{}_{}".format(params['index_pattern'], params['fieldname']); + fs.global_fieldstats[key] = {'max': int(result['aggregations']['maxval']['value']), 'min': int(result['aggregations']['minval']['value'])}; - return 1, "ops" + logger.info("[fieldstats] Identified statistics for field '{}' in '{}'. Min: {}, Max: {}".format(params['fieldname'], params['index_pattern'], int(result['aggregations']['minval']['value']), int(result['aggregations']['maxval']['value']))) + else: + raise ParameterError("No matching data found for field '{}' in pattern '{}'.".format(params['fieldname'], params['index_pattern'])); diff --git a/eventdata/runners/indicesstats_runner.py b/eventdata/runners/indicesstats_runner.py index 70c194abd701a..d022b3d9bd275 100755 --- a/eventdata/runners/indicesstats_runner.py +++ b/eventdata/runners/indicesstats_runner.py @@ -3,7 +3,7 @@ import logging -logger = logging.getLogger("track.elasticlogs") +logger = logging.getLogger("track.eventdata") def indicesstats(es, params): """ diff --git a/eventdata/runners/kibana_runner.py b/eventdata/runners/kibana_runner.py index ae9d4e2952386..b64995730ccc9 100755 --- a/eventdata/runners/kibana_runner.py +++ b/eventdata/runners/kibana_runner.py @@ -4,56 +4,7 @@ import logging -logger = logging.getLogger("track.elasticlogs") - -def __find_time_interval(query): - interval_found = False - ts_min = 0 - ts_max = 0 - ts_format = "" - field = "" - - if 'query' in query and 'bool' in query['query'] and 'must' in query['query']['bool']: - query_clauses = query['query']['bool']['must'] - for clause in query_clauses: - if 'range' in clause.keys(): - range_clause = clause['range'] - for key in range_clause.keys(): - keys = range_clause[key].keys() - if 'lte' in keys and 'gte' in keys and 'format' in keys: - field = key - ts_min = range_clause[key]['gte'] - ts_max = range_clause[key]['lte'] - ts_format = range_clause[key]['format'] - interval_found = True - - return interval_found, field, ts_min, ts_max, ts_format - -def __index_wildcard(index_spec): - if isinstance(index_spec['index'], str): - if '*' in index_spec['index']: - return True, index_spec['index'] - else: - return False, "" - elif isinstance(index_spec['index'], list) and len(index_spec['index']) == 1: - if '*' in index_spec['index'][0]: - return True, index_spec['index'][0] - else: - return False, "" - -def __perform_field_stats_lookup(es, index_pattern, field, min_val, max_val, fmt): - req_body = { 'fields': [field], 'index_constraints': {}} - req_body['index_constraints'][field] = {'max_value': {'gte': min_val, 'format': fmt}, 'min_value': {'lte': max_val, 'format': fmt}} - result = es.field_stats(index=index_pattern, level='indices', body=req_body) - indices_list = list(result['indices'].keys()) - - if indices_list == None: - return [index_pattern] - else: - return indices_list - -def __get_ms_timestamp(): - return int(round(time.time() * 1000)) +logger = logging.getLogger("track.eventdata") def kibana(es, params): """ @@ -81,39 +32,13 @@ def kibana(es, params): response['weight'] = 1 response['unit'] = "ops" response['visualisation_count'] = visualisations - - try: - field_stat_start = __get_ms_timestamp() - - # Loops through visualisations and calls field stats API for each one without caching, which is what - # Kibana currently does - cache = {} - - for i in range(0,len(request),2): - pattern_found, pattern = __index_wildcard(request[i]) - - if pattern_found: - interval_found, field, ts_min, ts_max, ts_fmt = __find_time_interval(request[i + 1]) - key = "{}-{}-{}".format(pattern, ts_min, ts_max) - - if key in list(cache.keys()): - request[i]['index'] = cache[key] - else: - request[i]['index'] = __perform_field_stats_lookup(es, pattern, field, ts_min, ts_max, ts_fmt) - cache[key] = request[i]['index'] - - field_stat_duration = int(__get_ms_timestamp() - field_stat_start) - response['field_stats_duration_ms'] = field_stat_duration - - except elasticsearch.TransportError as e: - logger.info("[kibana_runner] Error looking up field stats: {}".format(e)) if logger.isEnabledFor(logging.DEBUG): - logger.debug("[kibana_runner] Updated request: {}".format(request)) + logger.debug("[kibana_runner] request: {}".format(request)) result = es.msearch(body = request) if logger.isEnabledFor(logging.DEBUG): - logger.debug("[kibana_runner] response: {}".format(response)) + logger.debug("[kibana_runner] result: {}".format(result)) return response diff --git a/eventdata/runners/nodestorage_runner.py b/eventdata/runners/nodestorage_runner.py index a6cd8ea7d5f67..a3015e583adb0 100755 --- a/eventdata/runners/nodestorage_runner.py +++ b/eventdata/runners/nodestorage_runner.py @@ -3,7 +3,7 @@ import logging -logger = logging.getLogger("track.elasticlogs") +logger = logging.getLogger("track.eventdata") BYTES_PER_TB = 1024 * 1024 * 1024 * 1024 diff --git a/eventdata/track.json b/eventdata/track.json index 701f0bdaff216..f2a7c1e1db47e 100644 --- a/eventdata/track.json +++ b/eventdata/track.json @@ -1,23 +1,7 @@ -{# Define variables to use throughout the template #} - -{# Maximum number of indexing threads to use #} -{% set clients = 20 %} - -{# Rate limit steps (ops) for combined indexing and querying #} -{% set rate_limit_ops = [2,4,6,8,10,12,14,16,18,20,22,24,26,28,30] %} - -{# Duration of each rate limited indexing and querying benchmark in seconds #} -{% set rate_limit_duration_secs = 1200 %} - -{# Number of primary shards and replicas to index into #} -{% set shard_count = 2 %} -{% set replica_count = 0 %} - -{% set shard_sizing_iterations = 25 %} -{% set shard_sizing_queries = 20 %} - {% import "rally.helpers" as rally with context %} +{% set p_bulk_indexing_clients = (bulk_indexing_clients | default(8)) %} + { "version": 2, "description": "Track for simulating different aspects of event-based use cases.", @@ -39,7 +23,7 @@ "default": true, "description": "Runs indexing with a batch size of 1000 for 20 minutes. IDs are autogenerated by Elasticsearch, meaning there are no conflicts.", "meta": { - "client_count": {{ clients }} + "client_count": {{ p_bulk_indexing_clients }} }, "schedule": [ { @@ -49,8 +33,8 @@ "operation": { "operation-type": "create-index", "settings": { - "index.number_of_replicas": {{ replica_count }}, - "index.number_of_shards": {{ shard_count }} + "index.number_of_replicas": {{ replica_count | default(0) }}, + "index.number_of_shards": {{ shard_count | default(2) }} } } }, @@ -58,7 +42,7 @@ "operation": "index-append-1000", "warmup-time-period": 0, "time-period": 1200, - "clients": {{ clients }} + "clients": {{ p_bulk_indexing_clients }} } ] }, diff --git a/eventdata/track.py b/eventdata/track.py index 662534ebb7d50..73950f4bef6b3 100644 --- a/eventdata/track.py +++ b/eventdata/track.py @@ -1,6 +1,7 @@ from eventdata.parameter_sources.elasticlogs_bulk_source import ElasticlogsBulkSource from eventdata.parameter_sources.elasticlogs_kibana_source import ElasticlogsKibanaSource from eventdata.parameter_sources.sample_based_bulk_source import SampleBasedBulkSource +from eventdata.parameter_sources.interval_query_source import IntervalQuerySource from eventdata.runners import rollover_runner from eventdata.runners import createindex_runner from eventdata.runners import kibana_runner @@ -13,6 +14,7 @@ def register(registry): registry.register_param_source("elasticlogs_bulk", ElasticlogsBulkSource) registry.register_param_source("elasticlogs_kibana", ElasticlogsKibanaSource) registry.register_param_source("sample_based_bulk", SampleBasedBulkSource) + registry.register_param_source("interval_query", IntervalQuerySource) registry.register_runner("rollover", rollover_runner.rollover) registry.register_runner("createindex", createindex_runner.createindex) registry.register_runner("kibana", kibana_runner.kibana) diff --git a/eventdata/utils/fieldstats.py b/eventdata/utils/fieldstats.py new file mode 100644 index 0000000000000..8af7442a86883 --- /dev/null +++ b/eventdata/utils/fieldstats.py @@ -0,0 +1,2 @@ +global_fieldstats = {} +