Skip to content

Commit

Permalink
Use retry with throttling counters by default. Add pipeline option.
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Jun 13, 2024
1 parent c941895 commit 4fddba3
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 13 deletions.
31 changes: 21 additions & 10 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

from apache_beam import version as beam_version
from apache_beam.internal.gcp import auth
from apache_beam.io.gcp import gcsio_retry
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils import retry
Expand Down Expand Up @@ -147,6 +148,7 @@ def __init__(self, storage_client=None, pipeline_options=None):
self.client = storage_client
self._rewrite_cb = None
self.bucket_to_project_number = {}
self._storage_client_retry = gcsio_retry.get_retry(pipeline_options)

def get_project_number(self, bucket):
if bucket not in self.bucket_to_project_number:
Expand All @@ -159,7 +161,8 @@ def get_project_number(self, bucket):
def get_bucket(self, bucket_name):
"""Returns an object bucket from its name, or None if it does not exist."""
try:
return self.client.lookup_bucket(bucket_name)
return self.client.lookup_bucket(
bucket_name, retry=self._storage_client_retry)
except NotFound:
return None

Expand All @@ -180,7 +183,7 @@ def create_bucket(
bucket_or_name=bucket,
project=project,
location=location,
)
retry=self._storage_client_retry)
if kms_key:
bucket.default_kms_key_name(kms_key)
bucket.patch()
Expand Down Expand Up @@ -213,10 +216,11 @@ def open(

if mode == 'r' or mode == 'rb':
blob = bucket.blob(blob_name)
return BeamBlobReader(blob, chunk_size=read_buffer_size)
return BeamBlobReader(
blob, chunk_size=read_buffer_size, retry=self._storage_client_retry)
elif mode == 'w' or mode == 'wb':
blob = bucket.blob(blob_name)
return BeamBlobWriter(blob, mime_type)
return BeamBlobWriter(blob, mime_type, retry=self._storage_client_retry)
else:
raise ValueError('Invalid file open mode: %s.' % mode)

Expand Down Expand Up @@ -448,7 +452,7 @@ def _gcs_object(self, path):
"""
bucket_name, blob_name = parse_gcs_path(path)
bucket = self.client.bucket(bucket_name)
blob = bucket.get_blob(blob_name)
blob = bucket.get_blob(blob_name, retry=self._storage_client_retry)
if blob:
return blob
else:
Expand Down Expand Up @@ -496,7 +500,8 @@ def list_files(self, path, with_metadata=False):
else:
_LOGGER.debug("Starting the size estimation of the input")
bucket = self.client.bucket(bucket_name)
response = self.client.list_blobs(bucket, prefix=prefix)
response = self.client.list_blobs(
bucket, prefix=prefix, retry=self._storage_client_retry)
for item in response:
file_name = 'gs://%s/%s' % (item.bucket.name, item.name)
if file_name not in file_info:
Expand Down Expand Up @@ -544,18 +549,24 @@ def is_soft_delete_enabled(self, gcs_path):


class BeamBlobReader(BlobReader):
def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE):
super().__init__(blob, chunk_size=chunk_size)
def __init__(
self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE, retry=DEFAULT_RETRY):
super().__init__(blob, chunk_size=chunk_size, retry=retry)
self.mode = "r"


class BeamBlobWriter(BlobWriter):
def __init__(
self, blob, content_type, chunk_size=16 * 1024 * 1024, ignore_flush=True):
self,
blob,
content_type,
chunk_size=16 * 1024 * 1024,
ignore_flush=True,
retry=DEFAULT_RETRY):
super().__init__(
blob,
content_type=content_type,
chunk_size=chunk_size,
ignore_flush=ignore_flush,
retry=DEFAULT_RETRY)
retry=retry)
self.mode = "w"
14 changes: 12 additions & 2 deletions sdks/python/apache_beam/io/gcp/gcsio_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
from apache_beam.metrics.metric import Metrics
from google.api_core import retry
from google.api_core import exceptions as api_exceptions
from google.cloud.storage.retry import _should_retry
from google.cloud.storage.retry import _should_retry # pylint: disable=protected-access
from google.cloud.storage.retry import DEFAULT_RETRY

from apache_beam.options.pipeline_options import GoogleCloudOptions

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,5 +61,12 @@ def __call__(self, exc):
time.sleep(sleep_seconds)


DEFAULT_RETRY_WITH_THROTTLING = retry.Retry(
DEFAULT_RETRY_WITH_THROTTLING_COUNTERS = retry.Retry(
predicate=_should_retry, on_error=ThrottlingHandler())


def get_retry(pipeline_options):
if pipeline_options.view_as(GoogleCloudOptions).no_gcsio_throttling_counters:
return DEFAULT_RETRY
else:
return DEFAULT_RETRY_WITH_THROTTLING_COUNTERS
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsio_retry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
api_exceptions = None


@unittest.skipIf((gcsio_retry is None or api_exceptions is None), 'GCP dependencies are not installed')
@unittest.skipIf((gcsio_retry is None or api_exceptions is None),
'GCP dependencies are not installed')
class TestGCSIORetry(unittest.TestCase):
def test_retry_on_non_retriable(self):
mock = Mock(side_effect=[
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,12 @@ def _add_argparse_args(cls, parser):
'Controls the OAuth scopes that will be requested when creating '
'GCP credentials. Note: If set programmatically, must be set as a '
'list of strings'))
parser.add_argument(
'--no_gcsio_throttling_counters',
default='false',
action='store_true',
help='Throttling counters in GcsIO is enabled by default. Set '
'--no_gcsio_throttling_counters to avoid it.')

def _create_default_gcs_bucket(self):
try:
Expand Down

0 comments on commit 4fddba3

Please sign in to comment.