Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use dagster retry mechanism #2184

Merged
merged 5 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 53 additions & 27 deletions warehouse/oso_dagster/assets/eas_optimism.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from datetime import datetime, timedelta
from typing import Any, Dict

import dlt
from dagster import AssetExecutionContext, WeeklyPartitionsDefinition
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns
from pydantic import BaseModel
from pydantic import BaseModel, Field

from .open_collective import generate_steps
from ..utils.common import QueryArguments, QueryConfig, query_with_retry


class Attestation(BaseModel):
Expand All @@ -29,12 +30,35 @@ class Attestation(BaseModel):
isOffchain: bool


class EASOptimismParameters(QueryArguments):
take: int = Field(
...,
gt=0,
description="The number of nodes to fetch per query.",
)
skip: int = Field(
...,
ge=0,
description="The number of nodes to skip.",
)
where: Dict[str, Any] = Field(
...,
description="The where clause for the query.",
)


# The first attestation on EAS Optimism was created on the 07/28/2023 9:22:35 am
EAS_OPTIMISM_FIRST_ATTESTATION = datetime.fromtimestamp(1690557755)

# A sensible limit for the number of nodes to fetch per page
EAS_OPTIMISM_STEP_NODES_PER_PAGE = 10_000

# Minimum limit for the query, after which the query will fail
EAS_OPTIMISM_MINIMUM_LIMIT = 100

# The rate limit wait time in seconds
EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS = 65

# Kubernetes configuration for the asset materialization
K8S_CONFIG = {
"merge_behavior": "SHALLOW",
Expand Down Expand Up @@ -132,31 +156,32 @@ def get_optimism_eas_data(
"""
)

for step in generate_steps(total_count, EAS_OPTIMISM_STEP_NODES_PER_PAGE):
try:
query = client.execute(
attestations_query,
variable_values={
"take": EAS_OPTIMISM_STEP_NODES_PER_PAGE,
"skip": step,
"where": {
"time": {
"gte": date_from,
"lte": date_to,
},
},
},
)
context.log.info(
f"Fetching attestation {step}/{total_count}",
)
yield query["attestations"]
except Exception as exception:
context.log.warning(
f"An error occurred while fetching EAS data: '{exception}'. "
"We will stop this materialization instead of retrying for now."
)
return []
config = QueryConfig(
limit=EAS_OPTIMISM_STEP_NODES_PER_PAGE,
minimum_limit=EAS_OPTIMISM_MINIMUM_LIMIT,
query_arguments=EASOptimismParameters(
take=EAS_OPTIMISM_STEP_NODES_PER_PAGE,
offset=0,
skip=0,
where={
"time": {
"gte": date_from,
"lte": date_to,
}
},
),
step_key="skip",
extract_fn=lambda data: data["attestations"],
ratelimit_wait_seconds=EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS,
)

yield from query_with_retry(
client,
context,
attestations_query,
total_count,
config,
)


def get_optimism_eas(context: AssetExecutionContext, client: Client):
Expand Down Expand Up @@ -208,4 +233,5 @@ def attestations(context: AssetExecutionContext):
name="attestations",
columns=pydantic_to_dlt_nullable_columns(Attestation),
primary_key="id",
write_disposition="merge",
)
Loading