Skip to content

Commit

Permalink
SubscriptionManager class, bugfixes (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Aug 3, 2021
1 parent 1c43ebf commit 65b9ea3
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 101 deletions.
12 changes: 1 addition & 11 deletions src/dipdup/__main__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
import logging

from dipdup.cli import cli
from dipdup.exceptions import DipDupError

if __name__ == '__main__':
try:
cli(prog_name='dipdup', standalone_mode=False) # type: ignore
except KeyboardInterrupt:
pass
except DipDupError as e:
logging.critical(e.__repr__())
logging.info(e.format())
quit(e.exit_code)
cli(prog_name='dipdup', standalone_mode=False) # type: ignore
27 changes: 26 additions & 1 deletion src/dipdup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
from dataclasses import dataclass
from functools import wraps
from os.path import dirname, exists, join
from typing import List, cast

Expand All @@ -16,7 +17,7 @@
from dipdup.codegen import DEFAULT_DOCKER_ENV_FILE, DEFAULT_DOCKER_IMAGE, DEFAULT_DOCKER_TAG, DipDupCodeGenerator
from dipdup.config import DipDupConfig, LoggingConfig, PostgresDatabaseConfig
from dipdup.dipdup import DipDup
from dipdup.exceptions import ConfigurationError, MigrationRequiredError
from dipdup.exceptions import ConfigurationError, DipDupError, MigrationRequiredError
from dipdup.hasura import HasuraGateway
from dipdup.utils import set_decimal_context, tortoise_wrapper

Expand All @@ -30,6 +31,21 @@ class CLIContext:
logging_config: LoggingConfig


def cli_wrapper(fn):
@wraps(fn)
async def wrapper(*args, **kwargs):
try:
return await fn(*args, **kwargs)
except KeyboardInterrupt:
pass
except DipDupError as e:
_logger.critical(e.__repr__())
_logger.info(e.format())
quit(e.exit_code)

return wrapper


def init_sentry(config: DipDupConfig) -> None:
if not config.sentry:
return
Expand Down Expand Up @@ -59,6 +75,7 @@ def init_sentry(config: DipDupConfig) -> None:
@click.option('--env-file', '-e', type=str, multiple=True, help='Path to .env file', default=[])
@click.option('--logging-config', '-l', type=str, help='Path to logging YAML config', default='logging.yml')
@click.pass_context
@cli_wrapper
async def cli(ctx, config: List[str], env_file: List[str], logging_config: str):
try:
path = join(os.getcwd(), logging_config)
Expand Down Expand Up @@ -96,6 +113,7 @@ async def cli(ctx, config: List[str], env_file: List[str], logging_config: str):
@click.option('--reindex', is_flag=True, help='Drop database and start indexing from scratch')
@click.option('--oneshot', is_flag=True, help='Synchronize indexes wia REST and exit without starting WS connection')
@click.pass_context
@cli_wrapper
async def run(ctx, reindex: bool, oneshot: bool) -> None:
config: DipDupConfig = ctx.obj.config
config.initialize()
Expand All @@ -106,6 +124,7 @@ async def run(ctx, reindex: bool, oneshot: bool) -> None:

@cli.command(help='Initialize new dipdup project')
@click.pass_context
@cli_wrapper
async def init(ctx):
config: DipDupConfig = ctx.obj.config
config.pre_initialize()
Expand All @@ -115,6 +134,7 @@ async def init(ctx):

@cli.command(help='Migrate project to the new spec version')
@click.pass_context
@cli_wrapper
async def migrate(ctx):
def _bump_spec_version(spec_version: str):
for config_path in ctx.obj.config_paths:
Expand All @@ -141,12 +161,14 @@ def _bump_spec_version(spec_version: str):

@cli.command(help='Clear development request cache')
@click.pass_context
@cli_wrapper
async def clear_cache(ctx):
FileCache('dipdup', flag='cs').clear()


@cli.group()
@click.pass_context
@cli_wrapper
async def docker(ctx):
...

Expand All @@ -156,20 +178,23 @@ async def docker(ctx):
@click.option('--tag', '-t', type=str, help='DipDup Docker tag', default=DEFAULT_DOCKER_TAG)
@click.option('--env-file', '-e', type=str, help='Path to env_file', default=DEFAULT_DOCKER_ENV_FILE)
@click.pass_context
@cli_wrapper
async def docker_init(ctx, image: str, tag: str, env_file: str):
config: DipDupConfig = ctx.obj.config
await DipDupCodeGenerator(config, {}).generate_docker(image, tag, env_file)


@cli.group()
@click.pass_context
@cli_wrapper
async def hasura(ctx):
...


@hasura.command(name='configure', help='Configure Hasura GraphQL Engine')
@click.option('--reset', is_flag=True, help='Reset metadata before configuring')
@click.pass_context
@cli_wrapper
async def hasura_configure(ctx, reset: bool):
config: DipDupConfig = ctx.obj.config
url = config.database.connection_string
Expand Down
4 changes: 2 additions & 2 deletions src/dipdup/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
OperationIndexConfig,
TzktDatasourceConfig,
)
from dipdup.datasources import DatasourceT
from dipdup.datasources.datasource import Datasource
from dipdup.datasources.tzkt.datasource import TzktDatasource
from dipdup.exceptions import ConfigurationError
from dipdup.utils import import_submodules, mkdir_p, pascal_to_snake, snake_to_pascal, touch, write
Expand Down Expand Up @@ -68,7 +68,7 @@ def load_template(name: str) -> Template:
class DipDupCodeGenerator:
"""Generates package based on config, invoked from `init` CLI command"""

def __init__(self, config: DipDupConfig, datasources: Dict[DatasourceConfigT, DatasourceT]) -> None:
def __init__(self, config: DipDupConfig, datasources: Dict[DatasourceConfigT, Datasource]) -> None:
self._logger = logging.getLogger('dipdup.codegen')
self._config = config
self._datasources = datasources
Expand Down
64 changes: 52 additions & 12 deletions src/dipdup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from enum import Enum
from os import environ as env
from os.path import dirname
from typing import Any, Callable, Dict, List, Optional, Sequence, Type, Union, cast
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Type, Union, cast
from urllib.parse import urlparse

from pydantic import Field, validator
Expand Down Expand Up @@ -285,6 +285,24 @@ def initialize_storage_cls(self, package: str, module_name: str) -> None:
self.storage_type_cls = storage_type_cls


@dataclass
class ParentMixin:
"""`parent` field for index and template configs"""

def __post_init_post_parse__(self):
self._parent: Optional['IndexConfig'] = None

@property
def parent(self) -> Optional['IndexConfig']:
return self._parent

@parent.setter
def parent(self, config: 'IndexConfig') -> None:
if self._parent:
raise RuntimeError('Can\'t unset parent once set')
self._parent = config


@dataclass
class ParameterTypeMixin:
"""`parameter_type_cls` field"""
Expand Down Expand Up @@ -459,10 +477,11 @@ def originated_contract_config(self) -> ContractConfig:


@dataclass
class HandlerConfig:
class HandlerConfig(NameMixin):
callback: str

def __post_init_post_parse__(self):
super().__post_init_post_parse__()
self._callback_fn = None
if self.callback in (ROLLBACK_HANDLER, CONFIGURE_HANDLER):
raise ConfigurationError(f'`{self.callback}` callback name is reserved')
Expand Down Expand Up @@ -509,12 +528,13 @@ def template_values(self, value: Dict[str, str]) -> None:


@dataclass
class IndexConfig(TemplateValuesMixin, NameMixin):
class IndexConfig(TemplateValuesMixin, NameMixin, ParentMixin):
datasource: Union[str, TzktDatasourceConfig]

def __post_init_post_parse__(self) -> None:
TemplateValuesMixin.__post_init_post_parse__(self)
NameMixin.__post_init_post_parse__(self)
ParentMixin.__post_init_post_parse__(self)

def hash(self) -> str:
config_json = json.dumps(self, default=pydantic_encoder)
Expand All @@ -532,10 +552,11 @@ def datasource_config(self) -> TzktDatasourceConfig:
class OperationIndexConfig(IndexConfig):
"""Operation index config
:param datasource: Alias of datasource in `datasources` block
:param contract: Alias of contract to fetch operations for
:param first_block: First block to process
:param last_block: Last block to process
:param datasource: Alias of index datasource in `datasources` section
:param contracts: Aliases of contracts being indexed in `contracts` section
:param stateless: Makes index dynamic. DipDup will synchronize index from the first block on every run
:param first_block: First block to process (use with `--oneshot` run argument)
:param last_block: Last block to process (use with `--oneshot` run argument)
:param handlers: List of indexer handlers
"""

Expand All @@ -557,6 +578,15 @@ def contract_configs(self) -> List[ContractConfig]:
raise RuntimeError('Config is not initialized')
return cast(List[ContractConfig], self.contracts)

@property
def entrypoints(self) -> Set[str]:
entrypoints = set()
for handler in self.handlers:
for pattern in handler.pattern:
if isinstance(pattern, OperationHandlerTransactionPatternConfig) and pattern.entrypoint:
entrypoints.add(pattern.entrypoint)
return entrypoints


@dataclass
class BigMapHandlerConfig(HandlerConfig):
Expand Down Expand Up @@ -611,7 +641,7 @@ def contracts(self) -> List[ContractConfig]:


@dataclass
class IndexTemplateConfig:
class IndexTemplateConfig(ParentMixin):
kind = 'template'
template: str
values: Dict[str, str]
Expand Down Expand Up @@ -692,12 +722,12 @@ class DipDupConfig:
spec_version: str
package: str
datasources: Dict[str, DatasourceConfigT]
database: Union[SqliteDatabaseConfig, PostgresDatabaseConfig] = SqliteDatabaseConfig(kind='sqlite')
contracts: Dict[str, ContractConfig] = Field(default_factory=dict)
indexes: Dict[str, IndexConfigT] = Field(default_factory=dict)
templates: Dict[str, IndexConfigTemplateT] = Field(default_factory=dict)
database: Union[SqliteDatabaseConfig, PostgresDatabaseConfig] = SqliteDatabaseConfig(kind='sqlite')
jobs: Dict[str, JobConfig] = Field(default_factory=dict)
hasura: Optional[HasuraConfig] = None
jobs: Optional[Dict[str, JobConfig]] = None
sentry: Optional[SentryConfig] = None

def __post_init_post_parse__(self):
Expand All @@ -721,20 +751,27 @@ def validate(self) -> None:
raise ConfigurationError('SQLite DB engine is not supported by Hasura')

def get_contract(self, name: str) -> ContractConfig:
if name.startswith('<') and name.endswith('>'):
raise ConfigurationError(f'`{name}` variable of index template is not set')

try:
return self.contracts[name]
except KeyError as e:
raise ConfigurationError(f'Contract `{name}` not found in `contracts` config section') from e

def get_datasource(self, name: str) -> DatasourceConfigT:
if name.startswith('<') and name.endswith('>'):
raise ConfigurationError(f'`{name}` variable of index template is not set')

try:
return self.datasources[name]
except KeyError as e:
raise ConfigurationError(f'Datasource `{name}` not found in `datasources` config section') from e

def get_template(self, name: str) -> IndexConfigTemplateT:
if not self.templates:
raise ConfigurationError('`templates` section is missing')
if name.startswith('<') and name.endswith('>'):
raise ConfigurationError(f'`{name}` variable of index template is not set')

try:
return self.templates[name]
except KeyError as e:
Expand Down Expand Up @@ -772,6 +809,7 @@ def resolve_index_templates(self) -> None:
json_template = json.loads(raw_template)
new_index_config = template.__class__(**json_template)
new_index_config.template_values = index_config.values
new_index_config.parent = index_config.parent
self.indexes[index_name] = new_index_config

def _pre_initialize_index(self, index_name: str, index_config: IndexConfigT) -> None:
Expand Down Expand Up @@ -831,6 +869,8 @@ def pre_initialize(self) -> None:
contract_config.name = name
for name, datasource_config in self.datasources.items():
datasource_config.name = name
for name, job_config in self.jobs.items():
job_config.name = name

self.resolve_index_templates()
for index_name, index_config in self.indexes.items():
Expand Down
41 changes: 30 additions & 11 deletions src/dipdup/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from tortoise import Tortoise
from tortoise.transactions import in_transaction

from dipdup.config import ContractConfig, DipDupConfig, IndexTemplateConfig, PostgresDatabaseConfig
from dipdup.datasources import DatasourceT
from dipdup.config import ContractConfig, DipDupConfig, IndexConfig, IndexTemplateConfig, PostgresDatabaseConfig
from dipdup.datasources.datasource import Datasource
from dipdup.exceptions import ContractAlreadyExistsError, IndexAlreadyExistsError
from dipdup.utils import FormattedLogger


# TODO: Dataclasses are cool, everyone loves them. Resolve issue with pydantic in HandlerContext.
# TODO: Dataclasses are cool, everyone loves them. Resolve issue with pydantic serialization.
class DipDupContext:
def __init__(
self,
datasources: Dict[str, DatasourceT],
datasources: Dict[str, Datasource],
config: DipDupConfig,
) -> None:
self.datasources = datasources
Expand Down Expand Up @@ -74,16 +74,18 @@ class HandlerContext(DipDupContext):

def __init__(
self,
datasources: Dict[str, DatasourceT],
datasources: Dict[str, Datasource],
config: DipDupConfig,
logger: FormattedLogger,
template_values: Optional[Dict[str, str]],
datasource: DatasourceT,
datasource: Datasource,
index_config: IndexConfig,
) -> None:
super().__init__(datasources, config)
self.logger = logger
self.template_values = template_values
self.datasource = datasource
self.index_config = index_config

def add_contract(self, name: str, address: str, typename: Optional[str] = None) -> None:
if name in self.config.contracts:
Expand All @@ -102,21 +104,38 @@ def add_index(self, name: str, template: str, values: Dict[str, Any]) -> None:
template=template,
values=values,
)
# NOTE: Notify datasource to subscribe to operations by entrypoint if enabled in index config
self.config.indexes[name].parent = self.index_config
self._updated = True


class RollbackHandlerContext(HandlerContext):
template_values: None
class JobContext(DipDupContext):
"""Job handler context."""

def __init__(
self,
datasources: Dict[str, DatasourceT],
datasources: Dict[str, Datasource],
config: DipDupConfig,
logger: FormattedLogger,
datasource: DatasourceT,
) -> None:
super().__init__(datasources, config)
self.logger = logger

# TODO: Spawning indexes from jobs?


class RollbackHandlerContext(DipDupContext):
def __init__(
self,
datasources: Dict[str, Datasource],
config: DipDupConfig,
logger: FormattedLogger,
datasource: Datasource,
from_level: int,
to_level: int,
) -> None:
super().__init__(datasources, config, logger, None, datasource)
super().__init__(datasources, config)
self.logger = logger
self.datasource = datasource
self.from_level = from_level
self.to_level = to_level
Loading

0 comments on commit 65b9ea3

Please sign in to comment.