Skip to content

Commit

Permalink
Refactor HTTP stack, update User-Agent header, fix caching, add defau…
Browse files Browse the repository at this point in the history
…lt datasource ratelimits (#87)
  • Loading branch information
droserasprout authored Jul 12, 2021
1 parent 5773bc0 commit e397af5
Show file tree
Hide file tree
Showing 18 changed files with 282 additions and 231 deletions.
15 changes: 5 additions & 10 deletions src/dipdup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,21 @@
from dataclasses import dataclass
from functools import wraps
from os.path import dirname, join
from typing import List
from typing import List, NoReturn, cast
from typing import List, cast

import click
import sentry_sdk
from fcache.cache import FileCache # type: ignore
from sentry_sdk.integrations.aiohttp import AioHttpIntegration

from dipdup import __spec_version__, __version__, spec_version_mapping
from dipdup.config import DipDupConfig, LoggingConfig
from dipdup.dipdup import DipDup
from dipdup.exceptions import ConfigurationError, DipDupError, MigrationRequiredError
from dipdup import __spec_version__, __version__
from dipdup.config import DipDupConfig, LoggingConfig, PostgresDatabaseConfig
from dipdup.dipdup import DipDup
from dipdup.exceptions import ConfigurationError
from dipdup.hasura import HasuraManager
from dipdup.exceptions import ConfigurationError, DipDupError, MigrationRequiredError
from dipdup.hasura import HasuraGateway
from dipdup.utils import tortoise_wrapper

_logger = logging.getLogger(__name__)
_logger = logging.getLogger('dipdup.cli')


def click_command_wrapper(fn):
Expand Down Expand Up @@ -148,7 +143,7 @@ async def configure_hasura(ctx, reset: bool):
if not config.hasura:
_logger.error('`hasura` config section is empty')
return
hasura = HasuraManager(config.package, config.hasura, cast(PostgresDatabaseConfig, config.database))
hasura = HasuraGateway(config.package, config.hasura, cast(PostgresDatabaseConfig, config.database))

async with tortoise_wrapper(url, models):
try:
Expand Down
2 changes: 1 addition & 1 deletion src/dipdup/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class DipDupCodeGenerator:
"""Generates package based on config, invoked from `init` CLI command"""

def __init__(self, config: DipDupConfig, datasources: Dict[DatasourceConfigT, DatasourceT]) -> None:
self._logger = logging.getLogger(__name__)
self._logger = logging.getLogger('dipdup.codegen')
self._config = config
self._datasources = datasources
self._schemas: Dict[TzktDatasourceConfig, Dict[str, Dict[str, Any]]] = {}
Expand Down
15 changes: 11 additions & 4 deletions src/dipdup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
DEFAULT_RETRY_SLEEP = 1

sys.path.append(os.getcwd())
_logger = logging.getLogger(__name__)
_logger = logging.getLogger('dipdup.config')


class OperationType(Enum):
Expand Down Expand Up @@ -91,12 +91,19 @@ def valid_immune_tables(cls, v):

@dataclass
class HTTPConfig:
cache: bool = True
retry_count: int = DEFAULT_RETRY_COUNT
retry_sleep: int = DEFAULT_RETRY_SLEEP
cache: Optional[bool] = None
retry_count: Optional[int] = None
retry_sleep: Optional[int] = None
ratelimit_rate: Optional[int] = None
ratelimit_period: Optional[int] = None

def merge(self, other: Optional['HTTPConfig']) -> None:
if not other:
return
for k, v in other.__dict__.items():
if v is not None:
setattr(self, k, v)


@dataclass
class ContractConfig:
Expand Down
8 changes: 1 addition & 7 deletions src/dipdup/configs/debug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@
class: logging.StreamHandler
stream : ext://sys.stdout
loggers:
dipdup.dipdup:
level: DEBUG
dipdup.index:
level: DEBUG
dipdup.tzkt:
level: DEBUG
dipdup.bcd:
dipdup:
level: DEBUG

SignalRCoreClient:
Expand Down
8 changes: 1 addition & 7 deletions src/dipdup/configs/logging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@
class: logging.StreamHandler
stream : ext://sys.stdout
loggers:
dipdup.dipdup:
level: INFO
dipdup.index:
level: INFO
dipdup.tzkt:
level: INFO
dipdup.bcd:
dipdup:
level: INFO

SignalRCoreClient:
Expand Down
14 changes: 4 additions & 10 deletions src/dipdup/configs/warning.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,15 @@
class: logging.StreamHandler
stream : ext://sys.stdout
loggers:
dipdup.dipdup:
level: INFO
dipdup.index:
level: INFO
dipdup.tzkt:
level: INFO
dipdup.bcd:
level: INFO
dipdup:
level: WARNING

SignalRCoreClient:
formatter: brief
aiosqlite:
level: INFO
level: WARNING
db_client:
level: INFO
level: WARNING
root:
level: WARNING
handlers:
Expand Down
32 changes: 17 additions & 15 deletions src/dipdup/datasources/bcd/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,21 @@
from typing import Any, Dict, List, Optional

from dipdup.config import HTTPConfig
from dipdup.datasources.proxy import HTTPRequestProxy
from dipdup.http import HTTPGateway

TOKENS_REQUEST_LIMIT = 10


class BcdDatasource:
class BcdDatasource(HTTPGateway):
def __init__(
self,
url: str,
network: str,
http_config: Optional[HTTPConfig] = None,
) -> None:
if http_config is None:
http_config = HTTPConfig()

self._url = url.rstrip('/')
self._network = network
self._proxy = HTTPRequestProxy(http_config)
super().__init__(url, http_config)
self._logger = logging.getLogger('dipdup.bcd')

async def close_session(self) -> None:
await self._proxy.close_session()
self._network = network

async def run(self) -> None:
pass
Expand All @@ -34,9 +27,9 @@ async def resync(self) -> None:
async def get_tokens(self, address: str) -> List[Dict[str, Any]]:
tokens, offset = [], 0
while True:
tokens_batch = await self._proxy.http_request(
tokens_batch = await self._http.request(
'get',
url=f'{self._url}/v1/contract/{self._network}/{address}/tokens?offset={offset}',
url=f'v1/contract/{self._network}/{address}/tokens?offset={offset}',
)
tokens += tokens_batch
offset += TOKENS_REQUEST_LIMIT
Expand All @@ -45,7 +38,16 @@ async def get_tokens(self, address: str) -> List[Dict[str, Any]]:
return tokens

async def get_token(self, address: str, token_id: int) -> Dict[str, Any]:
return await self._proxy.http_request(
return await self._http.request(
'get',
url=f'{self._url}/v1/contract/{self._network}/{address}/tokens?token_id={token_id}',
url=f'v1/contract/{self._network}/{address}/tokens?token_id={token_id}',
)

def _default_http_config(self) -> HTTPConfig:
return HTTPConfig(
cache=True,
retry_count=3,
retry_sleep=1,
ratelimit_rate=100,
ratelimit_period=30,
)
38 changes: 18 additions & 20 deletions src/dipdup/datasources/coinbase/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,16 @@

from dipdup.config import HTTPConfig
from dipdup.datasources.coinbase.models import CandleData, CandleInterval
from dipdup.datasources.proxy import HTTPRequestProxy
from dipdup.http import HTTPGateway

CANDLES_REQUEST_LIMIT = 300
REST_API_URL = 'https://api.pro.coinbase.com'
WEBSOCKET_API_URL = 'wss://ws-feed.pro.coinbase.com'
API_URL = 'https://api.pro.coinbase.com'


class CoinbaseDatasource:
def __init__(self, http_config: Optional[HTTPConfig] = None) -> None:
if http_config is None:
http_config = HTTPConfig(
cache=True,
ratelimit_rate=10,
ratelimit_period=1,
)

class CoinbaseDatasource(HTTPGateway):
def __init__(self, url: str = API_URL, http_config: Optional[HTTPConfig] = None) -> None:
super().__init__(url, http_config)
self._logger = logging.getLogger('dipdup.coinbase')
self._proxy = HTTPRequestProxy(http_config)

async def close_session(self) -> None:
await self._proxy.close_session()

async def run(self) -> None:
pass
Expand All @@ -33,17 +22,17 @@ async def resync(self) -> None:
pass

async def get_oracle_prices(self) -> Dict[str, Any]:
return await self._proxy.http_request(
return await self._http.request(
'get',
url=f'{REST_API_URL}/oracle',
url='oracle',
)

async def get_candles(self, since: datetime, until: datetime, interval: CandleInterval, ticker: str = 'XTZ-USD') -> List[CandleData]:
candles = []
for _since, _until in self._split_candle_requests(since, until, interval):
candles_json = await self._proxy.http_request(
candles_json = await self._http.request(
'get',
url=f'{REST_API_URL}/products/{ticker}/candles',
url=f'products/{ticker}/candles',
params={
'start': _since.replace(tzinfo=timezone.utc).isoformat(),
'end': _until.replace(tzinfo=timezone.utc).isoformat(),
Expand All @@ -54,6 +43,15 @@ async def get_candles(self, since: datetime, until: datetime, interval: CandleIn
candles += [CandleData.from_json(c) for c in candles_json]
return sorted(candles, key=lambda c: c.timestamp)

def _default_http_config(self) -> HTTPConfig:
return HTTPConfig(
cache=True,
retry_count=3,
retry_sleep=1,
ratelimit_rate=10,
ratelimit_period=1,
)

def _split_candle_requests(self, since: datetime, until: datetime, interval: CandleInterval) -> List[Tuple[datetime, datetime]]:
request_interval_limit = timedelta(seconds=interval.seconds * CANDLES_REQUEST_LIMIT)
request_intervals = []
Expand Down
11 changes: 8 additions & 3 deletions src/dipdup/datasources/datasource.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from abc import ABC
from enum import Enum
from typing import Awaitable, List, Protocol
from typing import Awaitable, List, Optional, Protocol

from pyee import AsyncIOEventEmitter # type: ignore

from dipdup.config import HTTPConfig
from dipdup.http import HTTPGateway
from dipdup.models import BigMapData, OperationData


Expand All @@ -28,7 +29,11 @@ def __call__(self, datasource: 'IndexDatasource', from_level: int, to_level: int
...


class IndexDatasource(ABC, AsyncIOEventEmitter):
class IndexDatasource(HTTPGateway, AsyncIOEventEmitter):
def __init__(self, url: str, http_config: Optional[HTTPConfig] = None) -> None:
HTTPGateway.__init__(self, url, http_config)
AsyncIOEventEmitter.__init__(self)

def on(self, event, f=None) -> None:
raise RuntimeError('Do not use `on` directly')

Expand Down
62 changes: 0 additions & 62 deletions src/dipdup/datasources/proxy.py

This file was deleted.

Loading

0 comments on commit e397af5

Please sign in to comment.