Skip to content

Commit

Permalink
Cache source artifacts by default (#998)
Browse files Browse the repository at this point in the history
With this commit we add the artifact cache introduced by #992 also for
plugins and implement a cache cleanup mechanism that cleans up artifacts
after seven days by default (can be configured with `cache.days`). As
artifact caching can now replace the pipeline `from-sources-skip-build`
we deprecate it along with `from-sources-complete` and introduce a new
pipeline `from-sources` instead.
  • Loading branch information
danielmitterdorfer authored May 25, 2020
1 parent 1ca68e9 commit 78a5fb5
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 71 deletions.
2 changes: 1 addition & 1 deletion docs/command_line_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ This enables the HTTP exporter of `X-Pack Monitoring <https://www.elastic.co/pro

Selects the :doc:`pipeline </pipelines>` that Rally should run.

Rally can autodetect the pipeline in most cases. If you specify ``--distribution-version`` it will auto-select the pipeline ``from-distribution`` otherwise it will use ``from-sources-complete``.
Rally can autodetect the pipeline in most cases. If you specify ``--distribution-version`` it will auto-select the pipeline ``from-distribution`` otherwise it will use ``from-sources``.

.. _clr_enable_driver_profiling:

Expand Down
9 changes: 9 additions & 0 deletions docs/migrate.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Migration Guide
===============

Migrating to Rally 2.0.1
------------------------

Pipelines from-sources-complete and from-sources-skip-build are deprecated
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Rally 2.0.1 caches source artifacts automatically in ``~/.rally/benchmarks/distributions/src``. Therefore, it is not necessary anymore to explicitly skip the build with ``--pipeline=from-sources-skip-build``. Specify ``--pipeline=from-sources`` instead. See the :doc:`pipeline reference documentation </pipelines>` for more details.


Migrating to Rally 2.0.0
------------------------

Expand Down
27 changes: 16 additions & 11 deletions docs/pipelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ You can get a list of all pipelines with ``esrally list pipelines``::

Name Description
----------------------- ---------------------------------------------------------------------------------------------
from-sources Builds and provisions Elasticsearch, runs a benchmark and reports results.
from-sources-complete Builds and provisions Elasticsearch, runs a benchmark and reports results [deprecated].
from-sources-skip-build Provisions Elasticsearch (skips the build), runs a benchmark and reports results [deprecated].
from-distribution Downloads an Elasticsearch distribution, provisions it, runs a benchmark and reports results.
from-sources-complete Builds and provisions Elasticsearch, runs a benchmark and reports results.
benchmark-only Assumes an already running Elasticsearch instance, runs a benchmark and reports results
from-sources-skip-build Provisions Elasticsearch (skips the build), runs a benchmark and reports results.

benchmark-only
~~~~~~~~~~~~~~
Expand Down Expand Up @@ -45,28 +46,32 @@ However, this feature is mainly intended for continuous integration environments

This pipeline is just mentioned for completeness but Rally will autoselect it for you. All you need to do is to define the ``--distribution-version`` flag.

.. _pipelines_from-sources-complete:

from-sources-complete
~~~~~~~~~~~~~~~~~~~~~
from-sources
~~~~~~~~~~~~

You should use this pipeline when you want to build and benchmark Elasticsearch from sources. This pipeline will only work from Elasticsearch 5.0 onwards because Elasticsearch switched from Maven to Gradle and Rally only supports one build tool in the interest of maintainability.

Remember that you also need git installed. If that's not the case you'll get an error and have to run ``esrally configure`` first. An example invocation::

esrally --pipeline=from-sources-complete --revision=latest
esrally --pipeline=from-sources --revision=latest

You have to specify a :ref:`revision <clr_revision>`.

.. note::

This pipeline is just mentioned for completeness but Rally will automatically select it for you. All you need to do is to define the ``--revision`` flag.

To enable artifact caching for source builds, set ``cache`` to ``true`` in the section ``source`` in the configuration file in ``~/.rally/rally.ini``. Source builds will then be cached in ``~/.rally/benchmarks/distributions`` but artifacts will not be evicted automatically.
Artifacts are cached for seven days by default in ``~/.rally/benchmarks/distributions/src``. Artifact caching can be configured with the following sections in the section ``source`` in the configuration file in ``~/.rally/rally.ini``:

* ``cache`` (default: ``True``): Set to ``False`` to disable artifact caching.
* ``cache.days`` (default: ``7``): The maximum age in days of an artifact before it gets evicted from the artifact cache.

from-sources-complete
~~~~~~~~~~~~~~~~~~~~~

This deprecated pipeline is an alias for ``from-sources`` and is only provided for backwards-compatibility. Use ``from-sources`` instead.

from-sources-skip-build
~~~~~~~~~~~~~~~~~~~~~~~

This pipeline is similar to ``from-sources-complete`` except that it assumes you have built the binary once. It saves time if you want to run a benchmark twice for the exact same version of Elasticsearch. Obviously it doesn't make sense to provide a revision: It is always the previously built revision. An example invocation::

esrally --pipeline=from-sources-skip-build
This deprecated pipeline is similar to ``from-sources-complete`` except that it assumes you have built the binary once. Use ``from-sources`` instead which caches built artifacts automatically.
143 changes: 120 additions & 23 deletions esrally/mechanic/supplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import datetime
import glob
import logging
import os
Expand All @@ -34,6 +35,7 @@ def create(cfg, sources, distribution, build, car, plugins=None):
logger = logging.getLogger(__name__)
if plugins is None:
plugins = []
caching_enabled = cfg.opts("source", "cache", mandatory=False, default_value=True)
revisions = _extract_revisions(cfg.opts("mechanic", "source.revision"))
distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False)
supply_requirements = _supply_requirements(sources, distribution, build, plugins, revisions, distribution_version)
Expand Down Expand Up @@ -63,6 +65,18 @@ def create(cfg, sources, distribution, build, car, plugins=None):
# ... but the user can override it in rally.ini
dist_cfg.update(cfg.all_opts("distributions"))

if caching_enabled:
logger.info("Enabling source artifact caching.")
max_age_days = int(cfg.opts("source", "cache.days", mandatory=False, default_value=7))
if max_age_days <= 0:
raise exceptions.SystemSetupError(f"cache.days must be a positive number but is {max_age_days}")

source_distributions_root = os.path.join(distributions_root, "src")
_prune(source_distributions_root, max_age_days)
else:
logger.info("Disabling source artifact caching.")
source_distributions_root = None

if es_supplier_type == "source":
es_src_dir = os.path.join(_src_dir(cfg), _config_value(src_config, "elasticsearch.src.subdir"))

Expand All @@ -73,12 +87,12 @@ def create(cfg, sources, distribution, build, car, plugins=None):
builder=builder,
template_renderer=template_renderer)

if cfg.opts("source", "cache", mandatory=False, default_value=False):
logger.info("Enabling source artifact caching.")
source_supplier = CachedElasticsearchSourceSupplier(distributions_root,
source_supplier,
dist_cfg,
template_renderer)
if caching_enabled:
es_file_resolver = ElasticsearchFileNameResolver(dist_cfg, template_renderer)
source_supplier = CachedSourceSupplier(source_distributions_root,
source_supplier,
es_file_resolver)

suppliers.append(source_supplier)
repo = None
else:
Expand All @@ -94,14 +108,21 @@ def create(cfg, sources, distribution, build, car, plugins=None):
if supplier_type == "source":
if CorePluginSourceSupplier.can_handle(plugin):
logger.info("Adding core plugin source supplier for [%s].", plugin.name)
assert es_src_dir is not None, "Cannot build core plugin %s when Elasticsearch is not built from source." % plugin.name
suppliers.append(CorePluginSourceSupplier(plugin, es_src_dir, builder))
assert es_src_dir is not None, f"Cannot build core plugin {plugin.name} when Elasticsearch is not built from source."
plugin_supplier = CorePluginSourceSupplier(plugin, es_src_dir, builder)
elif ExternalPluginSourceSupplier.can_handle(plugin):
logger.info("Adding external plugin source supplier for [%s].", plugin.name)
suppliers.append(ExternalPluginSourceSupplier(plugin, plugin_version, _src_dir(cfg, mandatory=False), src_config, builder))
plugin_supplier = ExternalPluginSourceSupplier(plugin, plugin_version, _src_dir(cfg, mandatory=False), src_config, builder)
else:
raise exceptions.RallyError("Plugin %s can neither be treated as core nor as external plugin. Requirements: %s" %
(plugin.name, supply_requirements[plugin.name]))

if caching_enabled:
plugin_file_resolver = PluginFileNameResolver(plugin.name)
plugin_supplier = CachedSourceSupplier(source_distributions_root,
plugin_supplier,
plugin_file_resolver)
suppliers.append(plugin_supplier)
else:
logger.info("Adding plugin distribution supplier for [%s].", plugin.name)
assert repo is not None, "Cannot benchmark plugin %s from a distribution version but Elasticsearch from sources" % plugin.name
Expand Down Expand Up @@ -192,6 +213,35 @@ def _src_dir(cfg, mandatory=True):
" all prerequisites and reconfigure Rally with %s configure" % PROGRAM_NAME)


def _prune(root_path, max_age_days):
"""
Removes files that are older than ``max_age_days`` from ``root_path``. Subdirectories are not traversed.
:param root_path: A directory which should be checked.
:param max_age_days: Files that have been created more than ``max_age_days`` ago are deleted.
"""
logger = logging.getLogger(__name__)
if not os.path.exists(root_path):
logger.info("[%s] does not exist. Skipping pruning.", root_path)
return

for f in os.listdir(root_path):
artifact = os.path.join(root_path, f)
if os.path.isfile(artifact):
max_age = datetime.datetime.now() - datetime.timedelta(days=max_age_days)
try:
created_at = datetime.datetime.fromtimestamp(os.lstat(artifact).st_ctime)
if created_at < max_age:
logger.info("Deleting [%s] from artifact cache (reached max age).", f)
os.remove(artifact)
else:
logger.debug("Keeping [%s] (max age not yet reached)", f)
except OSError:
logger.exception("Could not check whether [%s] needs to be deleted from artifact cache.", artifact)
else:
logger.info("Skipping [%s] (not a file).", artifact)


class TemplateRenderer:
def __init__(self, version, os_name=None, arch=None):
self.version = version
Expand Down Expand Up @@ -231,16 +281,19 @@ def __call__(self, *args, **kwargs):
return binaries


class CachedElasticsearchSourceSupplier:
def __init__(self, distributions_root, source_supplier, distribution_config, template_renderer):
self.distributions_root = distributions_root
self.source_supplier = source_supplier
self.template_renderer = template_renderer
class ElasticsearchFileNameResolver:
def __init__(self, distribution_config, template_renderer):
self.cfg = distribution_config
self.runtime_jdk_bundled = convert.to_bool(self.cfg.get("runtime.jdk.bundled", False))
self.revision = None
self.cached_path = None
self.logger = logging.getLogger(__name__)
self.template_renderer = template_renderer

@property
def revision(self):
return self.template_renderer.version

@revision.setter
def revision(self, revision):
self.template_renderer.version = revision

@property
def file_name(self):
Expand All @@ -251,6 +304,29 @@ def file_name(self):
url = self.template_renderer.render(self.cfg[url_key])
return url[url.rfind("/") + 1:]

@property
def artifact_key(self):
return "elasticsearch"

def to_artifact_path(self, file_system_path):
return file_system_path

def to_file_system_path(self, artifact_path):
return artifact_path


class CachedSourceSupplier:
def __init__(self, distributions_root, source_supplier, file_resolver):
self.distributions_root = distributions_root
self.source_supplier = source_supplier
self.file_resolver = file_resolver
self.cached_path = None
self.logger = logging.getLogger(__name__)

@property
def file_name(self):
return self.file_resolver.file_name

@property
def cached(self):
return self.cached_path is not None and os.path.exists(self.cached_path)
Expand All @@ -259,7 +335,7 @@ def fetch(self):
resolved_revision = self.source_supplier.fetch()
if resolved_revision:
# ensure we use the resolved revision for rendering the artifact
self.template_renderer.version = resolved_revision
self.file_resolver.revision = resolved_revision
self.cached_path = os.path.join(self.distributions_root, self.file_name)

def prepare(self):
Expand All @@ -269,18 +345,18 @@ def prepare(self):
def add(self, binaries):
if self.cached:
self.logger.info("Using cached artifact in [%s]", self.cached_path)
binaries["elasticsearch"] = self.cached_path
binaries[self.file_resolver.artifact_key] = self.file_resolver.to_artifact_path(self.cached_path)
else:
self.source_supplier.add(binaries)
original_path = binaries["elasticsearch"]
original_path = self.file_resolver.to_file_system_path(binaries[self.file_resolver.artifact_key])
# this can be None if the Elasticsearch does not reside in a git repo and the user has only
# copied all source files. In that case, we cannot resolve a revision hash and thus we cannot cache.
if self.cached_path:
try:
io.ensure_dir(io.dirname(self.cached_path))
shutil.copy(original_path, self.cached_path)
self.logger.info("Caching artifact in [%s]", self.cached_path)
binaries["elasticsearch"] = self.cached_path
binaries[self.file_resolver.artifact_key] = self.file_resolver.to_artifact_path(self.cached_path)
except OSError:
self.logger.exception("Not caching [%s].", original_path)
else:
Expand Down Expand Up @@ -318,6 +394,26 @@ def resolve_binary(self):
raise SystemSetupError("Couldn't find a tar.gz distribution. Please run Rally with the pipeline 'from-sources-complete'.")


class PluginFileNameResolver:
def __init__(self, plugin_name):
self.plugin_name = plugin_name
self.revision = None

@property
def file_name(self):
return f"{self.plugin_name}-{self.revision}.zip"

@property
def artifact_key(self):
return self.plugin_name

def to_artifact_path(self, file_system_path):
return f"file://{file_system_path}"

def to_file_system_path(self, artifact_path):
return artifact_path[len("file://"):]


class ExternalPluginSourceSupplier:
def __init__(self, plugin, revision, src_dir, src_config, builder):
assert not plugin.core_plugin, "Plugin %s is a core plugin" % plugin.name
Expand Down Expand Up @@ -348,7 +444,7 @@ def can_handle(plugin):
def fetch(self):
# optional (but then source code is assumed to be available locally)
plugin_remote_url = self.src_config.get("plugin.%s.remote.repo.url" % self.plugin.name)
SourceRepository(self.plugin.name, plugin_remote_url, self.plugin_src_dir).fetch(self.revision)
return SourceRepository(self.plugin.name, plugin_remote_url, self.plugin_src_dir).fetch(self.revision)

def prepare(self):
if self.builder:
Expand Down Expand Up @@ -380,7 +476,8 @@ def can_handle(plugin):
return plugin.core_plugin

def fetch(self):
pass
# Just retrieve the current revision *number* and assume that Elasticsearch has prepared the source tree.
return SourceRepository("Elasticsearch", None, self.es_src_dir).fetch(revision="current")

def prepare(self):
if self.builder:
Expand Down
11 changes: 7 additions & 4 deletions esrally/mechanic/team.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,17 @@ def load_plugin(self, name, config_names, plugin_params=None):
self.logger.info("Loading plugin [%s] with default configuration.", name)

root_path = self._plugin_root_path(name)
# used to determine whether this is a core plugin
core_plugin = self._core_plugin(name)
if not config_names:
# maybe we only have a config folder but nothing else (e.g. if there is only an install hook)
if io.exists(root_path):
return PluginDescriptor(name=name, config=config_names, root_path=root_path, variables=plugin_params)
return PluginDescriptor(name=name,
core_plugin=core_plugin is not None,
config=config_names,
root_path=root_path,
variables=plugin_params)
else:
core_plugin = self._core_plugin(name, plugin_params)
if core_plugin:
return core_plugin
# If we just have a plugin name then we assume that this is a community plugin and the user has specified a download URL
Expand All @@ -371,8 +376,6 @@ def load_plugin(self, name, config_names, plugin_params=None):
config_paths = []
# used for deduplication
known_config_bases = set()
# used to determine whether this is a core plugin
core_plugin = self._core_plugin(name)

for config_name in config_names:
config_file = self._plugin_file(name, config_name)
Expand Down
Loading

0 comments on commit 78a5fb5

Please sign in to comment.