Skip to content

Commit

Permalink
Merge pull request #1588 from BoostryJP/feature/#1587
Browse files Browse the repository at this point in the history
Switch to asyncpg for async DB engine
  • Loading branch information
YoshihitoAso authored Jan 10, 2025
2 parents 722befc + 2d5294e commit df56eae
Show file tree
Hide file tree
Showing 14 changed files with 2,636 additions and 2,020 deletions.
1 change: 1 addition & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
or "postgresql://ethuser:ethpass@localhost:5432/ethcache"
)
DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg://")
ASYNC_DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://")
DB_ECHO = True if CONFIG["database"]["echo"] == "yes" else False
DATABASE_TYPE = "postgresql" if DATABASE_URL.startswith("postgresql") else "mysql"
DATABASE_SCHEMA = os.environ.get("DATABASE_SCHEMA")
Expand Down
4 changes: 2 additions & 2 deletions app/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ def get_batch_async_engine(uri: str):

# Create Engine
engine = get_engine(config.DATABASE_URL)
async_engine = get_async_engine(get_async_uri(config.DATABASE_URL))
batch_async_engine = get_batch_async_engine(get_async_uri(config.DATABASE_URL))
async_engine = get_async_engine(get_async_uri(config.ASYNC_DATABASE_URL))
batch_async_engine = get_batch_async_engine(get_async_uri(config.ASYNC_DATABASE_URL))


# Create Session Mapker
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies = [
"httpx<1.0.0,>=0.27.0",
"memray<2.0.0,>=1.14.0",
"py-spy>=0.4.0",
"asyncpg>=0.30.0",
]

[tool.uv]
Expand All @@ -41,7 +42,7 @@ dev-dependencies = [
"pytest-alembic<1.0.0,>=0.10.7",
"pytest-freezer<1.0.0,>=0.4.8",
"textual-dev<2.0.0,>=1.2.1",
"pytest-asyncio==0.23.8",
"pytest-asyncio==0.25.0",
"pytest-aiohttp<2.0.0,>=1.0.5",
"ruamel-yaml<1.0.0,>=0.18.6",
"pytest-memray<2.0.0,>=1.6.0",
Expand Down Expand Up @@ -102,6 +103,7 @@ addopts = "-m 'not alembic'"
markers = [
"alembic: tests for alembic",
]
asyncio_default_fixture_loop_scope = "session"

[tool.coverage.run]
branch = true
Expand Down
127 changes: 74 additions & 53 deletions tests/batch/indexer_Token_Detail_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from web3 import Web3
from web3.middleware import ExtraDataToPOAMiddleware

Expand Down Expand Up @@ -68,7 +67,7 @@ def test_module(shared_contract):


@pytest.fixture(scope="function")
def processor(test_module, session):
def processor(test_module):
config.BOND_TOKEN_ENABLED = True
config.SHARE_TOKEN_ENABLED = True
config.MEMBERSHIP_TOKEN_ENABLED = True
Expand All @@ -88,6 +87,7 @@ def main_func(test_module):
LOG.setLevel(default_log_level)


@pytest.mark.asyncio
@mock.patch("app.config.BOND_TOKEN_ENABLED", True)
@mock.patch("app.config.SHARE_TOKEN_ENABLED", True)
@mock.patch("app.config.COUPON_TOKEN_ENABLED", True)
Expand All @@ -100,21 +100,21 @@ class TestProcessor:
agent = eth_account["agent"]

@staticmethod
def listing_token(token_address, token_template, session):
async def listing_token(token_address, token_template, async_session):
_listing = Listing()
_listing.token_address = token_address
_listing.is_public = True
_listing.max_holding_quantity = 1000000
_listing.max_sell_amount = 1000000
_listing.owner_address = TestProcessor.issuer["account_address"]
session.add(_listing)
async_session.add(_listing)

_idx_token_list_item = IDXTokenListItem()
_idx_token_list_item.token_address = token_address
_idx_token_list_item.token_template = token_template
_idx_token_list_item.owner_address = TestProcessor.issuer["account_address"]
session.add(_idx_token_list_item)
session.commit()
async_session.add(_idx_token_list_item)
await async_session.commit()

@staticmethod
def issue_token_bond_with_args(issuer, token_list, args):
Expand Down Expand Up @@ -153,11 +153,11 @@ def issue_token_membership_with_args(issuer, token_list, args):
###########################################################################

# <Normal_1>
def test_normal_1(
async def test_normal_1(
self,
processor: Processor,
shared_contract,
session: Session,
async_session,
block_number: None,
):
token_list_contract = shared_contract["TokenList"]
Expand Down Expand Up @@ -206,7 +206,9 @@ def test_normal_1(
token = self.issue_token_bond_with_args(
self.issuer, token_list_contract, args
)
self.listing_token(token["address"], "IbetStraightBond", session)
await self.listing_token(
token["address"], "IbetStraightBond", async_session
)
args = {
re.sub("([A-Z])", lambda x: "_" + x.group(1).lower(), k): v
for k, v in args.items()
Expand Down Expand Up @@ -241,7 +243,7 @@ def test_normal_1(
token = self.issue_token_share_with_args(
self.issuer, token_list_contract, args
)
self.listing_token(token["address"], "IbetShare", session)
await self.listing_token(token["address"], "IbetShare", async_session)
args = {
re.sub("([A-Z])", lambda x: "_" + x.group(1).lower(), k): v
for k, v in args.items()
Expand Down Expand Up @@ -279,7 +281,7 @@ def test_normal_1(
token = self.issue_token_membership_with_args(
self.issuer, token_list_contract, args
)
self.listing_token(token["address"], "IbetMembership", session)
await self.listing_token(token["address"], "IbetMembership", async_session)

args["totalSupply"] = args["initialSupply"]
del args["initialSupply"]
Expand Down Expand Up @@ -311,7 +313,7 @@ def test_normal_1(
token = self.issue_token_coupon_with_args(
self.issuer, token_list_contract, args
)
self.listing_token(token["address"], "IbetCoupon", session)
await self.listing_token(token["address"], "IbetCoupon", async_session)
args = {
re.sub("([A-Z])", lambda x: "_" + x.group(1).lower(), k): v
for k, v in args.items()
Expand All @@ -322,46 +324,61 @@ def test_normal_1(

# Run target process
processor.SEC_PER_RECORD = 0
asyncio.run(processor.process())
await processor.process()

# assertion
for _expect_dict in _bond_token_expected_list:
_bond_token: BondTokenModel = session.scalars(
select(BondTokenModel)
.where(BondTokenModel.token_address == _expect_dict["token_address"])
.limit(1)
_bond_token: BondTokenModel = (
await async_session.scalars(
select(BondTokenModel)
.where(
BondTokenModel.token_address == _expect_dict["token_address"]
)
.limit(1)
)
).first()
_bond_token_obj = BondToken.from_model(_bond_token)
for k, v in _expect_dict.items():
assert v == getattr(_bond_token_obj, k)

for _expect_dict in _share_token_expected_list:
_share_token: ShareTokenModel = session.scalars(
select(ShareTokenModel)
.where(ShareTokenModel.token_address == _expect_dict["token_address"])
.limit(1)
_share_token: ShareTokenModel = (
await async_session.scalars(
select(ShareTokenModel)
.where(
ShareTokenModel.token_address == _expect_dict["token_address"]
)
.limit(1)
)
).first()
_share_token_obj = ShareToken.from_model(_share_token)
for k, v in _expect_dict.items():
assert v == getattr(_share_token_obj, k)

for _expect_dict in _membership_token_expected_list:
_membership_token: MembershipTokenModel = session.scalars(
select(MembershipTokenModel)
.where(
MembershipTokenModel.token_address == _expect_dict["token_address"]
_membership_token: MembershipTokenModel = (
await async_session.scalars(
select(MembershipTokenModel)
.where(
MembershipTokenModel.token_address
== _expect_dict["token_address"]
)
.limit(1)
)
.limit(1)
).first()
_membership_token_obj = MembershipToken.from_model(_membership_token)
for k, v in _expect_dict.items():
assert v == getattr(_membership_token_obj, k)

for _expect_dict in _coupon_token_expected_list:
_coupon_token: CouponTokenModel = session.scalars(
select(CouponTokenModel)
.where(CouponTokenModel.token_address == _expect_dict["token_address"])
.limit(1)
_coupon_token: CouponTokenModel = (
await async_session.scalars(
select(CouponTokenModel)
.where(
CouponTokenModel.token_address == _expect_dict["token_address"]
)
.limit(1)
)
).first()
_coupon_token_obj = CouponToken.from_model(_coupon_token)
for k, v in _expect_dict.items():
Expand All @@ -375,7 +392,9 @@ def test_normal_1(
# <Error_2>: ServiceUnavailable occurs and is handled in mainloop.

# <Error_1_1>: ServiceUnavailable occurs in __sync_xx method.
def test_error_1_1(self, processor: Processor, shared_contract, session):
async def test_error_1_1(
self, processor: Processor, shared_contract, async_session
):
# Issue Token
token_list_contract = shared_contract["TokenList"]
exchange_contract = shared_contract["IbetStraightBondExchange"]
Expand All @@ -395,7 +414,7 @@ def test_error_1_1(self, processor: Processor, shared_contract, session):
token = self.issue_token_coupon_with_args(
self.issuer, token_list_contract, args
)
self.listing_token(token["address"], "IbetCoupon", session)
await self.listing_token(token["address"], "IbetCoupon", async_session)

# Expect that process() raises ServiceUnavailable.
with (
Expand All @@ -405,18 +424,18 @@ def test_error_1_1(self, processor: Processor, shared_contract, session):
),
pytest.raises(ServiceUnavailable),
):
asyncio.run(processor.process())
await processor.process()

# Assertion
_coupon_token_list: List[CouponTokenModel] = session.scalars(
select(CouponTokenModel)
_coupon_token_list: List[CouponTokenModel] = (
await async_session.scalars(select(CouponTokenModel))
).all()
assert len(_coupon_token_list) == 0

token = self.issue_token_coupon_with_args(
self.issuer, token_list_contract, args
)
self.listing_token(token["address"], "IbetCoupon", session)
await self.listing_token(token["address"], "IbetCoupon", async_session)

# Expect that process() raises ServiceUnavailable.
with (
Expand All @@ -426,17 +445,19 @@ def test_error_1_1(self, processor: Processor, shared_contract, session):
),
pytest.raises(ServiceUnavailable),
):
asyncio.run(processor.process())
await processor.process()

# Assertion
session.rollback()
_coupon_token_list: List[CouponTokenModel] = session.scalars(
select(CouponTokenModel)
await async_session.rollback()
_coupon_token_list: List[CouponTokenModel] = (
await async_session.scalars(select(CouponTokenModel))
).all()
assert len(_coupon_token_list) == 0

# <Error_1_2>: SQLAlchemyError occurs in "process".
def test_error_1_2(self, processor: Processor, shared_contract, session):
async def test_error_1_2(
self, processor: Processor, shared_contract, async_session
):
# Issue Token
token_list_contract = shared_contract["TokenList"]
exchange_contract = shared_contract["IbetStraightBondExchange"]
Expand All @@ -456,42 +477,42 @@ def test_error_1_2(self, processor: Processor, shared_contract, session):
token = self.issue_token_coupon_with_args(
self.issuer, token_list_contract, args
)
self.listing_token(token["address"], "IbetCoupon", session)
await self.listing_token(token["address"], "IbetCoupon", async_session)

# Expect that process() raises SQLAlchemyError.
with (
mock.patch.object(AsyncSession, "commit", side_effect=SQLAlchemyError()),
pytest.raises(SQLAlchemyError),
):
asyncio.run(processor.process())
await processor.process()

# Assertion
_coupon_token_list: List[CouponTokenModel] = session.scalars(
select(CouponTokenModel)
_coupon_token_list: List[CouponTokenModel] = (
await async_session.scalars(select(CouponTokenModel))
).all()
assert len(_coupon_token_list) == 0

token = self.issue_token_coupon_with_args(
self.issuer, token_list_contract, args
)
self.listing_token(token["address"], "IbetCoupon", session)
await self.listing_token(token["address"], "IbetCoupon", async_session)

# Expect that process() raises SQLAlchemyError.
with (
mock.patch.object(AsyncSession, "commit", side_effect=SQLAlchemyError()),
pytest.raises(SQLAlchemyError),
):
asyncio.run(processor.process())
await processor.process()

# Assertion
session.rollback()
_coupon_token_list: List[CouponTokenModel] = session.scalars(
select(CouponTokenModel)
await async_session.rollback()
_coupon_token_list: List[CouponTokenModel] = (
await async_session.scalars(select(CouponTokenModel))
).all()
assert len(_coupon_token_list) == 0

# <Error_2>: ServiceUnavailable occurs and is handled in mainloop.
def test_error_2(self, main_func, shared_contract, session, caplog):
async def test_error_2(self, main_func, shared_contract, async_session, caplog):
# Issue Token
token_list_contract = shared_contract["TokenList"]
exchange_contract = shared_contract["IbetStraightBondExchange"]
Expand All @@ -511,7 +532,7 @@ def test_error_2(self, main_func, shared_contract, session, caplog):
token = self.issue_token_coupon_with_args(
self.issuer, token_list_contract, args
)
self.listing_token(token["address"], "IbetCoupon", session)
await self.listing_token(token["address"], "IbetCoupon", async_session)
# Mocking time.sleep to break mainloop
time_mock = MagicMock(wraps=asyncio)
time_mock.sleep.side_effect = [TypeError()]
Expand All @@ -526,7 +547,7 @@ def test_error_2(self, main_func, shared_contract, session, caplog):
pytest.raises(TypeError),
):
# Expect that process() raises ServiceUnavailable and handled in mainloop.
asyncio.run(main_func())
await main_func()

assert 1 == caplog.record_tuples.count(
(LOG.name, logging.INFO, "Service started successfully")
Expand Down
Loading

0 comments on commit df56eae

Please sign in to comment.