Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RetryBackoff to support custom @retry backoff strategies #247

Merged
merged 14 commits into from
Jan 6, 2022
Merged
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,5 @@ Pipfile.lock
# macOS
.DS_Store

# VS Code
.vscode/
17 changes: 17 additions & 0 deletions docs/source/dev/decorators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,23 @@ to specify one of the alternative approaches exposed through the
def get_user(self, user):
"""Get user by username."""

You can implement a custom backoff strategy by extending the class
:class:`uplink.retry.RetryBackoff`:

.. code-block:: python
:emphasize-lines: 3,7

from uplink.retry import RetryBackoff

class MyCustomBackoff(RetryBackoff):
...

class GitHub(uplink.Consumer):
@uplink.retry(backoff=MyCustomBackoff())
@uplink.get("/users/{user}")
def get_user(self, user):
pass

.. automodule:: uplink.retry.backoff
:members:

Expand Down
4 changes: 3 additions & 1 deletion tests/integration/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def get_user(self, user):
def get_issue(self, user, repo, issue):
pass

@retry(max_attempts=3, on_exception=retry.CONNECTION_TIMEOUT)
@retry(
stop=retry.stop.after_attempt(3), on_exception=retry.CONNECTION_TIMEOUT
)
@get("repos/{user}/{repo}/project/{project}")
def get_project(self, user, repo, project):
pass
Expand Down
36 changes: 20 additions & 16 deletions tests/unit/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ def test_fixed_backoff():
assert next(iterator) == 10


def test_compose_backoff(mocker):
left = backoff.from_iterable([0, 1])
right = backoff.from_iterable([2])
mocker.spy(left, "handle_after_final_retry")
mocker.spy(right, "handle_after_final_retry")
strategy = left | right

# Should return None after both strategies are exhausted
assert strategy.get_timeout_after_response(None, None) == 0
assert strategy.get_timeout_after_exception(None, None, None, None) == 1
assert strategy.get_timeout_after_response(None, None) == 2
assert strategy.get_timeout_after_exception(None, None, None, None) is None

# Should invoke both strategies after_stop method
strategy.handle_after_final_retry()

left.handle_after_final_retry.assert_called_once_with()
right.handle_after_final_retry.assert_called_once_with()


def test_retry_stop_default():
decorator = retry()
assert stop.NEVER == decorator._stop
Expand Down Expand Up @@ -75,22 +95,6 @@ def test_stop_or_with_none():
assert stop1 is (stop1 | None)


def test_retry_custom_stop():
def custom_stop(*_):
return True

decorator = retry(stop=custom_stop)
assert decorator._stop == custom_stop


def test_retry_backoff():
def custom_backoff(*_):
return True

decorator = retry(backoff=custom_backoff)
assert decorator._backoff == custom_backoff


def test_retry_decorator_exposes_submodules_as_properties():
assert retry.backoff is backoff
assert retry.stop is stop
Expand Down
3 changes: 2 additions & 1 deletion uplink/retry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from uplink.retry.retry import retry
from uplink.retry.when import RetryPredicate
from uplink.retry.backoff import RetryBackoff

__all__ = ["retry", "RetryPredicate"]
__all__ = ["retry", "RetryPredicate", "RetryBackoff"]
171 changes: 152 additions & 19 deletions uplink/retry/backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,132 @@
__all__ = ["jittered", "exponential", "fixed"]


def jittered(base=2, multiplier=1, minimum=0, maximum=MAX_VALUE):
def from_iterable(iterable):
"""Creates a retry strategy from an iterable of timeouts"""

class IterableRetryBackoff(_IterableBackoff):
def __iter__(self):
return iter(iterable)

return IterableRetryBackoff()


def from_iterable_factory(iterable_factory):
"""
Creates a retry strategy from a function that returns an iterable
of timeouts.
"""

class IterableRetryBackoff(_IterableBackoff):
def __iter__(self):
return iter(iterable_factory())

return IterableRetryBackoff()


class RetryBackoff(object):
"""
Base class for a strategy that calculates the timeout between
retry attempts.

You can compose two ``RetryBackoff`` instances by using the ``|``
operator::

CustomBackoffA() | CustomBackoffB()

The resulting backoff strategy will first compute the timeout using
the left-hand instance. If that timeout is ``None``, the strategy
will try to compute a fallback using the right-hand instance. If
both instances return ``None``, the resulting strategy will also
return ``None``.
"""

def get_timeout_after_response(self, request, response):
"""
Returns the number of seconds to wait before retrying the
request, or ``None`` to indicate that the given response should
be returned.
"""
raise NotImplementedError # pragma: no cover

def get_timeout_after_exception(self, request, exc_type, exc_val, exc_tb):
"""
Returns the number of seconds to wait before retrying the
request, or ``None`` to indicate that the given exception
should be raised.
"""
raise NotImplementedError # pragma: no cover

def handle_after_final_retry(self):
"""
Handles any clean-up necessary following the final retry
attempt.
"""
pass # pragma: no cover

def __or__(self, other):
"""Composes the current strategy with another."""
assert isinstance(
other, RetryBackoff
), "Both objects should be backoffs."
return _Or(self, other)


class _Or(RetryBackoff):
def __init__(self, left, right):
self._left = left
self._right = right

def get_timeout_after_response(self, request, response):
timeout = self._left.get_timeout_after_response(request, response)
if timeout is None:
return self._right.get_timeout_after_response(request, response)
return timeout

def get_timeout_after_exception(self, request, exc_type, exc_val, exc_tb):
timeout = self._left.get_timeout_after_exception(
request, exc_type, exc_val, exc_tb
)
if timeout is None:
return self._right.get_timeout_after_exception(
request, exc_type, exc_val, exc_tb
)
return timeout

def handle_after_final_retry(self):
self._left.handle_after_final_retry()
self._right.handle_after_final_retry()


class _IterableBackoff(RetryBackoff):
__iterator = None

def __iter__(self):
raise NotImplementedError # pragma: no cover

def __call__(self):
return iter(self)

def __next(self):
if self.__iterator is None:
self.__iterator = iter(self)

try:
return next(self.__iterator)
except StopIteration:
return None

def get_timeout_after_response(self, request, response):
return self.__next()

def get_timeout_after_exception(self, request, exc_type, exc_val, exc_tb):
return self.__next()

def handle_after_final_retry(self):
self.__iterator = None


class jittered(_IterableBackoff):
"""
Waits using capped exponential backoff and full jitter.

Expand All @@ -18,35 +143,43 @@ def jittered(base=2, multiplier=1, minimum=0, maximum=MAX_VALUE):
time of competing clients in a distributed system experiencing
high contention.
"""
exp_backoff = exponential(base, multiplier, minimum, maximum)
return lambda *_: iter(
random.uniform(0, 1) * delay for delay in exp_backoff()
) # pragma: no cover

def __init__(self, base=2, multiplier=1, minimum=0, maximum=MAX_VALUE):
self._exp_backoff = exponential(base, multiplier, minimum, maximum)

def __iter__(self):
for delay in self._exp_backoff(): # pragma: no branch
yield random.uniform(0, 1) * delay

def exponential(base=2, multiplier=1, minimum=0, maximum=MAX_VALUE):

class exponential(_IterableBackoff):
"""
Waits using capped exponential backoff, meaning that the delay
is multiplied by a constant ``base`` after each attempt, up to
an optional ``maximum`` value.
"""

def wait_iterator():
delay = multiplier
while minimum > delay:
delay *= base
while True:
yield min(delay, maximum)
delay *= base
def __init__(self, base=2, multiplier=1, minimum=0, maximum=MAX_VALUE):
self._base = base
self._multiplier = multiplier
self._minimum = minimum
self._maximum = maximum

return wait_iterator
def __iter__(self):
delay = self._multiplier
while self._minimum > delay:
delay *= self._base
while True:
yield min(delay, self._maximum)
delay *= self._base


def fixed(seconds):
class fixed(_IterableBackoff):
"""Waits for a fixed number of ``seconds`` before each retry."""

def wait_iterator():
while True:
yield seconds
def __init__(self, seconds):
self._seconds = seconds

return wait_iterator
def __iter__(self):
while True:
yield self._seconds
Loading