Skip to content

Commit

Permalink
Various sqlmesh and clickhouse updates for concurrency (#2174)
Browse files Browse the repository at this point in the history
* Attempt to partition and set sorting keys

* Update batch size and storage format to different tree

* more engine tools

* update deps

* sqlmesh settings tests

* camel case refactor

* ensure we can run many different metrics at once

* update deps again

* sqlfluff upgrade fix
  • Loading branch information
ravenac95 committed Sep 19, 2024
1 parent 3c92ad5 commit 451a97c
Show file tree
Hide file tree
Showing 8 changed files with 694 additions and 594 deletions.
2 changes: 1 addition & 1 deletion .sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ templater = dbt
runaway_limit = 10
max_line_length = 80
indent_unit = space
exclude_rules = AL09
exclude_rules = AL09, RF02

[sqlfluff:indentation]
tab_space_size = 4
Expand Down
1,209 changes: 624 additions & 585 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ sqlalchemy = "^2.0.25"
textual = "^0.52.1"
redis = "^5.0.7"
githubkit = "^0.11.6"
sqlmesh = { git = "https://github.com/opensource-observer/sqlmesh.git", extras = [
sqlmesh = { git = "https://github.com/opensource-observer/sqlmesh.git", rev = "test-clickhouse-engine-updates", extras = [
"gcppostgres",
] }
dagster-duckdb = "^0.24.0"
Expand Down
18 changes: 17 additions & 1 deletion warehouse/metrics_mesh/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@

dotenv.load_dotenv()


def pool_manager_factory(config: ClickhouseConnectionConfig):
from clickhouse_connect.driver import httputil

return httputil.get_pool_manager(
num_pools=config.concurrent_tasks,
max_size=config.concurrent_tasks,
)


config = Config(
model_defaults=ModelDefaultsConfig(dialect="clickhouse", start="2024-08-01"),
gateways={
Expand All @@ -32,8 +42,14 @@
password=os.environ.get("SQLMESH_CLICKHOUSE_PASSWORD", ""),
port=int(os.environ.get("SQLMESH_CLICKHOUSE_PORT", "443")),
concurrent_tasks=int(
os.environ.get("SQLMESH_CLICKHOUSE_CONCURRENT_TASKS", "8")
os.environ.get("SQLMESH_CLICKHOUSE_CONCURRENT_TASKS", "16")
),
send_receive_timeout=1800,
connection_settings={"allow_nondeterministic_mutations": 1},
connection_pool_options={
"maxsize": 24,
"retries": 3,
},
),
state_connection=GCPPostgresConnectionConfig(
instance_connection_string=os.environ.get(
Expand Down
8 changes: 4 additions & 4 deletions warehouse/metrics_mesh/macros/time_aggregation_bucket.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from sqlglot import expressions as exp
from sqlmesh import macro
from sqlmesh.core.macros import MacroEvaluator
from sqlglot import expressions as exp


@macro()
def time_aggregation_bucket(
evaluator: MacroEvaluator, timeExp: exp.Expression, rollup: str
evaluator: MacroEvaluator, time_exp: exp.Expression, rollup: str
):
from sqlmesh.core.dialect import parse_one

Expand All @@ -25,7 +25,7 @@ def time_aggregation_bucket(
unit=exp.Var(this=rollup_to_interval[rollup]),
),
exp.Cast(
this=timeExp,
this=time_exp,
to=exp.DataType(this=exp.DataType.Type.DATE, nested=False),
),
],
Expand All @@ -38,6 +38,6 @@ def time_aggregation_bucket(
return exp.Anonymous(
this=rollup_to_clickhouse_function[rollup],
expressions=[
timeExp,
time_exp,
],
)
28 changes: 28 additions & 0 deletions warehouse/metrics_mesh/macros/to_unix_timestamp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from sqlglot import expressions as exp
from sqlmesh import macro
from sqlmesh.core.macros import MacroEvaluator


@macro()
def str_to_unix_timestamp(
evaluator: MacroEvaluator,
time_exp: exp.Expression,
):
from sqlmesh.core.dialect import parse_one

if evaluator.runtime_stage in ["loading", "creating"]:
return parse_one("1::Uint32", dialect="clickhouse")

if evaluator.engine_adapter.dialect == "duckdb":
return exp.TimeToUnix(
this=exp.StrToTime(
this=time_exp,
format=exp.Array(
expressions=[exp.Literal(this="%Y-%m-%d", is_string=True)]
),
)
)
return exp.Anonymous(
this="toUnixTimestamp",
expressions=[time_exp],
)
19 changes: 17 additions & 2 deletions warehouse/metrics_mesh/models/events_daily_to_artifact.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ MODEL (
name metrics.events_daily_to_artifact,
kind INCREMENTAL_BY_TIME_RANGE (
time_column bucket_day,
batch_size 30
batch_size 90
),
start '2015-01-01',
cron '@daily',
dialect 'clickhouse',
partitioned_by (event_type, DATE_TRUNC('MONTH', bucket_day)),
grain (
bucket_day,
event_type,
Expand All @@ -20,8 +21,20 @@ MODEL (
event_type String,
from_artifact_id String,
to_artifact_id String,
_sign Int8,
_version UInt32,
amount Float64
)
),
physical_properties (
ORDER_BY = (
event_type,
event_source,
from_artifact_id,
to_artifact_id,
bucket_day
),
),
storage_format "VersionedCollapsingMergeTree(_sign, _version)",
);
WITH events AS (
SELECT DISTINCT from_artifact_id,
Expand All @@ -38,6 +51,8 @@ SELECT from_artifact_id,
event_source,
event_type,
DATE_TRUNC('DAY', time::DATE) AS bucket_day,
1 as _sign,
@str_to_unix_timestamp(@execution_ds) as _version,
SUM(amount) AS amount
FROM events
GROUP BY from_artifact_id,
Expand Down
2 changes: 2 additions & 0 deletions warehouse/metrics_tools/lib/factories/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def generate_models_from_query(
"name": ModelKindName.INCREMENTAL_BY_TIME_RANGE,
"time_column": "metrics_sample_date",
"batch_size": 1,
"batch_concurrency": 1,
},
dialect="clickhouse",
columns=columns,
Expand Down Expand Up @@ -349,3 +350,4 @@ def timeseries_metrics(
# raise Exception("no queries generated from the evaluated queries")
# top_level_select = top_level_select.with_(f"all_{cte_suffix}", union_cte)
# return top_level_select
# return top_level_select

0 comments on commit 451a97c

Please sign in to comment.