Skip to content

Commit

Permalink
Merge branch 'main' into ravenac95/sqlmesh-clickhouse-updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 authored Sep 19, 2024
2 parents 7eab76e + 3c92ad5 commit 0b2eca0
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 129 deletions.
86 changes: 47 additions & 39 deletions warehouse/oso_dagster/assets/open_collective.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, timedelta
from typing import Optional
from typing import Optional, Literal

import dlt
from dagster import AssetExecutionContext, WeeklyPartitionsDefinition
Expand Down Expand Up @@ -47,6 +47,30 @@ class Transaction(BaseModel):
# The maximum number of nodes that can be retrieved per page
OPEN_COLLECTIVE_MAX_NODES_PER_PAGE = 1000

# Kubernetes configuration for the asset materialization
K8S_CONFIG = {
"merge_behavior": "SHALLOW",
"container_config": {
"resources": {
"requests": {"cpu": "2000m", "memory": "3584Mi"},
"limits": {"cpu": "2000m", "memory": "3584Mi"},
},
},
"pod_spec_config": {
"node_selector": {
"pool_type": "spot",
},
"tolerations": [
{
"key": "pool_type",
"operator": "Equal",
"value": "spot",
"effect": "NoSchedule",
}
],
},
}


def generate_steps(total: int, step: int):
"""
Expand Down Expand Up @@ -79,19 +103,19 @@ def generate_steps(total: int, step: int):
def get_open_collective_data(
context: AssetExecutionContext,
client: Client,
type: str,
dateFrom: str,
dateTo: str,
kind: Literal["DEBIT", "CREDIT"],
date_from: str,
date_to: str,
):
"""
Retrieves Open Collective data using the provided client and query parameters.
Args:
context (AssetExecutionContext): The execution context of the asset.
client (Client): The client object used to execute the GraphQL queries.
type (str): The transaction type. Either "DEBIT" or "CREDIT".
dateFrom (str): The start date for the query.
dateTo (str): The end date for the query.
kind (str): The transaction type. Either "DEBIT" or "CREDIT".
date_from (str): The start date for the query.
date_to (str): The end date for the query.
Yields:
list: A list of transaction nodes retrieved from Open Collective.
Expand All @@ -117,9 +141,9 @@ def get_open_collective_data(
total = client.execute(
total_query,
variable_values={
"type": type,
"dateFrom": dateFrom,
"dateTo": dateTo,
"type": kind,
"dateFrom": date_from,
"dateTo": date_to,
},
)

Expand Down Expand Up @@ -176,13 +200,13 @@ def get_open_collective_data(
variable_values={
"limit": OPEN_COLLECTIVE_MAX_NODES_PER_PAGE,
"offset": step,
"type": type,
"dateFrom": dateFrom,
"dateTo": dateTo,
"type": kind,
"dateFrom": date_from,
"dateTo": date_to,
},
)
context.log.info(
f"Fetching transaction {step}/{total_count} for type '{type}'"
f"Fetching transaction {step}/{total_count} for type '{kind}'"
)
yield query["transactions"]["nodes"]
except Exception as exception:
Expand All @@ -197,7 +221,7 @@ def get_open_collective_data(
def get_open_collective_expenses(
context: AssetExecutionContext,
client: Client,
kind: str,
kind: Literal["DEBIT", "CREDIT"],
):
"""
Get open collective expenses.
Expand Down Expand Up @@ -250,9 +274,12 @@ def base_open_collective_client(personal_token: str):
@dlt_factory(
key_prefix="open_collective",
partitions_def=WeeklyPartitionsDefinition(
start_date=OPEN_COLLECTIVE_TX_EPOCH.split("T")[0],
start_date=OPEN_COLLECTIVE_TX_EPOCH.split("T", maxsplit=1)[0],
end_date=(datetime.now()).isoformat().split("T")[0],
),
op_tags={
"dagster-k8s/config": K8S_CONFIG,
},
)
def expenses(
context: AssetExecutionContext,
Expand Down Expand Up @@ -291,31 +318,12 @@ def expenses(
@dlt_factory(
key_prefix="open_collective",
partitions_def=WeeklyPartitionsDefinition(
start_date=OPEN_COLLECTIVE_TX_EPOCH.split("T")[0],
start_date=OPEN_COLLECTIVE_TX_EPOCH.split("T", maxsplit=1)[0],
end_date=(datetime.now()).isoformat().split("T")[0],
),
op_tags={
"dagster-k8s/config": {
"merge_behavior": "SHALLOW",
"container_config": {
"resources": {
"requests": {"cpu": "2000m", "memory": "3584Mi"},
"limits": {"cpu": "2000m", "memory": "3584Mi"},
},
},
"pod_spec_config": {
"node_selector": {"pool_type": "spot",},
"tolerations": [
{
"key": "pool_type",
"operator": "Equal",
"value": "spot",
"effect": "NoSchedule",
}
],
},
}
}
"dagster-k8s/config": K8S_CONFIG,
},
)
def deposits(
context: AssetExecutionContext,
Expand All @@ -337,7 +345,7 @@ def deposits(
client = base_open_collective_client(personal_token)
resource = dlt.resource(
get_open_collective_expenses(context, client, "CREDIT"),
name="funds",
name="deposits",
columns=pydantic_to_dlt_nullable_columns(Transaction),
primary_key="id",
)
Expand Down
Loading

0 comments on commit 0b2eca0

Please sign in to comment.