Skip to content

Commit

Permalink
Add retry instance that records throttling metric.
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Jun 13, 2024
1 parent c001c3a commit c941895
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 4 deletions.
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ class GcsIO(object):
"""Google Cloud Storage I/O client."""
def __init__(self, storage_client=None, pipeline_options=None):
# type: (Optional[storage.Client], Optional[Union[dict, PipelineOptions]]) -> None
if not pipeline_options:
pipeline_options = PipelineOptions()
elif isinstance(pipeline_options, dict):
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
if storage_client is None:
if not pipeline_options:
pipeline_options = PipelineOptions()
elif isinstance(pipeline_options, dict):
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
storage_client = create_storage_client(pipeline_options)
self.client = storage_client
self._rewrite_cb = None
Expand Down
62 changes: 62 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Throttling Handler for GCSIO
"""

import logging
import math
import random
import time

from apache_beam.metrics.metric import Metrics
from google.api_core import retry
from google.api_core import exceptions as api_exceptions
from google.cloud.storage.retry import _should_retry

_LOGGER = logging.getLogger(__name__)

__all__ = ['DEFAULT_RETRY_WITH_THROTTLING']


class ThrottlingHandler(object):
_THROTTLED_SECS = Metrics.counter('gcsio', "cumulativeThrottlingSeconds")

def __init__(self, max_retries=10, max_retry_wait=600):
self._max_retries = max_retries
self._max_retry_wait = max_retry_wait
self._num_retries = 0
self._total_retry_wait = 0

def _get_next_wait_time(self):
wait_time = 2**self._num_retries
max_jitter = wait_time / 4.0
wait_time += random.uniform(-max_jitter, max_jitter)
return max(1, min(wait_time, self._max_retry_wait))

def __call__(self, exc):
if isinstance(exc, api_exceptions.TooManyRequests):
_LOGGER.debug('Caught GCS quota error (%s), retrying.', exc.reason)
self._num_retries += 1
sleep_seconds = self._get_next_wait_time()
ThrottlingHandler._THROTTLED_SECS.inc(math.ceil(sleep_seconds))
time.sleep(sleep_seconds)


DEFAULT_RETRY_WITH_THROTTLING = retry.Retry(
predicate=_should_retry, on_error=ThrottlingHandler())
75 changes: 75 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_retry_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Tests for Throttling Handler of GCSIO."""

import unittest
from unittest.mock import Mock

from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metricbase import MetricName
from apache_beam.runners.worker import statesampler
from apache_beam.utils import counters

try:
from apache_beam.io.gcp import gcsio_retry
from google.api_core import exceptions as api_exceptions
except ImportError:
gcsio_retry = None
api_exceptions = None


@unittest.skipIf((gcsio_retry is None or api_exceptions is None), 'GCP dependencies are not installed')
class TestGCSIORetry(unittest.TestCase):
def test_retry_on_non_retriable(self):
mock = Mock(side_effect=[
Exception('Something wrong!'),
])
retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING
with self.assertRaises(Exception):
retry(mock)()

def test_retry_on_throttling(self):
mock = Mock(
side_effect=[api_exceptions.TooManyRequests("Slow down!"), 12345])
retry = gcsio_retry.DEFAULT_RETRY_WITH_THROTTLING

sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state = sampler.scoped_state(
'my_step', 'my_state', metrics_container=MetricsContainer('my_step'))
try:
sampler.start()
with state:
container = MetricsEnvironment.current_container()

self.assertEqual(
container.get_counter(
MetricName('gcsio',
"cumulativeThrottlingSeconds")).get_cumulative(),
0)

self.assertEqual(12345, retry(mock)())

self.assertGreater(
container.get_counter(
MetricName('gcsio',
"cumulativeThrottlingSeconds")).get_cumulative(),
1)
finally:
sampler.stop()

0 comments on commit c941895

Please sign in to comment.