From a717118068da8c756ccd1454b77a8f5a40ac1c5c Mon Sep 17 00:00:00 2001 From: Javier R Date: Mon, 23 Sep 2024 10:54:03 +0200 Subject: [PATCH 1/5] fix: use dagster `retry` mechanism --- warehouse/oso_dagster/assets/eas_optimism.py | 38 +++++++---------- .../oso_dagster/assets/open_collective.py | 42 +++++++------------ 2 files changed, 32 insertions(+), 48 deletions(-) diff --git a/warehouse/oso_dagster/assets/eas_optimism.py b/warehouse/oso_dagster/assets/eas_optimism.py index 576d624f..771f6d51 100644 --- a/warehouse/oso_dagster/assets/eas_optimism.py +++ b/warehouse/oso_dagster/assets/eas_optimism.py @@ -133,30 +133,23 @@ def get_optimism_eas_data( ) for step in generate_steps(total_count, EAS_OPTIMISM_STEP_NODES_PER_PAGE): - try: - query = client.execute( - attestations_query, - variable_values={ - "take": EAS_OPTIMISM_STEP_NODES_PER_PAGE, - "skip": step, - "where": { - "time": { - "gte": date_from, - "lte": date_to, - }, + query = client.execute( + attestations_query, + variable_values={ + "take": EAS_OPTIMISM_STEP_NODES_PER_PAGE, + "skip": step, + "where": { + "time": { + "gte": date_from, + "lte": date_to, }, }, - ) - context.log.info( - f"Fetching attestation {step}/{total_count}", - ) - yield query["attestations"] - except Exception as exception: - context.log.warning( - f"An error occurred while fetching EAS data: '{exception}'. " - "We will stop this materialization instead of retrying for now." - ) - return [] + }, + ) + context.log.info( + f"Fetching attestation {step}/{total_count}", + ) + yield query["attestations"] def get_optimism_eas(context: AssetExecutionContext, client: Client): @@ -208,4 +201,5 @@ def attestations(context: AssetExecutionContext): name="attestations", columns=pydantic_to_dlt_nullable_columns(Attestation), primary_key="id", + write_disposition="merge", ) diff --git a/warehouse/oso_dagster/assets/open_collective.py b/warehouse/oso_dagster/assets/open_collective.py index e4a3fbf7..92bae4cb 100644 --- a/warehouse/oso_dagster/assets/open_collective.py +++ b/warehouse/oso_dagster/assets/open_collective.py @@ -200,8 +200,8 @@ class Transaction(BaseModel): # The first transaction on Open Collective was on January 23, 2015 OPEN_COLLECTIVE_TX_EPOCH = "2015-01-23T05:00:00.000Z" -# The maximum number of nodes that can be retrieved per page -OPEN_COLLECTIVE_MAX_NODES_PER_PAGE = 1000 +# The maximum are 1000 per page, we will use half since their API throws 503s sporadically +OPEN_COLLECTIVE_MAX_NODES_PER_PAGE = 500 # Kubernetes configuration for the asset materialization K8S_CONFIG = { @@ -327,8 +327,6 @@ def open_collective_graphql_payment_method(key: str): {key} {{ id type - name - data {open_collective_graphql_amount("balance")} {open_collective_graphql_shallow_account("account")} }} @@ -537,28 +535,18 @@ def get_open_collective_data( context.log.info(f"Total count of transactions: {total_count}") for step in generate_steps(total_count, OPEN_COLLECTIVE_MAX_NODES_PER_PAGE): - try: - query = client.execute( - expense_query, - variable_values={ - "limit": OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, - "offset": step, - "type": kind, - "dateFrom": date_from, - "dateTo": date_to, - }, - ) - context.log.info( - f"Fetching transaction {step}/{total_count} for type '{kind}'" - ) - yield query["transactions"]["nodes"] - except Exception as exception: - # TODO(jabolo): Implement a retry mechanism - context.log.warning( - f"An error occurred while fetching Open Collective data: '{exception}'. " - "We will stop this materialization instead of retrying for now." - ) - return [] + query = client.execute( + expense_query, + variable_values={ + "limit": OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, + "offset": step, + "type": kind, + "dateFrom": date_from, + "dateTo": date_to, + }, + ) + context.log.info(f"Fetching transaction {step}/{total_count} for type '{kind}'") + yield query["transactions"]["nodes"] def get_open_collective_expenses( @@ -647,6 +635,7 @@ def expenses( name="expenses", columns=pydantic_to_dlt_nullable_columns(Transaction), primary_key="id", + write_disposition="merge", ) if constants.enable_bigquery: @@ -691,6 +680,7 @@ def deposits( name="deposits", columns=pydantic_to_dlt_nullable_columns(Transaction), primary_key="id", + write_disposition="merge", ) if constants.enable_bigquery: From 8b3a1cece3a52e79e22bf30094eedebe935a5611 Mon Sep 17 00:00:00 2001 From: Javier R Date: Mon, 23 Sep 2024 17:55:18 +0200 Subject: [PATCH 2/5] feat: implement debounced `retry` mechanism --- .../oso_dagster/assets/open_collective.py | 118 +++++++++++++++--- 1 file changed, 101 insertions(+), 17 deletions(-) diff --git a/warehouse/oso_dagster/assets/open_collective.py b/warehouse/oso_dagster/assets/open_collective.py index 92bae4cb..b48a002b 100644 --- a/warehouse/oso_dagster/assets/open_collective.py +++ b/warehouse/oso_dagster/assets/open_collective.py @@ -1,15 +1,17 @@ from datetime import datetime, timedelta -from typing import List, Literal, Optional +from time import sleep +from typing import Any, Dict, Generator, List, Literal, Optional import dlt from dagster import AssetExecutionContext, WeeklyPartitionsDefinition from dlt.destinations.adapters import bigquery_adapter from gql import Client, gql from gql.transport.requests import RequestsHTTPTransport +from graphql import DocumentNode from oso_dagster import constants from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns from oso_dagster.utils.secrets import secret_ref_arg -from pydantic import UUID4, BaseModel +from pydantic import UUID4, BaseModel, Field class Host(BaseModel): @@ -197,11 +199,30 @@ class Transaction(BaseModel): host: Optional[Host] = None +class QueryParameters(BaseModel): + limit: int = Field( + ..., + gt=0, + description="Number of nodes to fetch per query.", + ) + offset: int = Field( + ..., + ge=0, + description="Offset for pagination.", + ) + type: str + dateFrom: str + dateTo: str + + # The first transaction on Open Collective was on January 23, 2015 OPEN_COLLECTIVE_TX_EPOCH = "2015-01-23T05:00:00.000Z" -# The maximum are 1000 per page, we will use half since their API throws 503s sporadically -OPEN_COLLECTIVE_MAX_NODES_PER_PAGE = 500 +# The maximum is 1000 nodes per page, we will retry until a minimum of 100 nodes per page is reached +OPEN_COLLECTIVE_MAX_NODES_PER_PAGE = 1000 + +# The minimum number of nodes per page, if this threshold is reached, the query will fail +OPEN_COLLECTIVE_MIN_NODES_PER_PAGE = 100 # Kubernetes configuration for the asset materialization K8S_CONFIG = { @@ -333,6 +354,73 @@ def open_collective_graphql_payment_method(key: str): """ +def query_with_retry( + client: Client, + context: AssetExecutionContext, + expense_query: DocumentNode, + total_count: int, + kind: Literal["DEBIT", "CREDIT"], + date_from: str, + date_to: str, +) -> Generator[Dict[str, Any], None, None]: + """ + Queries Open Collective data with a retry mechanism, reducing the + limit by half on each retry. If the limit reaches the minimum threshold, + the query will fail. + + Args: + client (Client): The client object used to execute the GraphQL queries. + context (AssetExecutionContext): The execution context of the asset. + expense_query (DocumentNode): The GraphQL query document. + total_count (int): The total count of transactions. + kind (str): The transaction type. Either "DEBIT" or "CREDIT". + date_from (str): The start date for the query. + date_to (str): The end date for the query. + + Yields: + Dict[str, Any]: A dictionary containing the transaction nodes. + """ + + current_index = 0 + limit = OPEN_COLLECTIVE_MAX_NODES_PER_PAGE + steps = list(generate_steps(total_count, limit)) + + while limit >= OPEN_COLLECTIVE_MIN_NODES_PER_PAGE: + while current_index < len(steps): + offset = steps[current_index] + params = QueryParameters( + limit=limit, + offset=offset, + type=kind, + dateFrom=date_from, + dateTo=date_to, + ) + + try: + query = client.execute( + expense_query, variable_values=params.model_dump() + ) + context.log.info( + f"Fetching transaction {offset}/{total_count} for type '{kind}'" + ) + current_index += 1 + yield query["transactions"]["nodes"] + except Exception as exception: + context.log.error(f"Query failed with error: {exception}") + if "429" in str(exception): + context.log.info("Got rate limited, retrying in 65 seconds.") + sleep(65) + continue + limit //= 2 + steps = list(generate_steps(total_count, limit)) + if limit < OPEN_COLLECTIVE_MIN_NODES_PER_PAGE: + raise ValueError( + f"Query failed after reaching the minimum limit of {limit}." + ) from exception + context.log.info(f"Retrying with limit: {limit}") + break + + def get_open_collective_data( context: AssetExecutionContext, client: Client, @@ -534,19 +622,15 @@ def get_open_collective_data( context.log.info(f"Total count of transactions: {total_count}") - for step in generate_steps(total_count, OPEN_COLLECTIVE_MAX_NODES_PER_PAGE): - query = client.execute( - expense_query, - variable_values={ - "limit": OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, - "offset": step, - "type": kind, - "dateFrom": date_from, - "dateTo": date_to, - }, - ) - context.log.info(f"Fetching transaction {step}/{total_count} for type '{kind}'") - yield query["transactions"]["nodes"] + yield from query_with_retry( + client, + context, + expense_query, + total_count, + kind, + date_from, + date_to, + ) def get_open_collective_expenses( From 37764f2e6971a094dc3b12da43a3f6ebf37f1ba8 Mon Sep 17 00:00:00 2001 From: Javier R Date: Mon, 23 Sep 2024 23:35:01 +0200 Subject: [PATCH 3/5] fix: use `retry` mechanism with `open_collective` and `eas` assets --- warehouse/oso_dagster/assets/eas_optimism.py | 70 ++++++--- .../oso_dagster/assets/open_collective.py | 146 ++++-------------- 2 files changed, 82 insertions(+), 134 deletions(-) diff --git a/warehouse/oso_dagster/assets/eas_optimism.py b/warehouse/oso_dagster/assets/eas_optimism.py index 771f6d51..72f81095 100644 --- a/warehouse/oso_dagster/assets/eas_optimism.py +++ b/warehouse/oso_dagster/assets/eas_optimism.py @@ -1,13 +1,14 @@ from datetime import datetime, timedelta +from typing import Any, Dict import dlt from dagster import AssetExecutionContext, WeeklyPartitionsDefinition from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns -from pydantic import BaseModel +from pydantic import BaseModel, Field -from .open_collective import generate_steps +from ..utils.common import QueryArguments, QueryConfig, query_with_retry class Attestation(BaseModel): @@ -29,12 +30,35 @@ class Attestation(BaseModel): isOffchain: bool +class EASOptimismParameters(QueryArguments): + take: int = Field( + ..., + gt=0, + description="The number of nodes to fetch per query.", + ) + skip: int = Field( + ..., + ge=0, + description="The number of nodes to skip.", + ) + where: Dict[str, Any] = Field( + ..., + description="The where clause for the query.", + ) + + # The first attestation on EAS Optimism was created on the 07/28/2023 9:22:35 am EAS_OPTIMISM_FIRST_ATTESTATION = datetime.fromtimestamp(1690557755) # A sensible limit for the number of nodes to fetch per page EAS_OPTIMISM_STEP_NODES_PER_PAGE = 10_000 +# Minimum limit for the query, after which the query will fail +EAS_OPTIMISM_MINIMUM_LIMIT = 100 + +# The rate limit wait time in seconds +EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS = 65 + # Kubernetes configuration for the asset materialization K8S_CONFIG = { "merge_behavior": "SHALLOW", @@ -132,24 +156,32 @@ def get_optimism_eas_data( """ ) - for step in generate_steps(total_count, EAS_OPTIMISM_STEP_NODES_PER_PAGE): - query = client.execute( - attestations_query, - variable_values={ - "take": EAS_OPTIMISM_STEP_NODES_PER_PAGE, - "skip": step, - "where": { - "time": { - "gte": date_from, - "lte": date_to, - }, - }, + config = QueryConfig( + limit=EAS_OPTIMISM_STEP_NODES_PER_PAGE, + minimum_limit=EAS_OPTIMISM_MINIMUM_LIMIT, + query_arguments=EASOptimismParameters( + take=EAS_OPTIMISM_STEP_NODES_PER_PAGE, + offset=0, + skip=0, + where={ + "time": { + "gte": date_from, + "lte": date_to, + } }, - ) - context.log.info( - f"Fetching attestation {step}/{total_count}", - ) - yield query["attestations"] + ), + step_key="skip", + extract_fn=lambda data: data["attestations"], + ratelimit_wait_seconds=EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS, + ) + + yield from query_with_retry( + client, + context, + attestations_query, + total_count, + config, + ) def get_optimism_eas(context: AssetExecutionContext, client: Client): diff --git a/warehouse/oso_dagster/assets/open_collective.py b/warehouse/oso_dagster/assets/open_collective.py index b48a002b..b7649913 100644 --- a/warehouse/oso_dagster/assets/open_collective.py +++ b/warehouse/oso_dagster/assets/open_collective.py @@ -1,18 +1,18 @@ from datetime import datetime, timedelta -from time import sleep -from typing import Any, Dict, Generator, List, Literal, Optional +from typing import List, Literal, Optional import dlt from dagster import AssetExecutionContext, WeeklyPartitionsDefinition from dlt.destinations.adapters import bigquery_adapter from gql import Client, gql from gql.transport.requests import RequestsHTTPTransport -from graphql import DocumentNode from oso_dagster import constants from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns from oso_dagster.utils.secrets import secret_ref_arg from pydantic import UUID4, BaseModel, Field +from ..utils.common import QueryArguments, QueryConfig, query_with_retry + class Host(BaseModel): id: UUID4 @@ -199,22 +199,6 @@ class Transaction(BaseModel): host: Optional[Host] = None -class QueryParameters(BaseModel): - limit: int = Field( - ..., - gt=0, - description="Number of nodes to fetch per query.", - ) - offset: int = Field( - ..., - ge=0, - description="Offset for pagination.", - ) - type: str - dateFrom: str - dateTo: str - - # The first transaction on Open Collective was on January 23, 2015 OPEN_COLLECTIVE_TX_EPOCH = "2015-01-23T05:00:00.000Z" @@ -224,6 +208,9 @@ class QueryParameters(BaseModel): # The minimum number of nodes per page, if this threshold is reached, the query will fail OPEN_COLLECTIVE_MIN_NODES_PER_PAGE = 100 +# The rate limit wait time in seconds +OPEN_COLLECTIVE_RATELIMIT_WAIT_SECONDS = 65 + # Kubernetes configuration for the asset materialization K8S_CONFIG = { "merge_behavior": "SHALLOW", @@ -249,32 +236,15 @@ class QueryParameters(BaseModel): } -def generate_steps(total: int, step: int): - """ - Generates a sequence of numbers from 0 up to the specified total, incrementing by the specified step. - If the total is not divisible by the step, the last iteration will yield the remaining value. - - Args: - total (int): The desired total value. - step (int): The increment value for each iteration. - - Yields: - int: The next number in the sequence. - - Example: - >>> for num in generate_steps(10, 3): - ... print(num) - 0 - 3 - 6 - 9 - 10 - """ - - for i in range(0, total, step): - yield i - if total % step != 0: - yield total +class OpenCollectiveParameters(QueryArguments): + limit: int = Field( + ..., + gt=0, + description="Number of nodes to fetch per query.", + ) + type: str + dateFrom: str + dateTo: str def open_collective_graphql_amount(key: str): @@ -354,73 +324,6 @@ def open_collective_graphql_payment_method(key: str): """ -def query_with_retry( - client: Client, - context: AssetExecutionContext, - expense_query: DocumentNode, - total_count: int, - kind: Literal["DEBIT", "CREDIT"], - date_from: str, - date_to: str, -) -> Generator[Dict[str, Any], None, None]: - """ - Queries Open Collective data with a retry mechanism, reducing the - limit by half on each retry. If the limit reaches the minimum threshold, - the query will fail. - - Args: - client (Client): The client object used to execute the GraphQL queries. - context (AssetExecutionContext): The execution context of the asset. - expense_query (DocumentNode): The GraphQL query document. - total_count (int): The total count of transactions. - kind (str): The transaction type. Either "DEBIT" or "CREDIT". - date_from (str): The start date for the query. - date_to (str): The end date for the query. - - Yields: - Dict[str, Any]: A dictionary containing the transaction nodes. - """ - - current_index = 0 - limit = OPEN_COLLECTIVE_MAX_NODES_PER_PAGE - steps = list(generate_steps(total_count, limit)) - - while limit >= OPEN_COLLECTIVE_MIN_NODES_PER_PAGE: - while current_index < len(steps): - offset = steps[current_index] - params = QueryParameters( - limit=limit, - offset=offset, - type=kind, - dateFrom=date_from, - dateTo=date_to, - ) - - try: - query = client.execute( - expense_query, variable_values=params.model_dump() - ) - context.log.info( - f"Fetching transaction {offset}/{total_count} for type '{kind}'" - ) - current_index += 1 - yield query["transactions"]["nodes"] - except Exception as exception: - context.log.error(f"Query failed with error: {exception}") - if "429" in str(exception): - context.log.info("Got rate limited, retrying in 65 seconds.") - sleep(65) - continue - limit //= 2 - steps = list(generate_steps(total_count, limit)) - if limit < OPEN_COLLECTIVE_MIN_NODES_PER_PAGE: - raise ValueError( - f"Query failed after reaching the minimum limit of {limit}." - ) from exception - context.log.info(f"Retrying with limit: {limit}") - break - - def get_open_collective_data( context: AssetExecutionContext, client: Client, @@ -622,14 +525,27 @@ def get_open_collective_data( context.log.info(f"Total count of transactions: {total_count}") + config = QueryConfig( + limit=OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, + minimum_limit=OPEN_COLLECTIVE_MIN_NODES_PER_PAGE, + query_arguments=OpenCollectiveParameters( + limit=OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, + offset=0, + type=kind, + dateFrom=date_from, + dateTo=date_to, + ), + step_key="offset", + extract_fn=lambda query: query["transactions"]["nodes"], + ratelimit_wait_seconds=OPEN_COLLECTIVE_RATELIMIT_WAIT_SECONDS, + ) + yield from query_with_retry( client, context, expense_query, total_count, - kind, - date_from, - date_to, + config, ) From 5296fdf1ac72869633cd08dccfffe6d4b1c14c23 Mon Sep 17 00:00:00 2001 From: Javier R Date: Mon, 23 Sep 2024 23:36:19 +0200 Subject: [PATCH 4/5] feat: implement `generic` retry mechanism --- warehouse/oso_dagster/utils/common.py | 151 +++++++++++++++++++++++++- 1 file changed, 149 insertions(+), 2 deletions(-) diff --git a/warehouse/oso_dagster/utils/common.py b/warehouse/oso_dagster/utils/common.py index 6f356ec0..bd74356d 100644 --- a/warehouse/oso_dagster/utils/common.py +++ b/warehouse/oso_dagster/utils/common.py @@ -1,9 +1,17 @@ from enum import Enum -from typing import TypeVar, Never, Optional +from time import sleep +from typing import Any, Callable, Dict, Generator, Never, Optional, TypeVar + +from dagster import AssetExecutionContext +from gql import Client +from graphql import DocumentNode +from pydantic import BaseModel, Field + from .errors import NullOrUndefinedValueError T = TypeVar("T") + # An enum for specifying time intervals class TimeInterval(Enum): Hourly = 0 @@ -11,6 +19,7 @@ class TimeInterval(Enum): Weekly = 2 Monthly = 3 + # Configures how we should handle incoming data class SourceMode(Enum): # Add new time-partitioned data incrementally @@ -18,16 +27,19 @@ class SourceMode(Enum): # Overwrite the entire dataset on each import Overwrite = 1 + # Simple snake case to camel case def to_camel_case(snake_str): return "".join(x.capitalize() for x in snake_str.lower().split("_")) + def to_lower_camel_case(snake_str): # We capitalize the first letter of each component except the first one # with the 'capitalize' method and join them together. camel_string = to_camel_case(snake_str) return snake_str[0].lower() + camel_string[1:] + def safeCast[T](x: T) -> T: """ Explicitly mark that a cast is safe. @@ -36,6 +48,7 @@ def safeCast[T](x: T) -> T: y: T = x return y + def assertNever(_x: Never) -> Never: """ Asserts that a branch is never taken. @@ -43,6 +56,7 @@ def assertNever(_x: Never) -> Never: """ raise Exception("unexpected branch taken") + def ensure[T](x: Optional[T], msg: str) -> T: """ Asserts that a value is not null or undefined. @@ -65,4 +79,137 @@ def ensure[T](x: Optional[T], msg: str) -> T: ) else: y: T = x - return y \ No newline at end of file + return y + + +def generate_steps(total: int, step: int): + """ + Generates a sequence of numbers from 0 up to the specified total, incrementing by the specified step. + If the total is not divisible by the step, the last iteration will yield the remaining value. + + Args: + total (int): The desired total value. + step (int): The increment value for each iteration. + + Yields: + int: The next number in the sequence. + + Example: + >>> for num in generate_steps(10, 3): + ... print(num) + 0 + 3 + 6 + 9 + 10 + """ + + for i in range(0, total, step): + yield i + if total % step != 0: + yield total + + +class QueryArguments(BaseModel): + offset: int = Field( + ..., + ge=0, + description="The offset to start fetching nodes from.", + ) + + +class QueryConfig(BaseModel): + limit: int = Field( + ..., + gt=0, + description="Maximum number of nodes to fetch per query.", + ) + minimum_limit: int = Field( + ..., + gt=0, + description="Minimum limit for the query, after which the query will fail.", + ) + query_arguments: QueryArguments = Field( + ..., + description="Arguments for the query to be executed.", + ) + step_key: str = Field( + ..., + description="The key in the query arguments to update with the offset.", + ) + extract_fn: Callable[[Dict[str, Any]], Any] = Field( + ..., + description="Function to extract the data from the query response.", + ) + ratelimit_wait_seconds: int = Field( + ..., + gt=0, + description="Time to wait before retrying the query after being rate limited.", + ) + + +def query_with_retry( + client: Client, + context: AssetExecutionContext, + query_str: DocumentNode, + total_count: int, + config: QueryConfig, +) -> Generator[Dict[str, Any], None, None]: + """ + Queries the GraphQL API with retry logic. The query will be retried + with a lower limit if it fails. If the limit reaches the minimum limit, + the query will fail. It will also handle rate limiting by waiting for + the specified number of seconds before retrying. + + Args: + client (Client): The GraphQL client to execute the query. + context (AssetExecutionContext): The context for the asset. + query_str (DocumentNode): The GraphQL query to execute. + total_count (int): The total number of nodes to fetch. + config (QueryConfig): The configuration for the query. + + Yields: + Dict[str, Any]: A dictionary containing the transaction nodes. + """ + + max_retries = 3 + current_index = 0 + limit = config.limit + steps = list(generate_steps(total_count, limit)) + + while limit >= config.minimum_limit: + while current_index < len(steps): + setattr(config.query_arguments, config.step_key, steps[current_index]) + + if max_retries == 0: + raise ValueError( + "Query failed after reaching the maximum number of retries." + ) + + try: + query = client.execute( + query_str, variable_values=config.query_arguments.model_dump() + ) + context.log.info( + f"Fetching transaction {steps[current_index]}/{total_count}" + ) + current_index += 1 + yield config.extract_fn(query) + except Exception as exception: + context.log.error(f"Query failed with error: {exception}") + if "429" in str(exception): + context.log.info( + f"Got rate-limited, retrying in {config.ratelimit_wait_seconds} seconds." + ) + sleep(config.ratelimit_wait_seconds) + max_retries -= 1 + continue + limit //= 2 + steps = list(generate_steps(total_count, limit)) + current_index *= 2 + if limit < config.minimum_limit: + raise ValueError( + f"Query failed after reaching the minimum limit of {limit}." + ) from exception + context.log.info(f"Retrying with limit: {limit}") + break From eb782be7799eeed69dfa544cc697b95a91d16ff3 Mon Sep 17 00:00:00 2001 From: Javier R Date: Tue, 24 Sep 2024 00:02:44 +0200 Subject: [PATCH 5/5] fix: remove `ShallowAccount` in favor of `Account` --- .../oso_dagster/assets/open_collective.py | 142 ++++++++---------- 1 file changed, 64 insertions(+), 78 deletions(-) diff --git a/warehouse/oso_dagster/assets/open_collective.py b/warehouse/oso_dagster/assets/open_collective.py index b7649913..76b7dbdb 100644 --- a/warehouse/oso_dagster/assets/open_collective.py +++ b/warehouse/oso_dagster/assets/open_collective.py @@ -46,23 +46,6 @@ class Location(BaseModel): long: float -class ShallowAccount(BaseModel): - id: str - slug: str - type: str - name: Optional[str] = None - legalName: Optional[str] = None - - -class PaymentMethod(BaseModel): - id: str - type: str - name: Optional[str] = None - data: Optional[str] = None - balance: Optional[Amount] = None - account: Optional[ShallowAccount] = None - - class SocialLink(BaseModel): type: str url: str @@ -94,6 +77,15 @@ class Account(BaseModel): location: Optional[Location] = None +class PaymentMethod(BaseModel): + id: str + type: str + name: Optional[str] = None + data: Optional[str] = None + balance: Optional[Amount] = None + account: Optional[Account] = None + + class PayoutMethod(BaseModel): id: str type: str @@ -103,7 +95,7 @@ class PayoutMethod(BaseModel): class VirtualCard(BaseModel): id: str - account: Optional[ShallowAccount] = None + account: Optional[Account] = None name: Optional[str] = None last4: Optional[str] = None status: Optional[str] = None @@ -134,13 +126,13 @@ class Expense(BaseModel): currency: Optional[str] = None type: Optional[str] = None status: Optional[str] = None - approvedBy: Optional[ShallowAccount] = None - paidBy: Optional[ShallowAccount] = None + approvedBy: Optional[Account] = None + paidBy: Optional[Account] = None onHold: Optional[bool] = None - account: Optional[ShallowAccount] = None - payee: Optional[ShallowAccount] = None + account: Optional[Account] = None + payee: Optional[Account] = None payeeLocation: Optional[Location] = None - createdByAccount: Optional[ShallowAccount] = None + createdByAccount: Optional[Account] = None host: Optional[Host] = None payoutMethod: Optional[PayoutMethod] = None paymentMethod: Optional[PaymentMethod] = None @@ -148,7 +140,7 @@ class Expense(BaseModel): items: Optional[List[Item]] = None invoiceInfo: Optional[str] = None merchantId: Optional[str] = None - requestedByAccount: Optional[ShallowAccount] = None + requestedByAccount: Optional[Account] = None requiredLegalDocuments: Optional[List[str]] = None @@ -181,8 +173,8 @@ class Transaction(BaseModel): hostFee: Optional[Amount] = None paymentProcessorFee: Optional[Amount] = None account: Optional[Account] = None - fromAccount: Optional[ShallowAccount] = None - toAccount: Optional[ShallowAccount] = None + fromAccount: Optional[Account] = None + toAccount: Optional[Account] = None expense: Optional[Expense] = None order: Optional[Order] = None createdAt: Optional[datetime] = None @@ -284,19 +276,6 @@ def open_collective_graphql_location(key: str): """ -def open_collective_graphql_shallow_account(key: str): - """Returns a GraphQL query string for shallow account information.""" - return f""" - {key} {{ - id - slug - type - name - legalName - }} - """ - - def open_collective_graphql_host(key: str): """Returns a GraphQL query string for host information.""" return f""" @@ -319,7 +298,42 @@ def open_collective_graphql_payment_method(key: str): id type {open_collective_graphql_amount("balance")} - {open_collective_graphql_shallow_account("account")} + {open_collective_graphql_account("account")} + }} + """ + + +def open_collective_graphql_account(key: str): + """Returns a GraphQL query string for account information.""" + return f""" + {key} {{ + id + slug + type + name + legalName + description + longDescription + tags + socialLinks {{ + type + url + createdAt + updatedAt + }} + expensePolicy + isIncognito + imageUrl + backgroundImageUrl + createdAt + updatedAt + isArchived + isFrozen + isAdmin + isHost + isAdmin + emails + {open_collective_graphql_location("location")} }} """ @@ -405,37 +419,9 @@ def get_open_collective_data( {open_collective_graphql_amount("platformFee")} {open_collective_graphql_amount("hostFee")} {open_collective_graphql_amount("paymentProcessorFee")} - account {{ - id - slug - type - name - legalName - description - longDescription - tags - socialLinks {{ - type - url - createdAt - updatedAt - }} - expensePolicy - isIncognito - imageUrl - backgroundImageUrl - createdAt - updatedAt - isArchived - isFrozen - isAdmin - isHost - isAdmin - emails - {open_collective_graphql_location("location")} - }} - {open_collective_graphql_shallow_account("fromAccount")} - {open_collective_graphql_shallow_account("toAccount")} + {open_collective_graphql_account("account")} + {open_collective_graphql_account("fromAccount")} + {open_collective_graphql_account("toAccount")} expense {{ id legacyId @@ -447,13 +433,13 @@ def get_open_collective_data( currency type status - {open_collective_graphql_shallow_account("approvedBy")} - {open_collective_graphql_shallow_account("paidBy")} + {open_collective_graphql_account("approvedBy")} + {open_collective_graphql_account("paidBy")} onHold - {open_collective_graphql_shallow_account("account")} - {open_collective_graphql_shallow_account("payee")} + {open_collective_graphql_account("account")} + {open_collective_graphql_account("payee")} {open_collective_graphql_location("payeeLocation")} - {open_collective_graphql_shallow_account("createdByAccount")} + {open_collective_graphql_account("createdByAccount")} {open_collective_graphql_host("host")} payoutMethod {{ id @@ -465,7 +451,7 @@ def get_open_collective_data( {open_collective_graphql_payment_method("paymentMethod")} virtualCard {{ id - {open_collective_graphql_shallow_account("account")} + {open_collective_graphql_account("account")} name last4 status @@ -485,7 +471,7 @@ def get_open_collective_data( }} invoiceInfo merchantId - {open_collective_graphql_shallow_account("requestedByAccount")} + {open_collective_graphql_account("requestedByAccount")} requiredLegalDocuments }} order {{