diff --git a/create-notice.sh b/create-notice.sh index 7f65d2415..88386cae1 100755 --- a/create-notice.sh +++ b/create-notice.sh @@ -52,6 +52,8 @@ function main { add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE" add_license "yappi" "https://raw.githubusercontent.com/sumerc/yappi/master/LICENSE" add_license "ijson" "https://raw.githubusercontent.com/ICRAR/ijson/master/LICENSE.txt" + add_license "google-resumable-media" "https://raw.githubusercontent.com/googleapis/google-resumable-media-python/master/LICENSE" + add_license "google-auth" "https://raw.githubusercontent.com/googleapis/google-auth-library-python/master/LICENSE" # transitive dependencies # Jinja2 dependencies @@ -65,7 +67,7 @@ function main { add_license "attrs" "https://raw.githubusercontent.com/python-attrs/attrs/master/LICENSE" add_license "chardet" "https://raw.githubusercontent.com/chardet/chardet/master/LICENSE" add_license "multidict" "https://raw.githubusercontent.com/aio-libs/multidict/master/LICENSE" - add_license "yarl" "https://github.com/aio-libs/yarl/blob/master/LICENSE" + add_license "yarl" "https://raw.githubusercontent.com/aio-libs/yarl/master/LICENSE" # yarl dependencies add_license "idna" "https://raw.githubusercontent.com/kjd/idna/master/LICENSE.md" # yarl dependency "multidict" is already coverered above @@ -73,6 +75,8 @@ function main { add_license "s3transfer" "https://raw.githubusercontent.com/boto/s3transfer/develop/LICENSE.txt" add_license "jmespath" "https://raw.githubusercontent.com/jmespath/jmespath.py/develop/LICENSE.txt" add_license "botocore" "https://raw.githubusercontent.com/boto/botocore/develop/LICENSE.txt" + # google-resumable-media dependencies + add_license "google-crc32c": "https://raw.githubusercontent.com/googleapis/python-crc32c/master/LICENSE" } main diff --git a/docs/track.rst b/docs/track.rst index aeef51dc5..3c66cd5fb 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -225,7 +225,10 @@ The ``corpora`` section contains all document corpora that are used by this trac Each entry in the ``documents`` list consists of the following properties: -* ``base-url`` (optional): A http(s) or S3 URL that points to the root path where Rally can obtain the corresponding source file. Rally can also download data from private S3 buckets if access is properly `configured `_. +* ``base-url`` (optional): A http(s), S3 or Google Storage URL that points to the root path where Rally can obtain the corresponding source file. Rally can also download data from private S3 or Google Storage buckets if access is properly configured: + + * S3 according to `docs `_. + * Google Storage: Either using `client library authentication `_ or by presenting an `oauth2 token `_ via the ``GOOGLE_AUTH_TOKEN`` environment variable, typically done using: ``export GOOGLE_AUTH_TOKEN=$(gcloud auth print-access-token)``. * ``source-format`` (optional, default: ``bulk``): Defines in which format Rally should interpret the data file specified by ``source-file``. Currently, only ``bulk`` is supported. * ``source-file`` (mandatory): File name of the corresponding documents. For local use, this file can be a ``.json`` file. If you provide a ``base-url`` we recommend that you provide a compressed file here. The following extensions are supported: ``.zip``, ``.bz2``, ``.gz``, ``.tar``, ``.tar.gz``, ``.tgz`` or ``.tar.bz2``. It must contain exactly one JSON file with the same name. The preferred file extension for our official tracks is ``.bz2``. * ``includes-action-and-meta-data`` (optional, defaults to ``false``): Defines whether the documents file contains already an action and meta-data line (``true``) or only documents (``false``). diff --git a/esrally/utils/net.py b/esrally/utils/net.py index 316070bba..3ac42147e 100644 --- a/esrally/utils/net.py +++ b/esrally/utils/net.py @@ -14,11 +14,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +import functools import logging import os import urllib.error +from urllib.parse import quote + import certifi import urllib3 @@ -69,7 +71,6 @@ def finish(self): self.p.finish() -# This function is not meant to be called externally and only exists for unit tests def _download_from_s3_bucket(bucket_name, bucket_path, local_path, expected_size_in_bytes=None, progress_indicator=None): # lazily initialize S3 support - we might not need it import boto3.s3.transfer @@ -94,17 +95,64 @@ def __call__(self, bytes_amount): Config=boto3.s3.transfer.TransferConfig(use_threads=False)) -def download_s3(url, local_path, expected_size_in_bytes=None, progress_indicator=None): +def _build_gcs_object_url(bucket_name, bucket_path): + # / and other special characters must be urlencoded in bucket and object names + # ref: https://cloud.google.com/storage/docs/request-endpoints#encoding + + return functools.reduce(urllib.parse.urljoin, [ + "https://storage.googleapis.com/storage/v1/b/", + f"{quote(bucket_name.strip('/'), safe='')}/", + "o/", + f"{quote(bucket_path.strip('/'), safe='')}", + "?alt=media" + ]) + + +def _download_from_gcs_bucket(bucket_name, bucket_path, local_path, expected_size_in_bytes=None, progress_indicator=None): + # lazily initialize Google Cloud Storage support - we might not need it + import google.oauth2.credentials + import google.auth.transport.requests as tr_requests + import google.auth + # Using Google Resumable Media as the standard storage library doesn't support progress + # (https://github.com/googleapis/python-storage/issues/27) + from google.resumable_media.requests import ChunkedDownload + ro_scope = "https://www.googleapis.com/auth/devstorage.read_only" + + access_token = os.environ.get("GOOGLE_AUTH_TOKEN") + if access_token: + credentials = google.oauth2.credentials.Credentials(token=access_token, scopes=(ro_scope, )) + else: + # https://google-auth.readthedocs.io/en/latest/user-guide.html + credentials, _ = google.auth.default(scopes=(ro_scope,)) + + transport = tr_requests.AuthorizedSession(credentials) + chunk_size = 50 * 1024 * 1024 # 50MB + + with open(local_path, "wb") as local_fp: + media_url = _build_gcs_object_url(bucket_name, bucket_path) + download = ChunkedDownload(media_url, chunk_size, local_fp) + # allow us to calculate the total bytes + download.consume_next_chunk(transport) + if not expected_size_in_bytes: + expected_size_in_bytes = download.total_bytes + while not download.finished: + if progress_indicator and download.bytes_downloaded and download.total_bytes: + progress_indicator(download.bytes_downloaded, expected_size_in_bytes) + download.consume_next_chunk(transport) + + +def download_from_bucket(blobstore, url, local_path, expected_size_in_bytes=None, progress_indicator=None): + blob_downloader = {"s3": _download_from_s3_bucket, "gs": _download_from_gcs_bucket} logger = logging.getLogger(__name__) - bucket_and_path = url[5:] + bucket_and_path = url[5:] # s3:// or gs:// prefix for now bucket_end_index = bucket_and_path.find("/") bucket = bucket_and_path[:bucket_end_index] # we need to remove the leading "/" bucket_path = bucket_and_path[bucket_end_index + 1:] - logger.info("Downloading from S3 bucket [%s] and path [%s] to [%s].", bucket, bucket_path, local_path) - _download_from_s3_bucket(bucket, bucket_path, local_path, expected_size_in_bytes, progress_indicator) + logger.info("Downloading from [%s] bucket [%s] and path [%s] to [%s].", blobstore, bucket, bucket_path, local_path) + blob_downloader[blobstore](bucket, bucket_path, local_path, expected_size_in_bytes, progress_indicator) return expected_size_in_bytes @@ -146,8 +194,9 @@ def download(url, local_path, expected_size_in_bytes=None, progress_indicator=No """ tmp_data_set_path = local_path + ".tmp" try: - if url.startswith("s3"): - expected_size_in_bytes = download_s3(url, tmp_data_set_path, expected_size_in_bytes, progress_indicator) + scheme = urllib3.util.parse_url(url).scheme + if scheme in ["s3", "gs"]: + expected_size_in_bytes = download_from_bucket(scheme, url, tmp_data_set_path, expected_size_in_bytes, progress_indicator) else: expected_size_in_bytes = download_http(url, tmp_data_set_path, expected_size_in_bytes, progress_indicator) except BaseException: diff --git a/setup.py b/setup.py index 1e054fd58..17bb70fc4 100644 --- a/setup.py +++ b/setup.py @@ -79,7 +79,13 @@ def str_from_file(name): # License: Apache 2.0 "yappi==1.2.3", # License: BSD - "ijson==2.6.1" + "ijson==2.6.1", + # License: Apache 2.0 + # transitive dependencies: + # google-crc32c: Apache 2.0 + "google-resumable-media==1.1.0", + # License: Apache 2.0 + "google-auth==1.21.1" ] tests_require = [ diff --git a/tests/utils/net_test.py b/tests/utils/net_test.py index c7aaecd58..cf8ac1aae 100644 --- a/tests/utils/net_test.py +++ b/tests/utils/net_test.py @@ -16,23 +16,49 @@ # under the License. import random import unittest.mock as mock -from unittest import TestCase + +import pytest from esrally.utils import net -class NetTests(TestCase): +class TestNetUtils: # Mocking boto3 objects directly is too complex so we keep all code in a helper function and mock this instead + @pytest.mark.parametrize("seed", range(1)) @mock.patch("esrally.utils.net._download_from_s3_bucket") - def test_download_from_s3_bucket(self, download): + def test_download_from_s3_bucket(self, download, seed): + random.seed(seed) expected_size = random.choice([None, random.randint(0, 1000)]) progress_indicator = random.choice([None, "some progress indicator"]) - net.download_s3("s3://mybucket.elasticsearch.org/data/documents.json.bz2", "/tmp/documents.json.bz2", - expected_size, progress_indicator) + net.download_from_bucket("s3", "s3://mybucket.elasticsearch.org/data/documents.json.bz2", "/tmp/documents.json.bz2", + expected_size, progress_indicator) download.assert_called_once_with("mybucket.elasticsearch.org", "data/documents.json.bz2", "/tmp/documents.json.bz2", expected_size, progress_indicator) + @pytest.mark.parametrize("seed", range(1)) + @mock.patch("esrally.utils.net._download_from_gcs_bucket") + def test_download_from_gs_bucket(self, download, seed): + random.seed(seed) + expected_size = random.choice([None, random.randint(0, 1000)]) + progress_indicator = random.choice([None, "some progress indicator"]) + + net.download_from_bucket("gs", "gs://unittest-gcp-bucket.test.org/data/documents.json.bz2", "/tmp/documents.json.bz2", + expected_size, progress_indicator) + download.assert_called_once_with("unittest-gcp-bucket.test.org", "data/documents.json.bz2", + "/tmp/documents.json.bz2", expected_size, progress_indicator) + + @pytest.mark.parametrize("seed", range(40)) + def test_gcs_object_url(self, seed): + random.seed(seed) + bucket_name = random.choice(["unittest-bucket.test.me", "/unittest-bucket.test.me", + "/unittest-bucket.test.me/", "unittest-bucket.test.me/"]) + bucket_path = random.choice(["path/to/object", "/path/to/object", + "/path/to/object/", "path/to/object/"]) + + assert net._build_gcs_object_url(bucket_name, bucket_path) == \ + "https://storage.googleapis.com/storage/v1/b/unittest-bucket.test.me/o/path%2Fto%2Fobject?alt=media" + def test_progress(self): progress = net.Progress("test") mock_progress = mock.Mock()