From ccb9519cff4196662b6511b11bd6a0cd224cf25e Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Mon, 8 Apr 2024 23:50:00 +0000 Subject: [PATCH] feat: optimize db entity handling --- scripts/exporters/transactions.py | 109 +++++++------- scripts/exporters/treasury_transactions.py | 42 +++--- yearn/entities.py | 24 +-- yearn/outputs/postgres/utils.py | 161 +++++++++++++-------- yearn/partners/snapshot.py | 10 +- 5 files changed, 205 insertions(+), 141 deletions(-) diff --git a/scripts/exporters/transactions.py b/scripts/exporters/transactions.py index 0e3113f59..7129109e4 100644 --- a/scripts/exporters/transactions.py +++ b/scripts/exporters/transactions.py @@ -5,24 +5,28 @@ from decimal import Decimal from typing import Optional +import dank_mids import pandas as pd import sentry_sdk +from async_lru import alru_cache from brownie import ZERO_ADDRESS, chain, web3 from brownie.exceptions import BrownieEnvironmentWarning +from brownie.network.event import _EventItem from multicall.utils import await_awaitable from pony.orm import db_session from web3._utils.abi import filter_by_name from web3._utils.events import construct_event_topic_set -from y.networks import Network +from y import ERC20, Network, get_block_timestamp_async from y.utils.events import get_logs_asap from yearn.entities import UserTx from yearn.events import decode_logs from yearn.exceptions import BatchSizeError -from yearn.outputs.postgres.utils import (cache_address, cache_chain, - cache_token, last_recorded_block) +from yearn.outputs.postgres.utils import (address_dbid, chain_dbid, cache_token, + last_recorded_block, token_dbid) from yearn.prices.magic import _get_price from yearn.typing import Block +from yearn.utils import threads from yearn.yearn import Yearn sentry_sdk.set_tag('script','transactions_exporter') @@ -39,7 +43,7 @@ Network.Gnosis: 2_000_000, Network.Arbitrum: 1_500_000, Network.Optimism: 4_000_000, - Network.Base: 500_000, + Network.Base: 2_000_000, }[chain.id] FIRST_END_BLOCK = { @@ -56,13 +60,11 @@ def main(): while True: cached_thru = last_recorded_block(UserTx) _check_for_infinite_loop(_cached_thru_from_last_run, cached_thru) - process_and_cache_user_txs(cached_thru) + await_awaitable(process_and_cache_user_txs(cached_thru)) _cached_thru_from_last_run = cached_thru time.sleep(1) - -@db_session -def process_and_cache_user_txs(last_saved_block=None): +async def process_and_cache_user_txs(last_saved_block=None): # NOTE: We look 50 blocks back to avoid uncles and reorgs max_block_to_cache = chain.height - 50 start_block = last_saved_block + 1 if last_saved_block else None @@ -73,38 +75,42 @@ def process_and_cache_user_txs(last_saved_block=None): ) if start_block and start_block > end_block: end_block = start_block - vaults = await_awaitable(yearn.active_vaults_at(end_block)) - df = pd.concat(await_awaitable(asyncio.gather(*[get_token_transfers(vault.vault, start_block, end_block) for vault in vaults]))) if vaults else pd.DataFrame() + vaults = await yearn.active_vaults_at(end_block) + df = pd.concat(await asyncio.gather(*[get_token_transfers(vault.vault, start_block, end_block) for vault in vaults])) if vaults else pd.DataFrame() if len(df): # NOTE: We want to insert txs in the order they took place, so wallet exporter # won't have issues in the event that transactions exporter fails mid-run. df = df.sort_values('block') - for index, row in df.iterrows(): - # this addresses one tx with a crazy price due to yvpbtc v1 pricePerFullShare bug. - price = row.price if len(str(round(row.price))) <= 20 else 99999999999999999999 - usd = row.value_usd if len(str(round(row.value_usd))) <= 20 else 99999999999999999999 - - UserTx( - chain=cache_chain(), - vault=cache_token(row.token), - timestamp=row.timestamp, - block=row.block, - hash=row.hash, - log_index=row.log_index, - type=row.type, - from_address=cache_address(row['from']), - to_address=cache_address(row['to']), - amount = row.amount, - price = Decimal(price), - value_usd = Decimal(usd), - gas_used = row.gas_used, - gas_price = row.gas_price - ) + await asyncio.gather(*(insert_user_tx(row) for index, row in df.iterrows())) if start_block == end_block: logger.info(f'{len(df)} user txs exported to postrges [block {start_block}]') else: logger.info(f'{len(df)} user txs exported to postrges [blocks {start_block}-{end_block}]') +async def insert_user_tx(row) -> None: + chain_pk = chain_dbid() + vault_dbid, from_address_dbid, to_address_dbid = await asyncio.gather(threads.run(token_dbid, row.token), threads.run(address_dbid, row['from']), threads.run(address_dbid, row['to'])) + # this addresses one tx with a crazy price due to yvpbtc v1 pricePerFullShare bug. + price = row.price if len(str(round(row.price))) <= 20 else 99999999999999999999 + usd = row.value_usd if len(str(round(row.value_usd))) <= 20 else 99999999999999999999 + + await threads.run( + db_session(UserTx), + chain=chain_pk, + vault=vault_dbid, + timestamp=row.timestamp, + block=row.block, + hash=row.hash, + log_index=row.log_index, + type=row.type, + from_address=from_address_dbid, + to_address=to_address_dbid, + amount = row.amount, + price = Decimal(price), + value_usd = Decimal(usd), + gas_used = row.gas_used, + gas_price = row.gas_price, + ) # Helper functions async def get_token_transfers(token, start_block, end_block) -> pd.DataFrame: @@ -112,43 +118,48 @@ async def get_token_transfers(token, start_block, end_block) -> pd.DataFrame: filter_by_name('Transfer', token.abi)[0], web3.codec, ) - events = decode_logs( - await get_logs_asap(token.address, topics, from_block=start_block, to_block=end_block, sync=False) - ) - token_entity = cache_token(token.address) - transfers = await asyncio.gather(*[_process_transfer_event(event, token_entity) for event in events]) + logs = await get_logs_asap(token.address, topics, from_block=start_block, to_block=end_block, sync=False) + transfers = await asyncio.gather(*[_process_transfer_event(event) for event in decode_logs(logs)]) return pd.DataFrame(transfers) -async def _process_transfer_event(event, token_entity) -> dict: +async def _process_transfer_event(event: _EventItem) -> dict: sender, receiver, amount = event.values() - cache_address(sender) - cache_address(receiver) - price = await get_price(token_entity.address.address, event.block_number) + txhash = event.transaction_hash.hex() + tx, tx_receipt, timestamp, token_dbid, price, scale = await asyncio.gather( + dank_mids.eth.get_transaction(txhash), + dank_mids.eth.get_transaction_receipt(txhash), + get_block_timestamp_async(event.block_number), + get_token_dbid(event.address), # NOTE: we don't use this output but we call this to ensure the token fk is in the db before we insert the transfer + get_price(event.address, event.block_number), + ERC20(event.address, asynchronous=True).scale, + ) if ( # NOTE magic.get_price() returns erroneous price due to erroneous ppfs - token_entity.address.address == '0x7F83935EcFe4729c4Ea592Ab2bC1A32588409797' + event.address == '0x7F83935EcFe4729c4Ea592Ab2bC1A32588409797' and event.block_number == 12869164 ): price = 99999 - txhash = event.transaction_hash.hex() return { 'chainid': chain.id, 'block': event.block_number, - 'timestamp': chain[event.block_number].timestamp, + 'timestamp': int(timestamp), 'hash': txhash, 'log_index': event.log_index, - 'token': token_entity.address.address, - 'type': _event_type(sender, receiver, token_entity.address.address), + 'token': event.address, + 'type': _event_type(sender, receiver, event.address), 'from': sender, 'to': receiver, - 'amount': Decimal(amount) / Decimal(10 ** token_entity.decimals), + 'amount': Decimal(amount) / Decimal(scale), 'price': price, - 'value_usd': Decimal(amount) / Decimal(10 ** token_entity.decimals) * Decimal(price), - 'gas_used': web3.eth.getTransactionReceipt(txhash).gasUsed, - 'gas_price': web3.eth.getTransaction(txhash).gasPrice + 'value_usd': Decimal(amount) / Decimal(scale) * Decimal(price), + 'gas_used': tx_receipt.gasUsed, + 'gas_price': tx.gasPrice } +@alru_cache(maxsize=None) +async def get_token_dbid(address: str) -> int: + return await threads.run(token_dbid, address) async def get_price(token_address, block): try: diff --git a/scripts/exporters/treasury_transactions.py b/scripts/exporters/treasury_transactions.py index b76db8242..d6d05ab44 100644 --- a/scripts/exporters/treasury_transactions.py +++ b/scripts/exporters/treasury_transactions.py @@ -18,8 +18,7 @@ from y.time import get_block_timestamp_async from yearn.entities import TreasuryTx, deduplicate_internal_transfers -from yearn.outputs.postgres.utils import (cache_address, cache_chain, - cache_token) +from yearn.outputs.postgres.utils import address_dbid, chain_dbid, token_dbid from yearn.treasury import accountant from yearn.treasury.treasury import YearnTreasury @@ -29,6 +28,8 @@ logger = logging.getLogger('yearn.treasury_transactions_exporter') +GNOSIS_SINGLETON = "0xd9Db270c1B5E3Bd161E8c8503c55cEABeE709552" + treasury = YearnTreasury(load_prices=True, asynchronous=True) @@ -54,11 +55,16 @@ def main() -> NoReturn: @a_sync(default='sync') async def load_new_txs(start_block: Block, end_block: Block) -> int: """returns: number of new txs""" - futs = [ - asyncio.create_task(insert_treasury_tx(entry)) - async for entry in treasury.ledger._get_and_yield(start_block, end_block) - if not isinstance(entry, _Done) and entry.value - ] + futs = [] + async for entry in treasury.ledger[start_block: end_block]: + if isinstance(entry, InternalTransfer) and entry.to_address == GNOSIS_SINGLETON: + # TODO: move this into eth-port + logger.debug("internal transfer to gnosis singleton, these are goofy and not real. ignoring %s", entry) + continue + if not entry.value: + logger.debug("zero value transfer, skipping %s", entry) + continue + futs.append(asyncio.create_task(insert_treasury_tx(entry))) if not futs: return 0 to_sort = sum(await tqdm_asyncio.gather(*futs, desc="Insert Txs to Postgres")) @@ -81,26 +87,25 @@ async def insert_treasury_tx(entry: LedgerEntry) -> int: await sort_thread.run(accountant.sort_tx, txid) return 0 - @db_session -def insert_to_db(entry: LedgerEntry, ts: int) -> bool: +def insert_to_db(entry: LedgerEntry, ts: int) -> int: if isinstance(entry, TokenTransfer): log_index = entry.log_index - token = cache_token(entry.token_address) + token = token_dbid(entry.token_address) gas = None else: log_index = None - token = cache_token(EEE_ADDRESS) + token = token_dbid(EEE_ADDRESS) gas = entry.gas try: entity = TreasuryTx( - chain=cache_chain(), + chain=chain_dbid(), block = entry.block_number, timestamp = ts, hash = entry.hash, log_index = log_index, - from_address = cache_address(entry.from_address) if entry.from_address else None, - to_address = cache_address(entry.to_address) if entry.to_address else None, + from_address = address_dbid(entry.from_address) if entry.from_address else None, + to_address = address_dbid(entry.to_address) if entry.to_address else None, token = token, amount = entry.value, price = entry.price, @@ -120,11 +125,14 @@ def insert_to_db(entry: LedgerEntry, ts: int) -> bool: @db_session def _validate_integrity_error(entry: LedgerEntry, log_index: int) -> None: ''' Raises AssertionError if existing object that causes a TransactionIntegrityError is not an EXACT MATCH to the attempted insert. ''' - existing_object = TreasuryTx.get(hash=entry.hash, log_index=log_index, chain=cache_chain()) + existing_object = TreasuryTx.get(hash=entry.hash, log_index=log_index, chain=chain_dbid()) if existing_object is None: - existing_objects = list(TreasuryTx.select(lambda tx: tx.hash==entry.hash and tx.log_index==log_index and tx.chain==cache_chain())) + existing_objects = list(TreasuryTx.select(lambda tx: tx.hash==entry.hash and tx.log_index==log_index and tx.chain==chain_dbid())) raise ValueError(f'unable to `.get` due to multiple entries: {existing_objects}') - assert entry.to_address == existing_object.to_address.address, (entry.to_address,existing_object.to_address.address) + if entry.to_address: + assert entry.to_address == existing_object.to_address.address, (entry.to_address, existing_object.to_address.address) + else: + assert existing_object.to_address is None, (entry.to_address, existing_object.to_address) assert entry.from_address == existing_object.from_address.address, (entry.from_address, existing_object.from_address.address) assert entry.value == existing_object.amount or entry.value == -1 * existing_object.amount, (entry.value, existing_object.amount) assert entry.block_number == existing_object.block, (entry.block_number, existing_object.block) diff --git a/yearn/entities.py b/yearn/entities.py index d590ed190..6fb6b584d 100644 --- a/yearn/entities.py +++ b/yearn/entities.py @@ -273,8 +273,8 @@ def get_or_create_entity(cls, log: _EventItem) -> "Stream": try: return Stream[stream_id] except ObjectNotFound: - from yearn.outputs.postgres.utils import (cache_address, - cache_token, + from yearn.outputs.postgres.utils import (address_dbid, + token_dbid, cache_txgroup) txgroup = { @@ -283,10 +283,10 @@ def get_or_create_entity(cls, log: _EventItem) -> "Stream": }.get(to_address, "Other Grants") txgroup = cache_txgroup(txgroup) - stream_contract = cache_address(log.address) - token = cache_token(Contract(log.address).token()) - from_address = cache_address(from_address) - to_address = cache_address(to_address) + stream_contract = address_dbid(log.address) + token = token_dbid(Contract(log.address).token()) + from_address = address_dbid(from_address) + to_address = address_dbid(to_address) entity = Stream( stream_id = stream_id, @@ -430,18 +430,18 @@ def rugged_on(self) -> typing.Optional[date]: @staticmethod def get_or_create_entity(event: _EventItem) -> "VestingEscrow": - from yearn.outputs.postgres.utils import cache_address, cache_token + from yearn.outputs.postgres.utils import address_dbid, cache_token print(event) funder, token, recipient, escrow, amount, start, duration, cliff_length = event.values() - escrow_address_entity = cache_address(escrow) - escrow = VestingEscrow.get(address=escrow_address_entity) + escrow_address_dbid = address_dbid(escrow) + escrow = VestingEscrow.get(address=escrow_address_dbid) if escrow is None: token_entity = cache_token(token) escrow = VestingEscrow( - address = escrow_address_entity, - funder = cache_address(funder), - recipient = cache_address(recipient), + address = escrow_address_dbid, + funder = address_dbid(funder), + recipient = address_dbid(recipient), token = token_entity, amount = amount / token_entity.scale, start_timestamp = datetime.fromtimestamp(start), diff --git a/yearn/outputs/postgres/utils.py b/yearn/outputs/postgres/utils.py index 9c1d7a622..fdc833cd1 100644 --- a/yearn/outputs/postgres/utils.py +++ b/yearn/outputs/postgres/utils.py @@ -1,10 +1,11 @@ import logging from decimal import Decimal +from functools import lru_cache from typing import Dict, Optional from brownie import ZERO_ADDRESS, chain, convert from brownie.convert.datatypes import HexString -from pony.orm import db_session, select +from pony.orm import IntegrityError, TransactionIntegrityError, db_session, commit, select from y import Contract, ContractNotVerified, Network from yearn.entities import (Address, Chain, Token, TreasuryTx, TxGroup, UserTx, @@ -18,90 +19,134 @@ Network.Mainnet: "0xC36442b4a4522E871399CD717aBDD847Ab11FE88", }.get(chain.id, 'not on this chain') +@lru_cache(maxsize=1) @db_session -def cache_chain(): - return Chain.get(chainid=chain.id) or Chain( +def cache_chain() -> Chain: + entity = Chain.get(chainid=chain.id) or Chain( chain_name=Network.name(), chainid=chain.id, victoria_metrics_label=Network.label(), ) + commit() + return entity + +@lru_cache(maxsize=1) +def chain_dbid() -> int: + return cache_chain().chain_dbid @db_session def cache_address(address: str) -> Address: address = convert.to_address(address) - chain = cache_chain() - address_entity = Address.get(address=address, chain=chain) - if not address_entity: - if is_contract(address): - try: - nickname = f"Contract: {Contract(address)._build['contractName']}" - except ContractNotVerified as e: - nickname = f"Non-Verified Contract: {address}" - address_entity = Address( - address=address, - chain=chain, - is_contract=True, - nickname=nickname, - ) - else: - address_entity = Address( - address=address, - chain=chain, - is_contract=False, - ) - return address_entity + chain = chain_dbid() + tries = 0 + while True: + if entity := Address.get(address=address, chain=chain): + return entity + try: + # if another thread beat us to the insert, we will get an exception that won't recurr next iteration + if is_contract(address): + try: + nickname = f"Contract: {Contract(address)._build['contractName']}" + except ContractNotVerified as e: + nickname = f"Non-Verified Contract: {address}" + entity = Address( + address=address, + chain=chain, + is_contract=True, + nickname=nickname, + ) + else: + entity = Address( + address=address, + chain=chain, + is_contract=False, + ) + commit() + break + except (IntegrityError, TransactionIntegrityError) as e: + if tries > 4: + raise e + logger.debug("%s for %s %s", e, address_dbid, address) + tries += 1 + return entity + +@lru_cache(maxsize=None) +def address_dbid(address: str) -> int: + return cache_address(address).address_id @db_session def cache_token(address: str) -> Token: + if token := Token.get(address=address_dbid(address)): + return token + + address = convert.to_address(address) + + # get token attributes + if address == "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE": + symbol, name = { + Network.Mainnet: ("ETH","Ethereum"), + Network.Fantom: ("FTM","Fantom"), + Network.Arbitrum: ("ETH", "Ethereum"), + Network.Optimism: ("ETH", "Ethereum"), + }[chain.id] + decimals = 18 + else: + token = Contract(address) + symbol, name, decimals = fetch_multicall([token,'symbol'], [token,'name'], [token,'decimals']) + + # MKR contract returns name and symbol as bytes32 which is converted to a brownie HexString + # try to decode it + if isinstance(name, HexString): + name = hex_to_string(name) + if isinstance(symbol, HexString): + symbol = hex_to_string(symbol) + + # update address nickname for token address_entity = cache_address(address) - token = Token.get(address=address_entity) - if not token: - address = convert.to_address(address) - if address == "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE": - symbol, name = { - Network.Mainnet: ("ETH","Ethereum"), - Network.Fantom: ("FTM","Fantom"), - Network.Arbitrum: ("ETH", "Ethereum"), - Network.Optimism: ("ETH", "Ethereum"), - }[chain.id] - decimals = 18 - else: - token = Contract(address) - symbol, name, decimals = fetch_multicall([token,'symbol'],[token,'name'],[token,'decimals']) - - # MKR contract returns name and symbol as bytes32 which is converted to a brownie HexString - # try to decode it - if isinstance(name, HexString): - name = hex_to_string(name) - if isinstance(symbol, HexString): - symbol = hex_to_string(symbol) - - if address_entity.nickname is None or address_entity.nickname.startswith("Contract: "): - # Don't overwrite any intentionally set nicknames, if applicable - address_entity.nickname = f"Token: {name}" - + if address_entity.nickname is None or address_entity.nickname.startswith("Contract: "): + # Don't overwrite any intentionally set nicknames, if applicable + address_entity.nickname = f"Token: {name}" + + # insert to db + tries = 0 + while True: + if token := Token.get(address=address_dbid(address)): + return token try: token = Token( - address=address_entity, + address=address_dbid(address), symbol=symbol, name=name, decimals= 0 if address == UNI_V3_POS or decimals is None else decimals, - chain=cache_chain() + chain=chain_dbid() ) + commit() + logger.info(f'token {symbol} added to postgres') + except (IntegrityError, TransactionIntegrityError) as e: + if tries >= 4: + raise e + tries += 1 + logger.warning("%s for %s %s", e, cache_token, address) except ValueError as e: """ Give us a little more info to help with debugging. """ raise ValueError(str(e), token.address, symbol, name, decimals) - logger.info(f'token {symbol} added to postgres') - return token + +@lru_cache(maxsize=None) +def token_dbid(address: str) -> int: + return cache_token(address).token_id @db_session def cache_txgroup(name: str, parent: Optional[TxGroup] = None) -> TxGroup: - _txgroup = TxGroup.get(name=name) - if not _txgroup: - _txgroup = TxGroup(name=name, parent_txgroup=parent) - logger.info(f'TxGroup {name} added to postgres') + while not (_txgroup := TxGroup.get(name=name)): + try: + TxGroup(name=name, parent_txgroup=parent) + commit() + logger.info(f'TxGroup {name} added to postgres') + except (IntegrityError, TransactionIntegrityError) as e: + logger.warning("%s for %s %s", e, cache_txgroup, name) if parent and parent != _txgroup.parent_txgroup: _txgroup.parent_txgroup = parent + commit() return _txgroup @db_session diff --git a/yearn/partners/snapshot.py b/yearn/partners/snapshot.py index 1800fbb63..e1e584ca0 100644 --- a/yearn/partners/snapshot.py +++ b/yearn/partners/snapshot.py @@ -40,8 +40,8 @@ try: from yearn.entities import PartnerHarvestEvent - from yearn.outputs.postgres.utils import (cache_address, cache_chain, - cache_token) + from yearn.outputs.postgres.utils import (address_dbid, chain_dbid, + token_dbid) USE_POSTGRES_CACHE = True except OperationalError as e: if "Is the server running on that host and accepting TCP/IP connections?" in str(e): @@ -578,7 +578,7 @@ def cache_data(wrap: DataFrame) -> None: ''' for i, row in wrap.iterrows(): PartnerHarvestEvent( - chain=cache_chain(), + chain=chain_dbid(), block=row.block, timestamp=int(row.timestamp.timestamp()), balance=row.balance, @@ -589,8 +589,8 @@ def cache_data(wrap: DataFrame) -> None: payout_base=row.payout_base, protocol_fee=row.protocol_fee, # Use cache_address instead of cache_token because some wrappers aren't verified - wrapper=cache_address(row.wrapper), - vault=cache_token(row.vault), + wrapper=address_dbid(row.wrapper), + vault=token_dbid(row.vault), ) commit()