Skip to content

Commit

Permalink
Adds arbitrum transactions (#2189)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 committed Sep 23, 2024
1 parent 2d3523b commit 43cf7a2
Showing 1 changed file with 8 additions and 140 deletions.
148 changes: 8 additions & 140 deletions warehouse/oso_dagster/assets/arbitrum.py
Original file line number Diff line number Diff line change
@@ -1,145 +1,13 @@
from oso_dagster.utils import gcs_to_bucket_name, unpack_config
from ..factories.goldsky import goldsky_network_assets
from ..factories.goldsky.config import NetworkAssetSourceConfigDict

from ..factories.common import AssetFactoryResponse, early_resources_asset_factory
from ..factories.goldsky.additional import blocks_extensions, transactions_extensions
from ..factories.goldsky.assets import goldsky_asset
from ..factories.goldsky.config import GoldskyNetworkConfig, NetworkAssetSourceConfig


@unpack_config(GoldskyNetworkConfig)
def arbitrum_assets(config: GoldskyNetworkConfig):
"""TODO THIS IS A COPY PASTE HACK TO HAVE EVERYTHING BUT THE TRANSACTIONS."""

@early_resources_asset_factory(caller_depth=2)
def _factory(project_id: str, staging_bucket: str):
staging_bucket_name = gcs_to_bucket_name(staging_bucket)

blocks_asset_config = NetworkAssetSourceConfig.with_defaults(
NetworkAssetSourceConfig(
source_name=f"{config.network_name}-blocks",
partition_column_name="timestamp",
partition_column_transform=lambda t: f"TIMESTAMP_SECONDS(`{t}`)",
schema_overrides=[],
external_reference="",
),
config.blocks_config,
)

blocks = AssetFactoryResponse([])

if config.blocks_enabled:
blocks = goldsky_asset(
key_prefix=config.network_name,
name="blocks",
source_name=blocks_asset_config.source_name,
project_id=project_id,
destination_table_name=f"{config.network_name}_blocks",
working_destination_dataset_name=config.working_destination_dataset_name,
destination_dataset_name=config.destination_dataset_name,
partition_column_name=blocks_asset_config.partition_column_name,
partition_column_transform=blocks_asset_config.partition_column_transform,
source_bucket_name=staging_bucket_name,
destination_bucket_name=staging_bucket_name,
# uncomment the following value to test
# max_objects_to_load=1,
additional_factories=[blocks_extensions()],
)
blocks_table_fqn = f"{project_id}.{config.destination_dataset_name}.{config.network_name}_blocks"
else:
blocks_table_fqn = blocks_asset_config.external_reference

transactions_asset_config = NetworkAssetSourceConfig.with_defaults(
NetworkAssetSourceConfig(
source_name=f"{config.network_name}-enriched_transactions",
partition_column_name="block_timestamp",
partition_column_transform=lambda t: f"TIMESTAMP_SECONDS(`{t}`)",
schema_overrides=[{"name": "value", "field_type": "BYTES"}],
external_reference="",
),
config.transactions_config,
)

transactions = AssetFactoryResponse([])

if config.transactions_enabled:
assert (
blocks_table_fqn != ""
), "blocks table location cannot be derived. must not be empty"

transactions = goldsky_asset(
key_prefix=config.network_name,
name="transactions",
source_name=transactions_asset_config.source_name,
project_id=project_id,
destination_table_name=f"{config.network_name}_transactions",
working_destination_dataset_name=config.working_destination_dataset_name,
destination_dataset_name=config.destination_dataset_name,
partition_column_name=transactions_asset_config.partition_column_name,
partition_column_transform=transactions_asset_config.partition_column_transform,
schema_overrides=transactions_asset_config.schema_overrides,
source_bucket_name=staging_bucket_name,
destination_bucket_name=staging_bucket_name,
# uncomment the following value to test
# max_objects_to_load=1,
deps=blocks.filter_assets_by_name("blocks"),
additional_factories=[
# transactions_checks(blocks_table_fqn=blocks_table_fqn),
transactions_extensions(
blocks_table_fqn=blocks_table_fqn,
),
],
)

traces_asset_config = NetworkAssetSourceConfig.with_defaults(
NetworkAssetSourceConfig(
source_name=f"{config.network_name}-traces",
partition_column_name="block_timestamp",
partition_column_transform=lambda t: f"TIMESTAMP_SECONDS(`{t}`)",
schema_overrides=[
{"name": "value", "field_type": "BYTES"},
],
external_reference="",
),
config.traces_config,
)

traces = AssetFactoryResponse([])

if config.traces_enabled:
# assert (
# transactions_table_fqn != ""
# ), "transactions table cannot be derived. must not be empty"

traces = goldsky_asset(
key_prefix=config.network_name,
name="traces",
source_name=traces_asset_config.source_name,
project_id=project_id,
destination_table_name=f"{config.network_name}_traces",
working_destination_dataset_name=config.working_destination_dataset_name,
destination_dataset_name=config.destination_dataset_name,
partition_column_name=traces_asset_config.partition_column_name,
partition_column_transform=traces_asset_config.partition_column_transform,
source_bucket_name=staging_bucket_name,
destination_bucket_name=staging_bucket_name,
schema_overrides=traces_asset_config.schema_overrides,
# uncomment the following value to test
# max_objects_to_load=1,
deps=transactions.filter_assets_by_name("transactions"),
additional_factories=[
# traces_checks(transactions_table_fqn=transactions_table_fqn),
# traces_extensions(transactions_table_fqn=transactions_table_fqn),
],
)

return blocks + transactions + traces

return _factory


arbitrum_network = arbitrum_assets(
arbitrum_network = goldsky_network_assets(
network_name="arbitrum",
destination_dataset_name="arbitrum_one",
working_destination_dataset_name="oso_raw_sources",
transactions_enabled=False,
traces_config=NetworkAssetSourceConfigDict(
schema_overrides=[
{"name": "value", "field_type": "BYTES"},
]
),
)

0 comments on commit 43cf7a2

Please sign in to comment.