Skip to content

Commit

Permalink
Merge branch 'main' into jabolol/open-collective-dbt-models
Browse files Browse the repository at this point in the history
  • Loading branch information
Jabolol committed Sep 24, 2024
2 parents de3834c + c328ee8 commit a226883
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 303 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-deploy-owners.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ on:

# Cancel in progress jobs on new pushes.
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
Expand Down
35 changes: 35 additions & 0 deletions apps/docs/docs/integrate/overview/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,41 @@ GROUP BY fid, display_name
ORDER BY reaction_count DESC
```

Here's another use case, showing how to derive all the verified Ethereum addresses owned by a Farcaster user:

```sql
WITH
profiles AS (
SELECT
v.fid,
v.address,
p.custody_address,
JSON_VALUE(p.data, "$.username") AS username,
FROM `YOUR_PROJECT_NAME.farcaster.verifications` v
JOIN `YOUR_PROJECT_NAME.farcaster.profiles` p ON v.fid = p.fid
WHERE v.deleted_at IS NULL
),
eth_addresses AS (
SELECT
fid,
username,
address
FROM profiles
WHERE LENGTH(address) = 42
UNION ALL
SELECT
fid,
username,
custody_address AS address
FROM profiles
)
SELECT DISTINCT
fid,
username,
address
FROM eth_addresses
```

**Remember to replace 'YOUR_PROJECT_NAME' with the name of your project in the query.**

### Lens Data
Expand Down
4 changes: 2 additions & 2 deletions warehouse/metrics_mesh/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ def pool_manager_factory(config: ClickhouseConnectionConfig):
os.environ.get("SQLMESH_CLICKHOUSE_CONCURRENT_TASKS", "16")
),
send_receive_timeout=1800,
connection_settings={"allow_nondeterministic_mutations": 1},
# connection_settings={"allow_nondeterministic_mutations": 1},
connection_pool_options={
"maxsize": 24,
"retries": 3,
"retries": 0,
},
),
state_connection=GCPPostgresConnectionConfig(
Expand Down
2 changes: 1 addition & 1 deletion warehouse/metrics_tools/lib/factories/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ class DailyTimeseriesRollingWindowOptions(t.TypedDict):
def join_all_of_entity_type(evaluator: MacroEvaluator, *, db: str, tables: t.List[str], columns: t.List[str]):
query = exp.select(*columns).from_(sqlglot.to_table(f"{db}.{tables[0]}"))
for table in tables[1:]:
query.union(exp.select(*columns).from_(sqlglot.to_table(f"{db}.{table}")), distinct=False)
query = query.union(exp.select(*columns).from_(sqlglot.to_table(f"{db}.{table}")), distinct=False)
return query


Expand Down
154 changes: 14 additions & 140 deletions warehouse/oso_dagster/assets/arbitrum.py
Original file line number Diff line number Diff line change
@@ -1,145 +1,19 @@
from oso_dagster.utils import gcs_to_bucket_name, unpack_config
from ..factories.goldsky import goldsky_network_assets
from ..factories.goldsky.config import NetworkAssetSourceConfigDict

from ..factories.common import AssetFactoryResponse, early_resources_asset_factory
from ..factories.goldsky.additional import blocks_extensions, transactions_extensions
from ..factories.goldsky.assets import goldsky_asset
from ..factories.goldsky.config import GoldskyNetworkConfig, NetworkAssetSourceConfig


@unpack_config(GoldskyNetworkConfig)
def arbitrum_assets(config: GoldskyNetworkConfig):
"""TODO THIS IS A COPY PASTE HACK TO HAVE EVERYTHING BUT THE TRANSACTIONS."""

@early_resources_asset_factory(caller_depth=2)
def _factory(project_id: str, staging_bucket: str):
staging_bucket_name = gcs_to_bucket_name(staging_bucket)

blocks_asset_config = NetworkAssetSourceConfig.with_defaults(
NetworkAssetSourceConfig(
source_name=f"{config.network_name}-blocks",
partition_column_name="timestamp",
partition_column_transform=lambda t: f"TIMESTAMP_SECONDS(`{t}`)",
schema_overrides=[],
external_reference="",
),
config.blocks_config,
)

blocks = AssetFactoryResponse([])

if config.blocks_enabled:
blocks = goldsky_asset(
key_prefix=config.network_name,
name="blocks",
source_name=blocks_asset_config.source_name,
project_id=project_id,
destination_table_name=f"{config.network_name}_blocks",
working_destination_dataset_name=config.working_destination_dataset_name,
destination_dataset_name=config.destination_dataset_name,
partition_column_name=blocks_asset_config.partition_column_name,
partition_column_transform=blocks_asset_config.partition_column_transform,
source_bucket_name=staging_bucket_name,
destination_bucket_name=staging_bucket_name,
# uncomment the following value to test
# max_objects_to_load=1,
additional_factories=[blocks_extensions()],
)
blocks_table_fqn = f"{project_id}.{config.destination_dataset_name}.{config.network_name}_blocks"
else:
blocks_table_fqn = blocks_asset_config.external_reference

transactions_asset_config = NetworkAssetSourceConfig.with_defaults(
NetworkAssetSourceConfig(
source_name=f"{config.network_name}-enriched_transactions",
partition_column_name="block_timestamp",
partition_column_transform=lambda t: f"TIMESTAMP_SECONDS(`{t}`)",
schema_overrides=[{"name": "value", "field_type": "BYTES"}],
external_reference="",
),
config.transactions_config,
)

transactions = AssetFactoryResponse([])

if config.transactions_enabled:
assert (
blocks_table_fqn != ""
), "blocks table location cannot be derived. must not be empty"

transactions = goldsky_asset(
key_prefix=config.network_name,
name="transactions",
source_name=transactions_asset_config.source_name,
project_id=project_id,
destination_table_name=f"{config.network_name}_transactions",
working_destination_dataset_name=config.working_destination_dataset_name,
destination_dataset_name=config.destination_dataset_name,
partition_column_name=transactions_asset_config.partition_column_name,
partition_column_transform=transactions_asset_config.partition_column_transform,
schema_overrides=transactions_asset_config.schema_overrides,
source_bucket_name=staging_bucket_name,
destination_bucket_name=staging_bucket_name,
# uncomment the following value to test
# max_objects_to_load=1,
deps=blocks.filter_assets_by_name("blocks"),
additional_factories=[
# transactions_checks(blocks_table_fqn=blocks_table_fqn),
transactions_extensions(
blocks_table_fqn=blocks_table_fqn,
),
],
)

traces_asset_config = NetworkAssetSourceConfig.with_defaults(
NetworkAssetSourceConfig(
source_name=f"{config.network_name}-traces",
partition_column_name="block_timestamp",
partition_column_transform=lambda t: f"TIMESTAMP_SECONDS(`{t}`)",
schema_overrides=[
{"name": "value", "field_type": "BYTES"},
],
external_reference="",
),
config.traces_config,
)

traces = AssetFactoryResponse([])

if config.traces_enabled:
# assert (
# transactions_table_fqn != ""
# ), "transactions table cannot be derived. must not be empty"

traces = goldsky_asset(
key_prefix=config.network_name,
name="traces",
source_name=traces_asset_config.source_name,
project_id=project_id,
destination_table_name=f"{config.network_name}_traces",
working_destination_dataset_name=config.working_destination_dataset_name,
destination_dataset_name=config.destination_dataset_name,
partition_column_name=traces_asset_config.partition_column_name,
partition_column_transform=traces_asset_config.partition_column_transform,
source_bucket_name=staging_bucket_name,
destination_bucket_name=staging_bucket_name,
schema_overrides=traces_asset_config.schema_overrides,
# uncomment the following value to test
# max_objects_to_load=1,
deps=transactions.filter_assets_by_name("transactions"),
additional_factories=[
# traces_checks(transactions_table_fqn=transactions_table_fqn),
# traces_extensions(transactions_table_fqn=transactions_table_fqn),
],
)

return blocks + transactions + traces

return _factory


arbitrum_network = arbitrum_assets(
arbitrum_network = goldsky_network_assets(
network_name="arbitrum",
destination_dataset_name="arbitrum_one",
working_destination_dataset_name="oso_raw_sources",
transactions_enabled=False,
traces_config=NetworkAssetSourceConfigDict(
schema_overrides=[
{"name": "value", "field_type": "BYTES"},
]
),
transactions_config=NetworkAssetSourceConfigDict(
schema_overrides=[
{"name": "value", "field_type": "BYTES"},
{"name": "gas_price", "field_type": "BYTES"},
]
),
)
80 changes: 53 additions & 27 deletions warehouse/oso_dagster/assets/eas_optimism.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from datetime import datetime, timedelta
from typing import Any, Dict

import dlt
from dagster import AssetExecutionContext, WeeklyPartitionsDefinition
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from oso_dagster.factories import dlt_factory, pydantic_to_dlt_nullable_columns
from pydantic import BaseModel
from pydantic import BaseModel, Field

from .open_collective import generate_steps
from ..utils.common import QueryArguments, QueryConfig, query_with_retry


class Attestation(BaseModel):
Expand All @@ -29,12 +30,35 @@ class Attestation(BaseModel):
isOffchain: bool


class EASOptimismParameters(QueryArguments):
take: int = Field(
...,
gt=0,
description="The number of nodes to fetch per query.",
)
skip: int = Field(
...,
ge=0,
description="The number of nodes to skip.",
)
where: Dict[str, Any] = Field(
...,
description="The where clause for the query.",
)


# The first attestation on EAS Optimism was created on the 07/28/2023 9:22:35 am
EAS_OPTIMISM_FIRST_ATTESTATION = datetime.fromtimestamp(1690557755)

# A sensible limit for the number of nodes to fetch per page
EAS_OPTIMISM_STEP_NODES_PER_PAGE = 10_000

# Minimum limit for the query, after which the query will fail
EAS_OPTIMISM_MINIMUM_LIMIT = 100

# The rate limit wait time in seconds
EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS = 65

# Kubernetes configuration for the asset materialization
K8S_CONFIG = {
"merge_behavior": "SHALLOW",
Expand Down Expand Up @@ -132,31 +156,32 @@ def get_optimism_eas_data(
"""
)

for step in generate_steps(total_count, EAS_OPTIMISM_STEP_NODES_PER_PAGE):
try:
query = client.execute(
attestations_query,
variable_values={
"take": EAS_OPTIMISM_STEP_NODES_PER_PAGE,
"skip": step,
"where": {
"time": {
"gte": date_from,
"lte": date_to,
},
},
},
)
context.log.info(
f"Fetching attestation {step}/{total_count}",
)
yield query["attestations"]
except Exception as exception:
context.log.warning(
f"An error occurred while fetching EAS data: '{exception}'. "
"We will stop this materialization instead of retrying for now."
)
return []
config = QueryConfig(
limit=EAS_OPTIMISM_STEP_NODES_PER_PAGE,
minimum_limit=EAS_OPTIMISM_MINIMUM_LIMIT,
query_arguments=EASOptimismParameters(
take=EAS_OPTIMISM_STEP_NODES_PER_PAGE,
offset=0,
skip=0,
where={
"time": {
"gte": date_from,
"lte": date_to,
}
},
),
step_key="skip",
extract_fn=lambda data: data["attestations"],
ratelimit_wait_seconds=EAS_OPTIMISM_RATELIMIT_WAIT_SECONDS,
)

yield from query_with_retry(
client,
context,
attestations_query,
total_count,
config,
)


def get_optimism_eas(context: AssetExecutionContext, client: Client):
Expand Down Expand Up @@ -208,4 +233,5 @@ def attestations(context: AssetExecutionContext):
name="attestations",
columns=pydantic_to_dlt_nullable_columns(Attestation),
primary_key="id",
write_disposition="merge",
)
Loading

0 comments on commit a226883

Please sign in to comment.