Skip to content

Commit

Permalink
Merge pull request #79 from mozilla-releng/asyncio
Browse files Browse the repository at this point in the history
Add asyncio support to redo module
  • Loading branch information
hneiva authored Jun 27, 2024
2 parents 7fda7ac + 543cf58 commit fa0bdf1
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 4 deletions.
50 changes: 49 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Redo - Utilities to retry Python callables
Introduction
************

Redo provides various means to add seamless ability to retry to any Python callable. Redo includes a plain function ``(redo.retry)``, a decorator ``(redo.retriable)``, and a context manager ``(redo.retrying)`` to enable you to integrate it in the best possible way for your project. As a bonus, a standalone interface is also included ``("retry")``.
Redo provides various means to add seamless ability to retry to any Python callable. Redo includes plain functions (``redo.retry``, ``redo.retry_async``), decorators (``redo.retriable``, ``redo.retriable_async``), and a context manager (``redo.retrying``) to enable you to integrate it in the best possible way for your project. As a bonus, a standalone interface is also included (``"retry"``).

Installation
************
Expand All @@ -22,7 +22,9 @@ Below is the list of functions available

* retrier
* retry
* retry_async
* retriable
* retriable_async
* retrying (contextmanager)

retrier(attempts=5, sleeptime=10, max_sleeptime=300, sleepscale=1.5, jitter=1)
Expand Down Expand Up @@ -102,6 +104,31 @@ Calls an action function until it succeeds, or we give up.
3
'success!'

retry_async(func, attempts=5, sleeptime_callback=calculate_sleep_time, retry_exceptions=Exception, args=(), kwargs={}, sleeptime_kwargs=None)
---------------------------------------------------------------------------------------------------------------------------------------------

An asynchronous function that retries a given async callable.

**Arguments Detail:**

1. **func (function):** an awaitable function to retry
2. **attempts (int):** maximum number of attempts; defaults to 5
3. **sleeptime_callback (function):** function to determine sleep time after each attempt; defaults to `calculateSleepTime`
4. **retry_exceptions (list or exception):** exceptions to retry on; defaults to `Exception`
5. **args (list):** arguments to pass to `func`
6. **kwargs (dict):** keyword arguments to pass to `func`
7. **sleeptime_kwargs (dict):** keyword arguments to pass to `sleeptime_callback`

**Output:** The value from a successful `func` call or raises an exception after exceeding attempts.

**Example:**

::

>>> async def async_action():
... # Your async code here
>>> result = await retry_async(async_action)

retriable(\*retry_args, \*\*retry_kwargs)
-----------------------------------------

Expand Down Expand Up @@ -130,6 +157,27 @@ A decorator factory for ``retry()``. Wrap your function in ``@retriable(...)`` t
3
'success!'

retriable_async(retry_exceptions=Exception, sleeptime_kwargs=None)
------------------------------------------------------------------

A decorator for asynchronously retrying a function.

**Arguments Detail:**

1. **retry_exceptions (list or exception):** exceptions to retry on; defaults to `Exception`
2. **sleeptime_kwargs (dict):** keyword arguments to pass to the sleeptime callback

**Output:** A function decorator that applies `retry_async` to the decorated function.

**Example:**

::

>>> @retriable_async()
... async def async_action():
... # Your async code here
>>> result = await async_action()

retrying(func, \*retry_args, \*\*retry_kwargs)
----------------------------------------------

Expand Down
124 changes: 122 additions & 2 deletions src/redo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
# ***** END LICENSE BLOCK *****

import asyncio
import functools
import logging
import random
import time
from contextlib import contextmanager
from functools import wraps
from typing import Any, Awaitable, Callable, Dict, Optional, Sequence, Tuple, Type, Union

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -210,7 +212,7 @@ def retriable(*retry_args, **retry_kwargs):
"""

def _retriable_factory(func):
@wraps(func)
@functools.wraps(func)
def _retriable_wrapper(*args, **kwargs):
return retry(func, args=args, kwargs=kwargs, *retry_args, **retry_kwargs)

Expand Down Expand Up @@ -248,3 +250,121 @@ def retrying(func, *retry_args, **retry_kwargs):
'success!'
"""
yield retriable(*retry_args, **retry_kwargs)(func)


def calculate_sleep_time(attempt, delay_factor=5.0, randomization_factor=0.5, max_delay=120):
"""Calculate the sleep time between retries, in seconds.
Based off of `taskcluster.utils.calculateSleepTime`, but with kwargs instead
of constant `delay_factor`/`randomization_factor`/`max_delay`. The taskcluster
function generally slept for less than a second, which didn't always get
past server issues.
Args:
attempt (int): the retry attempt number
delay_factor (float, optional): a multiplier for the delay time. Defaults to 5.
randomization_factor (float, optional): a randomization multiplier for the
delay time. Defaults to .5.
max_delay (float, optional): the max delay to sleep. Defaults to 120 (seconds).
Returns:
float: the time to sleep, in seconds.
"""
if attempt <= 0:
return 0

# We subtract one to get exponents: 1, 2, 3, 4, 5, ..
delay = float(2 ** (attempt - 1)) * float(delay_factor)
# Apply randomization factor. Only increase the delay here.
delay = delay * (randomization_factor * random.random() + 1)
# Always limit with a maximum delay
return min(delay, max_delay)


async def retry_async(
func: Callable[..., Awaitable[Any]],
attempts: int = 5,
sleeptime_callback: Callable[..., Any] = calculate_sleep_time,
retry_exceptions: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception,
args: Sequence[Any] = (),
kwargs: Optional[Dict[str, Any]] = None,
sleeptime_kwargs: Optional[Dict[str, Any]] = None,
) -> Any:
"""Retry ``func``, where ``func`` is an awaitable.
Args:
func (function): an awaitable function.
attempts (int, optional): the number of attempts to make. Default is 5.
sleeptime_callback (function, optional): the function to use to determine
how long to sleep after each attempt. Defaults to ``calculateSleepTime``.
retry_exceptions (list or exception, optional): the exception(s) to retry on.
Defaults to ``Exception``.
args (list, optional): the args to pass to ``func``. Defaults to ()
kwargs (dict, optional): the kwargs to pass to ``func``. Defaults to
{}.
sleeptime_kwargs (dict, optional): the kwargs to pass to ``sleeptime_callback``.
If None, use {}. Defaults to None.
Returns:
object: the value from a successful ``function`` call
Raises:
Exception: the exception from a failed ``function`` call, either outside
of the retry_exceptions, or one of those if we pass the max
``attempts``.
"""
kwargs = kwargs or {}
attempt = 1
while True:
try:
return await func(*args, **kwargs)
except retry_exceptions:
attempt += 1
_check_number_of_attempts(attempt, attempts, func, "retry_async")
await asyncio.sleep(_define_sleep_time(sleeptime_kwargs, sleeptime_callback, attempt, func, "retry_async"))


def _check_number_of_attempts(attempt: int, attempts: int, func: Callable[..., Any], retry_function_name: str) -> None:
if attempt > attempts:
log.warning("{}: {}: too many retries!".format(retry_function_name, func.__name__))
raise


def _define_sleep_time(
sleeptime_kwargs: Optional[Dict[str, Any]],
sleeptime_callback: Callable[..., int],
attempt: int,
func: Callable[..., Any],
retry_function_name: str,
) -> float:
sleeptime_kwargs = sleeptime_kwargs or {}
sleep_time = sleeptime_callback(attempt, **sleeptime_kwargs)
log.debug("{}: {}: sleeping {} seconds before retry".format(retry_function_name, func.__name__, sleep_time))
return sleep_time


def retriable_async(
retry_exceptions: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception,
sleeptime_kwargs: Optional[Dict[str, Any]] = None,
) -> Callable[..., Callable[..., Awaitable[Any]]]:
"""Decorate a function by wrapping ``retry_async`` around.
Args:
retry_exceptions (list or exception, optional): the exception(s) to retry on.
Defaults to ``Exception``.
sleeptime_kwargs (dict, optional): the kwargs to pass to ``sleeptime_callback``.
If None, use {}. Defaults to None.
Returns:
function: the decorated function
"""

def wrap(async_func: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]:
@functools.wraps(async_func)
async def wrapped(*args: Any, **kwargs: Any) -> Any:
return await retry_async(
async_func,
retry_exceptions=retry_exceptions,
args=args,
kwargs=kwargs,
sleeptime_kwargs=sleeptime_kwargs,
)

return wrapped

return wrap
2 changes: 1 addition & 1 deletion tests/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def _raiseCustomException():
return _succeedOnSecondAttempt(exception=NewError)


@pytest.yield_fixture
@pytest.fixture
def check_logging(caplog):
"""
Ensure that all log messages can be formatted.
Expand Down
107 changes: 107 additions & 0 deletions tests/test_retry_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import mock
import pytest

from redo import calculate_sleep_time, retriable_async, retry_async

retry_count = {}


async def always_fail(*args, **kwargs):
global retry_count
retry_count.setdefault("always_fail", 0)
retry_count["always_fail"] += 1
raise Exception("fail")


async def fail_first(*args, **kwargs):
global retry_count
retry_count["fail_first"] += 1
if retry_count["fail_first"] < 2:
raise Exception("first")
return "yay"


async def fake_sleep(*args, **kwargs):
pass


@pytest.mark.asyncio
async def test_retry_async_fail_first():
global retry_count
retry_count["fail_first"] = 0
status = await retry_async(fail_first, sleeptime_kwargs={"delay_factor": 0})
assert status == "yay"
assert retry_count["fail_first"] == 2


@pytest.mark.asyncio
async def test_retry_async_always_fail():
global retry_count
retry_count["always_fail"] = 0
with mock.patch("asyncio.sleep", new=fake_sleep):
with pytest.raises(Exception):
status = await retry_async(always_fail, sleeptime_kwargs={"delay_factor": 0})
assert status is None
assert retry_count["always_fail"] == 5


@pytest.mark.asyncio
async def test_retriable_async_fail_first():
global retry_count

@retriable_async(sleeptime_kwargs={"delay_factor": 0})
async def decorated_fail_first(*args, **kwargs):
return await fail_first(*args, **kwargs)

retry_count["fail_first"] = 0
status = await decorated_fail_first()
assert status == "yay"
assert retry_count["fail_first"] == 2


@pytest.mark.asyncio
async def test_retriable_async_always_fail_async():
global retry_count

@retriable_async(sleeptime_kwargs={"delay_factor": 0})
async def decorated_always_fail(*args, **kwargs):
return await always_fail(*args, **kwargs)

retry_count["always_fail"] = 0
with mock.patch("asyncio.sleep", new=fake_sleep):
with pytest.raises(Exception):
await decorated_always_fail()

assert retry_count["always_fail"] == 5


@pytest.mark.parametrize("attempt", (-1, 0))
def test_calculate_no_sleep_time(attempt):
assert calculate_sleep_time(attempt) == 0


@pytest.mark.parametrize(
"attempt,kwargs,min_expected,max_expected",
(
(
1,
{"delay_factor": 5.0, "randomization_factor": 0, "max_delay": 15},
5.0,
5.0,
),
(
2,
{"delay_factor": 5.0, "randomization_factor": 0.25, "max_delay": 15},
10.0,
12.5,
),
(
3,
{"delay_factor": 5.0, "randomization_factor": 0.25, "max_delay": 10},
10.0,
10.0,
),
),
)
def test_calculate_sleep_time(attempt, kwargs, min_expected, max_expected):
assert min_expected <= calculate_sleep_time(attempt, **kwargs) <= max_expected

0 comments on commit fa0bdf1

Please sign in to comment.