From c328ee8647b199f46efec19a8092eed41043a83d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20R=C3=ADos?= Date: Tue, 24 Sep 2024 16:26:42 +0200 Subject: [PATCH] fix: use dagster `retry` mechanism (#2184) * fix: use dagster `retry` mechanism * feat: implement debounced `retry` mechanism * fix: use `retry` mechanism with `open_collective` and `eas` assets * feat: implement `generic` retry mechanism * fix: remove `ShallowAccount` in favor of `Account` --- warehouse/oso_dagster/assets/eas_optimism.py | 80 ++++-- .../oso_dagster/assets/open_collective.py | 236 ++++++++---------- warehouse/oso_dagster/utils/common.py | 151 ++++++++++- 3 files changed, 308 insertions(+), 159 deletions(-) diff --git a/warehouse/oso_dagster/assets/eas_optimism.py b/warehouse/oso_dagster/assets/eas_optimism.py index 576d624f4..72f81095b 100644 --- a/warehouse/oso_dagster/assets/eas_optimism.py +++ b/warehouse/oso_dagster/assets/eas_optimism.py @@ -1,13 +1,14 @@ from datetime import datetime, timedelta +from typing import Any, Dict import dlt from dagster import AssetExecutionContext, WeeklyPartitionsDefinition from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns -from pydantic import BaseModel +from pydantic import BaseModel, Field -from .open_collective import generate_steps +from ..utils.common import QueryArguments, QueryConfig, query_with_retry class Attestation(BaseModel): @@ -29,12 +30,35 @@ class Attestation(BaseModel): isOffchain: bool +class EASOptimismParameters(QueryArguments): + take: int = Field( + ..., + gt=0, + description="The number of nodes to fetch per query.", + ) + skip: int = Field( + ..., + ge=0, + description="The number of nodes to skip.", + ) + where: Dict[str, Any] = Field( + ..., + description="The where clause for the query.", + ) + + # The first attestation on EAS Optimism was created on the 07/28/2023 9:22:35 am EAS_OPTIMISM_FIRST_ATTESTATION = datetime.fromtimestamp(1690557755) # A sensible limit for the number of nodes to fetch per page EAS_OPTIMISM_STEP_NODES_PER_PAGE = 10_000 +# Minimum limit for the query, after which the query will fail +EAS_OPTIMISM_MINIMUM_LIMIT = 100 + +# The rate limit wait time in seconds +EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS = 65 + # Kubernetes configuration for the asset materialization K8S_CONFIG = { "merge_behavior": "SHALLOW", @@ -132,31 +156,32 @@ def get_optimism_eas_data( """ ) - for step in generate_steps(total_count, EAS_OPTIMISM_STEP_NODES_PER_PAGE): - try: - query = client.execute( - attestations_query, - variable_values={ - "take": EAS_OPTIMISM_STEP_NODES_PER_PAGE, - "skip": step, - "where": { - "time": { - "gte": date_from, - "lte": date_to, - }, - }, - }, - ) - context.log.info( - f"Fetching attestation {step}/{total_count}", - ) - yield query["attestations"] - except Exception as exception: - context.log.warning( - f"An error occurred while fetching EAS data: '{exception}'. " - "We will stop this materialization instead of retrying for now." - ) - return [] + config = QueryConfig( + limit=EAS_OPTIMISM_STEP_NODES_PER_PAGE, + minimum_limit=EAS_OPTIMISM_MINIMUM_LIMIT, + query_arguments=EASOptimismParameters( + take=EAS_OPTIMISM_STEP_NODES_PER_PAGE, + offset=0, + skip=0, + where={ + "time": { + "gte": date_from, + "lte": date_to, + } + }, + ), + step_key="skip", + extract_fn=lambda data: data["attestations"], + ratelimit_wait_seconds=EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS, + ) + + yield from query_with_retry( + client, + context, + attestations_query, + total_count, + config, + ) def get_optimism_eas(context: AssetExecutionContext, client: Client): @@ -208,4 +233,5 @@ def attestations(context: AssetExecutionContext): name="attestations", columns=pydantic_to_dlt_nullable_columns(Attestation), primary_key="id", + write_disposition="merge", ) diff --git a/warehouse/oso_dagster/assets/open_collective.py b/warehouse/oso_dagster/assets/open_collective.py index e4a3fbf7d..76b7dbdb2 100644 --- a/warehouse/oso_dagster/assets/open_collective.py +++ b/warehouse/oso_dagster/assets/open_collective.py @@ -9,7 +9,9 @@ from oso_dagster import constants from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns from oso_dagster.utils.secrets import secret_ref_arg -from pydantic import UUID4, BaseModel +from pydantic import UUID4, BaseModel, Field + +from ..utils.common import QueryArguments, QueryConfig, query_with_retry class Host(BaseModel): @@ -44,23 +46,6 @@ class Location(BaseModel): long: float -class ShallowAccount(BaseModel): - id: str - slug: str - type: str - name: Optional[str] = None - legalName: Optional[str] = None - - -class PaymentMethod(BaseModel): - id: str - type: str - name: Optional[str] = None - data: Optional[str] = None - balance: Optional[Amount] = None - account: Optional[ShallowAccount] = None - - class SocialLink(BaseModel): type: str url: str @@ -92,6 +77,15 @@ class Account(BaseModel): location: Optional[Location] = None +class PaymentMethod(BaseModel): + id: str + type: str + name: Optional[str] = None + data: Optional[str] = None + balance: Optional[Amount] = None + account: Optional[Account] = None + + class PayoutMethod(BaseModel): id: str type: str @@ -101,7 +95,7 @@ class PayoutMethod(BaseModel): class VirtualCard(BaseModel): id: str - account: Optional[ShallowAccount] = None + account: Optional[Account] = None name: Optional[str] = None last4: Optional[str] = None status: Optional[str] = None @@ -132,13 +126,13 @@ class Expense(BaseModel): currency: Optional[str] = None type: Optional[str] = None status: Optional[str] = None - approvedBy: Optional[ShallowAccount] = None - paidBy: Optional[ShallowAccount] = None + approvedBy: Optional[Account] = None + paidBy: Optional[Account] = None onHold: Optional[bool] = None - account: Optional[ShallowAccount] = None - payee: Optional[ShallowAccount] = None + account: Optional[Account] = None + payee: Optional[Account] = None payeeLocation: Optional[Location] = None - createdByAccount: Optional[ShallowAccount] = None + createdByAccount: Optional[Account] = None host: Optional[Host] = None payoutMethod: Optional[PayoutMethod] = None paymentMethod: Optional[PaymentMethod] = None @@ -146,7 +140,7 @@ class Expense(BaseModel): items: Optional[List[Item]] = None invoiceInfo: Optional[str] = None merchantId: Optional[str] = None - requestedByAccount: Optional[ShallowAccount] = None + requestedByAccount: Optional[Account] = None requiredLegalDocuments: Optional[List[str]] = None @@ -179,8 +173,8 @@ class Transaction(BaseModel): hostFee: Optional[Amount] = None paymentProcessorFee: Optional[Amount] = None account: Optional[Account] = None - fromAccount: Optional[ShallowAccount] = None - toAccount: Optional[ShallowAccount] = None + fromAccount: Optional[Account] = None + toAccount: Optional[Account] = None expense: Optional[Expense] = None order: Optional[Order] = None createdAt: Optional[datetime] = None @@ -200,9 +194,15 @@ class Transaction(BaseModel): # The first transaction on Open Collective was on January 23, 2015 OPEN_COLLECTIVE_TX_EPOCH = "2015-01-23T05:00:00.000Z" -# The maximum number of nodes that can be retrieved per page +# The maximum is 1000 nodes per page, we will retry until a minimum of 100 nodes per page is reached OPEN_COLLECTIVE_MAX_NODES_PER_PAGE = 1000 +# The minimum number of nodes per page, if this threshold is reached, the query will fail +OPEN_COLLECTIVE_MIN_NODES_PER_PAGE = 100 + +# The rate limit wait time in seconds +OPEN_COLLECTIVE_RATELIMIT_WAIT_SECONDS = 65 + # Kubernetes configuration for the asset materialization K8S_CONFIG = { "merge_behavior": "SHALLOW", @@ -228,32 +228,15 @@ class Transaction(BaseModel): } -def generate_steps(total: int, step: int): - """ - Generates a sequence of numbers from 0 up to the specified total, incrementing by the specified step. - If the total is not divisible by the step, the last iteration will yield the remaining value. - - Args: - total (int): The desired total value. - step (int): The increment value for each iteration. - - Yields: - int: The next number in the sequence. - - Example: - >>> for num in generate_steps(10, 3): - ... print(num) - 0 - 3 - 6 - 9 - 10 - """ - - for i in range(0, total, step): - yield i - if total % step != 0: - yield total +class OpenCollectiveParameters(QueryArguments): + limit: int = Field( + ..., + gt=0, + description="Number of nodes to fetch per query.", + ) + type: str + dateFrom: str + dateTo: str def open_collective_graphql_amount(key: str): @@ -293,44 +276,64 @@ def open_collective_graphql_location(key: str): """ -def open_collective_graphql_shallow_account(key: str): - """Returns a GraphQL query string for shallow account information.""" +def open_collective_graphql_host(key: str): + """Returns a GraphQL query string for host information.""" return f""" {key} {{ id - slug type + slug name legalName + description + currency }} """ -def open_collective_graphql_host(key: str): - """Returns a GraphQL query string for host information.""" +def open_collective_graphql_payment_method(key: str): + """Returns a GraphQL query string for payment method information.""" return f""" {key} {{ id type - slug - name - legalName - description - currency + {open_collective_graphql_amount("balance")} + {open_collective_graphql_account("account")} }} """ -def open_collective_graphql_payment_method(key: str): - """Returns a GraphQL query string for payment method information.""" +def open_collective_graphql_account(key: str): + """Returns a GraphQL query string for account information.""" return f""" {key} {{ id + slug type name - data - {open_collective_graphql_amount("balance")} - {open_collective_graphql_shallow_account("account")} + legalName + description + longDescription + tags + socialLinks {{ + type + url + createdAt + updatedAt + }} + expensePolicy + isIncognito + imageUrl + backgroundImageUrl + createdAt + updatedAt + isArchived + isFrozen + isAdmin + isHost + isAdmin + emails + {open_collective_graphql_location("location")} }} """ @@ -416,37 +419,9 @@ def get_open_collective_data( {open_collective_graphql_amount("platformFee")} {open_collective_graphql_amount("hostFee")} {open_collective_graphql_amount("paymentProcessorFee")} - account {{ - id - slug - type - name - legalName - description - longDescription - tags - socialLinks {{ - type - url - createdAt - updatedAt - }} - expensePolicy - isIncognito - imageUrl - backgroundImageUrl - createdAt - updatedAt - isArchived - isFrozen - isAdmin - isHost - isAdmin - emails - {open_collective_graphql_location("location")} - }} - {open_collective_graphql_shallow_account("fromAccount")} - {open_collective_graphql_shallow_account("toAccount")} + {open_collective_graphql_account("account")} + {open_collective_graphql_account("fromAccount")} + {open_collective_graphql_account("toAccount")} expense {{ id legacyId @@ -458,13 +433,13 @@ def get_open_collective_data( currency type status - {open_collective_graphql_shallow_account("approvedBy")} - {open_collective_graphql_shallow_account("paidBy")} + {open_collective_graphql_account("approvedBy")} + {open_collective_graphql_account("paidBy")} onHold - {open_collective_graphql_shallow_account("account")} - {open_collective_graphql_shallow_account("payee")} + {open_collective_graphql_account("account")} + {open_collective_graphql_account("payee")} {open_collective_graphql_location("payeeLocation")} - {open_collective_graphql_shallow_account("createdByAccount")} + {open_collective_graphql_account("createdByAccount")} {open_collective_graphql_host("host")} payoutMethod {{ id @@ -476,7 +451,7 @@ def get_open_collective_data( {open_collective_graphql_payment_method("paymentMethod")} virtualCard {{ id - {open_collective_graphql_shallow_account("account")} + {open_collective_graphql_account("account")} name last4 status @@ -496,7 +471,7 @@ def get_open_collective_data( }} invoiceInfo merchantId - {open_collective_graphql_shallow_account("requestedByAccount")} + {open_collective_graphql_account("requestedByAccount")} requiredLegalDocuments }} order {{ @@ -536,29 +511,28 @@ def get_open_collective_data( context.log.info(f"Total count of transactions: {total_count}") - for step in generate_steps(total_count, OPEN_COLLECTIVE_MAX_NODES_PER_PAGE): - try: - query = client.execute( - expense_query, - variable_values={ - "limit": OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, - "offset": step, - "type": kind, - "dateFrom": date_from, - "dateTo": date_to, - }, - ) - context.log.info( - f"Fetching transaction {step}/{total_count} for type '{kind}'" - ) - yield query["transactions"]["nodes"] - except Exception as exception: - # TODO(jabolo): Implement a retry mechanism - context.log.warning( - f"An error occurred while fetching Open Collective data: '{exception}'. " - "We will stop this materialization instead of retrying for now." - ) - return [] + config = QueryConfig( + limit=OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, + minimum_limit=OPEN_COLLECTIVE_MIN_NODES_PER_PAGE, + query_arguments=OpenCollectiveParameters( + limit=OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, + offset=0, + type=kind, + dateFrom=date_from, + dateTo=date_to, + ), + step_key="offset", + extract_fn=lambda query: query["transactions"]["nodes"], + ratelimit_wait_seconds=OPEN_COLLECTIVE_RATELIMIT_WAIT_SECONDS, + ) + + yield from query_with_retry( + client, + context, + expense_query, + total_count, + config, + ) def get_open_collective_expenses( @@ -647,6 +621,7 @@ def expenses( name="expenses", columns=pydantic_to_dlt_nullable_columns(Transaction), primary_key="id", + write_disposition="merge", ) if constants.enable_bigquery: @@ -691,6 +666,7 @@ def deposits( name="deposits", columns=pydantic_to_dlt_nullable_columns(Transaction), primary_key="id", + write_disposition="merge", ) if constants.enable_bigquery: diff --git a/warehouse/oso_dagster/utils/common.py b/warehouse/oso_dagster/utils/common.py index 6f356ec07..bd74356d6 100644 --- a/warehouse/oso_dagster/utils/common.py +++ b/warehouse/oso_dagster/utils/common.py @@ -1,9 +1,17 @@ from enum import Enum -from typing import TypeVar, Never, Optional +from time import sleep +from typing import Any, Callable, Dict, Generator, Never, Optional, TypeVar + +from dagster import AssetExecutionContext +from gql import Client +from graphql import DocumentNode +from pydantic import BaseModel, Field + from .errors import NullOrUndefinedValueError T = TypeVar("T") + # An enum for specifying time intervals class TimeInterval(Enum): Hourly = 0 @@ -11,6 +19,7 @@ class TimeInterval(Enum): Weekly = 2 Monthly = 3 + # Configures how we should handle incoming data class SourceMode(Enum): # Add new time-partitioned data incrementally @@ -18,16 +27,19 @@ class SourceMode(Enum): # Overwrite the entire dataset on each import Overwrite = 1 + # Simple snake case to camel case def to_camel_case(snake_str): return "".join(x.capitalize() for x in snake_str.lower().split("_")) + def to_lower_camel_case(snake_str): # We capitalize the first letter of each component except the first one # with the 'capitalize' method and join them together. camel_string = to_camel_case(snake_str) return snake_str[0].lower() + camel_string[1:] + def safeCast[T](x: T) -> T: """ Explicitly mark that a cast is safe. @@ -36,6 +48,7 @@ def safeCast[T](x: T) -> T: y: T = x return y + def assertNever(_x: Never) -> Never: """ Asserts that a branch is never taken. @@ -43,6 +56,7 @@ def assertNever(_x: Never) -> Never: """ raise Exception("unexpected branch taken") + def ensure[T](x: Optional[T], msg: str) -> T: """ Asserts that a value is not null or undefined. @@ -65,4 +79,137 @@ def ensure[T](x: Optional[T], msg: str) -> T: ) else: y: T = x - return y \ No newline at end of file + return y + + +def generate_steps(total: int, step: int): + """ + Generates a sequence of numbers from 0 up to the specified total, incrementing by the specified step. + If the total is not divisible by the step, the last iteration will yield the remaining value. + + Args: + total (int): The desired total value. + step (int): The increment value for each iteration. + + Yields: + int: The next number in the sequence. + + Example: + >>> for num in generate_steps(10, 3): + ... print(num) + 0 + 3 + 6 + 9 + 10 + """ + + for i in range(0, total, step): + yield i + if total % step != 0: + yield total + + +class QueryArguments(BaseModel): + offset: int = Field( + ..., + ge=0, + description="The offset to start fetching nodes from.", + ) + + +class QueryConfig(BaseModel): + limit: int = Field( + ..., + gt=0, + description="Maximum number of nodes to fetch per query.", + ) + minimum_limit: int = Field( + ..., + gt=0, + description="Minimum limit for the query, after which the query will fail.", + ) + query_arguments: QueryArguments = Field( + ..., + description="Arguments for the query to be executed.", + ) + step_key: str = Field( + ..., + description="The key in the query arguments to update with the offset.", + ) + extract_fn: Callable[[Dict[str, Any]], Any] = Field( + ..., + description="Function to extract the data from the query response.", + ) + ratelimit_wait_seconds: int = Field( + ..., + gt=0, + description="Time to wait before retrying the query after being rate limited.", + ) + + +def query_with_retry( + client: Client, + context: AssetExecutionContext, + query_str: DocumentNode, + total_count: int, + config: QueryConfig, +) -> Generator[Dict[str, Any], None, None]: + """ + Queries the GraphQL API with retry logic. The query will be retried + with a lower limit if it fails. If the limit reaches the minimum limit, + the query will fail. It will also handle rate limiting by waiting for + the specified number of seconds before retrying. + + Args: + client (Client): The GraphQL client to execute the query. + context (AssetExecutionContext): The context for the asset. + query_str (DocumentNode): The GraphQL query to execute. + total_count (int): The total number of nodes to fetch. + config (QueryConfig): The configuration for the query. + + Yields: + Dict[str, Any]: A dictionary containing the transaction nodes. + """ + + max_retries = 3 + current_index = 0 + limit = config.limit + steps = list(generate_steps(total_count, limit)) + + while limit >= config.minimum_limit: + while current_index < len(steps): + setattr(config.query_arguments, config.step_key, steps[current_index]) + + if max_retries == 0: + raise ValueError( + "Query failed after reaching the maximum number of retries." + ) + + try: + query = client.execute( + query_str, variable_values=config.query_arguments.model_dump() + ) + context.log.info( + f"Fetching transaction {steps[current_index]}/{total_count}" + ) + current_index += 1 + yield config.extract_fn(query) + except Exception as exception: + context.log.error(f"Query failed with error: {exception}") + if "429" in str(exception): + context.log.info( + f"Got rate-limited, retrying in {config.ratelimit_wait_seconds} seconds." + ) + sleep(config.ratelimit_wait_seconds) + max_retries -= 1 + continue + limit //= 2 + steps = list(generate_steps(total_count, limit)) + current_index *= 2 + if limit < config.minimum_limit: + raise ValueError( + f"Query failed after reaching the minimum limit of {limit}." + ) from exception + context.log.info(f"Retrying with limit: {limit}") + break