diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index beffb18a17179..67c3a4fb9d6a2 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -43,6 +43,7 @@ from apache_beam import version as beam_version from apache_beam.internal.gcp import auth +from apache_beam.io.gcp import gcsio_retry from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.utils import retry @@ -147,6 +148,7 @@ def __init__(self, storage_client=None, pipeline_options=None): self.client = storage_client self._rewrite_cb = None self.bucket_to_project_number = {} + self._storage_client_retry = gcsio_retry.get_retry(pipeline_options) def get_project_number(self, bucket): if bucket not in self.bucket_to_project_number: @@ -159,7 +161,8 @@ def get_project_number(self, bucket): def get_bucket(self, bucket_name): """Returns an object bucket from its name, or None if it does not exist.""" try: - return self.client.lookup_bucket(bucket_name) + return self.client.lookup_bucket( + bucket_name, retry=self._storage_client_retry) except NotFound: return None @@ -180,7 +183,7 @@ def create_bucket( bucket_or_name=bucket, project=project, location=location, - ) + retry=self._storage_client_retry) if kms_key: bucket.default_kms_key_name(kms_key) bucket.patch() @@ -213,10 +216,11 @@ def open( if mode == 'r' or mode == 'rb': blob = bucket.blob(blob_name) - return BeamBlobReader(blob, chunk_size=read_buffer_size) + return BeamBlobReader( + blob, chunk_size=read_buffer_size, retry=self._storage_client_retry) elif mode == 'w' or mode == 'wb': blob = bucket.blob(blob_name) - return BeamBlobWriter(blob, mime_type) + return BeamBlobWriter(blob, mime_type, retry=self._storage_client_retry) else: raise ValueError('Invalid file open mode: %s.' % mode) @@ -448,7 +452,7 @@ def _gcs_object(self, path): """ bucket_name, blob_name = parse_gcs_path(path) bucket = self.client.bucket(bucket_name) - blob = bucket.get_blob(blob_name) + blob = bucket.get_blob(blob_name, retry=self._storage_client_retry) if blob: return blob else: @@ -496,7 +500,8 @@ def list_files(self, path, with_metadata=False): else: _LOGGER.debug("Starting the size estimation of the input") bucket = self.client.bucket(bucket_name) - response = self.client.list_blobs(bucket, prefix=prefix) + response = self.client.list_blobs( + bucket, prefix=prefix, retry=self._storage_client_retry) for item in response: file_name = 'gs://%s/%s' % (item.bucket.name, item.name) if file_name not in file_info: @@ -544,18 +549,24 @@ def is_soft_delete_enabled(self, gcs_path): class BeamBlobReader(BlobReader): - def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE): - super().__init__(blob, chunk_size=chunk_size) + def __init__( + self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE, retry=DEFAULT_RETRY): + super().__init__(blob, chunk_size=chunk_size, retry=retry) self.mode = "r" class BeamBlobWriter(BlobWriter): def __init__( - self, blob, content_type, chunk_size=16 * 1024 * 1024, ignore_flush=True): + self, + blob, + content_type, + chunk_size=16 * 1024 * 1024, + ignore_flush=True, + retry=DEFAULT_RETRY): super().__init__( blob, content_type=content_type, chunk_size=chunk_size, ignore_flush=ignore_flush, - retry=DEFAULT_RETRY) + retry=retry) self.mode = "w" diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry.py b/sdks/python/apache_beam/io/gcp/gcsio_retry.py index 0f004d6400d5c..2cafb244bf3a2 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry.py @@ -27,7 +27,10 @@ from apache_beam.metrics.metric import Metrics from google.api_core import retry from google.api_core import exceptions as api_exceptions -from google.cloud.storage.retry import _should_retry +from google.cloud.storage.retry import _should_retry # pylint: disable=protected-access +from google.cloud.storage.retry import DEFAULT_RETRY + +from apache_beam.options.pipeline_options import GoogleCloudOptions _LOGGER = logging.getLogger(__name__) @@ -58,5 +61,12 @@ def __call__(self, exc): time.sleep(sleep_seconds) -DEFAULT_RETRY_WITH_THROTTLING = retry.Retry( +DEFAULT_RETRY_WITH_THROTTLING_COUNTERS = retry.Retry( predicate=_should_retry, on_error=ThrottlingHandler()) + + +def get_retry(pipeline_options): + if pipeline_options.view_as(GoogleCloudOptions).no_gcsio_throttling_counters: + return DEFAULT_RETRY + else: + return DEFAULT_RETRY_WITH_THROTTLING_COUNTERS diff --git a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py index b9dd777ac9eab..21ce8f8d3f248 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_retry_test.py @@ -34,7 +34,8 @@ api_exceptions = None -@unittest.skipIf((gcsio_retry is None or api_exceptions is None), 'GCP dependencies are not installed') +@unittest.skipIf((gcsio_retry is None or api_exceptions is None), + 'GCP dependencies are not installed') class TestGCSIORetry(unittest.TestCase): def test_retry_on_non_retriable(self): mock = Mock(side_effect=[ diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 42aee47a957e8..35ff1a3b5d101 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -900,6 +900,12 @@ def _add_argparse_args(cls, parser): 'Controls the OAuth scopes that will be requested when creating ' 'GCP credentials. Note: If set programmatically, must be set as a ' 'list of strings')) + parser.add_argument( + '--no_gcsio_throttling_counters', + default='false', + action='store_true', + help='Throttling counters in GcsIO is enabled by default. Set ' + '--no_gcsio_throttling_counters to avoid it.') def _create_default_gcs_bucket(self): try: