Skip to content

Commit

Permalink
Sentry environment, job config interval, fix shutdown, minor improv…
Browse files Browse the repository at this point in the history
…ements (#93)
  • Loading branch information
droserasprout authored Jul 16, 2021
1 parent f99c269 commit ee7803e
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 114 deletions.
45 changes: 30 additions & 15 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ aiohttp = "^3.7.4"
asyncpg = "0.23.0"
datamodel-code-generator = "^0.11.1"
"ruamel.yaml" = "^0.17.2"
tortoise-orm = "0.17.4"
tortoise-orm = "0.17.5"
pydantic = "^1.8.1"
aiosignalrcore = "^0.9.2"
fcache = "^0.4.7"
Expand Down
13 changes: 7 additions & 6 deletions src/dipdup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dipdup.dipdup import DipDup
from dipdup.exceptions import ConfigurationError, DipDupError, MigrationRequiredError
from dipdup.hasura import HasuraGateway
from dipdup.utils import tortoise_wrapper
from dipdup.utils import set_decimal_context, tortoise_wrapper

_logger = logging.getLogger('dipdup.cli')

Expand Down Expand Up @@ -66,9 +66,12 @@ async def cli(ctx, config: List[str], logging_config: str):
if _config.sentry:
sentry_sdk.init(
dsn=_config.sentry.dsn,
environment=_config.sentry.environment,
integrations=[AioHttpIntegration()],
)

set_decimal_context(_config.package)

ctx.obj = CLIContext(
config_paths=config,
config=_config,
Expand Down Expand Up @@ -143,10 +146,8 @@ async def configure_hasura(ctx, reset: bool):
if not config.hasura:
_logger.error('`hasura` config section is empty')
return
hasura = HasuraGateway(config.package, config.hasura, cast(PostgresDatabaseConfig, config.database))
hasura_gateway = HasuraGateway(config.package, config.hasura, cast(PostgresDatabaseConfig, config.database))

async with tortoise_wrapper(url, models):
try:
await hasura.configure(reset)
finally:
await hasura.close_session()
async with hasura_gateway:
await hasura_gateway.configure(reset)
25 changes: 18 additions & 7 deletions src/dipdup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,12 @@ def valid_immune_tables(cls, v):
class HTTPConfig:
cache: Optional[bool] = None
retry_count: Optional[int] = None
retry_sleep: Optional[int] = None
retry_sleep: Optional[float] = None
retry_multiplier: Optional[float] = None
ratelimit_rate: Optional[int] = None
ratelimit_period: Optional[int] = None
connection_limit: Optional[int] = None
batch_size: Optional[int] = None

def merge(self, other: Optional['HTTPConfig']) -> None:
if not other:
Expand Down Expand Up @@ -162,12 +165,13 @@ class TzktDatasourceConfig(NameMixin):
def __hash__(self):
return hash(self.url)

@validator('url', allow_reuse=True)
def valid_url(cls, v):
parsed_url = urlparse(v)
def __post_init_post_parse__(self) -> None:
super().__post_init_post_parse__()
if self.http and self.http.batch_size and self.http.batch_size > 10000:
raise ConfigurationError('`batch_size` must be less than 10000')
parsed_url = urlparse(self.url)
if not (parsed_url.scheme and parsed_url.netloc):
raise ConfigurationError(f'`{v}` is not a valid datasource URL')
return v
raise ConfigurationError(f'`{self.url}` is not a valid datasource URL')


@dataclass
Expand Down Expand Up @@ -666,14 +670,21 @@ def headers(self) -> Dict[str, str]:

@dataclass
class JobConfig(HandlerConfig):
crontab: str
crontab: Optional[str] = None
interval: Optional[int] = None
args: Optional[Dict[str, Any]] = None
atomic: bool = False

def __post_init_post_parse__(self):
if int(bool(self.crontab)) + int(bool(self.interval)) != 1:
raise ConfigurationError('Either `interval` or `crontab` field must be specified')
super().__post_init_post_parse__()


@dataclass
class SentryConfig:
dsn: str
environment: Optional[str] = None


@dataclass
Expand Down
3 changes: 2 additions & 1 deletion src/dipdup/datasources/bcd/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ async def get_token(self, address: str, token_id: int) -> Optional[Dict[str, Any
def _default_http_config(self) -> HTTPConfig:
return HTTPConfig(
cache=True,
retry_count=3,
retry_sleep=1,
retry_multiplier=1.1,
ratelimit_rate=100,
ratelimit_period=30,
connection_limit=25,
)
6 changes: 3 additions & 3 deletions src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

OperationID = int

TZKT_HTTP_REQUEST_LIMIT = 10000
TZKT_ORIGINATIONS_REQUEST_LIMIT = 100
OPERATION_FIELDS = (
"type",
Expand Down Expand Up @@ -291,7 +290,7 @@ def __init__(

@property
def request_limit(self) -> int:
return TZKT_HTTP_REQUEST_LIMIT
return self._http_config.batch_size or 10000

@property
def level(self) -> Optional[int]:
Expand Down Expand Up @@ -583,10 +582,11 @@ async def subscribe_to_head(self) -> None:
def _default_http_config(self) -> HTTPConfig:
return HTTPConfig(
cache=True,
retry_count=3,
retry_sleep=1,
retry_multiplier=1.1,
ratelimit_rate=100,
ratelimit_period=30,
connection_limit=25,
)

async def _on_operation_message(self, message: List[Dict[str, Any]]) -> None:
Expand Down
Loading

0 comments on commit ee7803e

Please sign in to comment.