From 9c3efc0db321eb15e2fa9b2d6a8a0ad667562371 Mon Sep 17 00:00:00 2001 From: Yosuke Otosu Date: Mon, 20 Jan 2025 14:23:33 +0900 Subject: [PATCH] Extract events from async contract instance to reduce memory usage --- app/utils/contract_utils.py | 17 ++++++++++++++++- batch/indexer_issue_redeem.py | 13 +++++++------ batch/indexer_transfer.py | 11 ++++++----- batch/indexer_transfer_approval.py | 8 +++++--- batch/processor_create_utxo.py | 18 +++++++++++------- 5 files changed, 45 insertions(+), 22 deletions(-) diff --git a/app/utils/contract_utils.py b/app/utils/contract_utils.py index 6e29671b..e54c0a2e 100644 --- a/app/utils/contract_utils.py +++ b/app/utils/contract_utils.py @@ -27,6 +27,7 @@ from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import Session from web3.contract import AsyncContract, Contract +from web3.contract.async_contract import AsyncContractEvents from web3.exceptions import ( ABIEventNotFound, ABIFunctionNotFound, @@ -290,6 +291,20 @@ def get_event_logs( return result +class AsyncContractEventsView: + def __init__(self, address: str, contract_events: AsyncContractEvents) -> None: + self._address = address + self._events = contract_events + + @property + def address(self) -> str: + return self._address + + @property + def events(self) -> AsyncContractEvents: + return self._events + + class AsyncContractUtils: factory_map: dict[str, Type[AsyncContract]] = {} @@ -585,7 +600,7 @@ async def get_block_by_transaction_hash(tx_hash: str): @staticmethod async def get_event_logs( - contract: AsyncContract, + contract: AsyncContract | AsyncContractEventsView, event: str, block_from: int = None, block_to: int = None, diff --git a/batch/indexer_issue_redeem.py b/batch/indexer_issue_redeem.py index 1e81ab6e..2db85aac 100644 --- a/batch/indexer_issue_redeem.py +++ b/batch/indexer_issue_redeem.py @@ -27,7 +27,6 @@ from sqlalchemy import and_, select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from web3.contract import AsyncContract from app.database import BatchAsyncSessionLocal from app.exceptions import ServiceUnavailableError @@ -38,7 +37,7 @@ IDXIssueRedeemEventType, Token, ) -from app.utils.contract_utils import AsyncContractUtils +from app.utils.contract_utils import AsyncContractEventsView, AsyncContractUtils from app.utils.web3_utils import AsyncWeb3Wrapper from batch.utils import batch_log from config import INDEXER_BLOCK_LOT_MAX_SIZE, INDEXER_SYNC_INTERVAL @@ -51,7 +50,7 @@ class Processor: def __init__(self): - self.token_list: dict[str, AsyncContract] = {} + self.token_list: dict[str, AsyncContractEventsView] = {} async def sync_new_logs(self): db_session = BatchAsyncSessionLocal() @@ -145,7 +144,9 @@ async def __get_token_list(self, db_session: AsyncSession): token_contract = web3.eth.contract( address=load_required_token.token_address, abi=load_required_token.abi ) - self.token_list[load_required_token.token_address] = token_contract + self.token_list[load_required_token.token_address] = ( + AsyncContractEventsView(token_contract.address, token_contract.events) + ) @staticmethod async def __get_idx_issue_redeem_block_number(db_session: AsyncSession): @@ -207,7 +208,7 @@ async def __sync_issue( else: await self.__insert_index( db_session=db_session, - event_type=IDXIssueRedeemEventType.ISSUE.value, + event_type=IDXIssueRedeemEventType.ISSUE, transaction_hash=transaction_hash, token_address=to_checksum_address(token.address), locked_address=args["lockAddress"], @@ -248,7 +249,7 @@ async def __sync_redeem( else: await self.__insert_index( db_session=db_session, - event_type=IDXIssueRedeemEventType.REDEEM.value, + event_type=IDXIssueRedeemEventType.REDEEM, transaction_hash=transaction_hash, token_address=to_checksum_address(token.address), locked_address=args["lockAddress"], diff --git a/batch/indexer_transfer.py b/batch/indexer_transfer.py index 7ed8e2ed..9e3e5525 100644 --- a/batch/indexer_transfer.py +++ b/batch/indexer_transfer.py @@ -29,7 +29,6 @@ from sqlalchemy import and_, select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from web3.contract import AsyncContract from app.database import BatchAsyncSessionLocal from app.exceptions import ServiceUnavailableError @@ -41,7 +40,7 @@ IDXTransferSourceEventType, Token, ) -from app.utils.contract_utils import AsyncContractUtils +from app.utils.contract_utils import AsyncContractEventsView, AsyncContractUtils from app.utils.web3_utils import AsyncWeb3Wrapper from batch.utils import batch_log from config import INDEXER_BLOCK_LOT_MAX_SIZE, INDEXER_SYNC_INTERVAL, ZERO_ADDRESS @@ -54,7 +53,7 @@ class Processor: def __init__(self): - self.token_list: dict[str, AsyncContract] = {} + self.token_list: dict[str, AsyncContractEventsView] = {} async def sync_new_logs(self): db_session = BatchAsyncSessionLocal() @@ -151,7 +150,9 @@ async def __get_token_list(self, db_session: AsyncSession): token_contract = web3.eth.contract( address=load_required_token.token_address, abi=load_required_token.abi ) - self.token_list[load_required_token.token_address] = token_contract + self.token_list[load_required_token.token_address] = ( + AsyncContractEventsView(token_contract.address, token_contract.events) + ) @staticmethod async def __get_idx_transfer_block_number(db_session: AsyncSession): @@ -303,7 +304,7 @@ async def __sink_on_transfer( transfer_record.from_address = from_address transfer_record.to_address = to_address transfer_record.amount = amount - transfer_record.source_event = source_event.value + transfer_record.source_event = source_event transfer_record.data = data transfer_record.message = message transfer_record.block_timestamp = block_timestamp diff --git a/batch/indexer_transfer_approval.py b/batch/indexer_transfer_approval.py index 70c7543c..764a80d4 100644 --- a/batch/indexer_transfer_approval.py +++ b/batch/indexer_transfer_approval.py @@ -41,7 +41,7 @@ Token, TokenType, ) -from app.utils.contract_utils import AsyncContractUtils +from app.utils.contract_utils import AsyncContractEventsView, AsyncContractUtils from app.utils.web3_utils import AsyncWeb3Wrapper from batch.utils import batch_log from config import INDEXER_BLOCK_LOT_MAX_SIZE, INDEXER_SYNC_INTERVAL, ZERO_ADDRESS @@ -71,7 +71,7 @@ class Processor: def __init__(self): - self.token_list: dict[str, AsyncContract] = {} + self.token_list: dict[str, AsyncContractEventsView] = {} self.exchange_list: list[AsyncContract] = [] self.token_type_map: dict[str, TokenType] = {} @@ -165,7 +165,9 @@ async def __get_contract_list(self, db_session: AsyncSession): token_contract = web3.eth.contract( address=load_required_token.token_address, abi=load_required_token.abi ) - self.token_list[load_required_token.token_address] = token_contract + self.token_list[load_required_token.token_address] = ( + AsyncContractEventsView(token_contract.address, token_contract.events) + ) self.token_type_map[load_required_token.token_address] = ( load_required_token.type ) diff --git a/batch/processor_create_utxo.py b/batch/processor_create_utxo.py index 0d2dcbc0..0073f7ee 100644 --- a/batch/processor_create_utxo.py +++ b/batch/processor_create_utxo.py @@ -27,13 +27,12 @@ from sqlalchemy import and_, create_engine, select from sqlalchemy.exc import DataError, SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from web3.contract import AsyncContract from app.database import BatchAsyncSessionLocal from app.exceptions import ServiceUnavailableError from app.model.blockchain import IbetShareContract, IbetStraightBondContract from app.model.db import UTXO, Account, Token, TokenType, UTXOBlockNumber -from app.utils.contract_utils import AsyncContractUtils +from app.utils.contract_utils import AsyncContractEventsView, AsyncContractUtils from app.utils.ledger_utils import request_ledger_creation from app.utils.web3_utils import AsyncWeb3Wrapper from batch.utils import batch_log @@ -60,7 +59,7 @@ class Processor: def __init__(self): - self.token_contract_list: list[AsyncContract] = [] + self.token_contract_list: list[AsyncContractEventsView] = [] self.token_type_map: dict[str, TokenType] = {} async def process(self): @@ -151,7 +150,12 @@ async def __refresh_token_contract_list(self, db_session: AsyncSession): token_contract = AsyncContractUtils.get_contract( contract_name=_token.type, contract_address=_token.token_address ) - self.token_contract_list.append(token_contract) + self.token_contract_list.append( + AsyncContractEventsView( + token_contract.address, + token_contract.events, + ) + ) self.token_type_map[_token.token_address] = _token.type @staticmethod @@ -177,7 +181,7 @@ async def __set_utxo_block_number(db_session: AsyncSession, block_number: int): async def __process_transfer( self, db_session: AsyncSession, - token_contract: AsyncContract, + token_contract: AsyncContractEventsView, block_from: int, block_to: int, ): @@ -333,7 +337,7 @@ async def __process_transfer( async def __process_issue( self, db_session: AsyncSession, - token_contract: AsyncContract, + token_contract: AsyncContractEventsView, block_from: int, block_to: int, ): @@ -389,7 +393,7 @@ async def __process_issue( async def __process_redeem( self, db_session: AsyncSession, - token_contract: AsyncContract, + token_contract: AsyncContractEventsView, block_from: int, block_to: int, ):