From c68fc1a976974d92cee6f486b14e41d88841d10d Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Mon, 23 Sep 2024 14:31:31 -0700 Subject: [PATCH 1/6] Fix concurrent runs of `test-deploy` (#2190) --- .github/workflows/test-deploy-owners.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-deploy-owners.yml b/.github/workflows/test-deploy-owners.yml index 05cf7ef7..4820cdfb 100644 --- a/.github/workflows/test-deploy-owners.yml +++ b/.github/workflows/test-deploy-owners.yml @@ -21,7 +21,7 @@ on: # Cancel in progress jobs on new pushes. concurrency: - group: ${{ github.workflow }}-${{ github.ref }} + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} cancel-in-progress: true jobs: From 2d3523b11e66a976c356968387a45c10665371fc Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Mon, 23 Sep 2024 14:25:12 -0700 Subject: [PATCH 2/6] Metrics fixes for metrics_v0 (#2188) --- warehouse/metrics_mesh/config.py | 4 ++-- warehouse/metrics_tools/lib/factories/definition.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/warehouse/metrics_mesh/config.py b/warehouse/metrics_mesh/config.py index 1badb602..a227a5b5 100644 --- a/warehouse/metrics_mesh/config.py +++ b/warehouse/metrics_mesh/config.py @@ -45,10 +45,10 @@ def pool_manager_factory(config: ClickhouseConnectionConfig): os.environ.get("SQLMESH_CLICKHOUSE_CONCURRENT_TASKS", "16") ), send_receive_timeout=1800, - connection_settings={"allow_nondeterministic_mutations": 1}, + # connection_settings={"allow_nondeterministic_mutations": 1}, connection_pool_options={ "maxsize": 24, - "retries": 3, + "retries": 0, }, ), state_connection=GCPPostgresConnectionConfig( diff --git a/warehouse/metrics_tools/lib/factories/definition.py b/warehouse/metrics_tools/lib/factories/definition.py index 07f419e3..1380867e 100644 --- a/warehouse/metrics_tools/lib/factories/definition.py +++ b/warehouse/metrics_tools/lib/factories/definition.py @@ -729,7 +729,7 @@ class DailyTimeseriesRollingWindowOptions(t.TypedDict): def join_all_of_entity_type(evaluator: MacroEvaluator, *, db: str, tables: t.List[str], columns: t.List[str]): query = exp.select(*columns).from_(sqlglot.to_table(f"{db}.{tables[0]}")) for table in tables[1:]: - query.union(exp.select(*columns).from_(sqlglot.to_table(f"{db}.{table}")), distinct=False) + query = query.union(exp.select(*columns).from_(sqlglot.to_table(f"{db}.{table}")), distinct=False) return query From 43cf7a21ebbc6a3602e16e207845ee5cf5bb1331 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Mon, 23 Sep 2024 14:33:17 -0700 Subject: [PATCH 3/6] Adds arbitrum transactions (#2189) --- warehouse/oso_dagster/assets/arbitrum.py | 148 ++--------------------- 1 file changed, 8 insertions(+), 140 deletions(-) diff --git a/warehouse/oso_dagster/assets/arbitrum.py b/warehouse/oso_dagster/assets/arbitrum.py index 5c6cecfa..ac66d104 100644 --- a/warehouse/oso_dagster/assets/arbitrum.py +++ b/warehouse/oso_dagster/assets/arbitrum.py @@ -1,145 +1,13 @@ -from oso_dagster.utils import gcs_to_bucket_name, unpack_config +from ..factories.goldsky import goldsky_network_assets +from ..factories.goldsky.config import NetworkAssetSourceConfigDict -from ..factories.common import AssetFactoryResponse, early_resources_asset_factory -from ..factories.goldsky.additional import blocks_extensions, transactions_extensions -from ..factories.goldsky.assets import goldsky_asset -from ..factories.goldsky.config import GoldskyNetworkConfig, NetworkAssetSourceConfig - - -@unpack_config(GoldskyNetworkConfig) -def arbitrum_assets(config: GoldskyNetworkConfig): - """TODO THIS IS A COPY PASTE HACK TO HAVE EVERYTHING BUT THE TRANSACTIONS.""" - - @early_resources_asset_factory(caller_depth=2) - def _factory(project_id: str, staging_bucket: str): - staging_bucket_name = gcs_to_bucket_name(staging_bucket) - - blocks_asset_config = NetworkAssetSourceConfig.with_defaults( - NetworkAssetSourceConfig( - source_name=f"{config.network_name}-blocks", - partition_column_name="timestamp", - partition_column_transform=lambda t: f"TIMESTAMP_SECONDS(`{t}`)", - schema_overrides=[], - external_reference="", - ), - config.blocks_config, - ) - - blocks = AssetFactoryResponse([]) - - if config.blocks_enabled: - blocks = goldsky_asset( - key_prefix=config.network_name, - name="blocks", - source_name=blocks_asset_config.source_name, - project_id=project_id, - destination_table_name=f"{config.network_name}_blocks", - working_destination_dataset_name=config.working_destination_dataset_name, - destination_dataset_name=config.destination_dataset_name, - partition_column_name=blocks_asset_config.partition_column_name, - partition_column_transform=blocks_asset_config.partition_column_transform, - source_bucket_name=staging_bucket_name, - destination_bucket_name=staging_bucket_name, - # uncomment the following value to test - # max_objects_to_load=1, - additional_factories=[blocks_extensions()], - ) - blocks_table_fqn = f"{project_id}.{config.destination_dataset_name}.{config.network_name}_blocks" - else: - blocks_table_fqn = blocks_asset_config.external_reference - - transactions_asset_config = NetworkAssetSourceConfig.with_defaults( - NetworkAssetSourceConfig( - source_name=f"{config.network_name}-enriched_transactions", - partition_column_name="block_timestamp", - partition_column_transform=lambda t: f"TIMESTAMP_SECONDS(`{t}`)", - schema_overrides=[{"name": "value", "field_type": "BYTES"}], - external_reference="", - ), - config.transactions_config, - ) - - transactions = AssetFactoryResponse([]) - - if config.transactions_enabled: - assert ( - blocks_table_fqn != "" - ), "blocks table location cannot be derived. must not be empty" - - transactions = goldsky_asset( - key_prefix=config.network_name, - name="transactions", - source_name=transactions_asset_config.source_name, - project_id=project_id, - destination_table_name=f"{config.network_name}_transactions", - working_destination_dataset_name=config.working_destination_dataset_name, - destination_dataset_name=config.destination_dataset_name, - partition_column_name=transactions_asset_config.partition_column_name, - partition_column_transform=transactions_asset_config.partition_column_transform, - schema_overrides=transactions_asset_config.schema_overrides, - source_bucket_name=staging_bucket_name, - destination_bucket_name=staging_bucket_name, - # uncomment the following value to test - # max_objects_to_load=1, - deps=blocks.filter_assets_by_name("blocks"), - additional_factories=[ - # transactions_checks(blocks_table_fqn=blocks_table_fqn), - transactions_extensions( - blocks_table_fqn=blocks_table_fqn, - ), - ], - ) - - traces_asset_config = NetworkAssetSourceConfig.with_defaults( - NetworkAssetSourceConfig( - source_name=f"{config.network_name}-traces", - partition_column_name="block_timestamp", - partition_column_transform=lambda t: f"TIMESTAMP_SECONDS(`{t}`)", - schema_overrides=[ - {"name": "value", "field_type": "BYTES"}, - ], - external_reference="", - ), - config.traces_config, - ) - - traces = AssetFactoryResponse([]) - - if config.traces_enabled: - # assert ( - # transactions_table_fqn != "" - # ), "transactions table cannot be derived. must not be empty" - - traces = goldsky_asset( - key_prefix=config.network_name, - name="traces", - source_name=traces_asset_config.source_name, - project_id=project_id, - destination_table_name=f"{config.network_name}_traces", - working_destination_dataset_name=config.working_destination_dataset_name, - destination_dataset_name=config.destination_dataset_name, - partition_column_name=traces_asset_config.partition_column_name, - partition_column_transform=traces_asset_config.partition_column_transform, - source_bucket_name=staging_bucket_name, - destination_bucket_name=staging_bucket_name, - schema_overrides=traces_asset_config.schema_overrides, - # uncomment the following value to test - # max_objects_to_load=1, - deps=transactions.filter_assets_by_name("transactions"), - additional_factories=[ - # traces_checks(transactions_table_fqn=transactions_table_fqn), - # traces_extensions(transactions_table_fqn=transactions_table_fqn), - ], - ) - - return blocks + transactions + traces - - return _factory - - -arbitrum_network = arbitrum_assets( +arbitrum_network = goldsky_network_assets( network_name="arbitrum", destination_dataset_name="arbitrum_one", working_destination_dataset_name="oso_raw_sources", - transactions_enabled=False, + traces_config=NetworkAssetSourceConfigDict( + schema_overrides=[ + {"name": "value", "field_type": "BYTES"}, + ] + ), ) From 316394776c0139a4de2807aa31c6ea0853fdbbe8 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Mon, 23 Sep 2024 15:26:03 -0700 Subject: [PATCH 4/6] Fix arb txs (#2192) Fixes arbitrum transactions column types --- warehouse/oso_dagster/assets/arbitrum.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/warehouse/oso_dagster/assets/arbitrum.py b/warehouse/oso_dagster/assets/arbitrum.py index ac66d104..273b837b 100644 --- a/warehouse/oso_dagster/assets/arbitrum.py +++ b/warehouse/oso_dagster/assets/arbitrum.py @@ -10,4 +10,10 @@ {"name": "value", "field_type": "BYTES"}, ] ), + transactions_config=NetworkAssetSourceConfigDict( + schema_overrides=[ + {"name": "value", "field_type": "BYTES"}, + {"name": "gas_price", "field_type": "BYTES"}, + ] + ), ) From 4572f9305fe4338d85c118046dd6a00651b937e8 Mon Sep 17 00:00:00 2001 From: Carl Cervone <42869436+ccerv1@users.noreply.github.com> Date: Tue, 24 Sep 2024 08:32:43 -0400 Subject: [PATCH 5/6] docs: update farcaster sql (#2193) --- apps/docs/docs/integrate/overview/index.mdx | 35 +++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/apps/docs/docs/integrate/overview/index.mdx b/apps/docs/docs/integrate/overview/index.mdx index b91fc102..dad93809 100644 --- a/apps/docs/docs/integrate/overview/index.mdx +++ b/apps/docs/docs/integrate/overview/index.mdx @@ -332,6 +332,41 @@ GROUP BY fid, display_name ORDER BY reaction_count DESC ``` +Here's another use case, showing how to derive all the verified Ethereum addresses owned by a Farcaster user: + +```sql +WITH + profiles AS ( + SELECT + v.fid, + v.address, + p.custody_address, + JSON_VALUE(p.data, "$.username") AS username, + FROM `YOUR_PROJECT_NAME.farcaster.verifications` v + JOIN `YOUR_PROJECT_NAME.farcaster.profiles` p ON v.fid = p.fid + WHERE v.deleted_at IS NULL + ), + eth_addresses AS ( + SELECT + fid, + username, + address + FROM profiles + WHERE LENGTH(address) = 42 + UNION ALL + SELECT + fid, + username, + custody_address AS address + FROM profiles + ) +SELECT DISTINCT + fid, + username, + address +FROM eth_addresses +``` + **Remember to replace 'YOUR_PROJECT_NAME' with the name of your project in the query.** ### Lens Data From c328ee8647b199f46efec19a8092eed41043a83d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20R=C3=ADos?= Date: Tue, 24 Sep 2024 16:26:42 +0200 Subject: [PATCH 6/6] fix: use dagster `retry` mechanism (#2184) * fix: use dagster `retry` mechanism * feat: implement debounced `retry` mechanism * fix: use `retry` mechanism with `open_collective` and `eas` assets * feat: implement `generic` retry mechanism * fix: remove `ShallowAccount` in favor of `Account` --- warehouse/oso_dagster/assets/eas_optimism.py | 80 ++++-- .../oso_dagster/assets/open_collective.py | 236 ++++++++---------- warehouse/oso_dagster/utils/common.py | 151 ++++++++++- 3 files changed, 308 insertions(+), 159 deletions(-) diff --git a/warehouse/oso_dagster/assets/eas_optimism.py b/warehouse/oso_dagster/assets/eas_optimism.py index 576d624f..72f81095 100644 --- a/warehouse/oso_dagster/assets/eas_optimism.py +++ b/warehouse/oso_dagster/assets/eas_optimism.py @@ -1,13 +1,14 @@ from datetime import datetime, timedelta +from typing import Any, Dict import dlt from dagster import AssetExecutionContext, WeeklyPartitionsDefinition from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns -from pydantic import BaseModel +from pydantic import BaseModel, Field -from .open_collective import generate_steps +from ..utils.common import QueryArguments, QueryConfig, query_with_retry class Attestation(BaseModel): @@ -29,12 +30,35 @@ class Attestation(BaseModel): isOffchain: bool +class EASOptimismParameters(QueryArguments): + take: int = Field( + ..., + gt=0, + description="The number of nodes to fetch per query.", + ) + skip: int = Field( + ..., + ge=0, + description="The number of nodes to skip.", + ) + where: Dict[str, Any] = Field( + ..., + description="The where clause for the query.", + ) + + # The first attestation on EAS Optimism was created on the 07/28/2023 9:22:35 am EAS_OPTIMISM_FIRST_ATTESTATION = datetime.fromtimestamp(1690557755) # A sensible limit for the number of nodes to fetch per page EAS_OPTIMISM_STEP_NODES_PER_PAGE = 10_000 +# Minimum limit for the query, after which the query will fail +EAS_OPTIMISM_MINIMUM_LIMIT = 100 + +# The rate limit wait time in seconds +EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS = 65 + # Kubernetes configuration for the asset materialization K8S_CONFIG = { "merge_behavior": "SHALLOW", @@ -132,31 +156,32 @@ def get_optimism_eas_data( """ ) - for step in generate_steps(total_count, EAS_OPTIMISM_STEP_NODES_PER_PAGE): - try: - query = client.execute( - attestations_query, - variable_values={ - "take": EAS_OPTIMISM_STEP_NODES_PER_PAGE, - "skip": step, - "where": { - "time": { - "gte": date_from, - "lte": date_to, - }, - }, - }, - ) - context.log.info( - f"Fetching attestation {step}/{total_count}", - ) - yield query["attestations"] - except Exception as exception: - context.log.warning( - f"An error occurred while fetching EAS data: '{exception}'. " - "We will stop this materialization instead of retrying for now." - ) - return [] + config = QueryConfig( + limit=EAS_OPTIMISM_STEP_NODES_PER_PAGE, + minimum_limit=EAS_OPTIMISM_MINIMUM_LIMIT, + query_arguments=EASOptimismParameters( + take=EAS_OPTIMISM_STEP_NODES_PER_PAGE, + offset=0, + skip=0, + where={ + "time": { + "gte": date_from, + "lte": date_to, + } + }, + ), + step_key="skip", + extract_fn=lambda data: data["attestations"], + ratelimit_wait_seconds=EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS, + ) + + yield from query_with_retry( + client, + context, + attestations_query, + total_count, + config, + ) def get_optimism_eas(context: AssetExecutionContext, client: Client): @@ -208,4 +233,5 @@ def attestations(context: AssetExecutionContext): name="attestations", columns=pydantic_to_dlt_nullable_columns(Attestation), primary_key="id", + write_disposition="merge", ) diff --git a/warehouse/oso_dagster/assets/open_collective.py b/warehouse/oso_dagster/assets/open_collective.py index e4a3fbf7..76b7dbdb 100644 --- a/warehouse/oso_dagster/assets/open_collective.py +++ b/warehouse/oso_dagster/assets/open_collective.py @@ -9,7 +9,9 @@ from oso_dagster import constants from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns from oso_dagster.utils.secrets import secret_ref_arg -from pydantic import UUID4, BaseModel +from pydantic import UUID4, BaseModel, Field + +from ..utils.common import QueryArguments, QueryConfig, query_with_retry class Host(BaseModel): @@ -44,23 +46,6 @@ class Location(BaseModel): long: float -class ShallowAccount(BaseModel): - id: str - slug: str - type: str - name: Optional[str] = None - legalName: Optional[str] = None - - -class PaymentMethod(BaseModel): - id: str - type: str - name: Optional[str] = None - data: Optional[str] = None - balance: Optional[Amount] = None - account: Optional[ShallowAccount] = None - - class SocialLink(BaseModel): type: str url: str @@ -92,6 +77,15 @@ class Account(BaseModel): location: Optional[Location] = None +class PaymentMethod(BaseModel): + id: str + type: str + name: Optional[str] = None + data: Optional[str] = None + balance: Optional[Amount] = None + account: Optional[Account] = None + + class PayoutMethod(BaseModel): id: str type: str @@ -101,7 +95,7 @@ class PayoutMethod(BaseModel): class VirtualCard(BaseModel): id: str - account: Optional[ShallowAccount] = None + account: Optional[Account] = None name: Optional[str] = None last4: Optional[str] = None status: Optional[str] = None @@ -132,13 +126,13 @@ class Expense(BaseModel): currency: Optional[str] = None type: Optional[str] = None status: Optional[str] = None - approvedBy: Optional[ShallowAccount] = None - paidBy: Optional[ShallowAccount] = None + approvedBy: Optional[Account] = None + paidBy: Optional[Account] = None onHold: Optional[bool] = None - account: Optional[ShallowAccount] = None - payee: Optional[ShallowAccount] = None + account: Optional[Account] = None + payee: Optional[Account] = None payeeLocation: Optional[Location] = None - createdByAccount: Optional[ShallowAccount] = None + createdByAccount: Optional[Account] = None host: Optional[Host] = None payoutMethod: Optional[PayoutMethod] = None paymentMethod: Optional[PaymentMethod] = None @@ -146,7 +140,7 @@ class Expense(BaseModel): items: Optional[List[Item]] = None invoiceInfo: Optional[str] = None merchantId: Optional[str] = None - requestedByAccount: Optional[ShallowAccount] = None + requestedByAccount: Optional[Account] = None requiredLegalDocuments: Optional[List[str]] = None @@ -179,8 +173,8 @@ class Transaction(BaseModel): hostFee: Optional[Amount] = None paymentProcessorFee: Optional[Amount] = None account: Optional[Account] = None - fromAccount: Optional[ShallowAccount] = None - toAccount: Optional[ShallowAccount] = None + fromAccount: Optional[Account] = None + toAccount: Optional[Account] = None expense: Optional[Expense] = None order: Optional[Order] = None createdAt: Optional[datetime] = None @@ -200,9 +194,15 @@ class Transaction(BaseModel): # The first transaction on Open Collective was on January 23, 2015 OPEN_COLLECTIVE_TX_EPOCH = "2015-01-23T05:00:00.000Z" -# The maximum number of nodes that can be retrieved per page +# The maximum is 1000 nodes per page, we will retry until a minimum of 100 nodes per page is reached OPEN_COLLECTIVE_MAX_NODES_PER_PAGE = 1000 +# The minimum number of nodes per page, if this threshold is reached, the query will fail +OPEN_COLLECTIVE_MIN_NODES_PER_PAGE = 100 + +# The rate limit wait time in seconds +OPEN_COLLECTIVE_RATELIMIT_WAIT_SECONDS = 65 + # Kubernetes configuration for the asset materialization K8S_CONFIG = { "merge_behavior": "SHALLOW", @@ -228,32 +228,15 @@ class Transaction(BaseModel): } -def generate_steps(total: int, step: int): - """ - Generates a sequence of numbers from 0 up to the specified total, incrementing by the specified step. - If the total is not divisible by the step, the last iteration will yield the remaining value. - - Args: - total (int): The desired total value. - step (int): The increment value for each iteration. - - Yields: - int: The next number in the sequence. - - Example: - >>> for num in generate_steps(10, 3): - ... print(num) - 0 - 3 - 6 - 9 - 10 - """ - - for i in range(0, total, step): - yield i - if total % step != 0: - yield total +class OpenCollectiveParameters(QueryArguments): + limit: int = Field( + ..., + gt=0, + description="Number of nodes to fetch per query.", + ) + type: str + dateFrom: str + dateTo: str def open_collective_graphql_amount(key: str): @@ -293,44 +276,64 @@ def open_collective_graphql_location(key: str): """ -def open_collective_graphql_shallow_account(key: str): - """Returns a GraphQL query string for shallow account information.""" +def open_collective_graphql_host(key: str): + """Returns a GraphQL query string for host information.""" return f""" {key} {{ id - slug type + slug name legalName + description + currency }} """ -def open_collective_graphql_host(key: str): - """Returns a GraphQL query string for host information.""" +def open_collective_graphql_payment_method(key: str): + """Returns a GraphQL query string for payment method information.""" return f""" {key} {{ id type - slug - name - legalName - description - currency + {open_collective_graphql_amount("balance")} + {open_collective_graphql_account("account")} }} """ -def open_collective_graphql_payment_method(key: str): - """Returns a GraphQL query string for payment method information.""" +def open_collective_graphql_account(key: str): + """Returns a GraphQL query string for account information.""" return f""" {key} {{ id + slug type name - data - {open_collective_graphql_amount("balance")} - {open_collective_graphql_shallow_account("account")} + legalName + description + longDescription + tags + socialLinks {{ + type + url + createdAt + updatedAt + }} + expensePolicy + isIncognito + imageUrl + backgroundImageUrl + createdAt + updatedAt + isArchived + isFrozen + isAdmin + isHost + isAdmin + emails + {open_collective_graphql_location("location")} }} """ @@ -416,37 +419,9 @@ def get_open_collective_data( {open_collective_graphql_amount("platformFee")} {open_collective_graphql_amount("hostFee")} {open_collective_graphql_amount("paymentProcessorFee")} - account {{ - id - slug - type - name - legalName - description - longDescription - tags - socialLinks {{ - type - url - createdAt - updatedAt - }} - expensePolicy - isIncognito - imageUrl - backgroundImageUrl - createdAt - updatedAt - isArchived - isFrozen - isAdmin - isHost - isAdmin - emails - {open_collective_graphql_location("location")} - }} - {open_collective_graphql_shallow_account("fromAccount")} - {open_collective_graphql_shallow_account("toAccount")} + {open_collective_graphql_account("account")} + {open_collective_graphql_account("fromAccount")} + {open_collective_graphql_account("toAccount")} expense {{ id legacyId @@ -458,13 +433,13 @@ def get_open_collective_data( currency type status - {open_collective_graphql_shallow_account("approvedBy")} - {open_collective_graphql_shallow_account("paidBy")} + {open_collective_graphql_account("approvedBy")} + {open_collective_graphql_account("paidBy")} onHold - {open_collective_graphql_shallow_account("account")} - {open_collective_graphql_shallow_account("payee")} + {open_collective_graphql_account("account")} + {open_collective_graphql_account("payee")} {open_collective_graphql_location("payeeLocation")} - {open_collective_graphql_shallow_account("createdByAccount")} + {open_collective_graphql_account("createdByAccount")} {open_collective_graphql_host("host")} payoutMethod {{ id @@ -476,7 +451,7 @@ def get_open_collective_data( {open_collective_graphql_payment_method("paymentMethod")} virtualCard {{ id - {open_collective_graphql_shallow_account("account")} + {open_collective_graphql_account("account")} name last4 status @@ -496,7 +471,7 @@ def get_open_collective_data( }} invoiceInfo merchantId - {open_collective_graphql_shallow_account("requestedByAccount")} + {open_collective_graphql_account("requestedByAccount")} requiredLegalDocuments }} order {{ @@ -536,29 +511,28 @@ def get_open_collective_data( context.log.info(f"Total count of transactions: {total_count}") - for step in generate_steps(total_count, OPEN_COLLECTIVE_MAX_NODES_PER_PAGE): - try: - query = client.execute( - expense_query, - variable_values={ - "limit": OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, - "offset": step, - "type": kind, - "dateFrom": date_from, - "dateTo": date_to, - }, - ) - context.log.info( - f"Fetching transaction {step}/{total_count} for type '{kind}'" - ) - yield query["transactions"]["nodes"] - except Exception as exception: - # TODO(jabolo): Implement a retry mechanism - context.log.warning( - f"An error occurred while fetching Open Collective data: '{exception}'. " - "We will stop this materialization instead of retrying for now." - ) - return [] + config = QueryConfig( + limit=OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, + minimum_limit=OPEN_COLLECTIVE_MIN_NODES_PER_PAGE, + query_arguments=OpenCollectiveParameters( + limit=OPEN_COLLECTIVE_MAX_NODES_PER_PAGE, + offset=0, + type=kind, + dateFrom=date_from, + dateTo=date_to, + ), + step_key="offset", + extract_fn=lambda query: query["transactions"]["nodes"], + ratelimit_wait_seconds=OPEN_COLLECTIVE_RATELIMIT_WAIT_SECONDS, + ) + + yield from query_with_retry( + client, + context, + expense_query, + total_count, + config, + ) def get_open_collective_expenses( @@ -647,6 +621,7 @@ def expenses( name="expenses", columns=pydantic_to_dlt_nullable_columns(Transaction), primary_key="id", + write_disposition="merge", ) if constants.enable_bigquery: @@ -691,6 +666,7 @@ def deposits( name="deposits", columns=pydantic_to_dlt_nullable_columns(Transaction), primary_key="id", + write_disposition="merge", ) if constants.enable_bigquery: diff --git a/warehouse/oso_dagster/utils/common.py b/warehouse/oso_dagster/utils/common.py index 6f356ec0..bd74356d 100644 --- a/warehouse/oso_dagster/utils/common.py +++ b/warehouse/oso_dagster/utils/common.py @@ -1,9 +1,17 @@ from enum import Enum -from typing import TypeVar, Never, Optional +from time import sleep +from typing import Any, Callable, Dict, Generator, Never, Optional, TypeVar + +from dagster import AssetExecutionContext +from gql import Client +from graphql import DocumentNode +from pydantic import BaseModel, Field + from .errors import NullOrUndefinedValueError T = TypeVar("T") + # An enum for specifying time intervals class TimeInterval(Enum): Hourly = 0 @@ -11,6 +19,7 @@ class TimeInterval(Enum): Weekly = 2 Monthly = 3 + # Configures how we should handle incoming data class SourceMode(Enum): # Add new time-partitioned data incrementally @@ -18,16 +27,19 @@ class SourceMode(Enum): # Overwrite the entire dataset on each import Overwrite = 1 + # Simple snake case to camel case def to_camel_case(snake_str): return "".join(x.capitalize() for x in snake_str.lower().split("_")) + def to_lower_camel_case(snake_str): # We capitalize the first letter of each component except the first one # with the 'capitalize' method and join them together. camel_string = to_camel_case(snake_str) return snake_str[0].lower() + camel_string[1:] + def safeCast[T](x: T) -> T: """ Explicitly mark that a cast is safe. @@ -36,6 +48,7 @@ def safeCast[T](x: T) -> T: y: T = x return y + def assertNever(_x: Never) -> Never: """ Asserts that a branch is never taken. @@ -43,6 +56,7 @@ def assertNever(_x: Never) -> Never: """ raise Exception("unexpected branch taken") + def ensure[T](x: Optional[T], msg: str) -> T: """ Asserts that a value is not null or undefined. @@ -65,4 +79,137 @@ def ensure[T](x: Optional[T], msg: str) -> T: ) else: y: T = x - return y \ No newline at end of file + return y + + +def generate_steps(total: int, step: int): + """ + Generates a sequence of numbers from 0 up to the specified total, incrementing by the specified step. + If the total is not divisible by the step, the last iteration will yield the remaining value. + + Args: + total (int): The desired total value. + step (int): The increment value for each iteration. + + Yields: + int: The next number in the sequence. + + Example: + >>> for num in generate_steps(10, 3): + ... print(num) + 0 + 3 + 6 + 9 + 10 + """ + + for i in range(0, total, step): + yield i + if total % step != 0: + yield total + + +class QueryArguments(BaseModel): + offset: int = Field( + ..., + ge=0, + description="The offset to start fetching nodes from.", + ) + + +class QueryConfig(BaseModel): + limit: int = Field( + ..., + gt=0, + description="Maximum number of nodes to fetch per query.", + ) + minimum_limit: int = Field( + ..., + gt=0, + description="Minimum limit for the query, after which the query will fail.", + ) + query_arguments: QueryArguments = Field( + ..., + description="Arguments for the query to be executed.", + ) + step_key: str = Field( + ..., + description="The key in the query arguments to update with the offset.", + ) + extract_fn: Callable[[Dict[str, Any]], Any] = Field( + ..., + description="Function to extract the data from the query response.", + ) + ratelimit_wait_seconds: int = Field( + ..., + gt=0, + description="Time to wait before retrying the query after being rate limited.", + ) + + +def query_with_retry( + client: Client, + context: AssetExecutionContext, + query_str: DocumentNode, + total_count: int, + config: QueryConfig, +) -> Generator[Dict[str, Any], None, None]: + """ + Queries the GraphQL API with retry logic. The query will be retried + with a lower limit if it fails. If the limit reaches the minimum limit, + the query will fail. It will also handle rate limiting by waiting for + the specified number of seconds before retrying. + + Args: + client (Client): The GraphQL client to execute the query. + context (AssetExecutionContext): The context for the asset. + query_str (DocumentNode): The GraphQL query to execute. + total_count (int): The total number of nodes to fetch. + config (QueryConfig): The configuration for the query. + + Yields: + Dict[str, Any]: A dictionary containing the transaction nodes. + """ + + max_retries = 3 + current_index = 0 + limit = config.limit + steps = list(generate_steps(total_count, limit)) + + while limit >= config.minimum_limit: + while current_index < len(steps): + setattr(config.query_arguments, config.step_key, steps[current_index]) + + if max_retries == 0: + raise ValueError( + "Query failed after reaching the maximum number of retries." + ) + + try: + query = client.execute( + query_str, variable_values=config.query_arguments.model_dump() + ) + context.log.info( + f"Fetching transaction {steps[current_index]}/{total_count}" + ) + current_index += 1 + yield config.extract_fn(query) + except Exception as exception: + context.log.error(f"Query failed with error: {exception}") + if "429" in str(exception): + context.log.info( + f"Got rate-limited, retrying in {config.ratelimit_wait_seconds} seconds." + ) + sleep(config.ratelimit_wait_seconds) + max_retries -= 1 + continue + limit //= 2 + steps = list(generate_steps(total_count, limit)) + current_index *= 2 + if limit < config.minimum_limit: + raise ValueError( + f"Query failed after reaching the minimum limit of {limit}." + ) from exception + context.log.info(f"Retrying with limit: {limit}") + break