diff --git a/moonstreamapi/moonstreamapi/actions.py b/moonstreamapi/moonstreamapi/actions.py index eacdf951..d482be89 100644 --- a/moonstreamapi/moonstreamapi/actions.py +++ b/moonstreamapi/moonstreamapi/actions.py @@ -26,13 +26,17 @@ from moonstreamdb.blockchain import AvailableBlockchainType from moonstreamdb.models import EthereumLabel from moonstreamdb.subscriptions import blockchain_by_subscription_id +from moonstreamdbv3.models_indexes import AbiJobs, AbiSubscriptions from slugify import slugify # type: ignore from sqlalchemy import text from sqlalchemy.orm import Session +from sqlalchemy.dialects.postgresql import insert from web3 import Web3 from web3._utils.validation import validate_abi + from . import data +from .admin.subscription_types import CANONICAL_SUBSCRIPTION_TYPES from .middleware import MoonstreamHTTPException from .reporter import reporter from .selectors_storage import selectors @@ -559,6 +563,7 @@ def apply_moonworm_tasks( ], } ) + except Exception as e: logger.error(f"Error get moonworm tasks: {str(e)}") reporter.error_report(e) @@ -576,6 +581,193 @@ def apply_moonworm_tasks( reporter.error_report(e) +def create_seer_subscription( + db_session: Session, + user_id: uuid.UUID, + customer_id: uuid.UUID, + address: str, + subscription_type: str, + abi: Any, + subscription_id: str, +) -> None: + + chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type].blockchain + + add_abi_to_db( + db_session=db_session, + user_id=user_id, + customer_id=customer_id, + address=address, + abis=abi, + chain=chain, + subscription_id=subscription_id, + ) + + +def delete_seer_subscription( + db_session: Session, + subscription_id, +) -> None: + """ + Delete seer subscription from db + If there are no more subscriptions for this address,abi_selector delete all abis + """ + + ## Delete subscription from db + + try: + db_session.query(AbiSubscriptions).filter( + AbiSubscriptions.subscription_id == subscription_id + ).delete(synchronize_session=False) + db_session.commit() + except Exception as e: + logger.error(f"Error delete subscription from db: {str(e)}") + db_session.rollback() + + not_connected_abi_jobs = ( + db_session.query(AbiJobs) + .join(AbiSubscriptions, AbiJobs.id == AbiSubscriptions.abi_job_id, isouter=True) + .filter(AbiSubscriptions.subscription_id == None) + .cte("not_connected_abi_jobs") + ) + + ## Delete abi jobs from db + + try: + db_session.query(AbiJobs).filter( + AbiJobs.id.in_(db_session.query(not_connected_abi_jobs.c.id)) + ).delete(synchronize_session=False) + + db_session.commit() + except Exception as e: + logger.error(f"Error delete abi jobs from db: {str(e)}") + db_session.rollback() + + +def add_abi_to_db( + db_session: Session, + user_id: uuid.UUID, + customer_id: uuid.UUID, + address: str, + abis: List[Dict[str, Any]], + chain: str, + subscription_id: Optional[str] = None, +) -> None: + abis_to_insert = [] + subscriptions_to_insert = [] + + try: + existing_abi_job = ( + db_session.query(AbiJobs) + .filter(AbiJobs.chain == chain) + .filter(AbiJobs.address == bytes.fromhex(address[2:])) + .filter(AbiJobs.customer_id == customer_id) + ).all() + except Exception as e: + logger.error(f"Error get abi from db: {str(e)}") + db_session.rollback() + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + job_by_abi_selector = {abi.abi_selector: abi for abi in existing_abi_job} + + for abi in abis: + + if abi["type"] not in ("event", "function"): + continue + + abi_selector = Web3.keccak( + text=abi["name"] + + "(" + + ",".join(map(lambda x: x["type"], abi["inputs"])) + + ")" + ) + + if abi["type"] == "function": + abi_selector = abi_selector[:4] + + abi_selector = abi_selector.hex() + + try: + + if abi_selector in job_by_abi_selector: + # ABI job already exists, create subscription link + if subscription_id: + subscriptions_to_insert.append( + { + "abi_job_id": job_by_abi_selector[abi_selector].id, + "subscription_id": subscription_id, + } + ) + else: + # ABI job does not exist, create new ABI job + abi_job = { + "address": ( + bytes.fromhex(address[2:]) if address is not None else None + ), + "user_id": user_id, + "customer_id": customer_id, + "abi_selector": abi_selector, + "chain": chain, + "abi_name": abi["name"], + "status": "active", + "historical_crawl_status": "pending", + "progress": 0, + "moonworm_task_pickedup": False, + "abi": json.dumps(abi), + } + + try: + abi_job_instance = AbiJobs(**abi_job) + except Exception as e: + logger.error( + f"Error validating abi for address {address}:{abi} {str(e)}" + ) + continue + + abis_to_insert.append(abi_job_instance) + if subscription_id: + subscriptions_to_insert.append( + { + "abi_job_id": abi_job_instance.id, + "subscription_id": subscription_id, + } + ) + + except Exception as e: + logger.error(f"Error creating abi for address {address}:{abi} {str(e)}") + continue + + try: + # Insert ABI jobs + if abis_to_insert: + + insert_stmt = ( + insert(AbiJobs) + .values([abi_job.__dict__ for abi_job in abis_to_insert]) + .on_conflict_do_nothing( + index_elements=["address", "chain", "abi_selector"] + ) + ) + db_session.execute(insert_stmt) + + # Insert corresponding subscriptions + if subscriptions_to_insert: + + insert_stmt = ( + insert(AbiSubscriptions) + .values(subscriptions_to_insert) + .on_conflict_do_nothing( + index_elements=["abi_job_id", "subscription_id"] + ) + ) + db_session.execute(insert_stmt) + + db_session.commit() + except Exception as e: + logger.error(f"Error inserting abi to db: {str(e)}") + db_session.rollback() + + def name_normalization(query_name: str) -> str: """ Sanitize provided query name. @@ -992,3 +1184,29 @@ def create_resource_for_user( raise MoonstreamHTTPException(status_code=500, internal_error=e) return resource + + +def chekc_user_resource_access( + customer_id: uuid.UUID, + user_token: uuid.UUID, +) -> bool: + """ + Check if user has access to customer_id + """ + + try: + response = bc.get_resource( + token=user_token, + resource_id=str(customer_id), + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + except BugoutResponseException as e: + if e.status_code == 404: + return False + raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail) + except Exception as e: + logger.error(f"Error get customer: {str(e)}") + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + return str(response.id) == customer_id diff --git a/moonstreamapi/moonstreamapi/data.py b/moonstreamapi/moonstreamapi/data.py index 7484f976..9848f1c9 100644 --- a/moonstreamapi/moonstreamapi/data.py +++ b/moonstreamapi/moonstreamapi/data.py @@ -245,6 +245,7 @@ class UpdateSubscriptionRequest(BaseModel): abi: Optional[str] = Form(None) description: Optional[str] = Form(None) tags: Optional[List[Dict[str, str]]] = Form(None) + customer_id: Optional[str] = Form(None) @validator("tags", pre=True, always=True) def transform_to_dict(cls, v): @@ -263,6 +264,7 @@ class CreateSubscriptionRequest(BaseModel): abi: Optional[str] = Form(None) description: Optional[str] = Form(None) tags: Optional[List[Dict[str, str]]] = Form(None) + customer_id: Optional[str] = Form(None) @validator("tags", pre=True, always=True) def transform_to_dict(cls, v): diff --git a/moonstreamapi/moonstreamapi/routes/subscriptions.py b/moonstreamapi/moonstreamapi/routes/subscriptions.py index b8806314..07a15d72 100644 --- a/moonstreamapi/moonstreamapi/routes/subscriptions.py +++ b/moonstreamapi/moonstreamapi/routes/subscriptions.py @@ -20,10 +20,13 @@ EntityJournalNotFoundException, apply_moonworm_tasks, check_if_smart_contract, + chekc_user_resource_access, get_entity_subscription_journal_id, get_list_of_support_interfaces, get_moonworm_tasks, validate_abi_json, + create_seer_subscription, + delete_seer_subscription, ) from ..admin import subscription_types from ..middleware import MoonstreamHTTPException @@ -32,6 +35,7 @@ MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_ENTITIES_RESERVED_TAGS, THREAD_TIMEOUT_SECONDS, + MOONSTREAM_DB_V3_INDEX_INSTANCE, ) from ..settings import bugout_client as bc from ..web3_provider import yield_web3_provider @@ -51,11 +55,13 @@ async def add_subscription_handler( request: Request, background_tasks: BackgroundTasks, web3: Web3 = Depends(yield_web3_provider), + db_session: Any = Depends(MOONSTREAM_DB_V3_INDEX_INSTANCE.yield_db_session), ) -> data.SubscriptionResourceData: """ Add subscription to blockchain stream data for user. """ token = request.state.token + user = request.state.user form = await request.form() @@ -71,6 +77,7 @@ async def add_subscription_handler( description = form_data.description tags = form_data.tags subscription_type_id = form_data.subscription_type_id + customer_id = form_data.customer_id if subscription_type_id != "ethereum_whalewatch": try: @@ -94,6 +101,19 @@ async def add_subscription_handler( detail="Currently ethereum_whalewatch not supported", ) + if customer_id is not None: + + results = chekc_user_resource_access( + customer_id=customer_id, + user_token=token, + ) + + if not results: + raise MoonstreamHTTPException( + status_code=403, + detail="User has no access to this customer", + ) + active_subscription_types_response = subscription_types.list_subscription_types( active_only=True ) @@ -129,10 +149,7 @@ async def add_subscription_handler( content["abi_hash"] = hash background_tasks.add_task( - apply_moonworm_tasks, - subscription_type_id, - json_abi, - address, + apply_moonworm_tasks, subscription_type_id, json_abi, address ) if description: @@ -202,6 +219,16 @@ async def add_subscription_handler( if key not in MOONSTREAM_ENTITIES_RESERVED_TAGS ] + if entity_secondary_fields.get("abi") and customer_id is not None: + create_seer_subscription( + db_session=db_session, + user_id=user.id, + customer_id=customer_id, + subscription_id=entity.id, + abi=abi, + subscription_type_id=subscription_type_id, + ) + return data.SubscriptionResourceData( id=str(entity.id), user_id=str(user.id), @@ -223,7 +250,9 @@ async def add_subscription_handler( response_model=data.SubscriptionResourceData, ) async def delete_subscription_handler( - request: Request, subscription_id: str = Path(...) + request: Request, + subscription_id: str = Path(...), + db_session: Any = Depends(MOONSTREAM_DB_V3_INDEX_INSTANCE.yield_db_session), ): """ Delete subscriptions. @@ -285,6 +314,11 @@ async def delete_subscription_handler( abi = deleted_entity.secondary_fields.get("abi") description = deleted_entity.secondary_fields.get("description") + delete_seer_subscription( + db_session=db_session, + subscription_id=subscription_id, + ) + return data.SubscriptionResourceData( id=str(deleted_entity.id), user_id=str(user.id), @@ -400,12 +434,12 @@ async def update_subscriptions_handler( request: Request, background_tasks: BackgroundTasks, subscription_id: str = Path(...), + db_session: Any = Depends(MOONSTREAM_DB_V3_INDEX_INSTANCE.yield_db_session), ) -> data.SubscriptionResourceData: """ Get user's subscriptions. """ token = request.state.token - user = request.state.user form = await request.form() @@ -419,6 +453,20 @@ async def update_subscriptions_handler( abi = form_data.abi description = form_data.description tags = form_data.tags + customer_id = form_data.customer_id + + if customer_id is not None: + + results = chekc_user_resource_access( + customer_id=customer_id, + user_token=token, + ) + + if not results: + raise MoonstreamHTTPException( + status_code=403, + detail="User has no access to this customer", + ) try: journal_id = get_entity_subscription_journal_id( @@ -541,13 +589,24 @@ async def update_subscriptions_handler( logger.error(f"Error update user subscriptions: {str(e)}") raise MoonstreamHTTPException(status_code=500, internal_error=e) - if abi: + if abi is not None and customer_id is not None: background_tasks.add_task( apply_moonworm_tasks, subscription_type_id, json_abi, address, ) + + create_seer_subscription( + db_session=db_session, + user_id=user.id, + customer_id=customer_id, + address=address, + subscription_type=subscription_type_id, + abi=json_abi, + subscription_id=subscription_id, + ) + subscription_required_fields = ( subscription.required_fields if subscription.required_fields is not None else {} ) diff --git a/moonstreamapi/moonstreamapi/settings.py b/moonstreamapi/moonstreamapi/settings.py index 70e0109a..3d1bc7c7 100644 --- a/moonstreamapi/moonstreamapi/settings.py +++ b/moonstreamapi/moonstreamapi/settings.py @@ -3,6 +3,7 @@ from bugout.app import Bugout from moonstreamdb.blockchain import AvailableBlockchainType +from moonstreamdbv3.db import MoonstreamDBIndexesEngine # Bugout BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev") @@ -408,3 +409,8 @@ MOONSTREAM_USAGE_REPORTS_JOURNAL_ID = os.environ.get( "MOONSTREAM_USAGE_REPORTS_JOURNAL_ID" ) + + +### Moonstreamdb v3 instance + +MOONSTREAM_DB_V3_INDEX_INSTANCE = MoonstreamDBIndexesEngine() diff --git a/moonstreamapi/moonstreamapi/version.py b/moonstreamapi/moonstreamapi/version.py index 80c607e1..cc7eaf07 100644 --- a/moonstreamapi/moonstreamapi/version.py +++ b/moonstreamapi/moonstreamapi/version.py @@ -2,4 +2,4 @@ Moonstream library and API version. """ -MOONSTREAMAPI_VERSION = "0.4.3" +MOONSTREAMAPI_VERSION = "0.4.4" diff --git a/moonstreamapi/requirements.txt b/moonstreamapi/requirements.txt index 623bb0e1..1bb95ee8 100644 --- a/moonstreamapi/requirements.txt +++ b/moonstreamapi/requirements.txt @@ -38,7 +38,7 @@ Mako==1.2.3 MarkupSafe==2.1.1 moonstream==0.1.1 moonstreamdb==0.4.5 -moonstreamdb-v3==0.0.9 +moonstreamdb-v3==0.0.13 multiaddr==0.0.9 multidict==6.0.2 netaddr==0.8.0 diff --git a/moonstreamapi/setup.py b/moonstreamapi/setup.py index b6b5b47f..4983a6cf 100644 --- a/moonstreamapi/setup.py +++ b/moonstreamapi/setup.py @@ -17,7 +17,7 @@ "fastapi", "moonstream", "moonstreamdb>=0.4.5", - "moonstreamdb-v3>=0.0.9", + "moonstreamdb-v3>=0.0.13", "humbug", "pydantic==1.10.2", "pyevmasm", diff --git a/moonstreamdb-v3/moonstreamdbv3/alembic_indexes/env.py b/moonstreamdb-v3/moonstreamdbv3/alembic_indexes/env.py index 947e9ed0..8a31d7f6 100644 --- a/moonstreamdb-v3/moonstreamdbv3/alembic_indexes/env.py +++ b/moonstreamdb-v3/moonstreamdbv3/alembic_indexes/env.py @@ -65,6 +65,7 @@ MantleSepoliaLogIndex, MantleSepoliaReorgs, AbiJobs, + AbiSubscriptions, ) @@ -107,6 +108,7 @@ def include_symbol(tablename, schema): MantleSepoliaLogIndex.__tablename__, MantleSepoliaReorgs.__tablename__, AbiJobs.__tablename__, + AbiSubscriptions.__tablename__, } diff --git a/moonstreamdb-v3/moonstreamdbv3/alembic_indexes/versions/f2c6aa92e5d2_add_v3_subscription.py b/moonstreamdb-v3/moonstreamdbv3/alembic_indexes/versions/f2c6aa92e5d2_add_v3_subscription.py new file mode 100644 index 00000000..e1c94285 --- /dev/null +++ b/moonstreamdb-v3/moonstreamdbv3/alembic_indexes/versions/f2c6aa92e5d2_add_v3_subscription.py @@ -0,0 +1,67 @@ +"""Add v3 subscription + +Revision ID: f2c6aa92e5d2 +Revises: 27086791044c +Create Date: 2024-07-11 19:41:49.899157 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "f2c6aa92e5d2" +down_revision: Union[str, None] = "27086791044c" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "abi_subscriptions", + sa.Column("abi_job_id", sa.UUID(), nullable=False), + sa.Column("subscription_id", sa.UUID(), nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("TIMEZONE('utc', statement_timestamp())"), + nullable=False, + ), + sa.ForeignKeyConstraint( + ["abi_job_id"], + ["abi_jobs.id"], + name=op.f("fk_abi_subscriptions_abi_job_id_abi_jobs"), + ), + sa.PrimaryKeyConstraint( + "abi_job_id", "subscription_id", name=op.f("pk_abi_subscriptions") + ), + ) + op.create_index( + op.f("ix_abi_subscriptions_abi_job_id"), + "abi_subscriptions", + ["abi_job_id"], + unique=False, + ) + op.create_index( + op.f("ix_abi_subscriptions_subscription_id"), + "abi_subscriptions", + ["subscription_id"], + unique=False, + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index( + op.f("ix_abi_subscriptions_subscription_id"), table_name="abi_subscriptions" + ) + op.drop_index( + op.f("ix_abi_subscriptions_abi_job_id"), table_name="abi_subscriptions" + ) + op.drop_table("abi_subscriptions") + # ### end Alembic commands ### diff --git a/moonstreamdb-v3/moonstreamdbv3/models_indexes.py b/moonstreamdb-v3/moonstreamdbv3/models_indexes.py index 254bdba5..439e654b 100644 --- a/moonstreamdb-v3/moonstreamdbv3/models_indexes.py +++ b/moonstreamdb-v3/moonstreamdbv3/models_indexes.py @@ -567,7 +567,11 @@ class AbiJobs(Base): __table_args__ = ( UniqueConstraint( - "chain", "address", "abi_selector", "customer_id", name="uq_abi_jobs" + "chain", + "address", + "abi_selector", + "customer_id", + name="uq_abi_jobs", ), ) @@ -589,3 +593,20 @@ class AbiJobs(Base): updated_at = Column( DateTime(timezone=True), server_default=utcnow(), nullable=False ) + + +class AbiSubscriptions(Base): + __tablename__ = "abi_subscriptions" + + __table_args__ = (PrimaryKeyConstraint("abi_job_id", "subscription_id"),) + + abi_job_id = Column( + UUID(as_uuid=True), + ForeignKey("abi_jobs.id"), + nullable=False, + index=True, + ) + subscription_id = Column(UUID(as_uuid=True), nullable=False, index=True) + created_at = Column( + DateTime(timezone=True), server_default=utcnow(), nullable=False + )