Skip to content

Commit

Permalink
Add support for Google Storage buckets (#1094)
Browse files Browse the repository at this point in the history
This commit adds support for GCS hosted buckets storing source data.

While at it, switch utils/net.py tests to pytest.
  • Loading branch information
dliappis authored Oct 19, 2020
1 parent 1bdec4f commit e5c5164
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 16 deletions.
6 changes: 5 additions & 1 deletion create-notice.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ function main {
add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE"
add_license "yappi" "https://raw.githubusercontent.com/sumerc/yappi/master/LICENSE"
add_license "ijson" "https://raw.githubusercontent.com/ICRAR/ijson/master/LICENSE.txt"
add_license "google-resumable-media" "https://raw.githubusercontent.com/googleapis/google-resumable-media-python/master/LICENSE"
add_license "google-auth" "https://raw.githubusercontent.com/googleapis/google-auth-library-python/master/LICENSE"

# transitive dependencies
# Jinja2 dependencies
Expand All @@ -65,14 +67,16 @@ function main {
add_license "attrs" "https://raw.githubusercontent.com/python-attrs/attrs/master/LICENSE"
add_license "chardet" "https://raw.githubusercontent.com/chardet/chardet/master/LICENSE"
add_license "multidict" "https://raw.githubusercontent.com/aio-libs/multidict/master/LICENSE"
add_license "yarl" "https://github.com/aio-libs/yarl/blob/master/LICENSE"
add_license "yarl" "https://raw.githubusercontent.com/aio-libs/yarl/master/LICENSE"
# yarl dependencies
add_license "idna" "https://raw.githubusercontent.com/kjd/idna/master/LICENSE.md"
# yarl dependency "multidict" is already coverered above
# boto3 dependencies
add_license "s3transfer" "https://raw.githubusercontent.com/boto/s3transfer/develop/LICENSE.txt"
add_license "jmespath" "https://raw.githubusercontent.com/jmespath/jmespath.py/develop/LICENSE.txt"
add_license "botocore" "https://raw.githubusercontent.com/boto/botocore/develop/LICENSE.txt"
# google-resumable-media dependencies
add_license "google-crc32c": "https://raw.githubusercontent.com/googleapis/python-crc32c/master/LICENSE"
}

main
5 changes: 4 additions & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ The ``corpora`` section contains all document corpora that are used by this trac

Each entry in the ``documents`` list consists of the following properties:

* ``base-url`` (optional): A http(s) or S3 URL that points to the root path where Rally can obtain the corresponding source file. Rally can also download data from private S3 buckets if access is properly `configured <https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration>`_.
* ``base-url`` (optional): A http(s), S3 or Google Storage URL that points to the root path where Rally can obtain the corresponding source file. Rally can also download data from private S3 or Google Storage buckets if access is properly configured:

* S3 according to `docs <https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration>`_.
* Google Storage: Either using `client library authentication <https://cloud.google.com/storage/docs/reference/libraries#setting_up_authentication>`_ or by presenting an `oauth2 token <https://cloud.google.com/storage/docs/authentication>`_ via the ``GOOGLE_AUTH_TOKEN`` environment variable, typically done using: ``export GOOGLE_AUTH_TOKEN=$(gcloud auth print-access-token)``.
* ``source-format`` (optional, default: ``bulk``): Defines in which format Rally should interpret the data file specified by ``source-file``. Currently, only ``bulk`` is supported.
* ``source-file`` (mandatory): File name of the corresponding documents. For local use, this file can be a ``.json`` file. If you provide a ``base-url`` we recommend that you provide a compressed file here. The following extensions are supported: ``.zip``, ``.bz2``, ``.gz``, ``.tar``, ``.tar.gz``, ``.tgz`` or ``.tar.bz2``. It must contain exactly one JSON file with the same name. The preferred file extension for our official tracks is ``.bz2``.
* ``includes-action-and-meta-data`` (optional, defaults to ``false``): Defines whether the documents file contains already an action and meta-data line (``true``) or only documents (``false``).
Expand Down
65 changes: 57 additions & 8 deletions esrally/utils/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import functools
import logging
import os
import urllib.error

from urllib.parse import quote

import certifi
import urllib3

Expand Down Expand Up @@ -69,7 +71,6 @@ def finish(self):
self.p.finish()


# This function is not meant to be called externally and only exists for unit tests
def _download_from_s3_bucket(bucket_name, bucket_path, local_path, expected_size_in_bytes=None, progress_indicator=None):
# lazily initialize S3 support - we might not need it
import boto3.s3.transfer
Expand All @@ -94,17 +95,64 @@ def __call__(self, bytes_amount):
Config=boto3.s3.transfer.TransferConfig(use_threads=False))


def download_s3(url, local_path, expected_size_in_bytes=None, progress_indicator=None):
def _build_gcs_object_url(bucket_name, bucket_path):
# / and other special characters must be urlencoded in bucket and object names
# ref: https://cloud.google.com/storage/docs/request-endpoints#encoding

return functools.reduce(urllib.parse.urljoin, [
"https://storage.googleapis.com/storage/v1/b/",
f"{quote(bucket_name.strip('/'), safe='')}/",
"o/",
f"{quote(bucket_path.strip('/'), safe='')}",
"?alt=media"
])


def _download_from_gcs_bucket(bucket_name, bucket_path, local_path, expected_size_in_bytes=None, progress_indicator=None):
# lazily initialize Google Cloud Storage support - we might not need it
import google.oauth2.credentials
import google.auth.transport.requests as tr_requests
import google.auth
# Using Google Resumable Media as the standard storage library doesn't support progress
# (https://github.com/googleapis/python-storage/issues/27)
from google.resumable_media.requests import ChunkedDownload
ro_scope = "https://www.googleapis.com/auth/devstorage.read_only"

access_token = os.environ.get("GOOGLE_AUTH_TOKEN")
if access_token:
credentials = google.oauth2.credentials.Credentials(token=access_token, scopes=(ro_scope, ))
else:
# https://google-auth.readthedocs.io/en/latest/user-guide.html
credentials, _ = google.auth.default(scopes=(ro_scope,))

transport = tr_requests.AuthorizedSession(credentials)
chunk_size = 50 * 1024 * 1024 # 50MB

with open(local_path, "wb") as local_fp:
media_url = _build_gcs_object_url(bucket_name, bucket_path)
download = ChunkedDownload(media_url, chunk_size, local_fp)
# allow us to calculate the total bytes
download.consume_next_chunk(transport)
if not expected_size_in_bytes:
expected_size_in_bytes = download.total_bytes
while not download.finished:
if progress_indicator and download.bytes_downloaded and download.total_bytes:
progress_indicator(download.bytes_downloaded, expected_size_in_bytes)
download.consume_next_chunk(transport)


def download_from_bucket(blobstore, url, local_path, expected_size_in_bytes=None, progress_indicator=None):
blob_downloader = {"s3": _download_from_s3_bucket, "gs": _download_from_gcs_bucket}
logger = logging.getLogger(__name__)

bucket_and_path = url[5:]
bucket_and_path = url[5:] # s3:// or gs:// prefix for now
bucket_end_index = bucket_and_path.find("/")
bucket = bucket_and_path[:bucket_end_index]
# we need to remove the leading "/"
bucket_path = bucket_and_path[bucket_end_index + 1:]

logger.info("Downloading from S3 bucket [%s] and path [%s] to [%s].", bucket, bucket_path, local_path)
_download_from_s3_bucket(bucket, bucket_path, local_path, expected_size_in_bytes, progress_indicator)
logger.info("Downloading from [%s] bucket [%s] and path [%s] to [%s].", blobstore, bucket, bucket_path, local_path)
blob_downloader[blobstore](bucket, bucket_path, local_path, expected_size_in_bytes, progress_indicator)

return expected_size_in_bytes

Expand Down Expand Up @@ -146,8 +194,9 @@ def download(url, local_path, expected_size_in_bytes=None, progress_indicator=No
"""
tmp_data_set_path = local_path + ".tmp"
try:
if url.startswith("s3"):
expected_size_in_bytes = download_s3(url, tmp_data_set_path, expected_size_in_bytes, progress_indicator)
scheme = urllib3.util.parse_url(url).scheme
if scheme in ["s3", "gs"]:
expected_size_in_bytes = download_from_bucket(scheme, url, tmp_data_set_path, expected_size_in_bytes, progress_indicator)
else:
expected_size_in_bytes = download_http(url, tmp_data_set_path, expected_size_in_bytes, progress_indicator)
except BaseException:
Expand Down
8 changes: 7 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,13 @@ def str_from_file(name):
# License: Apache 2.0
"yappi==1.2.3",
# License: BSD
"ijson==2.6.1"
"ijson==2.6.1",
# License: Apache 2.0
# transitive dependencies:
# google-crc32c: Apache 2.0
"google-resumable-media==1.1.0",
# License: Apache 2.0
"google-auth==1.21.1"
]

tests_require = [
Expand Down
36 changes: 31 additions & 5 deletions tests/utils/net_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,49 @@
# under the License.
import random
import unittest.mock as mock
from unittest import TestCase

import pytest

from esrally.utils import net


class NetTests(TestCase):
class TestNetUtils:
# Mocking boto3 objects directly is too complex so we keep all code in a helper function and mock this instead
@pytest.mark.parametrize("seed", range(1))
@mock.patch("esrally.utils.net._download_from_s3_bucket")
def test_download_from_s3_bucket(self, download):
def test_download_from_s3_bucket(self, download, seed):
random.seed(seed)
expected_size = random.choice([None, random.randint(0, 1000)])
progress_indicator = random.choice([None, "some progress indicator"])

net.download_s3("s3://mybucket.elasticsearch.org/data/documents.json.bz2", "/tmp/documents.json.bz2",
expected_size, progress_indicator)
net.download_from_bucket("s3", "s3://mybucket.elasticsearch.org/data/documents.json.bz2", "/tmp/documents.json.bz2",
expected_size, progress_indicator)
download.assert_called_once_with("mybucket.elasticsearch.org", "data/documents.json.bz2",
"/tmp/documents.json.bz2", expected_size, progress_indicator)

@pytest.mark.parametrize("seed", range(1))
@mock.patch("esrally.utils.net._download_from_gcs_bucket")
def test_download_from_gs_bucket(self, download, seed):
random.seed(seed)
expected_size = random.choice([None, random.randint(0, 1000)])
progress_indicator = random.choice([None, "some progress indicator"])

net.download_from_bucket("gs", "gs://unittest-gcp-bucket.test.org/data/documents.json.bz2", "/tmp/documents.json.bz2",
expected_size, progress_indicator)
download.assert_called_once_with("unittest-gcp-bucket.test.org", "data/documents.json.bz2",
"/tmp/documents.json.bz2", expected_size, progress_indicator)

@pytest.mark.parametrize("seed", range(40))
def test_gcs_object_url(self, seed):
random.seed(seed)
bucket_name = random.choice(["unittest-bucket.test.me", "/unittest-bucket.test.me",
"/unittest-bucket.test.me/", "unittest-bucket.test.me/"])
bucket_path = random.choice(["path/to/object", "/path/to/object",
"/path/to/object/", "path/to/object/"])

assert net._build_gcs_object_url(bucket_name, bucket_path) == \
"https://storage.googleapis.com/storage/v1/b/unittest-bucket.test.me/o/path%2Fto%2Fobject?alt=media"

def test_progress(self):
progress = net.Progress("test")
mock_progress = mock.Mock()
Expand Down

0 comments on commit e5c5164

Please sign in to comment.