diff --git a/src/dipdup/context.py b/src/dipdup/context.py index eaec5a402..ad8d1f65d 100644 --- a/src/dipdup/context.py +++ b/src/dipdup/context.py @@ -12,20 +12,14 @@ # TODO: Dataclasses are cool, everyone loves them. Resolve issue with pydantic in HandlerContext. -class HandlerContext: - """Common handler context.""" - +class DipDupContext: def __init__( self, datasources: Dict[str, DatasourceT], config: DipDupConfig, - logger: FormattedLogger, - template_values: Optional[Dict[str, str]], ) -> None: self.datasources = datasources self.config = config - self.logger = logger - self.template_values = template_values self._updated: bool = False def commit(self) -> None: @@ -70,6 +64,23 @@ async def reindex(self) -> None: await Tortoise._drop_databases() await self.restart() + +class HandlerContext(DipDupContext): + """Common handler context.""" + + def __init__( + self, + datasources: Dict[str, DatasourceT], + config: DipDupConfig, + logger: FormattedLogger, + template_values: Optional[Dict[str, str]], + datasource: DatasourceT, + ) -> None: + super().__init__(datasources, config) + self.logger = logger + self.template_values = template_values + self.datasource = datasource + def add_contract(self, name: str, address: str, typename: Optional[str] = None) -> None: if name in self.config.contracts: raise ConfigurationError(f'Contract `{name}` is already exists') @@ -91,16 +102,17 @@ def add_index(self, name: str, template: str, values: Dict[str, Any]) -> None: class RollbackHandlerContext(HandlerContext): + template_values: None + def __init__( self, datasources: Dict[str, DatasourceT], config: DipDupConfig, logger: FormattedLogger, - datasource: str, + datasource: DatasourceT, from_level: int, to_level: int, ) -> None: - super().__init__(datasources, config, logger, None) - self.datasource = datasource + super().__init__(datasources, config, logger, None, datasource) self.from_level = from_level self.to_level = to_level diff --git a/src/dipdup/datasources/datasource.py b/src/dipdup/datasources/datasource.py new file mode 100644 index 000000000..383efabc6 --- /dev/null +++ b/src/dipdup/datasources/datasource.py @@ -0,0 +1,56 @@ +from abc import ABC +from enum import Enum +from typing import Awaitable, List, Protocol + +from pyee import AsyncIOEventEmitter # type: ignore + +from dipdup.models import BigMapData, OperationData + + +class EventType(Enum): + operations = 'operatitions' + big_maps = 'big_maps' + rollback = 'rollback' + + +class OperationsCallback(Protocol): + def __call__(self, datasource: 'IndexDatasource', operations: List[OperationData]) -> Awaitable[None]: + ... + + +class BigMapsCallback(Protocol): + def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData]) -> Awaitable[None]: + ... + + +class RollbackCallback(Protocol): + def __call__(self, datasource: 'IndexDatasource', from_level: int, to_level: int) -> Awaitable[None]: + ... + + +class IndexDatasource(ABC, AsyncIOEventEmitter): + def on(self, event, f=None) -> None: + raise RuntimeError('Do not use `on` directly') + + def emit(self, event: str, *args, **kwargs) -> None: + if event not in ('new_listener', 'error'): + raise RuntimeError('Do not use `emit` directly') + super().emit(event, *args, **kwargs) + + def on_operations(self, fn: OperationsCallback) -> None: + super().on(EventType.operations, fn) + + def on_big_maps(self, fn: BigMapsCallback) -> None: + super().on(EventType.big_maps, fn) + + def on_rollback(self, fn: RollbackCallback) -> None: + super().on(EventType.rollback, fn) + + def emit_operations(self, operations: List[OperationData]) -> None: + super().emit(EventType.operations, datasource=self, operations=operations) + + def emit_big_maps(self, big_maps: List[BigMapData]) -> None: + super().emit(EventType.big_maps, datasource=self, big_maps=big_maps) + + def emit_rollback(self, from_level: int, to_level: int) -> None: + super().emit(EventType.rollback, datasource=self, from_level=from_level, to_level=to_level) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index f35b6b20b..2ce6b9f03 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -7,7 +7,6 @@ from aiosignalrcore.hub_connection_builder import HubConnectionBuilder # type: ignore from aiosignalrcore.messages.completion_message import CompletionMessage # type: ignore from aiosignalrcore.transport.websockets.connection import ConnectionState # type: ignore -from pyee import AsyncIOEventEmitter # type: ignore from dipdup.config import ( BigMapIndexConfig, @@ -16,6 +15,7 @@ OperationHandlerOriginationPatternConfig, OperationIndexConfig, ) +from dipdup.datasources.datasource import IndexDatasource from dipdup.datasources.proxy import DatasourceRequestProxy from dipdup.datasources.tzkt.enums import TzktMessageType from dipdup.models import BigMapAction, BigMapData, OperationData @@ -254,7 +254,7 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, List[BigMap yield big_maps[0].level, big_maps[: i + 1] -class TzktDatasource(AsyncIOEventEmitter): +class TzktDatasource(IndexDatasource): """Bridge between REST/WS TzKT endpoints and DipDup. * Converts raw API data to models @@ -452,7 +452,7 @@ async def add_index(self, index_config: IndexConfigTemplateT) -> None: else: raise NotImplementedError(f'Index kind `{index_config.kind}` is not supported') - await self.on_connect() + await self._on_connect() def _get_client(self) -> BaseHubConnection: """Create SignalR client, register message callbacks""" @@ -471,10 +471,10 @@ def _get_client(self) -> BaseHubConnection: ) ).build() - self._client.on_open(self.on_connect) - self._client.on_error(self.on_error) - self._client.on('operations', self.on_operation_message) - self._client.on('bigmaps', self.on_big_map_message) + self._client.on_open(self._on_connect) + self._client.on_error(self._on_error) + self._client.on('operations', self._on_operation_message) + self._client.on('bigmaps', self._on_big_map_message) return self._client @@ -485,7 +485,7 @@ async def run(self) -> None: self._logger.info('Starting websocket client') await self._get_client().start() - async def on_connect(self) -> None: + async def _on_connect(self) -> None: """Subscribe to all required channels on established WS connection""" if self._get_client().transport.state != ConnectionState.connected: return @@ -499,7 +499,7 @@ async def on_connect(self) -> None: for address, paths in self._big_map_subscriptions.items(): await self.subscribe_to_big_maps(address, paths) - def on_error(self, message: CompletionMessage) -> NoReturn: + def _on_error(self, message: CompletionMessage) -> NoReturn: """Raise exception from WS server's error message""" raise Exception(message.error) @@ -554,7 +554,7 @@ async def subscribe_to_big_maps(self, address: str, paths: List[str]) -> None: ], ) - async def on_operation_message( + async def _on_operation_message( self, message: List[Dict[str, Any]], ) -> None: @@ -577,18 +577,19 @@ async def on_operation_message( if operation.status != 'applied': continue operations.append(operation) - self.emit("operations", operations) + self.emit_operations(operations) elif message_type == TzktMessageType.REORG: - self.emit("rollback", self.level, current_level) + if self.level is None: + raise RuntimeError + self.emit_rollback(self.level, current_level) else: raise NotImplementedError - async def on_big_map_message( + async def _on_big_map_message( self, message: List[Dict[str, Any]], - sync=False, ) -> None: """Parse and emit raw big map diffs from WS""" for item in message: @@ -606,10 +607,12 @@ async def on_big_map_message( for big_map_json in item['data']: big_map = self.convert_big_map(big_map_json) big_maps.append(big_map) - self.emit("big_maps", big_maps) + self.emit_big_maps(big_maps) elif message_type == TzktMessageType.REORG: - self.emit("rollback", self.level, current_level) + if self.level is None: + raise RuntimeError + self.emit_rollback(self.level, current_level) else: raise NotImplementedError diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 3744442d5..dbf3760c7 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -1,7 +1,6 @@ import asyncio import hashlib import logging -from functools import partial from os.path import join from posix import listdir from typing import Dict, List, cast @@ -26,18 +25,19 @@ StaticTemplateConfig, TzktDatasourceConfig, ) -from dipdup.context import RollbackHandlerContext +from dipdup.context import DipDupContext, RollbackHandlerContext from dipdup.datasources import DatasourceT from dipdup.datasources.bcd.datasource import BcdDatasource +from dipdup.datasources.datasource import IndexDatasource from dipdup.datasources.tzkt.datasource import TzktDatasource from dipdup.exceptions import ConfigurationError, HandlerImportError from dipdup.hasura import configure_hasura -from dipdup.index import BigMapIndex, HandlerContext, Index, OperationIndex +from dipdup.index import BigMapIndex, Index, OperationIndex from dipdup.models import BigMapData, IndexType, OperationData, State class IndexDispatcher: - def __init__(self, ctx: HandlerContext) -> None: + def __init__(self, ctx: DipDupContext) -> None: self._ctx = ctx self._logger = logging.getLogger(__name__) @@ -82,21 +82,21 @@ async def reload_config(self) -> None: self._ctx.reset() - async def dispatch_operations(self, operations: List[OperationData]) -> None: + async def dispatch_operations(self, datasource: TzktDatasource, operations: List[OperationData]) -> None: assert len(set(op.level for op in operations)) == 1 level = operations[0].level for index in self._indexes.values(): - if isinstance(index, OperationIndex): + if isinstance(index, OperationIndex) and index.datasource == datasource: index.push(level, operations) - async def dispatch_big_maps(self, big_maps: List[BigMapData]) -> None: + async def dispatch_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData]) -> None: assert len(set(op.level for op in big_maps)) == 1 level = big_maps[0].level for index in self._indexes.values(): - if isinstance(index, BigMapIndex): + if isinstance(index, BigMapIndex) and index.datasource == datasource: index.push(level, big_maps) - async def _rollback(self, datasource: str, from_level: int, to_level: int) -> None: + async def _rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None: logger = utils.FormattedLogger(ROLLBACK_HANDLER) rollback_fn = self._ctx.config.get_rollback_fn() ctx = RollbackHandlerContext( @@ -111,12 +111,12 @@ async def _rollback(self, datasource: str, from_level: int, to_level: int) -> No async def run(self, oneshot=False) -> None: self._logger.info('Starting index dispatcher') - for name, datasource in self._ctx.datasources.items(): - if not isinstance(datasource, TzktDatasource): + for datasource in self._ctx.datasources.values(): + if not isinstance(datasource, IndexDatasource): continue - datasource.on('operations', self.dispatch_operations) - datasource.on('big_maps', self.dispatch_big_maps) - datasource.on('rollback', partial(self._rollback, datasource=name)) + datasource.on_operations(self.dispatch_operations) + datasource.on_big_maps(self.dispatch_big_maps) + datasource.on_rollback(self._rollback) self._ctx.commit() @@ -143,11 +143,9 @@ def __init__(self, config: DipDupConfig) -> None: self._config = config self._datasources: Dict[str, DatasourceT] = {} self._datasources_by_config: Dict[DatasourceConfigT, DatasourceT] = {} - self._ctx = HandlerContext( + self._ctx = DipDupContext( config=self._config, datasources=self._datasources, - logger=utils.FormattedLogger(__name__), - template_values=None, ) self._index_dispatcher = IndexDispatcher(self._ctx) diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 8064cb76e..74d16835e 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -16,7 +16,7 @@ OperationIndexConfig, OperationType, ) -from dipdup.context import HandlerContext +from dipdup.context import DipDupContext, HandlerContext from dipdup.datasources.tzkt.datasource import BigMapFetcher, OperationFetcher, TzktDatasource from dipdup.models import BigMapAction, BigMapData, BigMapDiff, OperationData, Origination, State, TemporaryState, Transaction from dipdup.utils import FormattedLogger, in_global_transaction @@ -25,7 +25,7 @@ class Index: - def __init__(self, ctx: HandlerContext, config: IndexConfigTemplateT, datasource: TzktDatasource) -> None: + def __init__(self, ctx: DipDupContext, config: IndexConfigTemplateT, datasource: TzktDatasource) -> None: self._ctx = ctx self._config = config self._datasource = datasource @@ -33,6 +33,10 @@ def __init__(self, ctx: HandlerContext, config: IndexConfigTemplateT, datasource self._logger = logging.getLogger(__name__) self._state: Optional[State] = None + @property + def datasource(self) -> TzktDatasource: + return self._datasource + async def get_state(self) -> State: """Get state of index containing current level and config hash""" if self._state is None: @@ -90,7 +94,7 @@ async def _initialize_index_state(self) -> None: class OperationIndex(Index): _config: OperationIndexConfig - def __init__(self, ctx: HandlerContext, config: OperationIndexConfig, datasource: TzktDatasource) -> None: + def __init__(self, ctx: DipDupContext, config: OperationIndexConfig, datasource: TzktDatasource) -> None: super().__init__(ctx, config, datasource) self._queue: Deque[Tuple[int, List[OperationData]]] = deque() self._contract_hashes: Dict[str, Tuple[str, str]] = {} @@ -283,6 +287,7 @@ async def _on_match( config=self._ctx.config, logger=logger, template_values=self._config.template_values, + datasource=self.datasource, ) await handler_config.callback_fn(handler_context, *args) @@ -325,7 +330,7 @@ async def _get_contract_hashes(self, address: str) -> Tuple[str, str]: class BigMapIndex(Index): _config: BigMapIndexConfig - def __init__(self, ctx: HandlerContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None: + def __init__(self, ctx: DipDupContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None: super().__init__(ctx, config, datasource) self._queue: Deque[Tuple[int, List[BigMapData]]] = deque() @@ -423,6 +428,7 @@ async def _on_match( config=self._ctx.config, logger=logger, template_values=self._config.template_values, + datasource=self.datasource, ) await handler_config.callback_fn(handler_context, big_map_context)