Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cache prices, LogFilter and TraceFilter classes #378

Merged
merged 110 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
110 commits
Select commit Hold shift + click to select a range
e734db0
fix: syntax err
BobTheBuidler Sep 28, 2023
d3fcefa
feat: Logs and Events containers
BobTheBuidler Sep 28, 2023
79b8cd9
feat: faster loading chainlink
BobTheBuidler Sep 28, 2023
4f4d4e7
fix: refactor Logs for empty batches
BobTheBuidler Sep 28, 2023
d3e0414
fix: unsupported callable
BobTheBuidler Sep 28, 2023
a23a468
fix: unexpected arg
BobTheBuidler Sep 28, 2023
45fe7c3
fix: missing arg
BobTheBuidler Sep 28, 2023
adbc773
fix: syntax error
BobTheBuidler Sep 28, 2023
7cc3483
fix: AttributeError
BobTheBuidler Sep 28, 2023
6a189a6
fix: handle excs in loaders
BobTheBuidler Sep 28, 2023
d32d04b
feat: cache Logs to disk
BobTheBuidler Sep 28, 2023
2840ed1
fix: missing reverse attribute
BobTheBuidler Sep 28, 2023
0dc3426
fix: broken db generation
BobTheBuidler Sep 28, 2023
94cc801
fix: mapping not generated
BobTheBuidler Sep 28, 2023
5008436
fix: TypeError
BobTheBuidler Sep 28, 2023
9683914
fix: caching logic
BobTheBuidler Sep 28, 2023
152d676
fix: load cache
BobTheBuidler Sep 28, 2023
565e6ee
chore: refactor
BobTheBuidler Sep 28, 2023
f21d805
feat: better log caching
BobTheBuidler Sep 30, 2023
c89baa8
fix: cache info err
BobTheBuidler Sep 30, 2023
a76115d
fix: composite key
BobTheBuidler Sep 30, 2023
0c78300
fix: missing import
BobTheBuidler Sep 30, 2023
875a369
fix: TransactionIntegrityError
BobTheBuidler Sep 30, 2023
321ad78
fix: missing import
BobTheBuidler Sep 30, 2023
82dad7d
fix: cache info logic
BobTheBuidler Sep 30, 2023
5e30656
fix: mixing objects from diff transactions
BobTheBuidler Sep 30, 2023
a264d64
fix: none type not iterable
BobTheBuidler Sep 30, 2023
671f2fb
fix: unpacking err
BobTheBuidler Sep 30, 2023
2620ba6
fix: set cache info
BobTheBuidler Sep 30, 2023
128cd27
feat: temp logger
BobTheBuidler Sep 30, 2023
eac15ca
feat: adjustable batch size
BobTheBuidler Sep 30, 2023
36eb8fb
fix: cached property err
BobTheBuidler Sep 30, 2023
68dcde2
feat: cache incrementally
BobTheBuidler Sep 30, 2023
7df4666
fix: AttributeError
BobTheBuidler Sep 30, 2023
421597d
feat: verbose fetching
BobTheBuidler Sep 30, 2023
148b65a
fix: syntax err
BobTheBuidler Sep 30, 2023
a917778
fix: syntax err
BobTheBuidler Sep 30, 2023
4d11428
fix: set_cache_info
BobTheBuidler Oct 1, 2023
30dcb0f
fix: log cache info key
BobTheBuidler Oct 1, 2023
7baccb5
fix: log cache info key
BobTheBuidler Oct 1, 2023
5a210d2
fix: missing commit
BobTheBuidler Oct 1, 2023
2d268f7
fix: incorrect select
BobTheBuidler Oct 1, 2023
2f7ed2b
fix: log cache info
BobTheBuidler Oct 1, 2023
08fb2b7
fix: log cache info
BobTheBuidler Oct 1, 2023
edbc0c6
fix: log cache info
BobTheBuidler Oct 1, 2023
e93cad7
chore: add logger
BobTheBuidler Oct 1, 2023
1bae681
fix: UnboundLocalError
BobTheBuidler Oct 1, 2023
6176f88
feat: cache prices
BobTheBuidler Oct 1, 2023
355f476
fix: missing subclass def
BobTheBuidler Oct 1, 2023
8e6b16b
fix: missing reverse attribute
BobTheBuidler Oct 1, 2023
e3654a7
fix: cache price for eee
BobTheBuidler Oct 1, 2023
11b270f
feat: skip_cache kwarg
BobTheBuidler Oct 1, 2023
830c00e
fix: walrus
BobTheBuidler Oct 1, 2023
8a68175
fix: TransactionIntegrityError
BobTheBuidler Oct 1, 2023
cda94d1
feat: better entity specs
BobTheBuidler Oct 1, 2023
afadc84
feat: dont block to cache
BobTheBuidler Oct 1, 2023
5cb3b19
fix: caching out of order
BobTheBuidler Oct 1, 2023
65e8675
fix: _load_cache
BobTheBuidler Oct 1, 2023
c4d6dcf
fix: missing import
BobTheBuidler Oct 1, 2023
481c25d
fix: _load_cache
BobTheBuidler Oct 1, 2023
56a5e88
fix: _cache_log
BobTheBuidler Oct 1, 2023
95dea9c
fix: OptimisticCheckError
BobTheBuidler Oct 1, 2023
76497a2
fix: mixing objs from diff txs
BobTheBuidler Oct 1, 2023
81a7403
fix: unencodable type
BobTheBuidler Oct 1, 2023
9e6834a
fix: set_price
BobTheBuidler Oct 1, 2023
4b9c93d
fix: _cache_log
BobTheBuidler Oct 1, 2023
c14f3a5
feat: enc_hook
BobTheBuidler Oct 1, 2023
5c426eb
fix: TypeError
BobTheBuidler Oct 1, 2023
393e7eb
fix: TypeError
BobTheBuidler Oct 1, 2023
50400b4
fix: AttributeError
BobTheBuidler Oct 1, 2023
71a44f8
fix: _cache_log
BobTheBuidler Oct 1, 2023
2de91e6
fix: missing import
BobTheBuidler Oct 1, 2023
dec36e8
fix: UnboundLocalError
BobTheBuidler Oct 1, 2023
ba5d9e4
fix: cache info updates
BobTheBuidler Oct 1, 2023
017a6fe
chore: refactor
BobTheBuidler Oct 1, 2023
dfe5fc9
chore: refactor db files
BobTheBuidler Oct 1, 2023
448ebb1
chore: refactor logs files
BobTheBuidler Oct 1, 2023
1f6603f
fix: InterfaceError
BobTheBuidler Oct 1, 2023
279204d
fix: db already bound
BobTheBuidler Oct 1, 2023
b4aa5c5
fix: AttributeError
BobTheBuidler Oct 1, 2023
6fcd7b0
fix: query too complex for pony
BobTheBuidler Oct 1, 2023
474a8dc
fix: broken import
BobTheBuidler Oct 1, 2023
befc7a9
feat: v0 trace caching
BobTheBuidler Oct 2, 2023
1be0932
fix: missing import
BobTheBuidler Oct 2, 2023
2cef7ec
fix: missing reverse attribute
BobTheBuidler Oct 2, 2023
9c642e1
fix: AttributeError
BobTheBuidler Oct 2, 2023
fe39658
chore: refactor
BobTheBuidler Oct 2, 2023
ccdc66f
fix: wrong name
BobTheBuidler Oct 2, 2023
03c3127
fix: wrong name
BobTheBuidler Oct 2, 2023
39f77cf
fix: invalid kwarg
BobTheBuidler Oct 2, 2023
e00e44d
feat: pruning
BobTheBuidler Oct 2, 2023
3c1a5b1
fix: some stuff
BobTheBuidler Oct 2, 2023
c599c93
fix: raise e
BobTheBuidler Oct 2, 2023
83edf05
feat: debug loggers
BobTheBuidler Oct 2, 2023
b44ff6c
fix: improper dunder
BobTheBuidler Oct 2, 2023
9f2c297
feat: debug loggers
BobTheBuidler Oct 2, 2023
95dcb00
fix: arg err
BobTheBuidler Oct 2, 2023
8a0e595
fix: typo
BobTheBuidler Oct 2, 2023
9ce48d2
feat: debug loggers
BobTheBuidler Oct 2, 2023
ae2b0ba
fix: if check causing loop
BobTheBuidler Oct 2, 2023
8dba4b1
feat: ProcessedEvents container
BobTheBuidler Oct 2, 2023
19f7dc1
fix: syntax err
BobTheBuidler Oct 2, 2023
d190318
feat: extract ASyncIterable classes
BobTheBuidler Oct 2, 2023
03ef5b7
fix: extra kwarg
BobTheBuidler Oct 2, 2023
e47cfd9
fix: abc init err
BobTheBuidler Oct 2, 2023
8975dd4
fix: chainlink feeds
BobTheBuidler Oct 3, 2023
8053a4a
chore: refactor
BobTheBuidler Oct 3, 2023
68c8dc6
fix: refactor chainlink
BobTheBuidler Oct 3, 2023
46e27a3
fix: type err
BobTheBuidler Oct 3, 2023
96b8646
feat: chunks per batch
BobTheBuidler Oct 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ checksum_dict>=1.1.1
dank_mids>=4.20.50
eth-brownie>=1.18.1,<1.20
eth_retry>=0.1.17,<0.2
ez-a-sync>=0.7.4
ez-a-sync>=0.9.0
joblib>=1.0.1
multicall>=0.8.1
pony
8 changes: 5 additions & 3 deletions tests/prices/test_chainlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_chainlink_get_feed(token):
async def test_chainlink_latest(token):
if not await chainlink.get_price(token):
feed = await chainlink.get_feed(token)
assert await feed.aggregator.coroutine() == ZERO_ADDRESS, 'no current price available'
assert await feed.contract.aggregator.coroutine() == ZERO_ADDRESS, 'no current price available'


@mainnet_only
Expand All @@ -136,10 +136,12 @@ async def test_chainlink_before_registry(token):
feed = await chainlink.get_feed(token, sync=False)
if await contract_creation_block_async(feed.address) > test_block:
pytest.skip('not applicable to feeds deployed after test block')
price = await chainlink.get_price(token, block=test_block)
print(type(chainlink.get_price))
price = chainlink.get_price(token, block=test_block)
price = await price
if not price:
feed = await chainlink.get_feed(token)
assert await feed.aggregator.coroutine() == ZERO_ADDRESS, 'no price available before registry'
assert await feed.contract.aggregator.coroutine() == ZERO_ADDRESS, 'no price available before registry'


def test_chainlink_nonexistent():
Expand Down
235 changes: 235 additions & 0 deletions y/_db/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@

import abc
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import (Any, AsyncIterator, Callable, Generic, List, NoReturn,
Optional, Type, TypeVar)

from a_sync.iter import ASyncIterable
from a_sync.primitives.executor import _AsyncExecutorMixin
from a_sync.primitives.locks.counter import CounterLock
from dank_mids.semaphores import BlockSemaphore
from hexbytes import HexBytes
from tqdm.asyncio import tqdm_asyncio
from web3.datastructures import AttributeDict
from web3.middleware.filter import block_ranges

from y._db.exceptions import CacheNotPopulatedError
from y.constants import BIG_VALUE, thread_pool_executor
from y.utils.dank_mids import dank_w3

T = TypeVar('T')
S = TypeVar('S')
M = TypeVar('M')

logger = logging.getLogger(__name__)
executor = ThreadPoolExecutor(16)

def enc_hook(obj: Any) -> bytes:
if isinstance(obj, AttributeDict):
return dict(obj)
elif isinstance(obj, HexBytes):
return obj.hex()
raise NotImplementedError(obj)

def dec_hook(typ: Type[T], obj: bytes) -> T:
if typ == HexBytes:
return HexBytes(obj)
raise ValueError(f"{typ} is not a valid type for decoding")

class DiskCache(Generic[S, M], metaclass=abc.ABCMeta):
__slots__ = []
@abc.abstractmethod
def set_metadata(self, from_block: int, done_thru: int) -> None:
"""Updates the cache metadata to indicate the cache is populated from block `from_block` to block `to_block`."""
@abc.abstractmethod
def is_cached_thru(self, from_block: int) -> int:
"""Returns max cached block for this cache or 0 if not cached."""
@abc.abstractmethod
def select(self, from_block: int, to_block: int) -> List[S]:
"""Selects all cached objects from block `from_block` to block `to_block`"""
def check_and_select(self, from_block: int, to_block: int) -> List[S]:
"""
Selects all cached objects from block `from_block` to block `to_block` if the cache is fully populated.
Raises `CacheNotPopulatedError` if it is not.
"""
if self.is_cached_thru(from_block) >= to_block:
return self.select(from_block, to_block)
else:
raise CacheNotPopulatedError(self, from_block, to_block)

C = TypeVar('C', bound=DiskCache)

class _DiskCachedMixin(Generic[T, C], metaclass=abc.ABCMeta):
__slots__ = 'is_reusable', '_cache', '_executor', '_objects', '_pruned'
def __init__(
self,
executor: _AsyncExecutorMixin = thread_pool_executor,
is_reusable: bool = True,
):
self.is_reusable = is_reusable
self._cache = None
self._executor = executor
self._objects: List[T] = []
self._pruned = 0
@abc.abstractproperty
def cache(self) -> C:
...
@abc.abstractproperty
def insert_to_db(self) -> Callable[[T], None]:
...
def _extend(self, objs) -> None:
"""Override this to pre-process objects before storing."""
return self._objects.extend(objs)
def _remove(self, obj: T) -> None:
self._objects.remove(obj)
self._pruned += 1
async def _load_cache(self, from_block: int) -> int:
"""
Loads cached logs from disk.
Returns max block of logs loaded from cache.
"""
logger.debug('checking to see if %s is cached in local db', self)
if cached_thru := await self._executor.run(self.cache.is_cached_thru, from_block):
logger.debug('%s is cached thru block %s, loading from db', self, cached_thru)
self._extend(await self._executor.run(self.cache.select, from_block, cached_thru))
logger.info('%s loaded %s objects thru block %s from disk', self, len(self._objects), cached_thru)
return cached_thru
return from_block - 1

class Filter(ASyncIterable[T], _DiskCachedMixin[T, C]):
__slots__ = 'from_block', 'to_block', '_chunk_size', '_chunks_per_batch', '_exc', '_interval', '_lock', '_semaphore', '_task', '_verbose'
def __init__(
self,
from_block: int,
*,
chunk_size: int = 10_000,
chunks_per_batch: int = 20,
interval: int = 300,
semaphore: Optional[BlockSemaphore] = None,
executor: _AsyncExecutorMixin = thread_pool_executor,
is_reusable: bool = True,
verbose: bool = False,
):
self.from_block = from_block
self._chunk_size = chunk_size
self._chunks_per_batch = chunks_per_batch
self._exc = None
self._interval = interval
self._lock = CounterLock()
self._semaphore = semaphore
self._task = None
self._verbose = verbose
super().__init__(executor=executor, is_reusable=is_reusable)

def __aiter__(self) -> AsyncIterator[T]:
return self._objects_thru(block=None).__aiter__()

@abc.abstractmethod
async def _fetch_range(self, from_block: int, to_block: int) -> List[T]:
...

@property
def semaphore(self) -> BlockSemaphore:
if self._semaphore is None:
self._semaphore = BlockSemaphore(self._chunks_per_batch)
return self._semaphore

def _get_block_for_obj(self, obj: T) -> int:
"""Override this as needed for different object types"""
return obj['blockNumber']

async def _objects_thru(self, block: Optional[int]) -> AsyncIterator[T]:
self._ensure_task()
yielded = 0
done_thru = 0
while True:
if block is None or done_thru < block:
await self._lock.wait_for(done_thru + 1)
if self._exc:
raise self._exc
if to_yield := self._objects[yielded-self._pruned:]:
for obj in to_yield:
if block and self._get_block_for_obj(obj) > block:
return
if not self.is_reusable:
self._remove(obj)
yield obj
yielded += 1
elif block and done_thru > block:
return
done_thru = self._lock.value

async def __fetch(self) -> NoReturn:
try:
await self._fetch()
except Exception as e:
logger.exception(e)
self._exc = e
self._lock.set(BIG_VALUE)
raise e

async def _fetch(self) -> NoReturn:
"""Override this if you want"""
await self._loop(self.from_block)

async def _fetch_range_wrapped(self, i: int, range_start: int, range_end: int) -> List[T]:
async with self.semaphore[range_end]:
logger.debug("fetching %s block %s to %s", self, range_start, range_end)
return i, range_end, await self._fetch_range(range_start, range_end)

async def _loop(self, from_block: int) -> NoReturn:
logger.debug('starting work loop for %s', self)
self._lock.set(await self._load_cache(from_block))
while True:
await self._load_new_objects(start_from_block=from_block)
await asyncio.sleep(self._interval)

async def _load_new_objects(self, to_block: Optional[int] = None, start_from_block: Optional[int] = None) -> None:
logger.debug('loading new objects for %s', self)
start = v + 1 if (v := self._lock.value) else start_from_block or self.from_block
end = to_block or await dank_w3.eth.block_number
await self._load_range(start, end)

async def _load_range(self, from_block: int, to_block: int) -> None:
logger.debug('loading block range %s to %s', from_block, to_block)
db_insert_tasks = []
cache_info_tasks = []
batches_yielded = 0
done = {}
as_completed = tqdm_asyncio.as_completed if self._verbose else asyncio.as_completed
coros = [
self._fetch_range_wrapped(i, start, end)
for i, (start, end)
in enumerate(block_ranges(from_block, to_block, self._chunk_size))
if i <= self._chunks_per_batch
]
for objs in as_completed(coros, timeout=None):
i, end, objs = await objs
done[i] = end, objs
for i in range(len(coros)):
if batches_yielded > i:
continue
if i not in done:
if db_insert_tasks:
await asyncio.gather(*db_insert_tasks)
db_insert_tasks.clear()
if cache_info_tasks:
await cache_info_tasks[-1]
cache_info_tasks.clear()
break
end, objs = done.pop(i)
self._extend(objs)
db_insert_tasks.extend(self._executor.run(self.insert_to_db, obj) for obj in objs)
cache_info_tasks.append(self._executor.run(self.cache.set_metadata, from_block, end))
batches_yielded += 1
self._lock.set(end)

def _ensure_task(self) -> None:
if self._task is None:
logger.debug('creating task for %s', self)
self._task = asyncio.create_task(self.__fetch())
if self._task.done() and (e := self._task.exception()):
raise e

68 changes: 64 additions & 4 deletions y/_db/entities.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@

from contextlib import suppress
from datetime import datetime
from decimal import Decimal
from typing import Any

from pony.orm import (Database, Optional, PrimaryKey, Required, Set,
composite_key)
TransactionIntegrityError, commit, composite_key,
db_session)

db = Database()

Expand All @@ -27,6 +31,8 @@ class Chain(db.Entity, _AsyncEntityMixin):

blocks = Set("Block")
addresses = Set("Address")
log_cached = Set("LogCacheInfo")
trace_caches = Set("TraceCacheInfo")

class Block(db.Entity, _AsyncEntityMixin):
_pk = PrimaryKey(int, auto=True)
Expand All @@ -38,7 +44,10 @@ class Block(db.Entity, _AsyncEntityMixin):
composite_key(chain, number)
composite_key(chain, hash)

prices = Set("Price", reverse="block", cascade_delete=False)
contracts_deployed = Set("Contract", reverse="deploy_block")
logs = Set("Log", reverse="block", cascade_delete=False)
traces = Set("Trace", reverse="block", cascade_delete=False)

class Address(db.Entity, _AsyncEntityMixin):
_pk = PrimaryKey(int, auto=True)
Expand All @@ -51,11 +60,62 @@ class Address(db.Entity, _AsyncEntityMixin):
contracts_deployed = Set("Contract", reverse="deployer")

class Contract(Address):
deployer = Optional(Address, reverse='contracts_deployed', lazy=True)
deploy_block = Optional(Block, reverse='contracts_deployed', lazy=True)
deployer = Optional(Address, reverse='contracts_deployed', lazy=True, cascade_delete=False)
deploy_block = Optional(Block, reverse='contracts_deployed', lazy=True, cascade_delete=False)

class Token(Contract):
symbol = Optional(str, lazy=True)
name = Optional(str, lazy=True)
decimals = Optional(int, lazy=True)
bucket = Optional(str, lazy=True)
bucket = Optional(str, index=True, lazy=True)

prices = Set("Price", reverse="token")

class Price(db.Entity):
dbid = PrimaryKey(int, auto=True)
block = Required(Block, index=True, lazy=True)
token = Required(Token, index=True, lazy=True)
composite_key(block, token)
price = Required(Decimal)

class TraceCacheInfo(db.Entity):
chain = Required(Chain, index=True)
to_addresses = Required(bytes, index=True)
from_addresses = Required(bytes, index=True)
composite_key(chain, to_addresses, from_addresses)
cached_from = Required(int)
cached_thru = Required(int)

class LogCacheInfo(db.Entity):
chain = Required(Chain, index=True)
address = Required(str, index=True)
topics = Required(bytes)
composite_key(chain, address, topics)
cached_from = Required(int)
cached_thru = Required(int)

class Log(db.Entity):
block = Required(Block, index=True, lazy=True)
transaction_hash = Required(str, lazy=True)
log_index = Required(int, lazy=True)
composite_key(block, transaction_hash, log_index)

address = Required(str, index=True, lazy=True)
topic0 = Required(str, index=True, lazy=True)
topic1 = Optional(str, index=True, lazy=True)
topic2 = Optional(str, index=True, lazy=True)
topic3 = Optional(str, index=True, lazy=True)
raw = Required(bytes, lazy=True)

class Trace(db.Entity):
block = Required(Block, index=True, lazy=True)
hash = Required(str, index=True, lazy=True)
from_address = Required(str, index=True, lazy=True)
to_address = Required(str, index=True, lazy=True)
raw = Required(bytes)

@db_session
def insert(type: db.Entity, **kwargs: Any) -> None:
with suppress(TransactionIntegrityError):
type(**kwargs)
commit()
3 changes: 3 additions & 0 deletions y/_db/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

class CacheNotPopulatedError(Exception):
pass
18 changes: 18 additions & 0 deletions y/_db/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

from pony.orm import BindingError, TransactionError

from y._db.config import connection_settings
from y._db.entities import db
from y._db.utils.utils import get_chain, get_block

try:
db.bind(**connection_settings, create_db=True)
db.generate_mapping(create_tables=True)
except TransactionError as e:
if str(e) != "@db_session-decorated create_tables() function with `ddl` option cannot be called inside of another db_session":
raise e
except BindingError as e:
if not str(e).startswith('Database object was already bound to'):
raise e

__all__ = 'get_chain', 'get_block'
Loading
Loading