diff --git a/docs/adding_benchmarks.rst b/docs/adding_benchmarks.rst index 3265648fd..55571248c 100644 --- a/docs/adding_benchmarks.rst +++ b/docs/adding_benchmarks.rst @@ -80,77 +80,85 @@ Ensure to create a file called "README.txt" which can contain more information a Upload all three files to a place where it is publicly available. We choose ``http://benchmarks.elastic.co/corpora/geonames`` for this example. For initial local testing you can also place all files in the data directory, which is located below the root directory you specified when initially configuring Rally. Let's say you specified ``/Users/daniel/benchmarks`` as root directory. Then you have to place the data for a track with the name "geonames" in ``/Users/daniel/benchmarks/data/geonames`` so Rally can pick it up. Additionally, you have to specify the ``--offline`` option when running Rally so it does not try to download any benchmark data. -Finally, add a new Python source file in Rally's project directory. By convention, the file should be called "$BENCHMARK_NAME_track.py", so for our example the file is called "geonames_track.py". It is placed in "esrally/track/". :: +Finally, add a new JSON file in Rally's project directory. By convention, the file should be called "$BENCHMARK_NAME.json", so for our example the file is called "geonames.json". It is placed in "esrally/track/". :: + + { + "meta": { + "short-description": "Standard benchmark in Rally (8.6M POIs from Geonames)", + "description": "This test indexes 8.6M documents (POIs from Geonames, total 2.8 GB json) using 8 client threads and 5000 docs per bulk request against Elasticsearch", + "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames" + }, + "indices": [ + { + "name": "geonames", + "types": [ + { + "name": "type", + "mapping": "mappings.json", + "documents": "documents.json.bz2", + "document-count": 8647880, + "compressed-bytes": 197857614, + "uncompressed-bytes": 2790927196 + } + ] + } + ], + "operations": [ + { + "name": "index-append-default-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0 + }, + "bulk-size": 5000, + "force-merge": true, + "clients": { + "count": 8 + } + }, + { + "name": "search", + "type": "search", + "warmup-iterations": 1000, + "iterations": 1000, + "clients": { + "count": 1 + }, + "queries": [ + { + "name": "default", + "body": { + "query": { + "match_all": {} + } + } + } + ] + } + ], + "challenges": [ + { + "name": "append-no-conflicts", + "description": "", + "schedule": [ + "index-append-default-settings", + "search" + ] + } + ] + } - from esrally.track import track - GEONAMES_INDEX_NAME = "geonames" - - class SampleQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "sample") - - def run(self, es): - return es.search(index=GEONAMES_INDEX_NAME) - - geonamesTrackSpec = track.Track( - name="geonames", - short_description="Demo benchmark", - description="This test indexes 8.6M documents (POIs from Geonames, total 2.8 GB json) using 8 client threads and 5000 docs per bulk " - "request against Elasticsearch", - source_root_url="http://benchmarks.elastic.co/corpora/geonames", - index_name=GEONAMES_INDEX_NAME, - type_name="type", - number_of_documents=8647880, - compressed_size_in_bytes=197857614, - uncompressed_size_in_bytes=2790927196, - document_file_name="documents.json.bz2", - mapping_file_name="mappings.json", - # Queries to use in the search benchmark - queries=[SampleQuery()], - challenges=track.challenges - - -In case you want to add multiple indices this is possible too. The same track needs to specified as follows then: :: - - - from esrally.track import track - - GEONAMES_INDEX_NAME = "geonames" - - class SampleQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "sample") - - def run(self, es): - return es.search(index=GEONAMES_INDEX_NAME) - - geonamesTrackSpec = track.Track( - name="geonames", - short_description="Demo benchmark", - description="This test indexes 8.6M documents (POIs from Geonames, total 2.8 GB json) using 8 client threads and 5000 docs per bulk " - "request against Elasticsearch", - source_root_url="http://benchmarks.elastic.co/corpora/geonames", - indices=[ - track.Index(name=GEONAMES_INDEX_NAME, types=[ - track.Type( - name="type", - mapping_file_name="mappings.json", - document_file_name="documents.json.bz2", - number_of_documents=8647880, - compressed_size_in_bytes=197857614, - uncompressed_size_in_bytes=2790927196) - ]) - ], - # Queries to use in the search benchmark - queries=[SampleQuery()], - challenges=track.challenges) A few things to note: -* You can either use the standard challenges provided with Rally or add your own. Note that Rally assumes that the challenge that should be run by default is called "append-no-conflicts". It is possible to not use this name but it is more convenient for users. Otherwise, they have to provide the command line option ``--challenge``. +* Rally assumes that the challenge that should be run by default is called "append-no-conflicts". If you want to run a different challenge, provide the command line option ``--challenge=YOUR_CHALLENGE_NAME``. * You can add as many queries as you want. We use the `official Python Elasticsearch client `_ to issue queries. -* The numbers are needed to verify integrity and provide progress reports. +* The numbers below the ``types`` property are needed to verify integrity and provide progress reports. + +.. note:: + + We have defined a `JSON schema for tracks `_ which you can use the check how to define your track. You should also check the tracks provided by Rally for inspiration. When you invoke ``esrally list tracks``, the new track should now appear:: @@ -165,8 +173,8 @@ When you invoke ``esrally list tracks``, the new track should now appear:: Available tracks: Name Description Challenges - ---------- -------------------------------------------------------- ----------------------------------------------------------------------- - geonames Standard benchmark in Rally (8.6M POIs from Geonames) append-no-conflicts,append-fast-no-conflicts,append-fast-with-conflicts + ---------- -------------------------------------------------------- ------------------- + geonames Standard benchmark in Rally (8.6M POIs from Geonames) append-no-conflicts Congratulations, you have created your first track! You can test it with ``esrally --track=geonames`` (or whatever the name of your track is) and run specific challenges with ``esrally --track=geonames --challenge=append-fast-with-conflicts``. diff --git a/esrally/driver.py b/esrally/driver.py index 97741d098..c3d26a9eb 100644 --- a/esrally/driver.py +++ b/esrally/driver.py @@ -59,39 +59,43 @@ def __init__(self, cfg, clock, track, challenge, cluster, phase): class LatencyBenchmark(Benchmark): - def __init__(self, cfg, clock, track, challenge, cluster, phase, queries, repetitions=1000): + def __init__(self, cfg, clock, track, challenge, cluster, phase, queries, warmup_repetitions=1000, repetitions=1000): Benchmark.__init__(self, cfg, clock, track, challenge, cluster, phase) self.queries = queries + self.warmup_repetitions = warmup_repetitions self.repetitions = repetitions self.stop_watch = self.clock.stop_watch() def run(self): logger.info("Running warmup iterations") - self._run_benchmark(" Benchmarking %s (warmup iteration %d/%d)") + self._run_benchmark(" Benchmarking %s (warmup iteration %d/%d)", self.warmup_repetitions) logger.info("Running measurement iterations") - times = self._run_benchmark(" Benchmarking %s (iteration %d/%d)") + times = self._run_benchmark(" Benchmarking %s (iteration %d/%d)", self.repetitions) for q in self.queries: latencies = [t[q.name] for t in times] for latency in latencies: self.metrics_store.put_value_cluster_level("query_latency_%s" % q.name, convert.seconds_to_ms(latency), "ms") - def _run_benchmark(self, message): + def _run_benchmark(self, message, repetitions): times = [] quiet = self.quiet_mode if not quiet: - self._print_progress(message, 0) - for iteration in range(1, self.repetitions + 1): + self._print_progress(message, 0, repetitions) + for iteration in range(1, repetitions + 1): if not quiet: - self._print_progress(message, iteration) + self._print_progress(message, iteration, repetitions) times.append(self._run_one_round()) self.progress.finish() return times - def _print_progress(self, message, iteration): - if iteration % (self.repetitions // 20) == 0: - progress_percent = round(100 * iteration / self.repetitions) - self.progress.print(message % (self.phase.name, iteration, self.repetitions), "[%3d%% done]" % progress_percent) + def _print_progress(self, message, iteration, repetitions): + if repetitions == 0: + self.progress.print(message % (self.phase.name, iteration, repetitions), "[100% done]") + else: + progress_percent = round(100 * iteration / repetitions) + if ((100 * iteration) / repetitions) % 5 == 0: + self.progress.print(message % (self.phase.name, iteration, repetitions), "[%3d%% done]" % progress_percent) def _run_one_round(self): d = {} @@ -107,7 +111,9 @@ def _run_one_round(self): class SearchBenchmark(LatencyBenchmark): def __init__(self, cfg, clock, t, challenge, cluster): - super().__init__(cfg, clock, t, challenge, cluster, track.BenchmarkPhase.search, t.queries, + super().__init__(cfg, clock, t, challenge, cluster, track.BenchmarkPhase.search, + challenge.benchmark[track.BenchmarkPhase.search].queries, + challenge.benchmark[track.BenchmarkPhase.search].warmup_iteration_count, challenge.benchmark[track.BenchmarkPhase.search].iteration_count) @@ -134,7 +140,8 @@ def __init__(self, cfg, clock, t, challenge, cluster): super().__init__(cfg, clock, t, challenge, cluster, track.BenchmarkPhase.stats, [ StatsQueryAdapter("indices_stats", cluster.indices_stats, metric="_all", level="shards"), StatsQueryAdapter("node_stats", cluster.nodes_stats, metric="_all", level="shards"), - ], challenge.benchmark[track.BenchmarkPhase.stats].iteration_count) + ], challenge.benchmark[track.BenchmarkPhase.stats].warmup_iteration_count, + challenge.benchmark[track.BenchmarkPhase.stats].iteration_count) class IndexedDocumentCountProbe: @@ -226,7 +233,7 @@ def __init__(self, config, clock, track, challenge, cluster): super().__init__(config, clock, track, challenge, cluster) def index_documents(self, ids): - num_client_threads = self.challenge.clients + num_client_threads = self.challenge.benchmark[track.BenchmarkPhase.index].clients logger.info("Launching %d client bulk indexing threads" % num_client_threads) self.stop_watch.start() diff --git a/esrally/racecontrol.py b/esrally/racecontrol.py index 9798975ce..44da49512 100644 --- a/esrally/racecontrol.py +++ b/esrally/racecontrol.py @@ -261,9 +261,9 @@ def start(self, command): if command == "list": self._list(ctx) elif command == "race": - pipeline = self._choose(pipelines, "pipeline", + pipeline = self._choose(lambda n: pipelines[n], "pipeline", "You can list the available pipelines with %s list pipelines." % PROGRAM_NAME)(ctx) - t = self._choose(track.tracks, "track", "You can list the available tracks with %s list tracks." % PROGRAM_NAME) + t = self._choose(lambda n: ctx.track_reader.read(n), "track", "You can list the available tracks with %s list tracks." % PROGRAM_NAME) metrics.race_store(self._config).store_race(t) pipeline.run(t) return True @@ -287,11 +287,11 @@ def start(self, command): logging.exception("A fatal error occurred while the running benchmark.") raise e - def _choose(self, source, what, help): + def _choose(self, loader, what, help): + name = self._config.opts("system", what) try: - name = self._config.opts("system", what) - return source[name] - except KeyError: + return loader(name) + except (KeyError, FileNotFoundError): raise exceptions.ImproperlyConfigured("Unknown %s [%s]. %s" % (what, name, help)) def _list(self, ctx): @@ -302,8 +302,9 @@ def _list(self, ctx): print("\nKeep in mind that each telemetry device may incur a runtime overhead which can skew results.") elif what == "tracks": print("Available tracks:\n") - print(tabulate.tabulate([[t.name, t.short_description, ",".join(map(str, t.challenges))] for t in track.tracks.values()], - headers=["Name", "Description", "Challenges"])) + print(tabulate.tabulate( + tabular_data=[[t.name, t.short_description, ",".join(map(str, t.challenges))] for t in ctx.track_reader.all_tracks()], + headers=["Name", "Description", "Challenges"])) elif what == "pipelines": print("Available pipelines:\n") @@ -318,8 +319,7 @@ def _list(self, ctx): print(tabulate.tabulate(races, headers=["Race Timestamp", "Track", "Challenge", "Car", "User Tag"])) elif what == "cars": print("Available cars:\n") - print(tabulate.tabulate([[car.name] for car in track.cars], - headers=["Name"])) + print(tabulate.tabulate([[car.name] for car in track.cars], headers=["Name"])) else: raise exceptions.ImproperlyConfigured("Cannot list unknown configuration option [%s]" % what) @@ -331,3 +331,4 @@ def __init__(self, cfg): self.marshal = track.Marshal(cfg) self.reporter = reporter.SummaryReporter(cfg) self.sweeper = sweeper.Sweeper(cfg) + self.track_reader = track.TrackFileReader(cfg) diff --git a/esrally/reporter.py b/esrally/reporter.py index f7975e4be..948d1b8eb 100644 --- a/esrally/reporter.py +++ b/esrally/reporter.py @@ -144,9 +144,9 @@ def report(self, t): store.open(invocation, t.name, challenge.name, selected_car) stats = Stats(store, - self.sample_size(challenge, track.BenchmarkPhase.stats), - t.queries, - self.sample_size(challenge, track.BenchmarkPhase.search)) + self.guarded(lambda: challenge.benchmark[track.BenchmarkPhase.stats].iteration_count), + self.guarded(lambda: challenge.benchmark[track.BenchmarkPhase.search].queries), + self.guarded(lambda: challenge.benchmark[track.BenchmarkPhase.search].iteration_count)) metrics_table = [] if track.BenchmarkPhase.index in challenge.benchmark: @@ -169,10 +169,10 @@ def report(self, t): print_internal(tabulate.tabulate(metrics_table, headers=["Metric", "Value"], numalign="right", stralign="right")) - def sample_size(self, challenge, benchmark_phase): - if track.BenchmarkPhase.stats in challenge.benchmark: - return challenge.benchmark[benchmark_phase].iteration_count - else: + def guarded(self, op): + try: + return op() + except KeyError: return None def report_index_throughput(self, stats): diff --git a/esrally/track/__init__.py b/esrally/track/__init__.py index 23ee9fe54..e69de29bb 100644 --- a/esrally/track/__init__.py +++ b/esrally/track/__init__.py @@ -1,6 +0,0 @@ -from os.path import dirname, basename, isfile -import glob - -# "Autodiscover" all available tracks -modules = glob.glob(dirname(__file__) + "/*.py") -__all__ = [basename(f)[:-3] for f in modules if isfile(f)] diff --git a/esrally/track/geonames.json b/esrally/track/geonames.json new file mode 100644 index 000000000..5e6a76696 --- /dev/null +++ b/esrally/track/geonames.json @@ -0,0 +1,196 @@ +{ + "meta": { + "short-description": "Standard benchmark in Rally (8.6M POIs from Geonames)", + "description": "This test indexes 8.6M documents (POIs from Geonames, total 2.8 GB json) using 8 client threads and 5000 docs per bulk request against Elasticsearch", + "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames" + }, + "indices": [ + { + "name": "geonames", + "types": [ + { + "name": "type", + "mapping": "mappings.json", + "documents": "documents.json.bz2", + "document-count": 8647880, + "compressed-bytes": 197857614, + "uncompressed-bytes": 2790927196 + } + ] + } + ], + "operations": [ + { + "name": "index-append-default-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0 + }, + "bulk-size": 5000, + "force-merge": true, + "clients": { + "count": 8 + } + }, + { + "name": "index-append-fast-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0, + "index.refresh_interval": "30s", + "index.number_of_shards": 6, + "index.translog.flush_threshold_size": "4g" + }, + "force-merge": true, + "bulk-size": 5000, + "clients": { + "count": 8 + } + }, + { + "name": "index-append-update-fast-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0, + "index.refresh_interval": "30s", + "index.number_of_shards": 6, + "index.translog.flush_threshold_size": "4g" + }, + "bulk-size": 5000, + "force-merge": true, + "conflicts": "sequential", + "clients": { + "count": 8 + } + }, + { + "name": "stats", + "type": "stats", + "warmup-iterations": 100, + "iterations": 100, + "clients": { + "count": 1 + } + }, + { + "name": "search", + "type": "search", + "target-throughput": 1, + "warmup-iterations": 1000, + "iterations": 1000, + "clients": { + "count": 1 + }, + "queries": [ + { + "name": "default", + "body": { + "query": { + "match_all": {} + } + } + }, + { + "name": "term", + "body": { + "query": { + "term": { + "country_code": "AT" + } + } + } + }, + { + "name": "phrase", + "body": { + "query": { + "match_phrase": { + "name": "Sankt Georgen" + } + } + } + }, + { + "name": "country_agg_uncached", + "cache": false, + "body": { + "size": 0, + "aggs": { + "country_population": { + "terms": { + "field": "country_code" + }, + "aggs": { + "sum_population": { + "sum": { + "field": "population" + } + } + } + } + } + } + }, + { + "name": "country_agg_cached", + "cache": true, + "body": { + "size": 0, + "aggs": { + "country_population": { + "terms": { + "field": "country_code" + }, + "aggs": { + "sum_population": { + "sum": { + "field": "population" + } + } + } + } + } + } + }, + { + "name": "scroll", + "query-type": "scroll", + "pages": 25, + "results-per-page": 1000, + "body": { + "query": { + "match_all": {} + } + } + } + ] + } + ], + "challenges": [ + { + "name": "append-no-conflicts", + "description": "", + "schedule": [ + "index-append-default-settings", + "stats", + "search" + ] + }, + { + "name": "append-fast-no-conflicts", + "description": "", + "schedule": [ + "index-append-fast-settings" + ] + }, + { + "name": "append-fast-with-conflicts", + "description": "", + "schedule": [ + "index-append-update-fast-settings" + ] + } + + ] +} + diff --git a/esrally/track/geonames_track.py b/esrally/track/geonames_track.py deleted file mode 100644 index a0c7fe9d1..000000000 --- a/esrally/track/geonames_track.py +++ /dev/null @@ -1,119 +0,0 @@ -from esrally.track import track - -GEO_NAMES_INDEX_NAME = "geonames" -GEO_NAMES_TYPE_NAME = "type" - - -class DefaultQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "default") - - def run(self, es): - return es.search(index=GEO_NAMES_INDEX_NAME) - - -class TermQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "term") - - def run(self, es): - return es.search(index=GEO_NAMES_INDEX_NAME, doc_type=GEO_NAMES_TYPE_NAME, q="country_code:AT") - - -class PhraseQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "phrase") - - def run(self, es): - return es.search(index=GEO_NAMES_INDEX_NAME, doc_type=GEO_NAMES_TYPE_NAME, body=''' -{ - "query": { - "match_phrase": { - "name": "Sankt Georgen" - } - } -}''') - - -class CountryAggQuery(track.Query): - def __init__(self, suffix="", use_request_cache=True): - track.Query.__init__(self, "country_agg" + suffix) - self.use_request_cache = use_request_cache - - def run(self, es): - return es.search(index=GEO_NAMES_INDEX_NAME, doc_type=GEO_NAMES_TYPE_NAME, - request_cache=self.use_request_cache, body=''' - { - "size": 0, - "aggs": { - "country_population": { - "terms": { - "field": "country_code" - }, - "aggs": { - "sum_population": { - "sum": { - "field": "population" - } - } - } - } - } - }''') - - -class ScrollQuery(track.Query): - PAGES = 25 - ITEMS_PER_PAGE = 1000 - - def __init__(self): - track.Query.__init__(self, "scroll", normalization_factor=self.PAGES) - self.scroll_id = None - - def run(self, es): - r = es.search( - index=GEO_NAMES_INDEX_NAME, - doc_type=GEO_NAMES_TYPE_NAME, - sort="_doc", - scroll="10s", - size=self.ITEMS_PER_PAGE) - self.scroll_id = r["_scroll_id"] - # Note that starting with ES 2.0, the initial call to search() returns already the first result page - # so we have to retrieve one page less - for i in range(self.PAGES - 1): - hit_count = len(r["hits"]["hits"]) - if hit_count == 0: - # done - break - r = es.scroll(scroll_id=self.scroll_id, scroll="10s") - - def close(self, es): - if self.scroll_id: - es.clear_scroll(scroll_id=self.scroll_id) - self.scroll_id = None - - -geonamesTrackSpec = track.Track( - name="geonames", - short_description="Standard benchmark in Rally (8.6M POIs from Geonames)", - description="This test indexes 8.6M documents (POIs from Geonames, total 2.8 GB json) using 8 client threads and 5000 docs per bulk " - "request against Elasticsearch", - source_root_url="http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames", - index_name=GEO_NAMES_INDEX_NAME, - type_name=GEO_NAMES_TYPE_NAME, - number_of_documents=8647880, - compressed_size_in_bytes=197857614, - uncompressed_size_in_bytes=2790927196, - document_file_name="documents.json.bz2", - mapping_file_name="mappings.json", - # Queries to use in the search benchmark - queries=[ - DefaultQuery(), - TermQuery(), - PhraseQuery(), - CountryAggQuery(use_request_cache=False), - CountryAggQuery(suffix="_cached", use_request_cache=True), - ScrollQuery() - ], - challenges=track.challenges -) diff --git a/esrally/track/geopoint.json b/esrally/track/geopoint.json new file mode 100644 index 000000000..dbcab075a --- /dev/null +++ b/esrally/track/geopoint.json @@ -0,0 +1,168 @@ +{ + "meta": { + "short-description": "60.8M POIs from PlanetOSM", + "description": "This test indexes 60.8M documents (POIs from PlanetOSM, total 2.3 GB json) using 8 client threads and 5000 docs per bulk request against Elasticsearch", + "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geopoint" + }, + "indices": [ + { + "name": "osmgeopoints", + "types": [ + { + "name": "type", + "mapping": "mappings.json", + "documents": "documents.json.bz2", + "document-count": 60844404, + "compressed-bytes": 505295401, + "uncompressed-bytes": 2448564579 + } + ] + } + ], + "operations": [ + { + "name": "index-append-default-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0 + }, + "bulk-size": 5000, + "force-merge": true, + "clients": { + "count": 8 + } + }, + { + "name": "index-append-fast-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0, + "index.refresh_interval": "30s", + "index.number_of_shards": 6, + "index.translog.flush_threshold_size": "4g" + }, + "force-merge": true, + "bulk-size": 5000, + "clients": { + "count": 8 + } + }, + { + "name": "index-append-update-fast-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0, + "index.refresh_interval": "30s", + "index.number_of_shards": 6, + "index.translog.flush_threshold_size": "4g" + }, + "bulk-size": 5000, + "force-merge": true, + "conflicts": "sequential", + "clients": { + "count": 8 + } + }, + { + "name": "stats", + "type": "stats", + "warmup-iterations": 100, + "iterations": 100, + "clients": { + "count": 1 + } + }, + { + "name": "search", + "type": "search", + "target-throughput": 1, + "warmup-iterations": 1000, + "iterations": 1000, + "clients": { + "count": 1 + }, + "queries": [ + { + "name": "polygon", + "body": { + "query" : { + "geo_polygon" : { + "location" : { + "points" : [[-0.1, 49.0], + [5.0, 48.0], + [15.0, 49.0], + [14.0, 60.0], + [-0.1, 61.0], + [-0.1, 49.0]] + } + } + } + } + }, + { + "name": "bbox", + "body": { + "query" : { + "geo_bounding_box" : { + "location" : { + "top_left" : [-0.1, 61.0], + "bottom_right" : [15.0, 48.0] + } + } + } + } + }, + { + "name": "distance", + "body": { + "query" : { + "geo_distance" : { + "distance" : "200km", + "location" : [7.0, 55.0] + } + } + } + }, + { + "name": "distanceRange", + "body": { + "query" : { + "geo_distance_range" : { + "from" : "200km", + "to" : "400km", + "location" : [7.0, 55.0] + } + } + } + } + ] + } + ], + "challenges": [ + { + "name": "append-no-conflicts", + "description": "", + "schedule": [ + "index-append-default-settings", + "stats", + "search" + ] + }, + { + "name": "append-fast-no-conflicts", + "description": "", + "schedule": [ + "index-append-fast-settings" + ] + }, + { + "name": "append-fast-with-conflicts", + "description": "", + "schedule": [ + "index-append-update-fast-settings" + ] + } + + ] +} + diff --git a/esrally/track/geopoint_track.py b/esrally/track/geopoint_track.py deleted file mode 100644 index b77778765..000000000 --- a/esrally/track/geopoint_track.py +++ /dev/null @@ -1,104 +0,0 @@ -from esrally.track import track - -GEO_POINT_INDEX_NAME = "osmgeopoints" -GEO_POINT_TYPE_NAME = "type" - - -class DefaultQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "default") - - def run(self, es): - return es.search(index=GEO_POINT_INDEX_NAME) - - -class BBoxQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "bbox") - - def run(self, es): - return es.search(index=GEO_POINT_INDEX_NAME, doc_type=GEO_POINT_TYPE_NAME, body=''' - { - "query" : { - "geo_bounding_box" : { - "location" : { - "top_left" : [-0.1, 61.0], - "bottom_right" : [15.0, 48.0] - } - } - } - }''') - - -class DistanceQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "distance") - - def run(self, es): - return es.search(index=GEO_POINT_INDEX_NAME, doc_type=GEO_POINT_TYPE_NAME, body=''' - { - "query" : { - "geo_distance" : { - "distance" : "200km", - "location" : [7.0, 55.0] - } - } - }''') - - -class DistanceRangeQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "distanceRange") - - def run(self, es): - return es.search(index=GEO_POINT_INDEX_NAME, doc_type=GEO_POINT_TYPE_NAME, body=''' - { - "query" : { - "geo_distance_range" : { - "from" : "200km", - "to" : "400km", - "location" : [7.0, 55.0] - } - } - }''') - - -class PolygonQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "polygon") - - def run(self, es): - return es.search(index=GEO_POINT_INDEX_NAME, doc_type=GEO_POINT_TYPE_NAME, body=''' - { - "query" : { - "geo_polygon" : { - "location" : { - "points" : [[-0.1, 49.0], - [5.0, 48.0], - [15.0, 49.0], - [14.0, 60.0], - [-0.1, 61.0], - [-0.1, 49.0]] - } - } - } - }''') - - -geopointTrackSpec = track.Track( - name="geopoint", - short_description="60.8M POIs from PlanetOSM", - description="This test indexes 60.8M documents (POIs from PlanetOSM, total 2.3 GB json) using 8 client threads and 5000 docs per bulk " - "request against Elasticsearch", - source_root_url="http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geopoint", - index_name=GEO_POINT_INDEX_NAME, - type_name=GEO_POINT_TYPE_NAME, - number_of_documents=60844404, - compressed_size_in_bytes=505295401, - uncompressed_size_in_bytes=2448564579, - document_file_name="documents.json.bz2", - mapping_file_name="mappings.json", - # Queries to use in the search benchmark - queries=[PolygonQuery(), BBoxQuery(), DistanceQuery(), DistanceRangeQuery()], - challenges=track.challenges -) diff --git a/esrally/track/logging_track.py b/esrally/track/logging_track.py deleted file mode 100644 index 5495e103c..000000000 --- a/esrally/track/logging_track.py +++ /dev/null @@ -1,173 +0,0 @@ -from datetime import datetime -from esrally.track import track - -# actually this should be one index per day (for testing Instant Kibana) -LOGGING_INDEX_PREFIX = "logs-" -LOGGING_INDEX_PATTERN = "%s*" % LOGGING_INDEX_PREFIX -LOGGING_TYPE_NAME = "type" - - -class DefaultQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "default") - - def run(self, es): - return es.search(index=LOGGING_INDEX_PATTERN) - - -class TermQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "term") - - def run(self, es): - # TODO dm: Fix me: Add a proper term query here - return es.search(index=LOGGING_INDEX_PATTERN, doc_type=LOGGING_TYPE_NAME, q="") - - -class RangeQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "range") - first_day = datetime.strptime("26.04.1998", "%d.%m.%Y") - now = datetime.now() - # always query all but the first 10 days in the data set (there are 88 days) - self.diff_days = (now - first_day).days - 10 - - def run(self, es): - return es.search(index=LOGGING_INDEX_PATTERN, doc_type=LOGGING_TYPE_NAME, body=''' -{ - "query": { - "range": { - "@timestamp": { - "gte": "now-%dd/d", - "lt": "now/d" - } - } - } -} -''' % self.diff_days) - - -#TODO dm: Turn off request cache? -class HourlyAggQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "hourly_agg") - - def run(self, es): - return es.search(index=LOGGING_INDEX_PATTERN, doc_type=LOGGING_TYPE_NAME, body=''' - { - "size": 0, - "aggs": { - "by_hour": { - "date_histogram": { - "field": "@timestamp", - "interval": "hour" - } - } - } -}''') - - -class ScrollQuery(track.Query): - PAGES = 25 - ITEMS_PER_PAGE = 1000 - - def __init__(self): - track.Query.__init__(self, "scroll", normalization_factor=self.PAGES) - self.scroll_id = None - - def run(self, es): - r = es.search( - index=LOGGING_INDEX_PATTERN, - doc_type=LOGGING_TYPE_NAME, - sort="_doc", - scroll="10s", - size=self.ITEMS_PER_PAGE) - self.scroll_id = r["_scroll_id"] - # Note that starting with ES 2.0, the initial call to search() returns already the first result page - # so we have to retrieve one page less - for i in range(self.PAGES - 1): - hit_count = len(r["hits"]["hits"]) - if hit_count == 0: - # done - break - r = es.scroll(scroll_id=self.scroll_id, scroll="10s") - - def close(self, es): - if self.scroll_id: - es.clear_scroll(scroll_id=self.scroll_id) - self.scroll_id = None - - - -logging_indices = [ - # date pattern, uncompressed, compressed, num docs - # ("30041998", 42761, 2574, 325), - # ("01051998", 159533957, 5995048, 1193254), - # ("02051998", 101584272, 3892369, 755705), - # ("03051998", 102351889, 3930074, 759462), - # ("04051998", 170093830, 6403163, 1262355), - # ("05051998", 203888981, 7735877, 1521490), - # ("06051998", 226694959, 8572377, 1687378), - # ("07051998", 229348975, 8645476, 1706185), - # ("08051998", 220548339, 8368724, 1649294), - # ("09051998", 136087579, 5259605, 1015056), - # ("10051998", 115069802, 4459406, 856126), - # ("11051998", 229599377, 8722505, 1710378), - # ("12051998", 313548483, 10250401, 2390900), - # ("13051998", 232771638, 9301395, 1728940), - # ("14051998", 256276051, 10053754, 1907968), - # ("15051998", 279662653, 10737399, 2083564), - # ("16051998", 233338908, 8867329, 1748699), - # ("17051998", 198815169, 7705242, 1483014), - # ("18051998", 206312374, 7819108, 1544561), - # ("19051998", 380093428, 13792061, 2848216), - # ("20051998", 407446295, 15179923, 3038231), - # ("21051998", 317587117, 11822364, 2372684), - # ("22051998", 439888196, 16435208, 3278410), - # ("23051998", 313929092, 11903914, 2339981), - # ("24051998", 298974313, 11322717, 2225196), - ("25051998", 467676639, 17520064, 3480705), - ("26051998", 278213029, 10460503, 2072251), - ("27051998", 522837325, 19715617, 3899565), - ("28051998", 28420256, 1088523, 212087), - ("29051998", 71993994, 2718454, 537171), - ("30051998", 12985999, 496377, 96914), - ("31051998", 56192882, 2163396, 418067), - ("01061998", 85402320, 3251168, 635635), - ("02061998", 80028474, 3057268, 596576), - ("03061998", 219224461, 8360743, 1636775), -] - - -def create_indices(): - indices = [] - for index in logging_indices: - if index: - date, uncompressed, compressed, num_docs = index - indices.append(track.Index(name=LOGGING_INDEX_PREFIX + date, types=[ - track.Type( - name=LOGGING_TYPE_NAME, - mapping_file_name="mappings.json", - document_file_name="documents-%s.json.bz2" % date, - number_of_documents=num_docs, - compressed_size_in_bytes=compressed, - uncompressed_size_in_bytes=uncompressed)])) - return indices - - -loggingTrackSpec = track.Track( - name="logging", - short_description="Logging benchmark", - description="This benchmark indexes HTTP server log data from the 1998 world cup.", - source_root_url="http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/logging", - indices=create_indices(), - queries=[ - DefaultQuery(), - TermQuery(), - RangeQuery(), - HourlyAggQuery(), - ScrollQuery() - ], - challenges=track.challenges) - - diff --git a/esrally/track/percolator.json b/esrally/track/percolator.json new file mode 100644 index 000000000..c4d38601c --- /dev/null +++ b/esrally/track/percolator.json @@ -0,0 +1,215 @@ +{ + "meta": { + "short-description": "Percolator benchmark based on 2M AOL queries", + "description": "This benchmark indexes 2M AOL queries and use the percolate query to match", + "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/percolator" + }, + "indices": [ + { + "name": "queries", + "types": [ + { + "name": "percolator", + "mapping": "queries-mapping.json", + "documents": "queries.json.bz2", + "document-count": 2000000, + "compressed-bytes": 123502, + "uncompressed-bytes": 148039748 + }, + { + "name": "content", + "mapping": "document-mapping.json" + } + ] + } + ], + "operations": [ + { + "name": "index-append-default-settings", + "type": "index", + "#COMMENT#": "TODO #69: To benchmark 5.0.0-alpha1, we need to set 'index.queries.cache.type': 'none' instead of 'index.queries.cache.enabled'", + "index-settings": { + "index.number_of_replicas": 0, + "index.queries.cache.enabled": false + }, + "bulk-size": 5000, + "force-merge": true, + "clients": { + "count": 8 + } + }, + { + "name": "stats", + "type": "stats", + "warmup-iterations": 100, + "iterations": 100, + "clients": { + "count": 1 + } + }, + { + "name": "search", + "type": "search", + "target-throughput": 1, + "warmup-iterations": 1000, + "iterations": 1000, + "clients": { + "count": 1 + }, + "queries": [ + { + "name": "percolator_with_content_president_bush", + "index": "queries", + "type": "percolator", + "body": { + "query" : { + "percolate" : { + "field" : "query", + "document_type" : "content", + "document" : { + "body" : "president bush" + } + } + } + } + }, + { + "name": "percolator_with_content_saddam_hussein", + "index": "queries", + "type": "percolator", + "body": { + "query" : { + "percolate" : { + "field" : "query", + "document_type" : "content", + "document" : { + "body" : "saddam hussein" + } + } + } + } + }, + { + "name": "percolator_with_content_hurricane_katrina", + "index": "queries", + "type": "percolator", + "body": { + "query" : { + "percolate" : { + "field" : "query", + "document_type" : "content", + "document" : { + "body" : "hurricane katrina" + } + } + } + } + }, + { + "name": "percolator_with_content_google", + "index": "queries", + "type": "percolator", + "body": { + "query" : { + "percolate" : { + "field" : "query", + "document_type" : "content", + "document" : { + "body" : "google" + } + } + } + } + }, + { + "name": "percolator_no_score_with_content_google", + "index": "queries", + "type": "percolator", + "body": { + "query" : { + "constant_score": { + "query": { + "percolate" : { + "field" : "query", + "document_type" : "content", + "document" : { + "body" : "google" + } + } + } + } + } + } + }, { + "name": "percolator_with_highlighting", + "index": "queries", + "type": "percolator", + "body": { + "query": { + "percolate" : { + "field" : "query", + "document_type" : "content", + "document" : { + "body" : "Israeli prime minister Ariel Sharon suffers a massive stroke; he is replaced by acting prime minister Ehud Olmert" + } + } + }, + "highlight": { + "fields": { + "body": {} + } + } + } + }, + { + "name": "percolator_with_content_ignore_me", + "index": "queries", + "type": "percolator", + "body": { + "query" : { + "percolate" : { + "field" : "query", + "document_type" : "content", + "document" : { + "body" : "ignore me" + } + } + } + } + }, + { + "name": "percolator_no_score_with_content_ignore_me", + "index": "queries", + "type": "percolator", + "body": { + "query" : { + "constant_score": { + "query": { + "percolate" : { + "field" : "query", + "document_type" : "content", + "document" : { + "body" : "ignore me" + } + } + } + } + } + } + } + ] + } + ], + "challenges": [ + { + "name": "append-no-conflicts", + "description": "", + "schedule": [ + "index-append-default-settings", + "stats", + "search" + ] + } + ] +} + diff --git a/esrally/track/percolator_track.py b/esrally/track/percolator_track.py deleted file mode 100644 index 9a526b5fd..000000000 --- a/esrally/track/percolator_track.py +++ /dev/null @@ -1,134 +0,0 @@ -from esrally.track import track - -PERCOLATOR_INDEX_NAME = "queries" -PERCOLATOR_TYPE_NAME = "percolator" - -# Workaround to support multiple versions (this is not how this will be handled in the future..) -percolatorIndexSettings = { - "master": { - "index.number_of_replicas": 0, - "index.queries.cache.enabled": False - }, - "5.0.0-alpha2": { - "index.number_of_replicas": 0, - "index.queries.cache.enabled": False - }, - "5.0.0-alpha1": { - "index.number_of_replicas": 0, - "index.queries.cache.type": "none" - } -} - - -class PercolatorQuery(track.Query): - def __init__(self, content): - track.Query.__init__(self, "percolator query with content: %s" % content) - self.content = content - - def run(self, es): - return es.search(index=PERCOLATOR_INDEX_NAME, doc_type=PERCOLATOR_TYPE_NAME, body=''' - { - "query" : { - "percolate" : { - "field" : "query", - "document_type" : "content", - "document" : { - "body" : "%s" - } - } - } - }''' % self.content) - - -class PercolatorQueryNoScoring(track.Query): - def __init__(self, content): - track.Query.__init__(self, "non scoring percolator query with content: %s" % content) - self.content = content - - def run(self, es): - return es.search(index=PERCOLATOR_INDEX_NAME, doc_type=PERCOLATOR_TYPE_NAME, body=''' - { - "query" : { - "constant_score": { - "query": { - "percolate" : { - "field" : "query", - "document_type" : "content", - "document" : { - "body" : "%s" - } - } - } - } - } - }''' % self.content) - - -class PercolatorQueryWithHighlighting(track.Query): - def __init__(self): - track.Query.__init__(self, "percolator query with highlighting") - - def run(self, es): - return es.search(index=PERCOLATOR_INDEX_NAME, doc_type=PERCOLATOR_TYPE_NAME, body=''' - { - "query": { - "percolate" : { - "field" : "query", - "document_type" : "content", - "document" : { - "body" : "Israeli prime minister Ariel Sharon suffers a massive stroke; he is replaced by acting prime minister Ehud Olmert" - } - } - }, - "highlight": { - "fields": { - "body": {} - } - } - }''') - - -percolatorTrackSpec = track.Track( - name="percolator", - short_description="Percolator benchmark based on 2M AOL queries", - description="This benchmark indexes 2M AOL queries and use the percolate query to match", - source_root_url="http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/percolator", - indices=[ - track.Index(name="queries", types=[ - # The type for the percolator queries: - track.Type( - name="percolator", - mapping_file_name="queries-mapping.json", - document_file_name="queries.json.bz2", - number_of_documents=2000000, - compressed_size_in_bytes=123502, - uncompressed_size_in_bytes=148039748 - ), - # The type used for documents being percolated: - track.Type( - name="content", - mapping_file_name="document-mapping.json" - ) - ]) - ], - # Queries to use in the search benchmark - queries=[ - PercolatorQuery(content="president bush"), - PercolatorQuery(content="saddam hussein"), - PercolatorQuery(content="hurricane katrina"), - PercolatorQuery(content="google"), - PercolatorQueryNoScoring(content="google"), - PercolatorQueryWithHighlighting(), - PercolatorQuery(content="ignore me"), - PercolatorQueryNoScoring(content="ignore me") - ], - challenges=[track.Challenge( - name="append-no-conflicts", - description="Append documents without any ID conflicts", - benchmark={ - track.BenchmarkPhase.index: track.IndexBenchmarkSettings(index_settings=percolatorIndexSettings), - track.BenchmarkPhase.stats: track.LatencyBenchmarkSettings(iteration_count=100), - track.BenchmarkPhase.search: track.LatencyBenchmarkSettings(iteration_count=100) - } - )] -) diff --git a/esrally/track/pmc.json b/esrally/track/pmc.json new file mode 100644 index 000000000..77afd7a2b --- /dev/null +++ b/esrally/track/pmc.json @@ -0,0 +1,183 @@ +{ + "meta": { + "short-description": "Full text benchmark containing 574.199 papers from PMC", + "description": "This test indexes 574.199 papers from PMC (total 23.2 GB json) using 8 client threads and 500 docs per bulk request against Elasticsearch", + "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/pmc" + }, + "indices": [ + { + "name": "pmc", + "types": [ + { + "name": "articles", + "mapping": "mappings.json", + "documents": "documents.json.bz2", + "document-count": 574199, + "compressed-bytes": 5928712141, + "uncompressed-bytes": 23256051757 + } + ] + } + ], + "operations": [ + { + "name": "index-append-default-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0 + }, + "bulk-size": 500, + "force-merge": true, + "clients": { + "count": 8 + } + }, + { + "name": "index-append-fast-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0, + "index.refresh_interval": "30s", + "index.number_of_shards": 6, + "index.translog.flush_threshold_size": "4g" + }, + "force-merge": true, + "bulk-size": 500, + "clients": { + "count": 8 + } + }, + { + "name": "index-append-update-fast-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0, + "index.refresh_interval": "30s", + "index.number_of_shards": 6, + "index.translog.flush_threshold_size": "4g" + }, + "bulk-size": 500, + "force-merge": true, + "conflicts": "sequential", + "clients": { + "count": 8 + } + }, + { + "name": "stats", + "type": "stats", + "warmup-iterations": 100, + "iterations": 100, + "clients": { + "count": 1 + } + }, + { + "name": "search", + "type": "search", + "target-throughput": 1, + "warmup-iterations": 1000, + "iterations": 1000, + "clients": { + "count": 1 + }, + "queries": [ + { + "name": "default", + "body": { + "query": { + "match_all": {} + } + } + }, + { + "name": "term", + "body": { + "query": { + "term": { + "body": "physician" + } + } + } + }, + { + "name": "phrase", + "body": { + "query": { + "match_phrase": { + "body": "newspaper coverage" + } + } + } + }, + { + "name": "articles_monthly_agg_uncached", + "cache": false, + "body": { + "size": 0, + "aggs": { + "articles_over_time": { + "date_histogram": { + "field": "timestamp", + "interval": "month" + } + } + } + } + }, + { + "name": "articles_monthly_agg_cached", + "cache": true, + "body": { + "size": 0, + "aggs": { + "articles_over_time": { + "date_histogram": { + "field": "timestamp", + "interval": "month" + } + } + } + } + }, + { + "name": "scroll", + "query-type": "scroll", + "pages": 25, + "results-per-page": 1000, + "body": { + "query": { + "match_all": {} + } + } + } + ] + } + ], + "challenges": [ + { + "name": "append-no-conflicts", + "description": "", + "schedule": [ + "index-append-default-settings", + "stats", + "search" + ] + }, + { + "name": "append-fast-no-conflicts", + "description": "", + "schedule": [ + "index-append-fast-settings" + ] + }, + { + "name": "append-fast-with-conflicts", + "description": "", + "schedule": [ + "index-append-update-fast-settings" + ] + } + ] +} + diff --git a/esrally/track/pmc_track.py b/esrally/track/pmc_track.py deleted file mode 100644 index 84b1d2b42..000000000 --- a/esrally/track/pmc_track.py +++ /dev/null @@ -1,143 +0,0 @@ -from esrally.utils import sysstats -from esrally.track import track - -PMC_INDEX_NAME = "pmc" -PMC_TYPE_NAME = "articles" - - -class DefaultQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "default") - - def run(self, es): - return es.search(index=PMC_INDEX_NAME) - - -class TermQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "term") - - def run(self, es): - return es.search(index=PMC_INDEX_NAME, doc_type=PMC_TYPE_NAME, q="body:physician") - - -class PhraseQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "phrase") - - def run(self, es): - return es.search(index=PMC_INDEX_NAME, doc_type=PMC_TYPE_NAME, body=''' -{ - "query": { - "match_phrase": { - "body": "newspaper coverage" - } - } -}''') - - -class MonthlyArticlesAggQuery(track.Query): - def __init__(self, suffix="", use_request_cache=True): - track.Query.__init__(self, "articles_monthly_agg" + suffix) - self.use_request_cache = use_request_cache - - def run(self, es): - return es.search(index=PMC_INDEX_NAME, doc_type=PMC_TYPE_NAME, - request_cache=self.use_request_cache, body=''' - { - "size": 0, - "aggs": { - "articles_over_time" : { - "date_histogram" : { - "field" : "timestamp", - "interval" : "month" - } - } - } - }''') - - -class ScrollQuery(track.Query): - PAGES = 25 - ITEMS_PER_PAGE = 1000 - - def __init__(self): - track.Query.__init__(self, "scroll", normalization_factor=self.PAGES) - self.scroll_id = None - - def run(self, es): - r = es.search( - index=PMC_INDEX_NAME, - doc_type=PMC_TYPE_NAME, - sort="_doc", - scroll="10s", - size=self.ITEMS_PER_PAGE) - self.scroll_id = r["_scroll_id"] - # Note that starting with ES 2.0, the initial call to search() returns already the first result page - # so we have to retrieve one page less - for i in range(self.PAGES - 1): - hit_count = len(r["hits"]["hits"]) - if hit_count == 0: - # done - break - r = es.scroll(scroll_id=self.scroll_id, scroll="10s") - - def close(self, es): - if self.scroll_id: - es.clear_scroll(scroll_id=self.scroll_id) - self.scroll_id = None - - -pmc_challenges = [ - track.Challenge( - name="append-no-conflicts", - description="Append documents without any ID conflicts", - benchmark={ - track.BenchmarkPhase.index: track.IndexBenchmarkSettings(index_settings=track.greenNodeSettings, bulk_size=500), - track.BenchmarkPhase.stats: track.LatencyBenchmarkSettings(iteration_count=100), - track.BenchmarkPhase.search: track.LatencyBenchmarkSettings(iteration_count=1000) - } - ), - track.Challenge( - name="append-fast-no-conflicts", - description="append-only, using 4 GB heap, and these settings:
%s
" % track.benchmarkFastSettings, - benchmark={ - track.BenchmarkPhase.index: track.IndexBenchmarkSettings(index_settings=track.benchmarkFastSettings, bulk_size=500) - } - ), - - track.Challenge( - name="append-fast-with-conflicts", - description="the same as fast, except we pass in an ID (worst case random UUID) for each document and 25% of the time the ID " - "already exists in the index.", - benchmark={ - track.BenchmarkPhase.index: track.IndexBenchmarkSettings(index_settings=track.benchmarkFastSettings, bulk_size=500, - id_conflicts=track.IndexIdConflict.SequentialConflicts) - } - ) -] - -pmcTrackSpec = track.Track( - name="pmc", - short_description="Full text benchmark containing 574.199 papers from PMC", - description="This test indexes 574.199 papers from PMC (total 23.2 GB json) using 8 client threads and 500 docs per bulk " - "request against Elasticsearch", - source_root_url="http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/pmc", - index_name=PMC_INDEX_NAME, - type_name=PMC_TYPE_NAME, - number_of_documents=574199, - compressed_size_in_bytes=5928712141, - uncompressed_size_in_bytes=23256051757, - document_file_name="documents.json.bz2", - mapping_file_name="mappings.json", - # Queries to use in the search benchmark - queries=[ - DefaultQuery(), - TermQuery(), - PhraseQuery(), - MonthlyArticlesAggQuery(use_request_cache=False), - MonthlyArticlesAggQuery(suffix="_cached", use_request_cache=True), - ScrollQuery() - ], - challenges=pmc_challenges -) diff --git a/esrally/track/tiny.json b/esrally/track/tiny.json new file mode 100644 index 000000000..5da86a673 --- /dev/null +++ b/esrally/track/tiny.json @@ -0,0 +1,201 @@ +{ + "meta": { + "short-description": "First 2k documents of the geonames track for local tests", + "description": "This test indexes 8.6M documents (POIs from Geonames, total 2.8 GB json) using 8 client threads and 5000 docs per bulk request against Elasticsearch", + "data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/tiny" + }, + "indices": [ + { + "name": "tiny", + "types": [ + { + "#COMMENT#": "Allow generators here - how do we run the generator (i.e. arbitrary Python code)?", + "name": "type", + "mapping": "mappings.json", + "documents": "documents.json.bz2", + "document-count": 2000, + "compressed-bytes": 28333, + "uncompressed-bytes": 564930 + } + ] + } + ], + "#COMMENT": "'operations' just define all possible operations but this is not the actual execution schedule. The execution is defined in the 'challenges' block and it just refers to the defined operations. The intention between this separation is to allow reuse of operations", + "operations": [ + { + "name": "index-append-default-settings", + "type": "index", + "#COMMENT#": "Rally will refuse to benchmark against a cluster that is in any other state as 'green'", + "index-settings": { + "index.number_of_replicas": 0 + }, + "bulk-size": 5000, + "force-merge": true, + "clients": { + "count": 8, + "#COMMENT#": "No authentication here - depends on server-side setup" + } + }, + { + "name": "index-append-fast-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0, + "index.refresh_interval": "30s", + "index.number_of_shards": 6, + "index.translog.flush_threshold_size": "4g" + }, + "force-merge": true, + "bulk-size": 5000, + "clients": { + "count": 8 + } + }, + { + "name": "index-append-update-fast-settings", + "type": "index", + "index-settings": { + "index.number_of_replicas": 0, + "index.refresh_interval": "30s", + "index.number_of_shards": 6, + "index.translog.flush_threshold_size": "4g" + }, + "bulk-size": 5000, + "force-merge": true, + "conflicts": "sequential", + "clients": { + "count": 8 + } + }, + { + "name": "stats", + "type": "stats", + "warmup-iterations": 100, + "iterations": 100, + "clients": { + "count": 1 + } + }, + { + "name": "search", + "type": "search", + "#COMMENT#": "This means 1 operation per second over all clients", + "target-throughput": 1, + "warmup-iterations": 1000, + "iterations": 1000, + "clients": { + "count": 1 + }, + "queries": [ + { + "name": "default", + "body": { + "query": { + "match_all": {} + } + } + }, + { + "name": "term", + "body": { + "query": { + "term": { + "country_code": "AT" + } + } + } + }, + { + "name": "phrase", + "body": { + "query": { + "match_phrase": { + "name": "Sankt Georgen" + } + } + } + }, + { + "name": "country_agg_uncached", + "cache": false, + "body": { + "size": 0, + "aggs": { + "country_population": { + "terms": { + "field": "country_code" + }, + "aggs": { + "sum_population": { + "sum": { + "field": "population" + } + } + } + } + } + } + }, + { + "name": "country_agg_cached", + "cache": true, + "body": { + "size": 0, + "aggs": { + "country_population": { + "terms": { + "field": "country_code" + }, + "aggs": { + "sum_population": { + "sum": { + "field": "population" + } + } + } + } + } + } + }, + { + "name": "scroll", + "query-type": "scroll", + "pages": 25, + "results-per-page": 1000, + "body": { + "query": { + "match_all": {} + } + } + } + ] + } + ], + "challenges": [ + { + "name": "append-no-conflicts", + "description": "", + "#COMMENT#": "We will introduce concepts which allow to define traffic mixes, parallel execution etc. etc. Let's keep it simple for the first step", + "schedule": [ + "index-append-default-settings", + "stats", + "search" + ] + }, + { + "name": "append-fast-no-conflicts", + "description": "", + "schedule": [ + "index-append-fast-settings" + ] + }, + { + "name": "append-fast-with-conflicts", + "description": "", + "schedule": [ + "index-append-update-fast-settings" + ] + } + ] +} + diff --git a/esrally/track/tiny_track.py b/esrally/track/tiny_track.py deleted file mode 100644 index 7647726e6..000000000 --- a/esrally/track/tiny_track.py +++ /dev/null @@ -1,123 +0,0 @@ -from esrally.track import track - -TINY_INDEX_NAME = "tiny" -TINY_TYPE_NAME = "type" - - -class DefaultQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "default") - - def run(self, es): - return es.search(index=TINY_INDEX_NAME) - - -class TermQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "term") - - def run(self, es): - return es.search(index=TINY_INDEX_NAME, doc_type=TINY_TYPE_NAME, q="country_code:AT") - - -class PhraseQuery(track.Query): - def __init__(self): - track.Query.__init__(self, "phrase") - - def run(self, es): - return es.search(index=TINY_INDEX_NAME, doc_type=TINY_TYPE_NAME, body=''' -{ - "query": { - "match_phrase": { - "name": "Sankt Georgen" - } - } -}''') - - -class CountryAggQuery(track.Query): - def __init__(self, suffix="", use_request_cache=True): - track.Query.__init__(self, "country_agg" + suffix) - self.use_request_cache = use_request_cache - - def run(self, es): - return es.search(index=TINY_INDEX_NAME, doc_type=TINY_TYPE_NAME, - request_cache=self.use_request_cache, body=''' - { - "size": 0, - "aggs": { - "country_population": { - "terms": { - "field": "country_code" - }, - "aggs": { - "sum_population": { - "sum": { - "field": "population" - } - } - } - } - } - }''') - - -class ScrollQuery(track.Query): - PAGES = 25 - ITEMS_PER_PAGE = 1000 - - def __init__(self): - track.Query.__init__(self, "scroll", normalization_factor=self.PAGES) - self.scroll_id = None - - def run(self, es): - r = es.search( - index=TINY_INDEX_NAME, - doc_type=TINY_TYPE_NAME, - sort="_doc", - scroll="10s", - size=self.ITEMS_PER_PAGE) - self.scroll_id = r["_scroll_id"] - # Note that starting with ES 2.0, the initial call to search() returns already the first result page - # so we have to retrieve one page less - for i in range(self.PAGES - 1): - hit_count = len(r["hits"]["hits"]) - if hit_count == 0: - # done - break - r = es.scroll(scroll_id=self.scroll_id, scroll="10s") - - def close(self, es): - if self.scroll_id: - es.clear_scroll(scroll_id=self.scroll_id) - self.scroll_id = None - - -tinyTrackSpec = track.Track( - name="tiny", - short_description="First 2k documents of the geonames track for local tests", - description="This test indexes 8.6M documents (POIs from Geonames, total 2.8 GB json) using 8 client threads and 5000 docs per bulk " - "request against Elasticsearch", - source_root_url="http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/tiny", - indices=[ - track.Index(name=TINY_INDEX_NAME, types=[ - track.Type( - name=TINY_TYPE_NAME, - mapping_file_name="mappings.json", - document_file_name="documents.json.bz2", - number_of_documents=2000, - compressed_size_in_bytes=28333, - uncompressed_size_in_bytes=564930) - ]) - ], - # Queries to use in the search benchmark - queries=[ - DefaultQuery(), - TermQuery(), - PhraseQuery(), - CountryAggQuery(use_request_cache=False), - CountryAggQuery(suffix="_cached", use_request_cache=True), - ScrollQuery() - ], - challenges=track.challenges) - diff --git a/esrally/track/track-schema.json b/esrally/track/track-schema.json new file mode 100644 index 000000000..8643e9153 --- /dev/null +++ b/esrally/track/track-schema.json @@ -0,0 +1,239 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "track", + "description": "Specification of tracks for Rally", + "type": "object", + "properties": { + "meta": { + "type": "object", + "properties": { + "short-description": { + "type": "string", + "description": "A short description of this track suitable for command line usage. It should be less than 80 characters." + }, + "description": { + "type": "string", + "description": "A longer description of this track." + }, + "data-url": { + "type": "string", + "format": "uri", + "description": "The root URL for track data. Has to be a publicly accessible http or https URL." + } + }, + "required": [ + "short-description", + "description", + "data-url" + ] + }, + "indices": { + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "title": "Index", + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Name of the index to create." + }, + "types": { + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "title": "Type", + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Name of the type to create." + }, + "mapping": { + "type": "string", + "description": "File name of the corresponding mapping file. Example: mappings-mytype.json" + }, + "documents": { + "type": "string", + "description": "File name of the corresponding documents that should be indexed. This file has to be compressed either as bz2, zip or tar.gz and must contain exactly one JSON file with the same name (Examples: documents.json.bz2, documents.zip (which should contain one file called 'documents.json')). At least one type must set this property and also document-count, compressed-bytes and uncompressed-bytes." + }, + "document-count": { + "type": "integer", + "minimum": 1, + "description": "Number of documents in the documents file. This number will be used to verify that all documents have been indexed successfully." + }, + "compressed-bytes": { + "type": "integer", + "minimum": 1, + "description": "The size in bytes of the compressed document file. This number is used to show users how much data will be downloaded by Rally and also to check whether the download is complete." + }, + "uncompressed-bytes": { + "type": "integer", + "minimum": 1, + "description": "The size in bytes of the documents file after decompression." + } + }, + "required": [ + "name", + "mapping" + ] + } + } + }, + "required": [ + "name", + "types" + ] + } + }, + "operations": { + "type": "array", + "minItems": 1, + "uniqueItems": true, + "description": "The 'operations' block describes the operations that can be executed. These can be reused later in the 'challenges' block which describes the actual execution schedule.", + "items": { + "title": "Operation", + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "A human-readable name of this operation" + }, + "type": { + "type": "string", + "enum": ["index", "stats", "search"], + "description": "Type of this operation. Possible values are: 'index', 'stats', 'search'." + }, + "index-settings": { + "type": "object", + "description": "[Only for type == 'index']: Defines the index settings of the benchmark candidate when the index is created." + }, + "bulk-size": { + "type": "integer", + "minimum": 1, + "description": "[Only for type == 'index']: Defines the bulk size." + }, + "force-merge": { + "type": "boolean", + "description": "[Only for type == 'index']: Whether to perform a force merge after the index benchmark is done. It is recommended to do a force-merge when benchmarking search after indexing in order to reduce run-to-run variance." + }, + "conflicts": { + "type": "string", + "enum": ["sequential", "random"], + "description": "[Only for type == 'index']: Type of index conflicts to simulate. If not specified, no conflicts will be simulated. Valid values are: 'sequential' (A document id is replaced with a document id with a sequentially increasing id), 'random' (A document id is replaced with a document id with a random other id)." + }, + "clients": { + "type": "object", + "properties": { + "count": { + "type": "integer", + "minimum": 1, + "maximum": 1024, + "description": "Number of clients which should execute this operation." + }, + "compression": { + "type": "boolean", + "description": "Whether to compress requests on the client." + } + }, + "required": ["count"] + }, + "warmup-iterations": { + "type": "integer", + "minimum": 0, + "description": "[Only for types 'stats', 'search']: Defines the number of times to run the operation in order to warmup the benchmark candidate. Warmup iterations will not be considered in the benchmark result." + }, + "iterations": { + "type": "integer", + "minimum": 1, + "description": "[Only for types 'stats', 'search']: Defines the number of times to run the operation." + }, + "queries": { + "type": "array", + "minItems": 1, + "description": "[Only for type 'search']: Defines the queries to execute.", + "items": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "A descriptive name for the query. It is recommended to use short names and to hyphenate parts of the words, e.g. 'query-most-recent-docs'. This name can be used to create graphs per query in Kibana." + }, + "cache": { + "type": "boolean", + "description": "Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version." + }, + "query-type": { + "type": "string", + "enum": ["scroll"], + "description": "The type of query. Should currently only be set for 'scroll' queries (as they require additional properties and also behave differently). If not defined, a request body search will be executed." + }, + "index": { + "type": "string", + "description": "The index or index pattern against which the query should be executed. This property is only necessary if there is more than one index or the index contains more than one type. Otherwise, Rally will derive the index and type by itself." + }, + "type": { + "type": "string", + "description": "The type against which the query should be executed. This property is only necessary if there is more than one index or the index contains more than one type. Otherwise, Rally will derive the index and type by itself." + }, + "pages": { + "type": "integer", + "minimum": 1, + "description": "[Only for query-type 'scroll'] Number of pages to retrieve." + }, + "results-per-page": { + "type": "integer", + "minimum": 1, + "description": "[Only for query-type 'scroll'] Number of documents to retrieve per page." + }, + "body": { + "type": "object", + "description": "The query body." + } + }, + "required": ["name"] + } + } + }, + "required": ["name", "type"] + } + }, + "challenges": { + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "title": "Challenge", + "type": "object", + "description": "Defines the concrete execution order", + "properties": { + "name": { + "type": "string", + "description": "A descriptive name of the challenge. Should not contain spaces in order to simplify handling on the command line for users." + }, + "description": { + "type": "string", + "description": "A human readable description of the challenge" + }, + "schedule": { + "type": "array", + "minItems": 1, + "uniqueItems": true, + "description": "Defines the concrete execution order of operations. Currently, all operations are executed sequentially.", + "items": { + "type": "string", + "description": "The name of an operation that should be executed. This name must match the operation name in the 'operations' block." + } + } + }, + "required": [ + "name", + "description", + "schedule" + ] + } + } + } +} diff --git a/esrally/track/track.py b/esrally/track/track.py index c8ff78eab..29ce1b6c0 100644 --- a/esrally/track/track.py +++ b/esrally/track/track.py @@ -1,32 +1,15 @@ import os import logging -import collections +import json import urllib.error from enum import Enum +import jsonschema from esrally import config, exceptions from esrally.utils import io, sysstats, convert, process, net logger = logging.getLogger("rally.track") -# Ensure cluster status green even for single nodes. Please don't add anything else here except to get the cluster to status -# 'green' even with a single node. -greenNodeSettings = { - "index.number_of_replicas": 0 -} - -mergePartsSettings = { - "index.number_of_replicas": 0, - "index.merge.scheduler.auto_throttle": False -} - -benchmarkFastSettings = { - "index.refresh_interval": "30s", - "index.number_of_shards": 6, - "index.number_of_replicas": 0, - "index.translog.flush_threshold_size": "4g" -} - mergePartsLogConfig = ''' es.logger.level: INFO rootLogger: ${es.logger.level}, console, file @@ -77,11 +60,6 @@ conversionPattern: "[%d{ISO8601}][%-5p][%-25c] %m%n" ''' -# Specific tracks add themselves to this dictionary. -# -# key = Track name, value = Track instance -tracks = collections.OrderedDict() - class Index: """ @@ -112,7 +90,8 @@ class Type: Defines a type in Elasticsearch. """ - def __init__(self, name, mapping_file_name, document_file_name=None, number_of_documents=0, compressed_size_in_bytes=0, + def __init__(self, name, mapping_file_name, document_file_name=None, number_of_documents=0, + compressed_size_in_bytes=0, uncompressed_size_in_bytes=0): """ @@ -136,14 +115,22 @@ def __init__(self, name, mapping_file_name, document_file_name=None, number_of_d self.compressed_size_in_bytes = compressed_size_in_bytes self.uncompressed_size_in_bytes = uncompressed_size_in_bytes + def has_valid_document_data(self): + return self.document_file_name is not None and \ + self.number_of_documents > 0 and \ + self.compressed_size_in_bytes > 0 and \ + self.uncompressed_size_in_bytes > 0 + class Track: """ A track defines the data set that is used. It corresponds loosely to a use case (e.g. logging, event processing, analytics, ...) """ - def __init__(self, name, short_description, description, source_root_url, challenges, queries, index_name=None, type_name=None, - number_of_documents=0, compressed_size_in_bytes=0, uncompressed_size_in_bytes=0, document_file_name=None, + def __init__(self, name, short_description, description, source_root_url, challenges, index_name=None, + type_name=None, + number_of_documents=0, compressed_size_in_bytes=0, uncompressed_size_in_bytes=0, + document_file_name=None, mapping_file_name=None, indices=None): """ @@ -163,7 +150,6 @@ def __init__(self, name, short_description, description, source_root_url, challe :param challenges: A list of one or more challenges to use. If in doubt, reuse the predefined list "track.challenges". Rally's default configuration assumes that each track defines at least one challenge with the name "append-no-conflicts". This is not required but simplifies usage. - :param queries: A list of queries to run in case searching should be benchmarked. :param index_name: The name of the index to create. :param type_name: The type name for this index. :param number_of_documents: The number of documents in the benchmark document. Needed for proper progress reporting. @@ -179,7 +165,6 @@ def __init__(self, name, short_description, description, source_root_url, challe self.description = description self.source_root_url = source_root_url self.challenges = challenges - self.queries = queries self.readme_file_name = "README.txt" # multiple indices if index_name is None: @@ -191,8 +176,6 @@ def __init__(self, name, short_description, description, source_root_url, challe Type(type_name, mapping_file_name, document_file_name, number_of_documents, compressed_size_in_bytes, uncompressed_size_in_bytes) ])] - # self-register - tracks[name] = self @property def number_of_documents(self): @@ -254,14 +237,27 @@ class BenchmarkPhase(Enum): class LatencyBenchmarkSettings: - def __init__(self, iteration_count=1000): + def __init__(self, queries=None, warmup_iteration_count=1000, iteration_count=1000): + """ + Creates new LatencyBenchmarkSettings. + + :param queries: A list of queries to run. Optional. + :param warmup_iteration_count: The number of times each query should be run for warmup. Defaults to 1000 iterations. + :param iteration_count: The number of times each query should be run. Defaults to 1000 iterations. + """ + if queries is None: + queries = [] + self.queries = queries + self.warmup_iteration_count = warmup_iteration_count self.iteration_count = iteration_count class IndexBenchmarkSettings: - def __init__(self, index_settings=None, bulk_size=5000, id_conflicts=IndexIdConflict.NoConflicts, force_merge=True): + def __init__(self, index_settings=None, clients=8, bulk_size=5000, id_conflicts=IndexIdConflict.NoConflicts, + force_merge=True): """ :param index_settings: A hash of index-level settings that will be set when the index is created. + :param clients: Number of concurrent clients that should index data. :param bulk_size: The number of documents to submit in a single bulk (Default: 5000). :param id_conflicts: Whether to produce index id conflicts during indexing (Default: NoConflicts). :param force_merge: Whether to do a force merge after the index benchmark (Default: True). @@ -269,6 +265,7 @@ def __init__(self, index_settings=None, bulk_size=5000, id_conflicts=IndexIdConf if index_settings is None: index_settings = {} self.index_settings = index_settings + self.clients = clients self.bulk_size = bulk_size self.id_conflicts = id_conflicts self.force_merge = force_merge @@ -282,13 +279,10 @@ class Challenge: def __init__(self, name, description, - #TODO dm: This should probably be changeable by step (indexing, querying) - clients=8, benchmark=None): if benchmark is None: benchmark = {} self.name = name - self.clients = clients self.description = description self.benchmark = benchmark @@ -345,7 +339,8 @@ def setup(self, track): distribution_version = self._config.opts("source", "distribution.version", mandatory=False) if distribution_version and len(distribution_version.strip()) > 0: msg = "Could not download mapping file [%s] for Elasticsearch distribution [%s] from [%s]. Please note that only " \ - "versions starting from 5.0.0-alpha1 are supported." % (mapping_file_name, distribution_version, mapping_url) + "versions starting from 5.0.0-alpha1 are supported." % ( + mapping_file_name, distribution_version, mapping_url) else: msg = "Could not download mapping file [%s] from [%s]. Please check that the data are available." % \ (mapping_file_name, mapping_url) @@ -397,7 +392,8 @@ def _download(self, url, local_path, size_in_bytes=None, force_download=False, r elif url.startswith("s3"): self._do_download_via_s3(url, local_path, size_in_bytes) else: - raise exceptions.SystemSetupError("Cannot download benchmark data from [%s]. Only http(s) and s3 are supported." % url) + raise exceptions.SystemSetupError( + "Cannot download benchmark data from [%s]. Only http(s) and s3 are supported." % url) if size_in_bytes: print("Done") except urllib.error.URLError: @@ -408,10 +404,12 @@ def _download(self, url, local_path, size_in_bytes=None, force_download=False, r # file must exist at this point -> verify if not os.path.isfile(local_path): if offline: - raise exceptions.SystemSetupError("Cannot find %s. Please disable offline mode and retry again." % local_path) + raise exceptions.SystemSetupError( + "Cannot find %s. Please disable offline mode and retry again." % local_path) else: - raise exceptions.SystemSetupError("Could not download from %s to %s. Please verify that data are available at %s and " - "check your internet connection." % (url, local_path, url)) + raise exceptions.SystemSetupError( + "Could not download from %s to %s. Please verify that data are available at %s and " + "check your internet connection." % (url, local_path, url)) def _decompress(self, data_set_path): # we assume that track data are always compressed and try to decompress them before running the benchmark @@ -431,7 +429,8 @@ def _do_download_via_s3(self, url, data_set_path, size_in_bytes): # cleanup probably corrupt data file... if os.path.isfile(tmp_data_set_path): os.remove(tmp_data_set_path) - raise RuntimeError("Could not get benchmark data from S3: '%s'. Is s3cmd installed and set up properly?" % s3cmd) + raise RuntimeError( + "Could not get benchmark data from S3: '%s'. Is s3cmd installed and set up properly?" % s3cmd) except: logger.info("Removing temp file %s" % tmp_data_set_path) os.remove(tmp_data_set_path) @@ -448,31 +447,253 @@ def _do_download_via_s3(self, url, data_set_path, size_in_bytes): Car(name="verbose_iw", logging_config=mergePartsLogConfig) ] -challenges = [ - Challenge( - name="append-no-conflicts", - description="Append documents without any ID conflicts", - benchmark={ - BenchmarkPhase.index: IndexBenchmarkSettings(index_settings=greenNodeSettings), - BenchmarkPhase.stats: LatencyBenchmarkSettings(iteration_count=100), - BenchmarkPhase.search: LatencyBenchmarkSettings(iteration_count=1000) - } - ), - Challenge( - name="append-fast-no-conflicts", - description="append-only, using 4 GB heap, and these settings:
%s
" % benchmarkFastSettings, - benchmark={ - BenchmarkPhase.index: IndexBenchmarkSettings(index_settings=benchmarkFastSettings) - } - ), - - Challenge( - name="append-fast-with-conflicts", - description="the same as fast, except we pass in an ID (worst case random UUID) for each document and 25% of the time the ID " - "already exists in the index.", - benchmark={ - BenchmarkPhase.index: IndexBenchmarkSettings(index_settings=benchmarkFastSettings, - id_conflicts=IndexIdConflict.SequentialConflicts) - } - ) -] + +class TrackSyntaxError(exceptions.RallyError): + """ + Raised whenever a syntax problem is encountered when loading the track specification. + """ + pass + + +class TrackFileReader: + def __init__(self, cfg): + self.cfg = cfg + track_schema_file = "%s/track/track-schema.json" % (self.cfg.opts("system", "rally.root")) + self.track_schema = json.loads(open(track_schema_file).read()) + + def _all_track_names(self): + # TODO dm: This will do for now. Resolve this on the file system later (#69). + return ["geonames", "geopoint", "pmc", "percolator", "tiny"] + + def all_tracks(self): + return [self.read(track_name) for track_name in self._all_track_names()] + + def read(self, track_name): + # TODO dm: This will change with #69 + track_file = "%s/track/%s.json" % (self.cfg.opts("system", "rally.root"), track_name) + try: + track_spec = json.loads(open(track_file).read()) + except json.JSONDecodeError as e: + logger.exception("Could not load [%s]." % track_file) + raise TrackSyntaxError("Could not load '%s'" % track_file, e) + try: + jsonschema.validate(track_spec, self.track_schema) + except jsonschema.exceptions.ValidationError as ve: + raise TrackSyntaxError( + "Track '%s' is invalid.\n\nError details: %s\nInstance: %s\nPath: %s\nSchema path: %s" + % (track_name, ve.message, + json.dumps(ve.instance, indent=4, sort_keys=True), ve.absolute_path, ve.absolute_schema_path)) + return TrackReader().read(track_name, track_spec) + + +class TrackReader: + def __init__(self): + self.name = None + + def read(self, track_name, track_specification): + self.name = track_name + short_description = self._r(track_specification, ["meta", "short-description"]) + description = self._r(track_specification, ["meta", "description"]) + source_root_url = self._r(track_specification, ["meta", "data-url"]) + indices = [self._create_index(index) for index in self._r(track_specification, "indices")] + challenges = self._create_challenges(track_specification, indices) + + return Track(name=self.name, short_description=short_description, description=description, + source_root_url=source_root_url, + challenges=challenges, indices=indices) + + def _error(self, msg): + raise TrackSyntaxError("Track '%s' is invalid. %s" % (self.name, msg)) + + def _r(self, root, path, error_ctx=None, mandatory=True, default_value=None): + if isinstance(path, str): + path = [path] + + structure = root + try: + for k in path: + structure = structure[k] + return structure + except KeyError: + if mandatory: + if error_ctx: + self._error("Mandatory element '%s' is missing in '%s'." % (".".join(path), error_ctx)) + else: + self._error("Mandatory element '%s' is missing." % ".".join(path)) + else: + return default_value + + def _create_index(self, index_spec): + index_name = self._r(index_spec, "name") + types = [self._create_type(type_spec) for type_spec in self._r(index_spec, "types")] + valid_document_data = False + for type in types: + if type.has_valid_document_data(): + valid_document_data = True + break + if not valid_document_data: + self._error("Index '%s' is invalid. At least one of its types needs to define documents." % index_name) + + return Index(name=index_name, types=types) + + def _create_type(self, type_spec): + return Type(name=self._r(type_spec, "name"), + mapping_file_name=self._r(type_spec, "mapping"), + document_file_name=self._r(type_spec, "documents", mandatory=False), + number_of_documents=self._r(type_spec, "document-count", mandatory=False, default_value=0), + compressed_size_in_bytes=self._r(type_spec, "compressed-bytes", mandatory=False), + uncompressed_size_in_bytes=self._r(type_spec, "uncompressed-bytes", mandatory=False) + ) + + def _create_challenges(self, track_spec, indices): + ops = self._parse_operations(self._r(track_spec, "operations"), indices) + challenges = [] + for challenge in self._r(track_spec, "challenges"): + challenge_name = self._r(challenge, "name", error_ctx="challenges") + challenge_description = self._r(challenge, "description", error_ctx=challenge_name) + benchmarks = {} + + operations_per_type = {} + for op in self._r(challenge, "schedule", error_ctx=challenge_name): + if op not in ops: + self._error("'schedule' for challenge '%s' contains a non-existing operation '%s'. " + "Please add an operation '%s' to the 'operations' block." % (challenge_name, op, op)) + + benchmark_type, benchmark_spec = ops[op] + if benchmark_type in benchmarks: + new_op_name = op + old_op_name = operations_per_type[benchmark_type] + self._error( + "'schedule' for challenge '%s' contains multiple operations of type '%s' which is currently " + "unsupported. Please remove one of these operations: '%s', '%s'" % + (challenge_name, benchmark_type, old_op_name, new_op_name)) + benchmarks[benchmark_type] = benchmark_spec + operations_per_type[benchmark_type] = op + challenges.append(Challenge(name=challenge_name, + description=challenge_description, + benchmark=benchmarks)) + + return challenges + + def _parse_operations(self, ops_specs, indices): + # key = name, value = (BenchmarkPhase instance, BenchmarkSettings instance) + ops = {} + for op_spec in ops_specs: + ops_spec_name = self._r(op_spec, "name", error_ctx="operations") + ops[ops_spec_name] = self._create_op(ops_spec_name, op_spec, indices) + + return ops + + def _create_op(self, ops_spec_name, ops_spec, indices): + benchmark_type = self._r(ops_spec, "type") + if benchmark_type == "index": + id_conflicts = self._r(ops_spec, "conflicts", mandatory=False) + if not id_conflicts: + id_conflicts = IndexIdConflict.NoConflicts + elif id_conflicts == "sequential": + id_conflicts = IndexIdConflict.SequentialConflicts + elif id_conflicts == "random": + id_conflicts = IndexIdConflict.RandomConflicts + else: + raise TrackSyntaxError("Unknown conflict type '%s' for operation '%s'" % (id_conflicts, ops_spec)) + + return (BenchmarkPhase.index, + IndexBenchmarkSettings(index_settings=self._r(ops_spec, "index-settings", error_ctx=ops_spec_name), + clients=self._r(ops_spec, ["clients", "count"], error_ctx=ops_spec_name), + bulk_size=self._r(ops_spec, "bulk-size", error_ctx=ops_spec_name), + force_merge=self._r(ops_spec, "force-merge", error_ctx=ops_spec_name), + id_conflicts=id_conflicts)) + elif benchmark_type == "search": + # TODO: Honor clients settings + return (BenchmarkPhase.search, + LatencyBenchmarkSettings(queries=self._create_queries(self._r(ops_spec, "queries", error_ctx=ops_spec_name), indices), + warmup_iteration_count=self._r(ops_spec, "warmup-iterations", error_ctx=ops_spec_name), + iteration_count=self._r(ops_spec, "iterations"))) + elif benchmark_type == "stats": + # TODO: Honor clients settings + return (BenchmarkPhase.stats, + LatencyBenchmarkSettings( + warmup_iteration_count=self._r(ops_spec, "warmup-iterations", error_ctx=ops_spec_name), + iteration_count=self._r(ops_spec, "iterations", error_ctx=ops_spec_name))) + else: + raise TrackSyntaxError("Unknown benchmark type '%s' for operation '%s'" % (benchmark_type, ops_spec_name)) + + def _create_queries(self, queries_spec, indices): + if len(indices) == 1 and len(indices[0].types) == 1: + default_index = indices[0].name + default_type = indices[0].types[0].name + else: + default_index = None + default_type = None + queries = [] + for query_spec in queries_spec: + query_name = self._r(query_spec, "name") + query_type = self._r(query_spec, "query-type", mandatory=False, default_value="default", error_ctx=query_name) + + index_name = self._r(query_spec, "index", mandatory=False, default_value=default_index, error_ctx=query_name) + type_name = self._r(query_spec, "type", mandatory=False, default_value=default_type, error_ctx=query_name) + + if not index_name or not type_name: + raise TrackSyntaxError("Query '%s' requires an index and a type." % query_name) + request_cache = self._r(query_spec, "cache", mandatory=False, default_value=False, error_ctx=query_name) + if query_type == "default": + query_body = self._r(query_spec, "body", error_ctx=query_name) + queries.append(DefaultQuery(index=index_name, type=type_name, name=query_name, + body=query_body, use_request_cache=request_cache)) + elif query_type == "scroll": + query_body = self._r(query_spec, "body", mandatory=False, error_ctx=query_name) + pages = self._r(query_spec, "pages", error_ctx=query_name) + items_per_page = self._r(query_spec, "results-per-page", error_ctx=query_name) + queries.append(ScrollQuery(index=index_name, type=type_name, name=query_name, body=query_body, + use_request_cache=request_cache, pages=pages, items_per_page=items_per_page)) + else: + raise TrackSyntaxError("Unknown query type '%s' in query '%s'" % (query_type, query_name)) + return queries + + +class DefaultQuery(Query): + def __init__(self, index, type, name, body, use_request_cache=False): + Query.__init__(self, name) + self.index = index + self.type = type + self.body = body + self.use_request_cache = use_request_cache + + def run(self, es): + return es.search(index=self.index, doc_type=self.type, request_cache=self.use_request_cache, body=self.body) + + +class ScrollQuery(Query): + def __init__(self, index, type, name, body, use_request_cache, pages, items_per_page): + Query.__init__(self, name, normalization_factor=pages) + self.index = index + self.type = type + self.pages = pages + self.items_per_page = items_per_page + self.body = body + self.use_request_cache = use_request_cache + self.scroll_id = None + + def run(self, es): + r = es.search( + index=self.index, + doc_type=self.type, + body=self.body, + sort="_doc", + scroll="10s", + size=self.items_per_page, + request_cache=self.use_request_cache) + self.scroll_id = r["_scroll_id"] + # Note that starting with ES 2.0, the initial call to search() returns already the first result page + # so we have to retrieve one page less + for i in range(self.pages - 1): + hit_count = len(r["hits"]["hits"]) + if hit_count == 0: + # done + break + r = es.scroll(scroll_id=self.scroll_id, scroll="10s") + + def close(self, es): + if self.scroll_id: + es.clear_scroll(scroll_id=self.scroll_id) + self.scroll_id = None diff --git a/setup.py b/setup.py index d96fe101f..1fb4aec89 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ "psutil==4.1.0", "py-cpuinfo==0.2.3", "tabulate==0.7.5", + "jsonschema==2.5.1", # always use the latest version, these are certificate files... "certifi" ] diff --git a/tests/track/track_test.py b/tests/track/track_test.py index 2a8af1442..9044817b1 100644 --- a/tests/track/track_test.py +++ b/tests/track/track_test.py @@ -22,3 +22,84 @@ def test_no_distribution_version_for_source_distro(self): marshal = track.Marshal(cfg) self.assertEqual(marshal.mapping_file_name(t), "test-mapping.json") + + +class TrackReaderTests(TestCase): + def test_missing_description_raises_syntax_error(self): + track_specification = { + "meta": { + "description": "unittest track" + } + } + reader = track.TrackReader() + with self.assertRaises(track.TrackSyntaxError) as ctx: + reader.read("unittest", track_specification) + self.assertEqual("Track 'unittest' is invalid. Mandatory element 'meta.short-description' is missing.", ctx.exception.args[0]) + + def test_parse_valid_track_specification(self): + track_specification = { + "meta": { + "short-description": "short description for unit test", + "description": "longer description of this track for unit test", + "data-url": "https://localhost/data" + }, + "indices": [ + { + "name": "index-historical", + "types": [ + { + "name": "main", + "documents": "documents-main.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000, + "mapping": "main-type-mappings.json" + }, + { + "name": "secondary", + "documents": "documents-secondary.json.bz2", + "document-count": 20, + "compressed-bytes": 200, + "uncompressed-bytes": 20000, + "mapping": "secondary-type-mappings.json" + } + + ] + } + ], + "operations": [ + { + "name": "index-append", + "type": "index", + "index-settings": {}, + "clients": { + "count": 8 + }, + "bulk-size": 5000, + "force-merge": False + } + ], + "challenges": [ + { + "name": "default-challenge", + "description": "Default challenge", + "schedule": [ + "index-append" + ] + } + + ] + } + reader = track.TrackReader() + resulting_track = reader.read("unittest", track_specification) + self.assertEqual("unittest", resulting_track.name) + self.assertEqual("short description for unit test", resulting_track.short_description) + self.assertEqual("longer description of this track for unit test", resulting_track.description) + self.assertEqual(1, len(resulting_track.indices)) + self.assertEqual("index-historical", resulting_track.indices[0].name) + self.assertEqual(2, len(resulting_track.indices[0].types)) + self.assertEqual("main", resulting_track.indices[0].types[0].name) + self.assertEqual("secondary", resulting_track.indices[0].types[1].name) + self.assertEqual(1, len(resulting_track.challenges)) + self.assertEqual("default-challenge", resulting_track.challenges[0].name) +