From e397af512513aac2d2225f4fec3267a4c79d4c18 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Mon, 12 Jul 2021 13:27:35 +0300 Subject: [PATCH] Refactor HTTP stack, update User-Agent header, fix caching, add default datasource ratelimits (#87) --- src/dipdup/cli.py | 15 +- src/dipdup/codegen.py | 2 +- src/dipdup/config.py | 15 +- src/dipdup/configs/debug.yml | 8 +- src/dipdup/configs/logging.yml | 8 +- src/dipdup/configs/warning.yml | 14 +- src/dipdup/datasources/bcd/datasource.py | 32 ++-- src/dipdup/datasources/coinbase/datasource.py | 38 +++-- src/dipdup/datasources/datasource.py | 11 +- src/dipdup/datasources/proxy.py | 62 -------- src/dipdup/datasources/tzkt/datasource.py | 63 ++++---- src/dipdup/dipdup.py | 19 +-- src/dipdup/hasura.py | 42 +++--- src/dipdup/http.py | 138 ++++++++++++++++++ src/dipdup/index.py | 2 +- src/dipdup/models.py | 2 +- src/dipdup/utils.py | 24 +-- tests/integration_tests/test_hasura.py | 18 +-- 18 files changed, 282 insertions(+), 231 deletions(-) delete mode 100644 src/dipdup/datasources/proxy.py create mode 100644 src/dipdup/http.py diff --git a/src/dipdup/cli.py b/src/dipdup/cli.py index ad60dfc8c..177f14d0e 100644 --- a/src/dipdup/cli.py +++ b/src/dipdup/cli.py @@ -5,8 +5,7 @@ from dataclasses import dataclass from functools import wraps from os.path import dirname, join -from typing import List -from typing import List, NoReturn, cast +from typing import List, cast import click import sentry_sdk @@ -14,17 +13,13 @@ from sentry_sdk.integrations.aiohttp import AioHttpIntegration from dipdup import __spec_version__, __version__, spec_version_mapping -from dipdup.config import DipDupConfig, LoggingConfig -from dipdup.dipdup import DipDup -from dipdup.exceptions import ConfigurationError, DipDupError, MigrationRequiredError -from dipdup import __spec_version__, __version__ from dipdup.config import DipDupConfig, LoggingConfig, PostgresDatabaseConfig from dipdup.dipdup import DipDup -from dipdup.exceptions import ConfigurationError -from dipdup.hasura import HasuraManager +from dipdup.exceptions import ConfigurationError, DipDupError, MigrationRequiredError +from dipdup.hasura import HasuraGateway from dipdup.utils import tortoise_wrapper -_logger = logging.getLogger(__name__) +_logger = logging.getLogger('dipdup.cli') def click_command_wrapper(fn): @@ -148,7 +143,7 @@ async def configure_hasura(ctx, reset: bool): if not config.hasura: _logger.error('`hasura` config section is empty') return - hasura = HasuraManager(config.package, config.hasura, cast(PostgresDatabaseConfig, config.database)) + hasura = HasuraGateway(config.package, config.hasura, cast(PostgresDatabaseConfig, config.database)) async with tortoise_wrapper(url, models): try: diff --git a/src/dipdup/codegen.py b/src/dipdup/codegen.py index 23f1bbd55..55cea9f68 100644 --- a/src/dipdup/codegen.py +++ b/src/dipdup/codegen.py @@ -49,7 +49,7 @@ class DipDupCodeGenerator: """Generates package based on config, invoked from `init` CLI command""" def __init__(self, config: DipDupConfig, datasources: Dict[DatasourceConfigT, DatasourceT]) -> None: - self._logger = logging.getLogger(__name__) + self._logger = logging.getLogger('dipdup.codegen') self._config = config self._datasources = datasources self._schemas: Dict[TzktDatasourceConfig, Dict[str, Dict[str, Any]]] = {} diff --git a/src/dipdup/config.py b/src/dipdup/config.py index d27aba5f5..b1e61384e 100644 --- a/src/dipdup/config.py +++ b/src/dipdup/config.py @@ -30,7 +30,7 @@ DEFAULT_RETRY_SLEEP = 1 sys.path.append(os.getcwd()) -_logger = logging.getLogger(__name__) +_logger = logging.getLogger('dipdup.config') class OperationType(Enum): @@ -91,12 +91,19 @@ def valid_immune_tables(cls, v): @dataclass class HTTPConfig: - cache: bool = True - retry_count: int = DEFAULT_RETRY_COUNT - retry_sleep: int = DEFAULT_RETRY_SLEEP + cache: Optional[bool] = None + retry_count: Optional[int] = None + retry_sleep: Optional[int] = None ratelimit_rate: Optional[int] = None ratelimit_period: Optional[int] = None + def merge(self, other: Optional['HTTPConfig']) -> None: + if not other: + return + for k, v in other.__dict__.items(): + if v is not None: + setattr(self, k, v) + @dataclass class ContractConfig: diff --git a/src/dipdup/configs/debug.yml b/src/dipdup/configs/debug.yml index b35d72ed1..fa93985cd 100644 --- a/src/dipdup/configs/debug.yml +++ b/src/dipdup/configs/debug.yml @@ -10,13 +10,7 @@ class: logging.StreamHandler stream : ext://sys.stdout loggers: - dipdup.dipdup: - level: DEBUG - dipdup.index: - level: DEBUG - dipdup.tzkt: - level: DEBUG - dipdup.bcd: + dipdup: level: DEBUG SignalRCoreClient: diff --git a/src/dipdup/configs/logging.yml b/src/dipdup/configs/logging.yml index f9a4e4cfa..4794d929c 100644 --- a/src/dipdup/configs/logging.yml +++ b/src/dipdup/configs/logging.yml @@ -10,13 +10,7 @@ class: logging.StreamHandler stream : ext://sys.stdout loggers: - dipdup.dipdup: - level: INFO - dipdup.index: - level: INFO - dipdup.tzkt: - level: INFO - dipdup.bcd: + dipdup: level: INFO SignalRCoreClient: diff --git a/src/dipdup/configs/warning.yml b/src/dipdup/configs/warning.yml index 10a16e782..919bf292d 100644 --- a/src/dipdup/configs/warning.yml +++ b/src/dipdup/configs/warning.yml @@ -10,21 +10,15 @@ class: logging.StreamHandler stream : ext://sys.stdout loggers: - dipdup.dipdup: - level: INFO - dipdup.index: - level: INFO - dipdup.tzkt: - level: INFO - dipdup.bcd: - level: INFO + dipdup: + level: WARNING SignalRCoreClient: formatter: brief aiosqlite: - level: INFO + level: WARNING db_client: - level: INFO + level: WARNING root: level: WARNING handlers: diff --git a/src/dipdup/datasources/bcd/datasource.py b/src/dipdup/datasources/bcd/datasource.py index 92fdd7c3d..4022f895b 100644 --- a/src/dipdup/datasources/bcd/datasource.py +++ b/src/dipdup/datasources/bcd/datasource.py @@ -2,28 +2,21 @@ from typing import Any, Dict, List, Optional from dipdup.config import HTTPConfig -from dipdup.datasources.proxy import HTTPRequestProxy +from dipdup.http import HTTPGateway TOKENS_REQUEST_LIMIT = 10 -class BcdDatasource: +class BcdDatasource(HTTPGateway): def __init__( self, url: str, network: str, http_config: Optional[HTTPConfig] = None, ) -> None: - if http_config is None: - http_config = HTTPConfig() - - self._url = url.rstrip('/') - self._network = network - self._proxy = HTTPRequestProxy(http_config) + super().__init__(url, http_config) self._logger = logging.getLogger('dipdup.bcd') - - async def close_session(self) -> None: - await self._proxy.close_session() + self._network = network async def run(self) -> None: pass @@ -34,9 +27,9 @@ async def resync(self) -> None: async def get_tokens(self, address: str) -> List[Dict[str, Any]]: tokens, offset = [], 0 while True: - tokens_batch = await self._proxy.http_request( + tokens_batch = await self._http.request( 'get', - url=f'{self._url}/v1/contract/{self._network}/{address}/tokens?offset={offset}', + url=f'v1/contract/{self._network}/{address}/tokens?offset={offset}', ) tokens += tokens_batch offset += TOKENS_REQUEST_LIMIT @@ -45,7 +38,16 @@ async def get_tokens(self, address: str) -> List[Dict[str, Any]]: return tokens async def get_token(self, address: str, token_id: int) -> Dict[str, Any]: - return await self._proxy.http_request( + return await self._http.request( 'get', - url=f'{self._url}/v1/contract/{self._network}/{address}/tokens?token_id={token_id}', + url=f'v1/contract/{self._network}/{address}/tokens?token_id={token_id}', + ) + + def _default_http_config(self) -> HTTPConfig: + return HTTPConfig( + cache=True, + retry_count=3, + retry_sleep=1, + ratelimit_rate=100, + ratelimit_period=30, ) diff --git a/src/dipdup/datasources/coinbase/datasource.py b/src/dipdup/datasources/coinbase/datasource.py index e72519e84..0526985d7 100644 --- a/src/dipdup/datasources/coinbase/datasource.py +++ b/src/dipdup/datasources/coinbase/datasource.py @@ -4,27 +4,16 @@ from dipdup.config import HTTPConfig from dipdup.datasources.coinbase.models import CandleData, CandleInterval -from dipdup.datasources.proxy import HTTPRequestProxy +from dipdup.http import HTTPGateway CANDLES_REQUEST_LIMIT = 300 -REST_API_URL = 'https://api.pro.coinbase.com' -WEBSOCKET_API_URL = 'wss://ws-feed.pro.coinbase.com' +API_URL = 'https://api.pro.coinbase.com' -class CoinbaseDatasource: - def __init__(self, http_config: Optional[HTTPConfig] = None) -> None: - if http_config is None: - http_config = HTTPConfig( - cache=True, - ratelimit_rate=10, - ratelimit_period=1, - ) - +class CoinbaseDatasource(HTTPGateway): + def __init__(self, url: str = API_URL, http_config: Optional[HTTPConfig] = None) -> None: + super().__init__(url, http_config) self._logger = logging.getLogger('dipdup.coinbase') - self._proxy = HTTPRequestProxy(http_config) - - async def close_session(self) -> None: - await self._proxy.close_session() async def run(self) -> None: pass @@ -33,17 +22,17 @@ async def resync(self) -> None: pass async def get_oracle_prices(self) -> Dict[str, Any]: - return await self._proxy.http_request( + return await self._http.request( 'get', - url=f'{REST_API_URL}/oracle', + url='oracle', ) async def get_candles(self, since: datetime, until: datetime, interval: CandleInterval, ticker: str = 'XTZ-USD') -> List[CandleData]: candles = [] for _since, _until in self._split_candle_requests(since, until, interval): - candles_json = await self._proxy.http_request( + candles_json = await self._http.request( 'get', - url=f'{REST_API_URL}/products/{ticker}/candles', + url=f'products/{ticker}/candles', params={ 'start': _since.replace(tzinfo=timezone.utc).isoformat(), 'end': _until.replace(tzinfo=timezone.utc).isoformat(), @@ -54,6 +43,15 @@ async def get_candles(self, since: datetime, until: datetime, interval: CandleIn candles += [CandleData.from_json(c) for c in candles_json] return sorted(candles, key=lambda c: c.timestamp) + def _default_http_config(self) -> HTTPConfig: + return HTTPConfig( + cache=True, + retry_count=3, + retry_sleep=1, + ratelimit_rate=10, + ratelimit_period=1, + ) + def _split_candle_requests(self, since: datetime, until: datetime, interval: CandleInterval) -> List[Tuple[datetime, datetime]]: request_interval_limit = timedelta(seconds=interval.seconds * CANDLES_REQUEST_LIMIT) request_intervals = [] diff --git a/src/dipdup/datasources/datasource.py b/src/dipdup/datasources/datasource.py index 383efabc6..734f58f00 100644 --- a/src/dipdup/datasources/datasource.py +++ b/src/dipdup/datasources/datasource.py @@ -1,9 +1,10 @@ -from abc import ABC from enum import Enum -from typing import Awaitable, List, Protocol +from typing import Awaitable, List, Optional, Protocol from pyee import AsyncIOEventEmitter # type: ignore +from dipdup.config import HTTPConfig +from dipdup.http import HTTPGateway from dipdup.models import BigMapData, OperationData @@ -28,7 +29,11 @@ def __call__(self, datasource: 'IndexDatasource', from_level: int, to_level: int ... -class IndexDatasource(ABC, AsyncIOEventEmitter): +class IndexDatasource(HTTPGateway, AsyncIOEventEmitter): + def __init__(self, url: str, http_config: Optional[HTTPConfig] = None) -> None: + HTTPGateway.__init__(self, url, http_config) + AsyncIOEventEmitter.__init__(self) + def on(self, event, f=None) -> None: raise RuntimeError('Do not use `on` directly') diff --git a/src/dipdup/datasources/proxy.py b/src/dipdup/datasources/proxy.py deleted file mode 100644 index abb9f83b1..000000000 --- a/src/dipdup/datasources/proxy.py +++ /dev/null @@ -1,62 +0,0 @@ -import asyncio -import hashlib -import logging -import pickle -from typing import Optional - -import aiohttp -from aiolimiter import AsyncLimiter -from fcache.cache import FileCache # type: ignore - -from dipdup.config import HTTPConfig # type: ignore -from dipdup.utils import http_request - - -class HTTPRequestProxy: - """Wrapper for aiohttp HTTP requests. - - Covers caching, retrying failed requests and ratelimiting""" - - def __init__(self, config: Optional[HTTPConfig] = None) -> None: - if config is None: - config = HTTPConfig() - self._logger = logging.getLogger(__name__) - self._config = config - self._cache = FileCache('dipdup', flag='cs') - self._ratelimiter = ( - AsyncLimiter(max_rate=config.ratelimit_rate, time_period=config.ratelimit_period) - if config.ratelimit_rate and config.ratelimit_period - else None - ) - self._session = aiohttp.ClientSession() - - async def _wrapped_request(self, method: str, **kwargs): - for attempt in range(self._config.retry_count): - self._logger.debug('HTTP request attempt %s/%s', attempt + 1, self._config.retry_count) - try: - return await http_request(self._session, method, **kwargs) - except (aiohttp.ClientConnectionError, aiohttp.ClientConnectorError) as e: - if attempt + 1 == self._config.retry_count: - raise e - self._logger.warning('HTTP request failed: %s', e) - await asyncio.sleep(self._config.retry_sleep) - - async def http_request(self, method: str, cache: bool = False, weight: int = 1, **kwargs): - if self._config.cache and cache: - key = hashlib.sha256(pickle.dumps([method, kwargs])).hexdigest() - try: - return self._cache[key] - except KeyError: - if self._ratelimiter: - await self._ratelimiter.acquire(weight) - response = await self._wrapped_request(method, **kwargs) - self._cache[key] = response - return response - else: - if self._ratelimiter: - await self._ratelimiter.acquire(weight) - response = await self._wrapped_request(method, **kwargs) - return response - - async def close_session(self) -> None: - await self._session.close() diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 5eafed5b1..b30414ff7 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -17,7 +17,6 @@ OperationIndexConfig, ) from dipdup.datasources.datasource import IndexDatasource -from dipdup.datasources.proxy import HTTPRequestProxy from dipdup.datasources.tzkt.enums import TzktMessageType from dipdup.models import BigMapAction, BigMapData, OperationData from dipdup.utils import split_by_chunks @@ -91,7 +90,7 @@ def __init__( self._origination_addresses = origination_addresses self._cache = cache - self._logger = logging.getLogger('dipdup.tzkt.fetcher') + self._logger = logging.getLogger('dipdup.tzkt') self._head: int = 0 self._heads: Dict[OperationFetcherChannel, int] = {} self._offsets: Dict[OperationFetcherChannel, int] = {} @@ -219,6 +218,7 @@ def __init__( big_map_paths: Set[str], cache: bool = False, ) -> None: + self._logger = logging.getLogger('dipdup.tzkt') self._datasource = datasource self._first_level = first_level self._last_level = last_level @@ -226,8 +226,6 @@ def __init__( self._big_map_paths = big_map_paths self._cache = cache - self._logger = logging.getLogger('dipdup.tzkt.fetcher') - async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, List[BigMapData]], None]: """Fetch big map diffs via Fetcher (not implemented yet) and pass to message callback""" @@ -277,12 +275,7 @@ def __init__( url: str, http_config: Optional[HTTPConfig] = None, ) -> None: - super().__init__() - if http_config is None: - http_config = HTTPConfig() - self._url = url.rstrip('/') - self._proxy = HTTPRequestProxy(http_config) - + super().__init__(url, http_config) self._logger = logging.getLogger('dipdup.tzkt') self._transaction_subscriptions: Set[str] = set() self._origination_subscriptions: bool = False @@ -305,17 +298,14 @@ def level(self) -> Optional[int]: def sync_level(self) -> Optional[int]: return self._sync_level - async def close_session(self) -> None: - await self._proxy.close_session() - async def get_similar_contracts(self, address: str, strict: bool = False) -> List[str]: """Get list of contracts sharing the same code hash or type hash""" entrypoint = 'same' if strict else 'similar' self._logger.info('Fetching %s contracts for address `%s', entrypoint, address) - contracts = await self._proxy.http_request( + contracts = await self._http.request( 'get', - url=f'{self._url}/v1/contracts/{address}/{entrypoint}', + url=f'v1/contracts/{address}/{entrypoint}', params=dict( select='address', limit=self.request_limit, @@ -326,9 +316,9 @@ async def get_similar_contracts(self, address: str, strict: bool = False) -> Lis async def get_originated_contracts(self, address: str) -> List[str]: """Get contracts originated from given address""" self._logger.info('Fetching originated contracts for address `%s', address) - contracts = await self._proxy.http_request( + contracts = await self._http.request( 'get', - url=f'{self._url}/v1/accounts/{address}/contracts', + url=f'v1/accounts/{address}/contracts', params=dict( limit=self.request_limit, ), @@ -338,25 +328,25 @@ async def get_originated_contracts(self, address: str) -> List[str]: async def get_contract_summary(self, address: str) -> Dict[str, Any]: """Get contract summary""" self._logger.info('Fetching contract summary for address `%s', address) - return await self._proxy.http_request( + return await self._http.request( 'get', - url=f'{self._url}/v1/contracts/{address}', + url=f'v1/contracts/{address}', ) async def get_contract_storage(self, address: str) -> Dict[str, Any]: """Get contract storage""" self._logger.info('Fetching contract storage for address `%s', address) - return await self._proxy.http_request( + return await self._http.request( 'get', - url=f'{self._url}/v1/contracts/{address}/storage', + url=f'v1/contracts/{address}/storage', ) async def get_jsonschemas(self, address: str) -> Dict[str, Any]: """Get JSONSchemas for contract's storage/parameter/bigmap types""" self._logger.info('Fetching jsonschemas for address `%s', address) - jsonschemas = await self._proxy.http_request( + jsonschemas = await self._http.request( 'get', - url=f'{self._url}/v1/contracts/{address}/interface', + url=f'v1/contracts/{address}/interface', cache=True, ) self._logger.debug(jsonschemas) @@ -365,9 +355,9 @@ async def get_jsonschemas(self, address: str) -> Dict[str, Any]: async def get_latest_block(self) -> Dict[str, Any]: """Get latest block (head)""" self._logger.info('Fetching latest block') - block = await self._proxy.http_request( + block = await self._http.request( 'get', - url=f'{self._url}/v1/head', + url='v1/head', ) self._logger.debug(block) return block @@ -380,9 +370,9 @@ async def get_originations( # NOTE: Chunk of 100 addresses seems like a reasonable choice - URL of ~3971 characters. # NOTE: Other operation requests won't hit that limit. for addresses_chunk in split_by_chunks(list(addresses), TZKT_ORIGINATIONS_REQUEST_LIMIT): - raw_originations += await self._proxy.http_request( + raw_originations += await self._http.request( 'get', - url=f'{self._url}/v1/operations/originations', + url='v1/operations/originations', params={ "originatedContract.in": ','.join(addresses_chunk), "offset": offset, @@ -405,9 +395,9 @@ async def get_originations( async def get_transactions( self, field: str, addresses: Set[str], offset: int, first_level: int, last_level: int, cache: bool = False ) -> List[OperationData]: - raw_transactions = await self._proxy.http_request( + raw_transactions = await self._http.request( 'get', - url=f'{self._url}/v1/operations/transactions', + url='v1/operations/transactions', params={ f"{field}.in": ','.join(addresses), "offset": offset, @@ -429,9 +419,9 @@ async def get_transactions( async def get_big_maps( self, addresses: Set[str], paths: Set[str], offset: int, first_level: int, last_level: int, cache: bool = False ) -> List[BigMapData]: - raw_big_maps = await self._proxy.http_request( + raw_big_maps = await self._http.request( 'get', - url=f'{self._url}/v1/bigmaps/updates', + url='v1/bigmaps/updates', params={ "contract.in": ",".join(addresses), "paths.in": ",".join(paths), @@ -481,7 +471,7 @@ def _get_client(self) -> BaseHubConnection: self._logger.info('Creating websocket client') self._client = ( HubConnectionBuilder() - .with_url(self._url + '/v1/events') + .with_url(self._http._url + '/v1/events') .with_automatic_reconnect( { "type": "raw", @@ -575,6 +565,15 @@ async def subscribe_to_big_maps(self, address: str, paths: List[str]) -> None: ], ) + def _default_http_config(self) -> HTTPConfig: + return HTTPConfig( + cache=True, + retry_count=3, + retry_sleep=1, + ratelimit_rate=100, + ratelimit_period=30, + ) + async def _on_operation_message( self, message: List[Dict[str, Any]], diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 1e4f61dbc..53b846b2b 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -35,7 +35,7 @@ from dipdup.datasources.datasource import IndexDatasource from dipdup.datasources.tzkt.datasource import TzktDatasource from dipdup.exceptions import ConfigurationError -from dipdup.hasura import HasuraManager +from dipdup.hasura import HasuraGateway from dipdup.index import BigMapIndex, Index, OperationIndex from dipdup.models import BigMapData, IndexType, OperationData, State from dipdup.scheduler import add_job, create_scheduler @@ -45,7 +45,7 @@ class IndexDispatcher: def __init__(self, ctx: DipDupContext) -> None: self._ctx = ctx - self._logger = logging.getLogger(__name__) + self._logger = logging.getLogger('dipdup') self._indexes: Dict[str, Index] = {} async def add_index(self, index_config: IndexConfigTemplateT) -> None: @@ -142,7 +142,7 @@ class DipDup: Spawns datasources, registers indexes, passes handler callbacks to executor""" def __init__(self, config: DipDupConfig) -> None: - self._logger = logging.getLogger(__name__) + self._logger = logging.getLogger('dipdup') self._config = config self._datasources: Dict[str, DatasourceT] = {} self._datasources_by_config: Dict[DatasourceConfigT, DatasourceT] = {} @@ -184,14 +184,14 @@ async def run(self, reindex: bool, oneshot: bool) -> None: datasource_tasks = [] if oneshot else [asyncio.create_task(d.run()) for d in self._datasources.values()] worker_tasks = [] - hasura_manager: Optional[HasuraManager] + hasura_gateway: Optional[HasuraGateway] if self._config.hasura: if not isinstance(self._config.database, PostgresDatabaseConfig): raise RuntimeError - hasura_manager = HasuraManager(self._config.package, self._config.hasura, self._config.database) - worker_tasks.append(asyncio.create_task(hasura_manager.configure())) + hasura_gateway = HasuraGateway(self._config.package, self._config.hasura, self._config.database) + worker_tasks.append(asyncio.create_task(hasura_gateway.configure())) else: - hasura_manager = None + hasura_gateway = None if self._config.jobs and not oneshot: for job_name, job_config in self._config.jobs.items(): @@ -207,8 +207,8 @@ async def run(self, reindex: bool, oneshot: bool) -> None: finally: self._logger.info('Closing datasource sessions') await asyncio.gather(*[d.close_session() for d in self._datasources.values()]) - if hasura_manager: - await hasura_manager.close_session() + if hasura_gateway: + await hasura_gateway.close_session() # FIXME: AttributeError: 'NoneType' object has no attribute 'call_soon_threadsafe' with suppress(AttributeError, SchedulerNotRunningError): self._scheduler.shutdown(wait=True) @@ -254,6 +254,7 @@ async def _create_datasources(self) -> None: else: raise NotImplementedError + datasource.set_user_agent(self._config.package) self._datasources[name] = datasource self._datasources_by_config[datasource_config] = datasource diff --git a/src/dipdup/hasura.py b/src/dipdup/hasura.py index 7098673b6..81ab67abd 100644 --- a/src/dipdup/hasura.py +++ b/src/dipdup/hasura.py @@ -15,8 +15,8 @@ from tortoise.transactions import get_connection from dipdup.config import HasuraConfig, HTTPConfig, PostgresDatabaseConfig, pascal_to_snake -from dipdup.datasources.proxy import HTTPRequestProxy from dipdup.exceptions import ConfigurationError +from dipdup.http import HTTPGateway @dataclass @@ -50,17 +50,19 @@ class HasuraError(RuntimeError): ... -class HasuraManager: +class HasuraGateway(HTTPGateway): def __init__( - self, package: str, hasura_config: HasuraConfig, database_config: PostgresDatabaseConfig, proxy: Optional[HTTPRequestProxy] = None + self, + package: str, + hasura_config: HasuraConfig, + database_config: PostgresDatabaseConfig, + http_config: Optional[HTTPConfig] = None, ) -> None: - if proxy is None: - proxy = HTTPRequestProxy(HTTPConfig(cache=False)) - self._logger = logging.getLogger(__name__) + super().__init__(hasura_config.url, http_config) + self._logger = logging.getLogger('dipdup.hasura') self._package = package self._hasura_config = hasura_config self._database_config = database_config - self._proxy = proxy async def configure(self, reset: bool = False) -> None: """Generate Hasura metadata and apply to instance with credentials from `hasura` config section.""" @@ -116,12 +118,18 @@ async def configure(self, reset: bool = False) -> None: self._logger.info('Hasura instance has been configured') - async def close_session(self) -> None: - await self._proxy.close_session() + def _default_http_config(self) -> HTTPConfig: + return HTTPConfig( + cache=False, + retry_count=3, + retry_sleep=1, + ratelimit_rate=100, + ratelimit_period=1, + ) - async def _hasura_http_request(self, endpoint: str, json: Dict[str, Any]) -> Dict[str, Any]: + async def _hasura_request(self, endpoint: str, json: Dict[str, Any]) -> Dict[str, Any]: self._logger.debug('Sending `%s` request: %s', endpoint, json) - result = await self._proxy.http_request( + result = await self._http.request( method='post', cache=False, url=f'{self._hasura_config.url}/v1/{endpoint}', @@ -137,7 +145,7 @@ async def _healthcheck(self) -> None: self._logger.info('Waiting for Hasura instance to be ready') for _ in range(self._hasura_config.connection_timeout): with suppress(ClientConnectorError, ClientOSError): - response = await self._proxy._session.get(f'{self._hasura_config.url}/healthz') + response = await self._http._session.get(f'{self._hasura_config.url}/healthz') if response.status == 200: break await asyncio.sleep(1) @@ -146,7 +154,7 @@ async def _healthcheck(self) -> None: async def _reset_metadata(self) -> None: self._logger.info('Resetting metadata') - await self._hasura_http_request( + await self._hasura_request( endpoint='metadata', json={ "type": "clear_metadata", @@ -156,7 +164,7 @@ async def _reset_metadata(self) -> None: async def _fetch_metadata(self) -> Dict[str, Any]: self._logger.info('Fetching existing metadata') - return await self._hasura_http_request( + return await self._hasura_request( endpoint='metadata', json={ "type": "export_metadata", @@ -166,7 +174,7 @@ async def _fetch_metadata(self) -> Dict[str, Any]: async def _replace_metadata(self, metadata: Dict[str, Any]) -> None: self._logger.info('Replacing metadata') - await self._hasura_http_request( + await self._hasura_request( endpoint='query', json={ "type": "replace_metadata", @@ -305,7 +313,7 @@ async def _get_fields(self, name: str = 'query_root') -> List[Field]: ).replace( ' ', '' ) - result = await self._hasura_http_request( + result = await self._hasura_request( endpoint='graphql', json={ 'query': query, @@ -374,7 +382,7 @@ async def _apply_camelcase(self) -> None: }, } - await self._hasura_http_request( + await self._hasura_request( endpoint='metadata', json={ 'type': 'pg_set_table_customization', diff --git a/src/dipdup/http.py b/src/dipdup/http.py new file mode 100644 index 000000000..5871246f5 --- /dev/null +++ b/src/dipdup/http.py @@ -0,0 +1,138 @@ +import asyncio +import hashlib +import logging +import pickle +import platform +from abc import ABC, abstractmethod +from contextlib import suppress +from http import HTTPStatus +from typing import Mapping, Optional, Tuple, cast + +import aiohttp +from aiolimiter import AsyncLimiter +from fcache.cache import FileCache # type: ignore + +from dipdup import __version__ +from dipdup.config import HTTPConfig # type: ignore + + +class HTTPGateway(ABC): + def __init__(self, url: str, http_config: Optional[HTTPConfig] = None) -> None: + self._http_config = self._default_http_config() + self._http_config.merge(http_config) + self._http = _HTTPGateway(url.rstrip('/'), self._http_config) + + @abstractmethod + def _default_http_config(self) -> HTTPConfig: + ... + + async def close_session(self) -> None: + await self._http.close_session() + + def set_user_agent(self, *args: str) -> None: + self._http.set_user_agent(*args) + + +class _HTTPGateway: + """Wrapper for aiohttp HTTP requests. + + Covers caching, retrying failed requests and ratelimiting""" + + def __init__(self, url: str, config: HTTPConfig) -> None: + self._logger = logging.getLogger('dipdup.http') + self._url = url + self._config = config + self._user_agent_args: Tuple[str, ...] = () + self._user_agent: Optional[str] = None + self._cache = FileCache('dipdup', flag='cs') + self._ratelimiter = ( + AsyncLimiter(max_rate=config.ratelimit_rate, time_period=config.ratelimit_period) + if config.ratelimit_rate and config.ratelimit_period + else None + ) + self._session = aiohttp.ClientSession() + + @property + def user_agent(self) -> str: + if self._user_agent is None: + user_agent_args = (platform.system(), platform.machine()) + (self._user_agent_args or ()) + user_agent = f'dipdup/{__version__} ({"; ".join(user_agent_args)})' + user_agent += ' ' + aiohttp.http.SERVER_SOFTWARE + self._user_agent = user_agent + return self._user_agent + + async def _wrapped_request(self, method: str, url: str, **kwargs): + attempts = list(range(self._config.retry_count)) if self._config.retry_count else [0] + for attempt in attempts: + self._logger.debug('HTTP request attempt %s/%s', attempt + 1, self._config.retry_count) + try: + return await self._request( + method=method, + url=url, + **kwargs, + ) + except (aiohttp.ClientConnectionError, aiohttp.ClientConnectorError) as e: + if attempt + 1 == attempts[-1]: + raise e + self._logger.warning('HTTP request failed: %s', e) + await asyncio.sleep(self._config.retry_sleep or 0) + except aiohttp.ClientResponseError as e: + if e.code == HTTPStatus.TOO_MANY_REQUESTS: + ratelimit_sleep = 5 + # TODO: Parse Retry-After in UTC date format + with suppress(KeyError, ValueError): + e.headers = cast(Mapping, e.headers) + ratelimit_sleep = int(e.headers['Retry-After']) + + self._logger.warning('HTTP request failed: %s', e) + await asyncio.sleep(ratelimit_sleep) + else: + if attempt + 1 == attempts[-1]: + raise e + self._logger.warning('HTTP request failed: %s', e) + await asyncio.sleep(self._config.retry_sleep or 0) + + async def _request(self, method: str, url: str, **kwargs): + """Wrapped aiohttp call with preconfigured headers and logging""" + headers = { + **kwargs.pop('headers', {}), + 'User-Agent': self.user_agent, + } + if not url.startswith(self._url): + url = self._url + '/' + url.lstrip('/') + params = kwargs.get('params', {}) + params_string = '&'.join([f'{k}={v}' for k, v in params.items()]) + request_string = f'{url}?{params_string}'.rstrip('?') + self._logger.debug('Calling `%s`', request_string) + async with self._session.request( + method=method, + url=url, + headers=headers, + raise_for_status=True, + **kwargs, + ) as response: + return await response.json() + + async def request(self, method: str, url: str, cache: bool = False, weight: int = 1, **kwargs): + if self._config.cache and cache: + key = hashlib.sha256(pickle.dumps([method, url, kwargs])).hexdigest() + try: + return self._cache[key] + except KeyError: + if self._ratelimiter: + await self._ratelimiter.acquire(weight) + response = await self._wrapped_request(method, url, **kwargs) + self._cache[key] = response + return response + else: + if self._ratelimiter: + await self._ratelimiter.acquire(weight) + response = await self._wrapped_request(method, url, **kwargs) + return response + + async def close_session(self) -> None: + await self._session.close() + + def set_user_agent(self, *args: str) -> None: + self._user_agent_args = args + self._user_agent = None diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 91e934e19..ee3b5c43e 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -30,7 +30,7 @@ def __init__(self, ctx: DipDupContext, config: IndexConfigTemplateT, datasource: self._config = config self._datasource = datasource - self._logger = logging.getLogger(__name__) + self._logger = logging.getLogger('dipdup.index') self._state: Optional[State] = None @property diff --git a/src/dipdup/models.py b/src/dipdup/models.py index d79ec0c90..417992f6f 100644 --- a/src/dipdup/models.py +++ b/src/dipdup/models.py @@ -16,7 +16,7 @@ ValueType = TypeVar('ValueType', bound=BaseModel) -_logger = logging.getLogger(__name__) +_logger = logging.getLogger('dipdup.models') class IndexType(Enum): diff --git a/src/dipdup/utils.py b/src/dipdup/utils.py index b059d08ec..e3f5659a1 100644 --- a/src/dipdup/utils.py +++ b/src/dipdup/utils.py @@ -6,16 +6,13 @@ from logging import Logger from typing import Any, AsyncIterator, Iterator, List, Optional -import aiohttp from tortoise import Tortoise from tortoise.backends.asyncpg.client import AsyncpgDBClient from tortoise.backends.base.client import TransactionContext from tortoise.backends.sqlite.client import SqliteClient from tortoise.transactions import in_transaction -from dipdup import __version__ - -_logger = logging.getLogger(__name__) +_logger = logging.getLogger('dipdup.utils') @asynccontextmanager @@ -101,25 +98,6 @@ async def in_global_transaction(): Tortoise._connections['default'] = original_conn -async def http_request(session: aiohttp.ClientSession, method: str, **kwargs): - """Wrapped aiohttp call with preconfigured headers and logging""" - headers = { - **kwargs.pop('headers', {}), - 'User-Agent': f'dipdup/{__version__}', - } - url = kwargs['url'] - params = kwargs.get('params', {}) - params_string = '&'.join([f'{k}={v}' for k, v in params.items()]) - request_string = f'{url}?{params_string}'.rstrip('?') - _logger.debug('Calling `%s`', request_string) - async with getattr(session, method)( - skip_auto_headers={'User-Agent'}, - headers=headers, - **kwargs, - ) as response: - return await response.json() - - class FormattedLogger(Logger): def __init__( self, diff --git a/tests/integration_tests/test_hasura.py b/tests/integration_tests/test_hasura.py index 49d0ade3b..91329c5e5 100644 --- a/tests/integration_tests/test_hasura.py +++ b/tests/integration_tests/test_hasura.py @@ -6,7 +6,7 @@ from tortoise import Tortoise from dipdup.config import HasuraConfig, PostgresDatabaseConfig -from dipdup.hasura import HasuraManager +from dipdup.hasura import HasuraGateway from dipdup.utils import tortoise_wrapper @@ -34,11 +34,11 @@ async def test_configure_hasura(self): database_config = PostgresDatabaseConfig(kind='postgres', host='', port=0, user='', database='', schema_name='hic_et_nunc') hasura_config = HasuraConfig('http://localhost') - hasura_manager = HasuraManager('demo_hic_et_nunc', hasura_config, database_config) - hasura_manager._get_views = AsyncMock(return_value=[]) - await hasura_manager._proxy._session.close() - hasura_manager._proxy = Mock() - hasura_manager._proxy.http_request = AsyncMock( + hasura_gateway = HasuraGateway('demo_hic_et_nunc', hasura_config, database_config) + hasura_gateway._get_views = AsyncMock(return_value=[]) + await hasura_gateway.close_session() + hasura_gateway._http = Mock() + hasura_gateway._http.request = AsyncMock( side_effect=[ empty_metadata, {}, @@ -50,8 +50,8 @@ async def test_configure_hasura(self): {}, ] ) - hasura_manager._healthcheck = AsyncMock() + hasura_gateway._healthcheck = AsyncMock() - await hasura_manager.configure() + await hasura_gateway.configure() - self.assertEqual(hasura_manager._proxy.http_request.call_args[-1]['json'], replace_metadata_request) + self.assertEqual(hasura_gateway._http.request.call_args[-1]['json'], replace_metadata_request)