Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use dagster retry mechanism #2184

Merged
merged 5 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions warehouse/oso_dagster/assets/eas_optimism.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,30 +133,23 @@ def get_optimism_eas_data(
)

for step in generate_steps(total_count, EAS_OPTIMISM_STEP_NODES_PER_PAGE):
try:
query = client.execute(
attestations_query,
variable_values={
"take": EAS_OPTIMISM_STEP_NODES_PER_PAGE,
"skip": step,
"where": {
"time": {
"gte": date_from,
"lte": date_to,
},
query = client.execute(
attestations_query,
variable_values={
"take": EAS_OPTIMISM_STEP_NODES_PER_PAGE,
"skip": step,
"where": {
"time": {
"gte": date_from,
"lte": date_to,
},
},
)
context.log.info(
f"Fetching attestation {step}/{total_count}",
)
yield query["attestations"]
except Exception as exception:
context.log.warning(
f"An error occurred while fetching EAS data: '{exception}'. "
"We will stop this materialization instead of retrying for now."
)
return []
},
)
context.log.info(
f"Fetching attestation {step}/{total_count}",
)
yield query["attestations"]


def get_optimism_eas(context: AssetExecutionContext, client: Client):
Expand Down Expand Up @@ -208,4 +201,5 @@ def attestations(context: AssetExecutionContext):
name="attestations",
columns=pydantic_to_dlt_nullable_columns(Attestation),
primary_key="id",
write_disposition="merge",
)
130 changes: 102 additions & 28 deletions warehouse/oso_dagster/assets/open_collective.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from datetime import datetime, timedelta
from typing import List, Literal, Optional
from time import sleep
from typing import Any, Dict, Generator, List, Literal, Optional

import dlt
from dagster import AssetExecutionContext, WeeklyPartitionsDefinition
from dlt.destinations.adapters import bigquery_adapter
from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
from graphql import DocumentNode
from oso_dagster import constants
from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns
from oso_dagster.utils.secrets import secret_ref_arg
from pydantic import UUID4, BaseModel
from pydantic import UUID4, BaseModel, Field


class Host(BaseModel):
Expand Down Expand Up @@ -197,12 +199,31 @@ class Transaction(BaseModel):
host: Optional[Host] = None


class QueryParameters(BaseModel):
limit: int = Field(
...,
gt=0,
description="Number of nodes to fetch per query.",
)
offset: int = Field(
...,
ge=0,
description="Offset for pagination.",
)
type: str
dateFrom: str
dateTo: str


# The first transaction on Open Collective was on January 23, 2015
OPEN_COLLECTIVE_TX_EPOCH = "2015-01-23T05:00:00.000Z"

# The maximum number of nodes that can be retrieved per page
# The maximum is 1000 nodes per page, we will retry until a minimum of 100 nodes per page is reached
OPEN_COLLECTIVE_MAX_NODES_PER_PAGE = 1000

# The minimum number of nodes per page, if this threshold is reached, the query will fail
OPEN_COLLECTIVE_MIN_NODES_PER_PAGE = 100

# Kubernetes configuration for the asset materialization
K8S_CONFIG = {
"merge_behavior": "SHALLOW",
Expand Down Expand Up @@ -327,14 +348,79 @@ def open_collective_graphql_payment_method(key: str):
{key} {{
id
type
name
data
{open_collective_graphql_amount("balance")}
{open_collective_graphql_shallow_account("account")}
}}
"""


def query_with_retry(
client: Client,
context: AssetExecutionContext,
expense_query: DocumentNode,
total_count: int,
kind: Literal["DEBIT", "CREDIT"],
date_from: str,
date_to: str,
) -> Generator[Dict[str, Any], None, None]:
"""
Queries Open Collective data with a retry mechanism, reducing the
limit by half on each retry. If the limit reaches the minimum threshold,
the query will fail.

Args:
client (Client): The client object used to execute the GraphQL queries.
context (AssetExecutionContext): The execution context of the asset.
expense_query (DocumentNode): The GraphQL query document.
total_count (int): The total count of transactions.
kind (str): The transaction type. Either "DEBIT" or "CREDIT".
date_from (str): The start date for the query.
date_to (str): The end date for the query.

Yields:
Dict[str, Any]: A dictionary containing the transaction nodes.
"""

current_index = 0
limit = OPEN_COLLECTIVE_MAX_NODES_PER_PAGE
steps = list(generate_steps(total_count, limit))

while limit >= OPEN_COLLECTIVE_MIN_NODES_PER_PAGE:
while current_index < len(steps):
offset = steps[current_index]
params = QueryParameters(
limit=limit,
offset=offset,
type=kind,
dateFrom=date_from,
dateTo=date_to,
)

try:
query = client.execute(
expense_query, variable_values=params.model_dump()
)
context.log.info(
f"Fetching transaction {offset}/{total_count} for type '{kind}'"
)
current_index += 1
yield query["transactions"]["nodes"]
except Exception as exception:
context.log.error(f"Query failed with error: {exception}")
if "429" in str(exception):
context.log.info("Got rate limited, retrying in 65 seconds.")
sleep(65)
continue
limit //= 2
steps = list(generate_steps(total_count, limit))
if limit < OPEN_COLLECTIVE_MIN_NODES_PER_PAGE:
raise ValueError(
f"Query failed after reaching the minimum limit of {limit}."
) from exception
context.log.info(f"Retrying with limit: {limit}")
break


def get_open_collective_data(
context: AssetExecutionContext,
client: Client,
Expand Down Expand Up @@ -536,29 +622,15 @@ def get_open_collective_data(

context.log.info(f"Total count of transactions: {total_count}")

for step in generate_steps(total_count, OPEN_COLLECTIVE_MAX_NODES_PER_PAGE):
try:
query = client.execute(
expense_query,
variable_values={
"limit": OPEN_COLLECTIVE_MAX_NODES_PER_PAGE,
"offset": step,
"type": kind,
"dateFrom": date_from,
"dateTo": date_to,
},
)
context.log.info(
f"Fetching transaction {step}/{total_count} for type '{kind}'"
)
yield query["transactions"]["nodes"]
except Exception as exception:
# TODO(jabolo): Implement a retry mechanism
context.log.warning(
f"An error occurred while fetching Open Collective data: '{exception}'. "
"We will stop this materialization instead of retrying for now."
)
return []
yield from query_with_retry(
client,
context,
expense_query,
total_count,
kind,
date_from,
date_to,
)


def get_open_collective_expenses(
Expand Down Expand Up @@ -647,6 +719,7 @@ def expenses(
name="expenses",
columns=pydantic_to_dlt_nullable_columns(Transaction),
primary_key="id",
write_disposition="merge",
)

if constants.enable_bigquery:
Expand Down Expand Up @@ -691,6 +764,7 @@ def deposits(
name="deposits",
columns=pydantic_to_dlt_nullable_columns(Transaction),
primary_key="id",
write_disposition="merge",
)

if constants.enable_bigquery:
Expand Down