Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/external-tracks'
Browse files Browse the repository at this point in the history
Closes #26
  • Loading branch information
danielmitterdorfer committed May 28, 2016
2 parents a90fa5a + 97803b9 commit 3d9a225
Show file tree
Hide file tree
Showing 20 changed files with 1,689 additions and 970 deletions.
142 changes: 75 additions & 67 deletions docs/adding_benchmarks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,77 +80,85 @@ Ensure to create a file called "README.txt" which can contain more information a

Upload all three files to a place where it is publicly available. We choose ``http://benchmarks.elastic.co/corpora/geonames`` for this example. For initial local testing you can also place all files in the data directory, which is located below the root directory you specified when initially configuring Rally. Let's say you specified ``/Users/daniel/benchmarks`` as root directory. Then you have to place the data for a track with the name "geonames" in ``/Users/daniel/benchmarks/data/geonames`` so Rally can pick it up. Additionally, you have to specify the ``--offline`` option when running Rally so it does not try to download any benchmark data.

Finally, add a new Python source file in Rally's project directory. By convention, the file should be called "$BENCHMARK_NAME_track.py", so for our example the file is called "geonames_track.py". It is placed in "esrally/track/". ::
Finally, add a new JSON file in Rally's project directory. By convention, the file should be called "$BENCHMARK_NAME.json", so for our example the file is called "geonames.json". It is placed in "esrally/track/". ::

{
"meta": {
"short-description": "Standard benchmark in Rally (8.6M POIs from Geonames)",
"description": "This test indexes 8.6M documents (POIs from Geonames, total 2.8 GB json) using 8 client threads and 5000 docs per bulk request against Elasticsearch",
"data-url": "http://benchmarks.elasticsearch.org.s3.amazonaws.com/corpora/geonames"
},
"indices": [
{
"name": "geonames",
"types": [
{
"name": "type",
"mapping": "mappings.json",
"documents": "documents.json.bz2",
"document-count": 8647880,
"compressed-bytes": 197857614,
"uncompressed-bytes": 2790927196
}
]
}
],
"operations": [
{
"name": "index-append-default-settings",
"type": "index",
"index-settings": {
"index.number_of_replicas": 0
},
"bulk-size": 5000,
"force-merge": true,
"clients": {
"count": 8
}
},
{
"name": "search",
"type": "search",
"warmup-iterations": 1000,
"iterations": 1000,
"clients": {
"count": 1
},
"queries": [
{
"name": "default",
"body": {
"query": {
"match_all": {}
}
}
}
]
}
],
"challenges": [
{
"name": "append-no-conflicts",
"description": "",
"schedule": [
"index-append-default-settings",
"search"
]
}
]
}

from esrally.track import track

GEONAMES_INDEX_NAME = "geonames"
class SampleQuery(track.Query):
def __init__(self):
track.Query.__init__(self, "sample")
def run(self, es):
return es.search(index=GEONAMES_INDEX_NAME)
geonamesTrackSpec = track.Track(
name="geonames",
short_description="Demo benchmark",
description="This test indexes 8.6M documents (POIs from Geonames, total 2.8 GB json) using 8 client threads and 5000 docs per bulk "
"request against Elasticsearch",
source_root_url="http://benchmarks.elastic.co/corpora/geonames",
index_name=GEONAMES_INDEX_NAME,
type_name="type",
number_of_documents=8647880,
compressed_size_in_bytes=197857614,
uncompressed_size_in_bytes=2790927196,
document_file_name="documents.json.bz2",
mapping_file_name="mappings.json",
# Queries to use in the search benchmark
queries=[SampleQuery()],
challenges=track.challenges


In case you want to add multiple indices this is possible too. The same track needs to specified as follows then: ::


from esrally.track import track

GEONAMES_INDEX_NAME = "geonames"

class SampleQuery(track.Query):
def __init__(self):
track.Query.__init__(self, "sample")

def run(self, es):
return es.search(index=GEONAMES_INDEX_NAME)

geonamesTrackSpec = track.Track(
name="geonames",
short_description="Demo benchmark",
description="This test indexes 8.6M documents (POIs from Geonames, total 2.8 GB json) using 8 client threads and 5000 docs per bulk "
"request against Elasticsearch",
source_root_url="http://benchmarks.elastic.co/corpora/geonames",
indices=[
track.Index(name=GEONAMES_INDEX_NAME, types=[
track.Type(
name="type",
mapping_file_name="mappings.json",
document_file_name="documents.json.bz2",
number_of_documents=8647880,
compressed_size_in_bytes=197857614,
uncompressed_size_in_bytes=2790927196)
])
],
# Queries to use in the search benchmark
queries=[SampleQuery()],
challenges=track.challenges)

A few things to note:

* You can either use the standard challenges provided with Rally or add your own. Note that Rally assumes that the challenge that should be run by default is called "append-no-conflicts". It is possible to not use this name but it is more convenient for users. Otherwise, they have to provide the command line option ``--challenge``.
* Rally assumes that the challenge that should be run by default is called "append-no-conflicts". If you want to run a different challenge, provide the command line option ``--challenge=YOUR_CHALLENGE_NAME``.
* You can add as many queries as you want. We use the `official Python Elasticsearch client <http://elasticsearch-py.readthedocs.org/>`_ to issue queries.
* The numbers are needed to verify integrity and provide progress reports.
* The numbers below the ``types`` property are needed to verify integrity and provide progress reports.

.. note::

We have defined a `JSON schema for tracks <https://github.com/elastic/rally/tree/master/esrally/track/track-schema.json>`_ which you can use the check how to define your track. You should also check the tracks provided by Rally for inspiration.

When you invoke ``esrally list tracks``, the new track should now appear::

Expand All @@ -165,8 +173,8 @@ When you invoke ``esrally list tracks``, the new track should now appear::
Available tracks:
Name Description Challenges
---------- -------------------------------------------------------- -----------------------------------------------------------------------
geonames Standard benchmark in Rally (8.6M POIs from Geonames) append-no-conflicts,append-fast-no-conflicts,append-fast-with-conflicts
---------- -------------------------------------------------------- -------------------
geonames Standard benchmark in Rally (8.6M POIs from Geonames) append-no-conflicts

Congratulations, you have created your first track! You can test it with ``esrally --track=geonames`` (or whatever the name of your track is) and run specific challenges with ``esrally --track=geonames --challenge=append-fast-with-conflicts``.

Expand Down
35 changes: 21 additions & 14 deletions esrally/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,43 @@ def __init__(self, cfg, clock, track, challenge, cluster, phase):


class LatencyBenchmark(Benchmark):
def __init__(self, cfg, clock, track, challenge, cluster, phase, queries, repetitions=1000):
def __init__(self, cfg, clock, track, challenge, cluster, phase, queries, warmup_repetitions=1000, repetitions=1000):
Benchmark.__init__(self, cfg, clock, track, challenge, cluster, phase)
self.queries = queries
self.warmup_repetitions = warmup_repetitions
self.repetitions = repetitions
self.stop_watch = self.clock.stop_watch()

def run(self):
logger.info("Running warmup iterations")
self._run_benchmark(" Benchmarking %s (warmup iteration %d/%d)")
self._run_benchmark(" Benchmarking %s (warmup iteration %d/%d)", self.warmup_repetitions)
logger.info("Running measurement iterations")
times = self._run_benchmark(" Benchmarking %s (iteration %d/%d)")
times = self._run_benchmark(" Benchmarking %s (iteration %d/%d)", self.repetitions)

for q in self.queries:
latencies = [t[q.name] for t in times]
for latency in latencies:
self.metrics_store.put_value_cluster_level("query_latency_%s" % q.name, convert.seconds_to_ms(latency), "ms")

def _run_benchmark(self, message):
def _run_benchmark(self, message, repetitions):
times = []
quiet = self.quiet_mode
if not quiet:
self._print_progress(message, 0)
for iteration in range(1, self.repetitions + 1):
self._print_progress(message, 0, repetitions)
for iteration in range(1, repetitions + 1):
if not quiet:
self._print_progress(message, iteration)
self._print_progress(message, iteration, repetitions)
times.append(self._run_one_round())
self.progress.finish()
return times

def _print_progress(self, message, iteration):
if iteration % (self.repetitions // 20) == 0:
progress_percent = round(100 * iteration / self.repetitions)
self.progress.print(message % (self.phase.name, iteration, self.repetitions), "[%3d%% done]" % progress_percent)
def _print_progress(self, message, iteration, repetitions):
if repetitions == 0:
self.progress.print(message % (self.phase.name, iteration, repetitions), "[100% done]")
else:
progress_percent = round(100 * iteration / repetitions)
if ((100 * iteration) / repetitions) % 5 == 0:
self.progress.print(message % (self.phase.name, iteration, repetitions), "[%3d%% done]" % progress_percent)

def _run_one_round(self):
d = {}
Expand All @@ -107,7 +111,9 @@ def _run_one_round(self):

class SearchBenchmark(LatencyBenchmark):
def __init__(self, cfg, clock, t, challenge, cluster):
super().__init__(cfg, clock, t, challenge, cluster, track.BenchmarkPhase.search, t.queries,
super().__init__(cfg, clock, t, challenge, cluster, track.BenchmarkPhase.search,
challenge.benchmark[track.BenchmarkPhase.search].queries,
challenge.benchmark[track.BenchmarkPhase.search].warmup_iteration_count,
challenge.benchmark[track.BenchmarkPhase.search].iteration_count)


Expand All @@ -134,7 +140,8 @@ def __init__(self, cfg, clock, t, challenge, cluster):
super().__init__(cfg, clock, t, challenge, cluster, track.BenchmarkPhase.stats, [
StatsQueryAdapter("indices_stats", cluster.indices_stats, metric="_all", level="shards"),
StatsQueryAdapter("node_stats", cluster.nodes_stats, metric="_all", level="shards"),
], challenge.benchmark[track.BenchmarkPhase.stats].iteration_count)
], challenge.benchmark[track.BenchmarkPhase.stats].warmup_iteration_count,
challenge.benchmark[track.BenchmarkPhase.stats].iteration_count)


class IndexedDocumentCountProbe:
Expand Down Expand Up @@ -226,7 +233,7 @@ def __init__(self, config, clock, track, challenge, cluster):
super().__init__(config, clock, track, challenge, cluster)

def index_documents(self, ids):
num_client_threads = self.challenge.clients
num_client_threads = self.challenge.benchmark[track.BenchmarkPhase.index].clients
logger.info("Launching %d client bulk indexing threads" % num_client_threads)

self.stop_watch.start()
Expand Down
21 changes: 11 additions & 10 deletions esrally/racecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ def start(self, command):
if command == "list":
self._list(ctx)
elif command == "race":
pipeline = self._choose(pipelines, "pipeline",
pipeline = self._choose(lambda n: pipelines[n], "pipeline",
"You can list the available pipelines with %s list pipelines." % PROGRAM_NAME)(ctx)
t = self._choose(track.tracks, "track", "You can list the available tracks with %s list tracks." % PROGRAM_NAME)
t = self._choose(lambda n: ctx.track_reader.read(n), "track", "You can list the available tracks with %s list tracks." % PROGRAM_NAME)
metrics.race_store(self._config).store_race(t)
pipeline.run(t)
return True
Expand All @@ -287,11 +287,11 @@ def start(self, command):
logging.exception("A fatal error occurred while the running benchmark.")
raise e

def _choose(self, source, what, help):
def _choose(self, loader, what, help):
name = self._config.opts("system", what)
try:
name = self._config.opts("system", what)
return source[name]
except KeyError:
return loader(name)
except (KeyError, FileNotFoundError):
raise exceptions.ImproperlyConfigured("Unknown %s [%s]. %s" % (what, name, help))

def _list(self, ctx):
Expand All @@ -302,8 +302,9 @@ def _list(self, ctx):
print("\nKeep in mind that each telemetry device may incur a runtime overhead which can skew results.")
elif what == "tracks":
print("Available tracks:\n")
print(tabulate.tabulate([[t.name, t.short_description, ",".join(map(str, t.challenges))] for t in track.tracks.values()],
headers=["Name", "Description", "Challenges"]))
print(tabulate.tabulate(
tabular_data=[[t.name, t.short_description, ",".join(map(str, t.challenges))] for t in ctx.track_reader.all_tracks()],
headers=["Name", "Description", "Challenges"]))

elif what == "pipelines":
print("Available pipelines:\n")
Expand All @@ -318,8 +319,7 @@ def _list(self, ctx):
print(tabulate.tabulate(races, headers=["Race Timestamp", "Track", "Challenge", "Car", "User Tag"]))
elif what == "cars":
print("Available cars:\n")
print(tabulate.tabulate([[car.name] for car in track.cars],
headers=["Name"]))
print(tabulate.tabulate([[car.name] for car in track.cars], headers=["Name"]))
else:
raise exceptions.ImproperlyConfigured("Cannot list unknown configuration option [%s]" % what)

Expand All @@ -331,3 +331,4 @@ def __init__(self, cfg):
self.marshal = track.Marshal(cfg)
self.reporter = reporter.SummaryReporter(cfg)
self.sweeper = sweeper.Sweeper(cfg)
self.track_reader = track.TrackFileReader(cfg)
14 changes: 7 additions & 7 deletions esrally/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ def report(self, t):
store.open(invocation, t.name, challenge.name, selected_car)

stats = Stats(store,
self.sample_size(challenge, track.BenchmarkPhase.stats),
t.queries,
self.sample_size(challenge, track.BenchmarkPhase.search))
self.guarded(lambda: challenge.benchmark[track.BenchmarkPhase.stats].iteration_count),
self.guarded(lambda: challenge.benchmark[track.BenchmarkPhase.search].queries),
self.guarded(lambda: challenge.benchmark[track.BenchmarkPhase.search].iteration_count))

metrics_table = []
if track.BenchmarkPhase.index in challenge.benchmark:
Expand All @@ -169,10 +169,10 @@ def report(self, t):

print_internal(tabulate.tabulate(metrics_table, headers=["Metric", "Value"], numalign="right", stralign="right"))

def sample_size(self, challenge, benchmark_phase):
if track.BenchmarkPhase.stats in challenge.benchmark:
return challenge.benchmark[benchmark_phase].iteration_count
else:
def guarded(self, op):
try:
return op()
except KeyError:
return None

def report_index_throughput(self, stats):
Expand Down
6 changes: 0 additions & 6 deletions esrally/track/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +0,0 @@
from os.path import dirname, basename, isfile
import glob

# "Autodiscover" all available tracks
modules = glob.glob(dirname(__file__) + "/*.py")
__all__ = [basename(f)[:-3] for f in modules if isfile(f)]
Loading

0 comments on commit 3d9a225

Please sign in to comment.