Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some Websocket issues, remove Index.head relation #144

Merged
merged 50 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
a4e1731
Fix action order in callbacks, refactor datasource class
droserasprout Sep 26, 2021
3f705e8
Fix race in emitter
droserasprout Sep 26, 2021
30422f2
typo
droserasprout Sep 26, 2021
c9bf752
Another typo
droserasprout Sep 26, 2021
93b7bff
Another typo
droserasprout Sep 26, 2021
ce814ab
Another typo
droserasprout Sep 26, 2021
c3b0747
Another typo
droserasprout Sep 26, 2021
733adfc
Let callbacks handle locks by themselves
droserasprout Sep 26, 2021
b4632b1
FFFFUUU
droserasprout Sep 26, 2021
6ff14d5
Fix missing level on emit
droserasprout Sep 26, 2021
f523d10
Another stupid typo
droserasprout Sep 26, 2021
e5601bf
Emit timeout
droserasprout Sep 26, 2021
a24e5ae
Capture exceptions early
droserasprout Sep 26, 2021
57b9613
Log dead tasks
droserasprout Sep 26, 2021
5936752
Merge branch 'master' into fix/ws-message-race
droserasprout Sep 28, 2021
a088564
Merge branch 'master' into fix/ws-message-race
droserasprout Sep 28, 2021
0c7b8ba
Drop pyee ☠
droserasprout Sep 28, 2021
ba2a4a7
Rename callbacks
droserasprout Sep 28, 2021
212eb43
Unlink head
droserasprout Sep 28, 2021
20481f3
Cleanup
droserasprout Sep 28, 2021
31fd05b
Cleanup, fix ugly database logger setting
droserasprout Sep 28, 2021
615eb22
Typos
droserasprout Sep 28, 2021
2e7d499
Update changelog
droserasprout Sep 28, 2021
f0d5b38
BlockCache improvements
droserasprout Sep 28, 2021
51448a2
typo;
droserasprout Sep 28, 2021
8079152
Ignore missing head
droserasprout Sep 28, 2021
64282a3
Fix dipdup initialization order
droserasprout Sep 28, 2021
ba43f59
Changelog
droserasprout Sep 28, 2021
a33398e
Cleanup
droserasprout Sep 28, 2021
83ccb7c
Fix cleaning up old blocks
droserasprout Sep 29, 2021
659b43d
Drop Index.hash
droserasprout Sep 29, 2021
26978d3
Cleanup
droserasprout Sep 29, 2021
d15ae13
Cleanup
droserasprout Sep 29, 2021
262bcef
Drop `stateless` fields
droserasprout Sep 29, 2021
46e8acf
Drop BlockCache, simplify block hash checks
droserasprout Sep 29, 2021
9b02c68
Cleanup
droserasprout Sep 29, 2021
bb97c1d
Check operation status before converting
droserasprout Sep 29, 2021
0ba5cb5
Comment
droserasprout Sep 29, 2021
a60e8a5
Changelog
droserasprout Sep 29, 2021
a766727
Cleanup, drop useless self.level check in datasource
droserasprout Sep 29, 2021
f06c4e8
Head constraint
droserasprout Sep 29, 2021
5fe6021
Revert multiple heads, add docs
droserasprout Sep 30, 2021
f613517
Changelog
droserasprout Sep 30, 2021
ffda67b
Lint
droserasprout Sep 30, 2021
c007ba4
Kill me please
droserasprout Sep 30, 2021
933cc90
Dont do sql in ws loop
droserasprout Sep 30, 2021
65d401f
Head sub, typing
droserasprout Sep 30, 2021
7b2aff4
Codestyle, cache blocks on init, bigmap fixes
droserasprout Sep 30, 2021
5939568
Typo
droserasprout Sep 30, 2021
071e8ba
Docs
droserasprout Sep 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/dipdup/datasources/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@


class OperationsCallback(Protocol):
def __call__(self, datasource: 'IndexDatasource', operations: List[OperationData], block: HeadBlockData) -> Awaitable[None]:
def __call__(self, datasource: 'IndexDatasource', operations: List[OperationData]) -> Awaitable[None]:
...


class BigMapsCallback(Protocol):
def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData], block: HeadBlockData) -> Awaitable[None]:
def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData]) -> Awaitable[None]:
...


Expand All @@ -23,7 +23,7 @@ def __call__(self, datasource: 'IndexDatasource', from_level: int, to_level: int


class HeadCallback(Protocol):
def __call__(self, datasource: 'IndexDatasource', block: HeadBlockData) -> Awaitable[None]:
def __call__(self, datasource: 'IndexDatasource', head: HeadBlockData) -> Awaitable[None]:
...


Expand Down Expand Up @@ -55,18 +55,18 @@ def on_big_maps(self, fn: BigMapsCallback) -> None:
def on_rollback(self, fn: RollbackCallback) -> None:
self._on_rollback.add(fn)

async def emit_head(self, block: HeadBlockData) -> None:
async def emit_head(self, head: HeadBlockData) -> None:
for fn in self._on_head:
await fn(self, block)
await fn(self, head)

async def emit_operations(self, operations: List[OperationData], block: HeadBlockData) -> None:
async def emit_operations(self, operations: List[OperationData]) -> None:
for fn in self._on_operations:
await fn(self, operations, block)
await fn(self, operations)

async def emit_big_maps(self, big_maps: List[BigMapData], block: HeadBlockData) -> None:
async def emit_big_maps(self, big_maps: List[BigMapData]) -> None:
for fn in self._on_big_maps:
await fn(self, big_maps, block)
await fn(self, big_maps)

async def emit_rollback(self, from_level: int, to_level: int) -> None:
for fn in self._on_rollback:
fn(self, from_level, to_level)
await fn(self, from_level, to_level)
132 changes: 46 additions & 86 deletions src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import datetime, timezone
from decimal import Decimal
from enum import Enum
from typing import Any, AsyncGenerator, DefaultDict, Dict, List, NoReturn, Optional, Set, Tuple, cast
from typing import Any, AsyncGenerator, DefaultDict, Dict, List, NoReturn, Optional, Set, Tuple, Union, cast

from aiohttp import ClientResponseError
from aiosignalrcore.hub.base_hub_connection import BaseHubConnection # type: ignore
Expand All @@ -23,6 +23,7 @@
from dipdup.datasources.datasource import IndexDatasource
from dipdup.datasources.tzkt.enums import TzktMessageType
from dipdup.enums import MessageType
from dipdup.exceptions import DipDupException
from dipdup.models import BigMapAction, BigMapData, BlockData, Head, HeadBlockData, OperationData, QuoteData
from dipdup.utils import groupby, split_by_chunks

Expand Down Expand Up @@ -280,40 +281,54 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, List[BigMap
yield big_maps[0].level, big_maps[: i + 1]


BlockDataT = Union[HeadBlockData, BlockData]


class BlockCache:
def __init__(self) -> None:
def __init__(self, datasource: 'TzktDatasource') -> None:
self._datasource = datasource
self._limit = 10
self._timeout = 60
self._blocks: DefaultDict[int, Optional[HeadBlockData]] = defaultdict(lambda: None)
self._events: DefaultDict[int, asyncio.Event] = defaultdict(asyncio.Event)
self._initial_block: Optional[BlockDataT] = None
self._blocks: Dict[int, BlockDataT] = {}
self._heads: Dict[int, Head] = {}

async def add_block(self, block: HeadBlockData) -> None:
if self._blocks[block.level]:
raise RuntimeError('Attempt to add block {block.level} which is already cached')
async def initialize(self) -> None:
block = await self._datasource.get_head_block()
self._initial_block = block
await self.add_block(block)

async def add_block(self, block: BlockDataT) -> None:
self._blocks[block.level] = block
self._events[block.level].set()

# FIXME: Refactor this
last_blocks = sorted(self._blocks.keys())[-self._limit :]
self._blocks = defaultdict(lambda: None, ({k: v for k, v in self._blocks.items() if k in last_blocks}))
self._events = defaultdict(asyncio.Event, ({k: v for k, v in self._events.items() if k in last_blocks}))
old_blocks = sorted(self._blocks.keys())[-self._limit :]
map(self._blocks.pop, old_blocks)
droserasprout marked this conversation as resolved.
Show resolved Hide resolved

async def get_block(self, level: int, required: bool = False) -> Optional[BlockDataT]:
if level not in self._blocks:
if required:
self._blocks[level] = await self._datasource.get_block(level)
# save Head
else:
return None

async def get_block(self, level: int) -> HeadBlockData:
if self._blocks and level < sorted(self._blocks.keys())[-1] - self._limit:
raise RuntimeError(f'Attemps to get block older than {self._limit} levels from head')
return self._blocks[level]

try:
await asyncio.wait_for(
fut=self._events[level].wait(),
timeout=self._timeout,
)
except asyncio.TimeoutError as e:
raise RuntimeError(f'Block {level} hasn\'t arrived in {self._timeout} seconds. Forgot to subscribe to head?') from e
async def get_initial_block(self) -> BlockDataT:
if not self._initial_block:
raise DipDupException('Attempted to get initial block but cache is not initialized')
return self._initial_block

block = self._blocks[level]
if not block:
raise RuntimeError('Event is set but block is missing in cache')
return block
async def verify_latest_blocks(self) -> None:
...
# NOTE: No need to check hashes of indexes which are not synchronized.
# head = await self.state.head
# if head and self.state.status == IndexStatus.REALTIME:
# block = await self._datasource.get_block(head.level)
# if head.hash != block.hash:
# await self._ctx.reindex(ReindexingReason.BLOCK_HASH_MISMATCH)

async def cleanup_heads(self) -> None:
...


class TzktDatasource(IndexDatasource):
Expand Down Expand Up @@ -347,8 +362,7 @@ def __init__(
self._big_map_subscriptions: Dict[str, Set[str]] = {}
self._ws_client: Optional[BaseHubConnection] = None

self._block_cache: BlockCache = BlockCache()
self._head: Optional[Head] = None
self.block_cache: BlockCache = BlockCache(self)
self._level: Optional[int] = None
self._sync_level: Optional[int] = None

Expand All @@ -364,10 +378,6 @@ def level(self) -> Optional[int]:
def sync_level(self) -> Optional[int]:
return self._sync_level

@property
def head(self) -> Optional[Head]:
return self._head

async def get_similar_contracts(self, address: str, strict: bool = False) -> List[str]:
"""Get list of contracts sharing the same code hash or type hash"""
entrypoint = 'same' if strict else 'similar'
Expand Down Expand Up @@ -730,81 +740,31 @@ async def _extract_message_data(self, type_: MessageType, message: List[Any]) ->
async def _on_operations_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw operations from WS"""
async for level, data in self._extract_message_data(MessageType.operation, message):
# NOTE: Wait for head message to arrive
block = await self._block_cache.get_block(level)

operations = []
for operation_json in data:
operation = self.convert_operation(operation_json)
if operation.status != 'applied':
continue
operations.append(operation)
if operations:
await self.emit_operations(operations, block)
await self.emit_operations(operations)

async def _on_big_maps_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw big map diffs from WS"""
async for level, data in self._extract_message_data(MessageType.big_map, message):
# NOTE: Wait for head message to arrive
block = await self._block_cache.get_block(level)

big_maps = []
for big_map_json in data:
big_map = self.convert_big_map(big_map_json)
big_maps.append(big_map)
await self.emit_big_maps(big_maps, block)
await self.emit_big_maps(big_maps)

async def _on_head_message(self, message: List[Dict[str, Any]]) -> None:
"""Parse and emit raw head block from WS"""
async for _, data in self._extract_message_data(MessageType.head, message):
block = self.convert_head_block(data)
await self._update_head(block)
await self.block_cache.add_block(block)
await self.emit_head(block)

async def _update_head(self, block: HeadBlockData) -> None:
"""Update Head model linked to datasource from WS head message"""
await self._block_cache.add_block(block)
created = False
if self._head is None:
self._head, created = await Head.get_or_create(
name=self._http._url,
defaults=dict(
level=block.level,
hash=block.hash,
timestamp=block.timestamp,
),
)
if not created:
self._head.level = block.level # type: ignore
self._head.hash = block.hash # type: ignore
self._head.timestamp = block.timestamp # type: ignore
await self._head.save()

# FIXME: I don't like this approach, too hacky.
async def set_head_from_http(self) -> None:
"""Set block from `get_head_block` HTTP method for indexes to use the same level during initial sync"""
if self._head:
raise RuntimeError('Head is already set')

block = await self.get_head_block()
self._head, created = await Head.get_or_create(
name=self._http._url,
defaults=dict(
level=block.level,
hash=block.hash,
timestamp=block.timestamp,
),
)
if not created:
self._head.level = block.level # type: ignore
self._head.hash = block.hash # type: ignore
self._head.timestamp = block.timestamp # type: ignore
await self._head.save()

self._logger.info('Datasource head set to block with level %s', self._head.level)

# NOTE: No need to emit?

@classmethod
def convert_operation(cls, operation_json: Dict[str, Any]) -> OperationData:
"""Convert raw operation message from WS/REST into dataclass"""
Expand Down
21 changes: 9 additions & 12 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from dipdup.exceptions import ConfigInitializationException, DipDupException, ReindexingReason
from dipdup.hasura import HasuraGateway
from dipdup.index import BigMapIndex, Index, OperationIndex
from dipdup.models import BigMapData, Contract, HeadBlockData
from dipdup.models import BigMapData, Contract
from dipdup.models import Index as IndexState
from dipdup.models import IndexStatus, OperationData, Schema
from dipdup.scheduler import add_job, create_scheduler
Expand All @@ -53,7 +53,6 @@ async def run(
) -> None:
self._logger.info('Starting index dispatcher')
await self._subscribe_to_datasource_events()
await self._set_datasource_heads()
await self._load_index_states()

while not self._stopped:
Expand All @@ -76,7 +75,7 @@ async def run(
if start_scheduler_event and not start_scheduler_event.is_set():
# NOTE: Do not check with every_index_is, indexes become REALTIME after first message from WS is received
for index in self._indexes.values():
if index.state.level != index.datasource.head.level:
if index.state.status != IndexStatus.REALTIME:
break
else:
start_scheduler_event.set()
Expand Down Expand Up @@ -109,11 +108,6 @@ async def _subscribe_to_datasource_events(self) -> None:
datasource.on_big_maps(self._on_big_maps) # type: ignore
datasource.on_rollback(self._on_rollback) # type: ignore

async def _set_datasource_heads(self) -> None:
for datasource in self._ctx.datasources.values():
if isinstance(datasource, TzktDatasource):
await datasource.set_head_from_http()

async def _load_index_states(self) -> None:
await self._fetch_contracts()
index_states = await IndexState.filter().all()
Expand All @@ -137,19 +131,19 @@ async def _load_index_states(self) -> None:
else:
self._logger.warning('Index `%s` was removed from config, ignoring', name)

async def _on_operations(self, datasource: TzktDatasource, operations: List[OperationData], block: HeadBlockData) -> None:
async def _on_operations(self, datasource: TzktDatasource, operations: List[OperationData]) -> None:
assert len(set(op.level for op in operations)) == 1
level = operations[0].level
for index in self._indexes.values():
if isinstance(index, OperationIndex) and index.datasource == datasource:
index.push(level, operations, block)
index.push(level, operations)

async def _on_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData], block: HeadBlockData) -> None:
async def _on_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData]) -> None:
assert len(set(op.level for op in big_maps)) == 1
level = big_maps[0].level
for index in self._indexes.values():
if isinstance(index, BigMapIndex) and index.datasource == datasource:
index.push(level, big_maps, block)
index.push(level, big_maps)

async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None:
if from_level - to_level == 1:
Expand Down Expand Up @@ -333,6 +327,9 @@ async def _set_up_datasources(self, stack: AsyncExitStack) -> None:
for datasource in self._datasources.values():
await stack.enter_async_context(datasource)

if isinstance(datasource, TzktDatasource):
await datasource.block_cache.initialize()

async def _set_up_index_dispatcher(
self,
tasks: Set[Task],
Expand Down
Loading