Skip to content

Commit

Permalink
Merge pull request #1108 from moonstream-to/add-v3-subscriptions
Browse files Browse the repository at this point in the history
Add subcription_id to abi_jobs.
  • Loading branch information
Andrei-Dolgolev authored Jul 15, 2024
2 parents 5387eea + 0292b2c commit dcc5eeb
Show file tree
Hide file tree
Showing 10 changed files with 386 additions and 11 deletions.
218 changes: 218 additions & 0 deletions moonstreamapi/moonstreamapi/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@
from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.models import EthereumLabel
from moonstreamdb.subscriptions import blockchain_by_subscription_id
from moonstreamdbv3.models_indexes import AbiJobs, AbiSubscriptions
from slugify import slugify # type: ignore
from sqlalchemy import text
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from web3 import Web3
from web3._utils.validation import validate_abi


from . import data
from .admin.subscription_types import CANONICAL_SUBSCRIPTION_TYPES
from .middleware import MoonstreamHTTPException
from .reporter import reporter
from .selectors_storage import selectors
Expand Down Expand Up @@ -559,6 +563,7 @@ def apply_moonworm_tasks(
],
}
)

except Exception as e:
logger.error(f"Error get moonworm tasks: {str(e)}")
reporter.error_report(e)
Expand All @@ -576,6 +581,193 @@ def apply_moonworm_tasks(
reporter.error_report(e)


def create_seer_subscription(
db_session: Session,
user_id: uuid.UUID,
customer_id: uuid.UUID,
address: str,
subscription_type: str,
abi: Any,
subscription_id: str,
) -> None:

chain = CANONICAL_SUBSCRIPTION_TYPES[subscription_type].blockchain

add_abi_to_db(
db_session=db_session,
user_id=user_id,
customer_id=customer_id,
address=address,
abis=abi,
chain=chain,
subscription_id=subscription_id,
)


def delete_seer_subscription(
db_session: Session,
subscription_id,
) -> None:
"""
Delete seer subscription from db
If there are no more subscriptions for this address,abi_selector delete all abis
"""

## Delete subscription from db

try:
db_session.query(AbiSubscriptions).filter(
AbiSubscriptions.subscription_id == subscription_id
).delete(synchronize_session=False)
db_session.commit()
except Exception as e:
logger.error(f"Error delete subscription from db: {str(e)}")
db_session.rollback()

not_connected_abi_jobs = (
db_session.query(AbiJobs)
.join(AbiSubscriptions, AbiJobs.id == AbiSubscriptions.abi_job_id, isouter=True)
.filter(AbiSubscriptions.subscription_id == None)
.cte("not_connected_abi_jobs")
)

## Delete abi jobs from db

try:
db_session.query(AbiJobs).filter(
AbiJobs.id.in_(db_session.query(not_connected_abi_jobs.c.id))
).delete(synchronize_session=False)

db_session.commit()
except Exception as e:
logger.error(f"Error delete abi jobs from db: {str(e)}")
db_session.rollback()


def add_abi_to_db(
db_session: Session,
user_id: uuid.UUID,
customer_id: uuid.UUID,
address: str,
abis: List[Dict[str, Any]],
chain: str,
subscription_id: Optional[str] = None,
) -> None:
abis_to_insert = []
subscriptions_to_insert = []

try:
existing_abi_job = (
db_session.query(AbiJobs)
.filter(AbiJobs.chain == chain)
.filter(AbiJobs.address == bytes.fromhex(address[2:]))
.filter(AbiJobs.customer_id == customer_id)
).all()
except Exception as e:
logger.error(f"Error get abi from db: {str(e)}")
db_session.rollback()
raise MoonstreamHTTPException(status_code=500, internal_error=e)

job_by_abi_selector = {abi.abi_selector: abi for abi in existing_abi_job}

for abi in abis:

if abi["type"] not in ("event", "function"):
continue

abi_selector = Web3.keccak(
text=abi["name"]
+ "("
+ ",".join(map(lambda x: x["type"], abi["inputs"]))
+ ")"
)

if abi["type"] == "function":
abi_selector = abi_selector[:4]

abi_selector = abi_selector.hex()

try:

if abi_selector in job_by_abi_selector:
# ABI job already exists, create subscription link
if subscription_id:
subscriptions_to_insert.append(
{
"abi_job_id": job_by_abi_selector[abi_selector].id,
"subscription_id": subscription_id,
}
)
else:
# ABI job does not exist, create new ABI job
abi_job = {
"address": (
bytes.fromhex(address[2:]) if address is not None else None
),
"user_id": user_id,
"customer_id": customer_id,
"abi_selector": abi_selector,
"chain": chain,
"abi_name": abi["name"],
"status": "active",
"historical_crawl_status": "pending",
"progress": 0,
"moonworm_task_pickedup": False,
"abi": json.dumps(abi),
}

try:
abi_job_instance = AbiJobs(**abi_job)
except Exception as e:
logger.error(
f"Error validating abi for address {address}:{abi} {str(e)}"
)
continue

abis_to_insert.append(abi_job_instance)
if subscription_id:
subscriptions_to_insert.append(
{
"abi_job_id": abi_job_instance.id,
"subscription_id": subscription_id,
}
)

except Exception as e:
logger.error(f"Error creating abi for address {address}:{abi} {str(e)}")
continue

try:
# Insert ABI jobs
if abis_to_insert:

insert_stmt = (
insert(AbiJobs)
.values([abi_job.__dict__ for abi_job in abis_to_insert])
.on_conflict_do_nothing(
index_elements=["address", "chain", "abi_selector"]
)
)
db_session.execute(insert_stmt)

# Insert corresponding subscriptions
if subscriptions_to_insert:

insert_stmt = (
insert(AbiSubscriptions)
.values(subscriptions_to_insert)
.on_conflict_do_nothing(
index_elements=["abi_job_id", "subscription_id"]
)
)
db_session.execute(insert_stmt)

db_session.commit()
except Exception as e:
logger.error(f"Error inserting abi to db: {str(e)}")
db_session.rollback()


def name_normalization(query_name: str) -> str:
"""
Sanitize provided query name.
Expand Down Expand Up @@ -992,3 +1184,29 @@ def create_resource_for_user(
raise MoonstreamHTTPException(status_code=500, internal_error=e)

return resource


def chekc_user_resource_access(
customer_id: uuid.UUID,
user_token: uuid.UUID,
) -> bool:
"""
Check if user has access to customer_id
"""

try:
response = bc.get_resource(
token=user_token,
resource_id=str(customer_id),
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)

except BugoutResponseException as e:
if e.status_code == 404:
return False
raise MoonstreamHTTPException(status_code=e.status_code, detail=e.detail)
except Exception as e:
logger.error(f"Error get customer: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)

return str(response.id) == customer_id
2 changes: 2 additions & 0 deletions moonstreamapi/moonstreamapi/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class UpdateSubscriptionRequest(BaseModel):
abi: Optional[str] = Form(None)
description: Optional[str] = Form(None)
tags: Optional[List[Dict[str, str]]] = Form(None)
customer_id: Optional[str] = Form(None)

@validator("tags", pre=True, always=True)
def transform_to_dict(cls, v):
Expand All @@ -263,6 +264,7 @@ class CreateSubscriptionRequest(BaseModel):
abi: Optional[str] = Form(None)
description: Optional[str] = Form(None)
tags: Optional[List[Dict[str, str]]] = Form(None)
customer_id: Optional[str] = Form(None)

@validator("tags", pre=True, always=True)
def transform_to_dict(cls, v):
Expand Down
Loading

0 comments on commit dcc5eeb

Please sign in to comment.