Skip to content

Commit

Permalink
Retry failed requests in DatasourceRequestProxy (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Jul 8, 2021
1 parent 1199414 commit def1a05
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 25 deletions.
14 changes: 14 additions & 0 deletions src/dipdup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
CONFIGURE_HANDLER = 'on_configure'
BLOCK_HANDLER = 'on_block'
ENV_VARIABLE_REGEX = r'\${([\w]*):-(.*)}'
DEFAULT_RETRY_COUNT = 3
DEFAULT_RETRY_SLEEP = 1

sys.path.append(os.getcwd())
_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -140,6 +142,10 @@ class TzktDatasourceConfig(NameMixin):
kind: Literal['tzkt']
url: str

cache: Optional[bool] = None
retry_count: int = DEFAULT_RETRY_COUNT
retry_sleep: int = DEFAULT_RETRY_SLEEP

def __hash__(self):
return hash(self.url)

Expand All @@ -162,6 +168,10 @@ class BcdDatasourceConfig(NameMixin):
url: str
network: str

cache: Optional[bool] = None
retry_count: int = DEFAULT_RETRY_COUNT
retry_sleep: int = DEFAULT_RETRY_SLEEP

def __hash__(self):
return hash(self.url + self.network)

Expand All @@ -180,6 +190,10 @@ class CoinbaseDatasourceConfig(NameMixin):
secret_key: Optional[str] = None
passphrase: Optional[str] = None

cache: Optional[bool] = None
retry_count: int = DEFAULT_RETRY_COUNT
retry_sleep: int = DEFAULT_RETRY_SLEEP

def __hash__(self):
return hash(self.kind)

Expand Down
4 changes: 2 additions & 2 deletions src/dipdup/datasources/bcd/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@


class BcdDatasource:
def __init__(self, url: str, network: str, cache: bool) -> None:
def __init__(self, url: str, network: str, proxy=DatasourceRequestProxy()) -> None:
self._url = url.rstrip('/')
self._network = network
self._proxy = proxy
self._logger = logging.getLogger('dipdup.bcd')
self._proxy = DatasourceRequestProxy(cache)

async def close_session(self) -> None:
await self._proxy.close_session()
Expand Down
9 changes: 2 additions & 7 deletions src/dipdup/datasources/coinbase/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Tuple

from aiolimiter import AsyncLimiter

from dipdup.datasources.coinbase.models import CandleData, CandleInterval
from dipdup.datasources.proxy import DatasourceRequestProxy

Expand All @@ -13,12 +11,9 @@


class CoinbaseDatasource:
def __init__(self, cache: bool) -> None:
def __init__(self, proxy: DatasourceRequestProxy) -> None:
self._logger = logging.getLogger('dipdup.coinbase')
self._proxy = DatasourceRequestProxy(
cache=cache,
ratelimiter=AsyncLimiter(max_rate=10, time_period=1),
)
self._proxy = proxy

async def close_session(self) -> None:
await self._proxy.close_session()
Expand Down
39 changes: 28 additions & 11 deletions src/dipdup/datasources/proxy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import hashlib
import logging
import pickle
Expand All @@ -7,16 +8,40 @@
from aiolimiter import AsyncLimiter
from fcache.cache import FileCache # type: ignore

from dipdup.config import DEFAULT_RETRY_COUNT, DEFAULT_RETRY_SLEEP # type: ignore
from dipdup.utils import http_request


class DatasourceRequestProxy:
def __init__(self, cache: bool = False, ratelimiter: Optional[AsyncLimiter] = None) -> None:
"""Wrapper for datasource HTTP requests.
Covers caching, retrying failed requests and ratelimiting"""

def __init__(
self,
cache: bool = False,
retry_count: int = DEFAULT_RETRY_COUNT,
retry_sleep: int = DEFAULT_RETRY_SLEEP,
ratelimiter: Optional[AsyncLimiter] = None,
) -> None:
self._logger = logging.getLogger(__name__)
self._cache = FileCache('dipdup', flag='cs') if cache else None
self._retry_count = retry_count
self._retry_sleep = retry_sleep
self._ratelimiter = ratelimiter
self._session = aiohttp.ClientSession()

async def _wrapped_request(self, method: str, **kwargs):
for attempt in range(self._retry_count):
self._logger.debug('Datasource request attempt %s/%s', attempt + 1, self._retry_count)
try:
return await http_request(self._session, method, **kwargs)
except (aiohttp.ClientConnectionError, aiohttp.ClientConnectorError) as e:
if attempt + 1 == self._retry_count:
raise e
self._logger.warning('Datasource request failed: %s', e)
await asyncio.sleep(self._retry_sleep)

async def http_request(self, method: str, skip_cache: bool = False, weight: int = 1, **kwargs):
if self._cache is not None and not skip_cache:
key = hashlib.sha256(pickle.dumps([method, kwargs])).hexdigest()
Expand All @@ -25,21 +50,13 @@ async def http_request(self, method: str, skip_cache: bool = False, weight: int
except KeyError:
if self._ratelimiter:
await self._ratelimiter.acquire(weight)
response = await http_request(
session=self._session,
method=method,
**kwargs,
)
response = await self._wrapped_request(method, **kwargs)
self._cache[key] = response
return response
else:
if self._ratelimiter:
await self._ratelimiter.acquire(weight)
response = await http_request(
session=self._session,
method=method,
**kwargs,
)
response = await self._wrapped_request(method, **kwargs)
return response

async def close_session(self) -> None:
Expand Down
8 changes: 6 additions & 2 deletions src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,21 @@ class TzktDatasource(IndexDatasource):
* Calls Matchers to match received operation groups with indexes' pattern and spawn callbacks on match
"""

def __init__(self, url: str, cache: bool) -> None:
def __init__(
self,
url: str,
proxy: DatasourceRequestProxy,
) -> None:
super().__init__()
self._url = url.rstrip('/')
self._proxy = proxy

self._logger = logging.getLogger('dipdup.tzkt')
self._transaction_subscriptions: Set[str] = set()
self._origination_subscriptions: bool = False
self._big_map_subscriptions: Dict[str, List[str]] = {}

self._client: Optional[BaseHubConnection] = None
self._proxy = DatasourceRequestProxy(cache)

self._level: Optional[int] = None
self._sync_level: Optional[int] = None
Expand Down
25 changes: 22 additions & 3 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from posix import listdir
from typing import Dict, List, cast

from aiolimiter import AsyncLimiter
from apscheduler.schedulers import SchedulerNotRunningError # type: ignore
from genericpath import exists
from tortoise import Tortoise
Expand Down Expand Up @@ -33,6 +34,7 @@
from dipdup.datasources.bcd.datasource import BcdDatasource
from dipdup.datasources.coinbase.datasource import CoinbaseDatasource
from dipdup.datasources.datasource import IndexDatasource
from dipdup.datasources.proxy import DatasourceRequestProxy
from dipdup.datasources.tzkt.datasource import TzktDatasource
from dipdup.exceptions import ConfigurationError
from dipdup.hasura import configure_hasura
Expand Down Expand Up @@ -228,20 +230,37 @@ async def _create_datasources(self) -> None:
if name in self._datasources:
continue

cache = self._config.cache_enabled if datasource_config.cache is None else datasource_config.cache
if isinstance(datasource_config, TzktDatasourceConfig):
proxy = DatasourceRequestProxy(
cache=cache,
retry_count=datasource_config.retry_count,
retry_sleep=datasource_config.retry_sleep,
)
datasource = TzktDatasource(
url=datasource_config.url,
cache=self._config.cache_enabled,
proxy=proxy,
)
elif isinstance(datasource_config, BcdDatasourceConfig):
proxy = DatasourceRequestProxy(
cache=cache,
retry_count=datasource_config.retry_count,
retry_sleep=datasource_config.retry_sleep,
)
datasource = BcdDatasource(
url=datasource_config.url,
network=datasource_config.network,
cache=self._config.cache_enabled,
proxy=proxy,
)
elif isinstance(datasource_config, CoinbaseDatasourceConfig):
proxy = DatasourceRequestProxy(
cache=cache,
retry_count=datasource_config.retry_count,
retry_sleep=datasource_config.retry_sleep,
ratelimiter=AsyncLimiter(max_rate=10, time_period=1),
)
datasource = CoinbaseDatasource(
cache=self._config.cache_enabled,
proxy=proxy,
)
else:
raise NotImplementedError
Expand Down

0 comments on commit def1a05

Please sign in to comment.