Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dispatching bigmaps to indexes with wrong datasource, refactor event emitter related code #75

Merged
merged 2 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions src/dipdup/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,14 @@


# TODO: Dataclasses are cool, everyone loves them. Resolve issue with pydantic in HandlerContext.
class HandlerContext:
"""Common handler context."""

class DipDupContext:
def __init__(
self,
datasources: Dict[str, DatasourceT],
config: DipDupConfig,
logger: FormattedLogger,
template_values: Optional[Dict[str, str]],
) -> None:
self.datasources = datasources
self.config = config
self.logger = logger
self.template_values = template_values
self._updated: bool = False

def commit(self) -> None:
Expand Down Expand Up @@ -70,6 +64,23 @@ async def reindex(self) -> None:
await Tortoise._drop_databases()
await self.restart()


class HandlerContext(DipDupContext):
"""Common handler context."""

def __init__(
self,
datasources: Dict[str, DatasourceT],
config: DipDupConfig,
logger: FormattedLogger,
template_values: Optional[Dict[str, str]],
datasource: DatasourceT,
) -> None:
super().__init__(datasources, config)
self.logger = logger
self.template_values = template_values
self.datasource = datasource

def add_contract(self, name: str, address: str, typename: Optional[str] = None) -> None:
if name in self.config.contracts:
raise ConfigurationError(f'Contract `{name}` is already exists')
Expand All @@ -91,16 +102,17 @@ def add_index(self, name: str, template: str, values: Dict[str, Any]) -> None:


class RollbackHandlerContext(HandlerContext):
template_values: None

def __init__(
self,
datasources: Dict[str, DatasourceT],
config: DipDupConfig,
logger: FormattedLogger,
datasource: str,
datasource: DatasourceT,
from_level: int,
to_level: int,
) -> None:
super().__init__(datasources, config, logger, None)
self.datasource = datasource
super().__init__(datasources, config, logger, None, datasource)
self.from_level = from_level
self.to_level = to_level
56 changes: 56 additions & 0 deletions src/dipdup/datasources/datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from abc import ABC
from enum import Enum
from typing import Awaitable, List, Protocol

from pyee import AsyncIOEventEmitter # type: ignore

from dipdup.models import BigMapData, OperationData


class EventType(Enum):
operations = 'operatitions'
big_maps = 'big_maps'
rollback = 'rollback'


class OperationsCallback(Protocol):
def __call__(self, datasource: 'IndexDatasource', operations: List[OperationData]) -> Awaitable[None]:
...


class BigMapsCallback(Protocol):
def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData]) -> Awaitable[None]:
...


class RollbackCallback(Protocol):
def __call__(self, datasource: 'IndexDatasource', from_level: int, to_level: int) -> Awaitable[None]:
...


class IndexDatasource(ABC, AsyncIOEventEmitter):
def on(self, event, f=None) -> None:
raise RuntimeError('Do not use `on` directly')

def emit(self, event: str, *args, **kwargs) -> None:
if event not in ('new_listener', 'error'):
raise RuntimeError('Do not use `emit` directly')
super().emit(event, *args, **kwargs)

def on_operations(self, fn: OperationsCallback) -> None:
super().on(EventType.operations, fn)

def on_big_maps(self, fn: BigMapsCallback) -> None:
super().on(EventType.big_maps, fn)

def on_rollback(self, fn: RollbackCallback) -> None:
super().on(EventType.rollback, fn)

def emit_operations(self, operations: List[OperationData]) -> None:
super().emit(EventType.operations, datasource=self, operations=operations)

def emit_big_maps(self, big_maps: List[BigMapData]) -> None:
super().emit(EventType.big_maps, datasource=self, big_maps=big_maps)

def emit_rollback(self, from_level: int, to_level: int) -> None:
super().emit(EventType.rollback, datasource=self, from_level=from_level, to_level=to_level)
35 changes: 19 additions & 16 deletions src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from aiosignalrcore.hub_connection_builder import HubConnectionBuilder # type: ignore
from aiosignalrcore.messages.completion_message import CompletionMessage # type: ignore
from aiosignalrcore.transport.websockets.connection import ConnectionState # type: ignore
from pyee import AsyncIOEventEmitter # type: ignore

from dipdup.config import (
BigMapIndexConfig,
Expand All @@ -16,6 +15,7 @@
OperationHandlerOriginationPatternConfig,
OperationIndexConfig,
)
from dipdup.datasources.datasource import IndexDatasource
from dipdup.datasources.proxy import DatasourceRequestProxy
from dipdup.datasources.tzkt.enums import TzktMessageType
from dipdup.models import BigMapAction, BigMapData, OperationData
Expand Down Expand Up @@ -254,7 +254,7 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, List[BigMap
yield big_maps[0].level, big_maps[: i + 1]


class TzktDatasource(AsyncIOEventEmitter):
class TzktDatasource(IndexDatasource):
"""Bridge between REST/WS TzKT endpoints and DipDup.

* Converts raw API data to models
Expand Down Expand Up @@ -452,7 +452,7 @@ async def add_index(self, index_config: IndexConfigTemplateT) -> None:
else:
raise NotImplementedError(f'Index kind `{index_config.kind}` is not supported')

await self.on_connect()
await self._on_connect()

def _get_client(self) -> BaseHubConnection:
"""Create SignalR client, register message callbacks"""
Expand All @@ -471,10 +471,10 @@ def _get_client(self) -> BaseHubConnection:
)
).build()

self._client.on_open(self.on_connect)
self._client.on_error(self.on_error)
self._client.on('operations', self.on_operation_message)
self._client.on('bigmaps', self.on_big_map_message)
self._client.on_open(self._on_connect)
self._client.on_error(self._on_error)
self._client.on('operations', self._on_operation_message)
self._client.on('bigmaps', self._on_big_map_message)

return self._client

Expand All @@ -485,7 +485,7 @@ async def run(self) -> None:
self._logger.info('Starting websocket client')
await self._get_client().start()

async def on_connect(self) -> None:
async def _on_connect(self) -> None:
"""Subscribe to all required channels on established WS connection"""
if self._get_client().transport.state != ConnectionState.connected:
return
Expand All @@ -499,7 +499,7 @@ async def on_connect(self) -> None:
for address, paths in self._big_map_subscriptions.items():
await self.subscribe_to_big_maps(address, paths)

def on_error(self, message: CompletionMessage) -> NoReturn:
def _on_error(self, message: CompletionMessage) -> NoReturn:
"""Raise exception from WS server's error message"""
raise Exception(message.error)

Expand Down Expand Up @@ -554,7 +554,7 @@ async def subscribe_to_big_maps(self, address: str, paths: List[str]) -> None:
],
)

async def on_operation_message(
async def _on_operation_message(
self,
message: List[Dict[str, Any]],
) -> None:
Expand All @@ -577,18 +577,19 @@ async def on_operation_message(
if operation.status != 'applied':
continue
operations.append(operation)
self.emit("operations", operations)
self.emit_operations(operations)

elif message_type == TzktMessageType.REORG:
self.emit("rollback", self.level, current_level)
if self.level is None:
raise RuntimeError
self.emit_rollback(self.level, current_level)

else:
raise NotImplementedError

async def on_big_map_message(
async def _on_big_map_message(
self,
message: List[Dict[str, Any]],
sync=False,
) -> None:
"""Parse and emit raw big map diffs from WS"""
for item in message:
Expand All @@ -606,10 +607,12 @@ async def on_big_map_message(
for big_map_json in item['data']:
big_map = self.convert_big_map(big_map_json)
big_maps.append(big_map)
self.emit("big_maps", big_maps)
self.emit_big_maps(big_maps)

elif message_type == TzktMessageType.REORG:
self.emit("rollback", self.level, current_level)
if self.level is None:
raise RuntimeError
self.emit_rollback(self.level, current_level)

else:
raise NotImplementedError
Expand Down
32 changes: 15 additions & 17 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import hashlib
import logging
from functools import partial
from os.path import join
from posix import listdir
from typing import Dict, List, cast
Expand All @@ -26,18 +25,19 @@
StaticTemplateConfig,
TzktDatasourceConfig,
)
from dipdup.context import RollbackHandlerContext
from dipdup.context import DipDupContext, RollbackHandlerContext
from dipdup.datasources import DatasourceT
from dipdup.datasources.bcd.datasource import BcdDatasource
from dipdup.datasources.datasource import IndexDatasource
from dipdup.datasources.tzkt.datasource import TzktDatasource
from dipdup.exceptions import ConfigurationError, HandlerImportError
from dipdup.hasura import configure_hasura
from dipdup.index import BigMapIndex, HandlerContext, Index, OperationIndex
from dipdup.index import BigMapIndex, Index, OperationIndex
from dipdup.models import BigMapData, IndexType, OperationData, State


class IndexDispatcher:
def __init__(self, ctx: HandlerContext) -> None:
def __init__(self, ctx: DipDupContext) -> None:
self._ctx = ctx

self._logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,21 +82,21 @@ async def reload_config(self) -> None:

self._ctx.reset()

async def dispatch_operations(self, operations: List[OperationData]) -> None:
async def dispatch_operations(self, datasource: TzktDatasource, operations: List[OperationData]) -> None:
assert len(set(op.level for op in operations)) == 1
level = operations[0].level
for index in self._indexes.values():
if isinstance(index, OperationIndex):
if isinstance(index, OperationIndex) and index.datasource == datasource:
index.push(level, operations)

async def dispatch_big_maps(self, big_maps: List[BigMapData]) -> None:
async def dispatch_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData]) -> None:
assert len(set(op.level for op in big_maps)) == 1
level = big_maps[0].level
for index in self._indexes.values():
if isinstance(index, BigMapIndex):
if isinstance(index, BigMapIndex) and index.datasource == datasource:
index.push(level, big_maps)

async def _rollback(self, datasource: str, from_level: int, to_level: int) -> None:
async def _rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None:
logger = utils.FormattedLogger(ROLLBACK_HANDLER)
rollback_fn = self._ctx.config.get_rollback_fn()
ctx = RollbackHandlerContext(
Expand All @@ -111,12 +111,12 @@ async def _rollback(self, datasource: str, from_level: int, to_level: int) -> No

async def run(self, oneshot=False) -> None:
self._logger.info('Starting index dispatcher')
for name, datasource in self._ctx.datasources.items():
if not isinstance(datasource, TzktDatasource):
for datasource in self._ctx.datasources.values():
if not isinstance(datasource, IndexDatasource):
continue
datasource.on('operations', self.dispatch_operations)
datasource.on('big_maps', self.dispatch_big_maps)
datasource.on('rollback', partial(self._rollback, datasource=name))
datasource.on_operations(self.dispatch_operations)
datasource.on_big_maps(self.dispatch_big_maps)
datasource.on_rollback(self._rollback)

self._ctx.commit()

Expand All @@ -143,11 +143,9 @@ def __init__(self, config: DipDupConfig) -> None:
self._config = config
self._datasources: Dict[str, DatasourceT] = {}
self._datasources_by_config: Dict[DatasourceConfigT, DatasourceT] = {}
self._ctx = HandlerContext(
self._ctx = DipDupContext(
config=self._config,
datasources=self._datasources,
logger=utils.FormattedLogger(__name__),
template_values=None,
)
self._index_dispatcher = IndexDispatcher(self._ctx)

Expand Down
14 changes: 10 additions & 4 deletions src/dipdup/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
OperationIndexConfig,
OperationType,
)
from dipdup.context import HandlerContext
from dipdup.context import DipDupContext, HandlerContext
from dipdup.datasources.tzkt.datasource import BigMapFetcher, OperationFetcher, TzktDatasource
from dipdup.models import BigMapAction, BigMapData, BigMapDiff, OperationData, Origination, State, TemporaryState, Transaction
from dipdup.utils import FormattedLogger, in_global_transaction
Expand All @@ -25,14 +25,18 @@


class Index:
def __init__(self, ctx: HandlerContext, config: IndexConfigTemplateT, datasource: TzktDatasource) -> None:
def __init__(self, ctx: DipDupContext, config: IndexConfigTemplateT, datasource: TzktDatasource) -> None:
self._ctx = ctx
self._config = config
self._datasource = datasource

self._logger = logging.getLogger(__name__)
self._state: Optional[State] = None

@property
def datasource(self) -> TzktDatasource:
return self._datasource

async def get_state(self) -> State:
"""Get state of index containing current level and config hash"""
if self._state is None:
Expand Down Expand Up @@ -90,7 +94,7 @@ async def _initialize_index_state(self) -> None:
class OperationIndex(Index):
_config: OperationIndexConfig

def __init__(self, ctx: HandlerContext, config: OperationIndexConfig, datasource: TzktDatasource) -> None:
def __init__(self, ctx: DipDupContext, config: OperationIndexConfig, datasource: TzktDatasource) -> None:
super().__init__(ctx, config, datasource)
self._queue: Deque[Tuple[int, List[OperationData]]] = deque()
self._contract_hashes: Dict[str, Tuple[str, str]] = {}
Expand Down Expand Up @@ -283,6 +287,7 @@ async def _on_match(
config=self._ctx.config,
logger=logger,
template_values=self._config.template_values,
datasource=self.datasource,
)

await handler_config.callback_fn(handler_context, *args)
Expand Down Expand Up @@ -325,7 +330,7 @@ async def _get_contract_hashes(self, address: str) -> Tuple[str, str]:
class BigMapIndex(Index):
_config: BigMapIndexConfig

def __init__(self, ctx: HandlerContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None:
def __init__(self, ctx: DipDupContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None:
super().__init__(ctx, config, datasource)
self._queue: Deque[Tuple[int, List[BigMapData]]] = deque()

Expand Down Expand Up @@ -423,6 +428,7 @@ async def _on_match(
config=self._ctx.config,
logger=logger,
template_values=self._config.template_values,
datasource=self.datasource,
)

await handler_config.callback_fn(handler_context, big_map_context)
Expand Down