Skip to content

Commit

Permalink
Update default SQL optimization level to use CTEs (#1525)
Browse files Browse the repository at this point in the history
Resolves #1040 

This PR updates the default SQL optimization level to use CTEs. This
creates a number of snapshot changes as queries in tests now use CTEs in
cases where there are common sources or common metrics in multi-metric /
derived metric cases.
  • Loading branch information
plypaul authored Nov 14, 2024
1 parent b5b6d3f commit 0be9c0a
Show file tree
Hide file tree
Showing 92 changed files with 3,181 additions and 3,194 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241112-215817.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Use CTEs instead of sub-queries in generated SQL.
time: 2024-11-12T21:58:17.127471-08:00
custom:
Author: plypaul
Issue: "1040"
32 changes: 16 additions & 16 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,21 @@ class MetricFlowQueryRequest:
"""

request_id: MetricFlowRequestId
saved_query_name: Optional[str] = None
metric_names: Optional[Sequence[str]] = None
metrics: Optional[Sequence[MetricQueryParameter]] = None
group_by_names: Optional[Sequence[str]] = None
group_by: Optional[Tuple[GroupByParameter, ...]] = None
limit: Optional[int] = None
time_constraint_start: Optional[datetime.datetime] = None
time_constraint_end: Optional[datetime.datetime] = None
where_constraints: Optional[Sequence[str]] = None
order_by_names: Optional[Sequence[str]] = None
order_by: Optional[Sequence[OrderByQueryParameter]] = None
min_max_only: bool = False
sql_optimization_level: SqlQueryOptimizationLevel = SqlQueryOptimizationLevel.O4
dataflow_plan_optimizations: FrozenSet[DataflowPlanOptimization] = DataflowPlanOptimization.enabled_optimizations()
query_type: MetricFlowQueryType = MetricFlowQueryType.METRIC
saved_query_name: Optional[str]
metric_names: Optional[Sequence[str]]
metrics: Optional[Sequence[MetricQueryParameter]]
group_by_names: Optional[Sequence[str]]
group_by: Optional[Tuple[GroupByParameter, ...]]
limit: Optional[int]
time_constraint_start: Optional[datetime.datetime]
time_constraint_end: Optional[datetime.datetime]
where_constraints: Optional[Sequence[str]]
order_by_names: Optional[Sequence[str]]
order_by: Optional[Sequence[OrderByQueryParameter]]
min_max_only: bool
sql_optimization_level: SqlQueryOptimizationLevel
dataflow_plan_optimizations: FrozenSet[DataflowPlanOptimization]
query_type: MetricFlowQueryType

@staticmethod
def create_with_random_request_id( # noqa: D102
Expand All @@ -129,7 +129,7 @@ def create_with_random_request_id( # noqa: D102
where_constraints: Optional[Sequence[str]] = None,
order_by_names: Optional[Sequence[str]] = None,
order_by: Optional[Sequence[OrderByQueryParameter]] = None,
sql_optimization_level: SqlQueryOptimizationLevel = SqlQueryOptimizationLevel.O4,
sql_optimization_level: SqlQueryOptimizationLevel = SqlQueryOptimizationLevel.default_level(),
dataflow_plan_optimizations: FrozenSet[
DataflowPlanOptimization
] = DataflowPlanOptimization.enabled_optimizations(),
Expand Down
2 changes: 1 addition & 1 deletion metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def convert_to_sql_query_plan(
self,
sql_engine_type: SqlEngine,
dataflow_plan_node: DataflowPlanNode,
optimization_level: SqlQueryOptimizationLevel = SqlQueryOptimizationLevel.O4,
optimization_level: SqlQueryOptimizationLevel = SqlQueryOptimizationLevel.default_level(),
sql_query_plan_id: Optional[DagId] = None,
) -> ConvertToSqlPlanResult:
"""Create an SQL query plan that represents the computation up to the given dataflow plan node."""
Expand Down
2 changes: 1 addition & 1 deletion metricflow/sql/optimizer/optimization_levels.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class SqlQueryOptimizationLevel(Enum):

@staticmethod
def default_level() -> SqlQueryOptimizationLevel: # noqa: D102
return SqlQueryOptimizationLevel.O4
return SqlQueryOptimizationLevel.O5


@dataclass(frozen=True)
Expand Down
3 changes: 0 additions & 3 deletions tests_metricflow/examples/test_node_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from metricflow.dataset.convert_semantic_model import SemanticModelToDataSetConverter
from metricflow.plan_conversion.dataflow_to_sql import DataflowToSqlQueryPlanConverter
from metricflow.protocols.sql_client import SqlClient
from metricflow.sql.optimizer.optimization_levels import SqlQueryOptimizationLevel
from metricflow.sql.render.sql_plan_renderer import SqlQueryPlanRenderer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,7 +56,6 @@ def test_view_sql_generated_at_a_node(
conversion_result = to_sql_plan_converter.convert_to_sql_query_plan(
sql_engine_type=sql_client.sql_engine_type,
dataflow_plan_node=read_source_node,
optimization_level=SqlQueryOptimizationLevel.O4,
)
sql_plan_at_read_node = conversion_result.sql_plan
sql_at_read_node = sql_renderer.render_sql_query_plan(sql_plan_at_read_node).sql
Expand Down Expand Up @@ -86,7 +84,6 @@ def test_view_sql_generated_at_a_node(
conversion_result = to_sql_plan_converter.convert_to_sql_query_plan(
sql_engine_type=sql_client.sql_engine_type,
dataflow_plan_node=filter_elements_node,
optimization_level=SqlQueryOptimizationLevel.O4,
)
sql_plan_at_filter_elements_node = conversion_result.sql_plan
sql_at_filter_elements_node = sql_renderer.render_sql_query_plan(sql_plan_at_filter_elements_node).sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def make_execution_plan_converter( # noqa: D103
),
sql_plan_renderer=DefaultSqlQueryPlanRenderer(),
sql_client=sql_client,
sql_optimization_level=SqlQueryOptimizationLevel.O4,
sql_optimization_level=SqlQueryOptimizationLevel.default_level(),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ def convert_and_check(
sql_engine_type=sql_client.sql_engine_type,
sql_query_plan_id=DagId.from_str("plan0_optimized"),
dataflow_plan_node=node,
optimization_level=SqlQueryOptimizationLevel.O4,
)
sql_query_plan = conversion_result.sql_plan
display_graph_if_requested(
Expand Down
1 change: 0 additions & 1 deletion tests_metricflow/query_rendering/compare_rendered_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def render_and_check(
conversion_result = dataflow_to_sql_converter.convert_to_sql_query_plan(
sql_engine_type=sql_client.sql_engine_type,
dataflow_plan_node=optimized_plan.sink_node,
optimization_level=SqlQueryOptimizationLevel.O4,
sql_query_plan_id=DagId.from_str("plan0_optimized"),
)
sql_query_plan = conversion_result.sql_plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,25 @@ sql_engine: BigQuery
---
-- Combine Aggregated Outputs
-- Compute Metrics via Expressions
SELECT
COALESCE(MAX(subq_28.buys), 0) AS visit_buy_conversions
FROM (
WITH sma_28019_cte AS (
-- Read Elements From Semantic Model 'visits_source'
-- Metric Time Dimension 'ds'
SELECT
DATETIME_TRUNC(ds, day) AS metric_time__day
, user_id AS user
, 1 AS visits
FROM ***************************.fct_visits visits_source_src_28000
)

SELECT
COALESCE(MAX(subq_27.buys), 0) AS visit_buy_conversions
FROM (
-- Read From CTE For node_id=sma_28019
-- Pass Only Elements: ['visits',]
-- Aggregate Measures
SELECT
SUM(1) AS visits
FROM ***************************.fct_visits visits_source_src_28000
SUM(visits) AS visits
FROM sma_28019_cte sma_28019_cte
) subq_18
CROSS JOIN (
-- Find conversions for user within the range of 7 day
Expand All @@ -26,42 +35,33 @@ CROSS JOIN (
FROM (
-- Dedupe the fanout with mf_internal_uuid in the conversion data set
SELECT DISTINCT
FIRST_VALUE(subq_21.visits) OVER (
FIRST_VALUE(sma_28019_cte.visits) OVER (
PARTITION BY
subq_24.user
, subq_24.metric_time__day
, subq_24.mf_internal_uuid
ORDER BY subq_21.metric_time__day DESC
subq_23.user
, subq_23.metric_time__day
, subq_23.mf_internal_uuid
ORDER BY sma_28019_cte.metric_time__day DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS visits
, FIRST_VALUE(subq_21.metric_time__day) OVER (
, FIRST_VALUE(sma_28019_cte.metric_time__day) OVER (
PARTITION BY
subq_24.user
, subq_24.metric_time__day
, subq_24.mf_internal_uuid
ORDER BY subq_21.metric_time__day DESC
subq_23.user
, subq_23.metric_time__day
, subq_23.mf_internal_uuid
ORDER BY sma_28019_cte.metric_time__day DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS metric_time__day
, FIRST_VALUE(subq_21.user) OVER (
, FIRST_VALUE(sma_28019_cte.user) OVER (
PARTITION BY
subq_24.user
, subq_24.metric_time__day
, subq_24.mf_internal_uuid
ORDER BY subq_21.metric_time__day DESC
subq_23.user
, subq_23.metric_time__day
, subq_23.mf_internal_uuid
ORDER BY sma_28019_cte.metric_time__day DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS user
, subq_24.mf_internal_uuid AS mf_internal_uuid
, subq_24.buys AS buys
FROM (
-- Read Elements From Semantic Model 'visits_source'
-- Metric Time Dimension 'ds'
-- Pass Only Elements: ['visits', 'metric_time__day', 'user']
SELECT
DATETIME_TRUNC(ds, day) AS metric_time__day
, user_id AS user
, 1 AS visits
FROM ***************************.fct_visits visits_source_src_28000
) subq_21
, subq_23.mf_internal_uuid AS mf_internal_uuid
, subq_23.buys AS buys
FROM sma_28019_cte sma_28019_cte
INNER JOIN (
-- Read Elements From Semantic Model 'buys_source'
-- Metric Time Dimension 'ds'
Expand All @@ -72,16 +72,16 @@ CROSS JOIN (
, 1 AS buys
, GENERATE_UUID() AS mf_internal_uuid
FROM ***************************.fct_buys buys_source_src_28000
) subq_24
) subq_23
ON
(
subq_21.user = subq_24.user
sma_28019_cte.user = subq_23.user
) AND (
(
subq_21.metric_time__day <= subq_24.metric_time__day
sma_28019_cte.metric_time__day <= subq_23.metric_time__day
) AND (
subq_21.metric_time__day > DATE_SUB(CAST(subq_24.metric_time__day AS DATETIME), INTERVAL 7 day)
sma_28019_cte.metric_time__day > DATE_SUB(CAST(subq_23.metric_time__day AS DATETIME), INTERVAL 7 day)
)
)
) subq_25
) subq_28
) subq_24
) subq_27
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,39 @@ docstring:
sql_engine: BigQuery
---
-- Compute Metrics via Expressions
WITH sma_28019_cte AS (
-- Read Elements From Semantic Model 'visits_source'
-- Metric Time Dimension 'ds'
SELECT
DATETIME_TRUNC(ds, day) AS metric_time__day
, user_id AS user
, 1 AS visits
FROM ***************************.fct_visits visits_source_src_28000
)

SELECT
metric_time__day
metric_time__day AS metric_time__day
, CAST(buys AS FLOAT64) / CAST(NULLIF(visits, 0) AS FLOAT64) AS visit_buy_conversion_rate_7days_fill_nulls_with_0
FROM (
-- Combine Aggregated Outputs
SELECT
COALESCE(subq_27.metric_time__day, subq_40.metric_time__day) AS metric_time__day
COALESCE(subq_27.metric_time__day, subq_39.metric_time__day) AS metric_time__day
, COALESCE(MAX(subq_27.visits), 0) AS visits
, COALESCE(MAX(subq_40.buys), 0) AS buys
, COALESCE(MAX(subq_39.buys), 0) AS buys
FROM (
-- Join to Time Spine Dataset
SELECT
subq_26.ds AS metric_time__day
, subq_24.visits AS visits
FROM ***************************.mf_time_spine subq_26
LEFT OUTER JOIN (
-- Read From CTE For node_id=sma_28019
-- Pass Only Elements: ['visits', 'metric_time__day']
-- Aggregate Measures
SELECT
metric_time__day
, SUM(visits) AS visits
FROM (
-- Read Elements From Semantic Model 'visits_source'
-- Metric Time Dimension 'ds'
-- Pass Only Elements: ['visits', 'metric_time__day']
SELECT
DATETIME_TRUNC(ds, day) AS metric_time__day
, 1 AS visits
FROM ***************************.fct_visits visits_source_src_28000
) subq_23
FROM sma_28019_cte sma_28019_cte
GROUP BY
metric_time__day
) subq_24
Expand All @@ -43,9 +47,9 @@ FROM (
FULL OUTER JOIN (
-- Join to Time Spine Dataset
SELECT
subq_39.ds AS metric_time__day
, subq_37.buys AS buys
FROM ***************************.mf_time_spine subq_39
subq_38.ds AS metric_time__day
, subq_36.buys AS buys
FROM ***************************.mf_time_spine subq_38
LEFT OUTER JOIN (
-- Find conversions for user within the range of 7 day
-- Pass Only Elements: ['buys', 'metric_time__day']
Expand All @@ -56,42 +60,33 @@ FROM (
FROM (
-- Dedupe the fanout with mf_internal_uuid in the conversion data set
SELECT DISTINCT
FIRST_VALUE(subq_30.visits) OVER (
FIRST_VALUE(sma_28019_cte.visits) OVER (
PARTITION BY
subq_33.user
, subq_33.metric_time__day
, subq_33.mf_internal_uuid
ORDER BY subq_30.metric_time__day DESC
subq_32.user
, subq_32.metric_time__day
, subq_32.mf_internal_uuid
ORDER BY sma_28019_cte.metric_time__day DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS visits
, FIRST_VALUE(subq_30.metric_time__day) OVER (
, FIRST_VALUE(sma_28019_cte.metric_time__day) OVER (
PARTITION BY
subq_33.user
, subq_33.metric_time__day
, subq_33.mf_internal_uuid
ORDER BY subq_30.metric_time__day DESC
subq_32.user
, subq_32.metric_time__day
, subq_32.mf_internal_uuid
ORDER BY sma_28019_cte.metric_time__day DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS metric_time__day
, FIRST_VALUE(subq_30.user) OVER (
, FIRST_VALUE(sma_28019_cte.user) OVER (
PARTITION BY
subq_33.user
, subq_33.metric_time__day
, subq_33.mf_internal_uuid
ORDER BY subq_30.metric_time__day DESC
subq_32.user
, subq_32.metric_time__day
, subq_32.mf_internal_uuid
ORDER BY sma_28019_cte.metric_time__day DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS user
, subq_33.mf_internal_uuid AS mf_internal_uuid
, subq_33.buys AS buys
FROM (
-- Read Elements From Semantic Model 'visits_source'
-- Metric Time Dimension 'ds'
-- Pass Only Elements: ['visits', 'metric_time__day', 'user']
SELECT
DATETIME_TRUNC(ds, day) AS metric_time__day
, user_id AS user
, 1 AS visits
FROM ***************************.fct_visits visits_source_src_28000
) subq_30
, subq_32.mf_internal_uuid AS mf_internal_uuid
, subq_32.buys AS buys
FROM sma_28019_cte sma_28019_cte
INNER JOIN (
-- Read Elements From Semantic Model 'buys_source'
-- Metric Time Dimension 'ds'
Expand All @@ -102,26 +97,26 @@ FROM (
, 1 AS buys
, GENERATE_UUID() AS mf_internal_uuid
FROM ***************************.fct_buys buys_source_src_28000
) subq_33
) subq_32
ON
(
subq_30.user = subq_33.user
sma_28019_cte.user = subq_32.user
) AND (
(
subq_30.metric_time__day <= subq_33.metric_time__day
sma_28019_cte.metric_time__day <= subq_32.metric_time__day
) AND (
subq_30.metric_time__day > DATE_SUB(CAST(subq_33.metric_time__day AS DATETIME), INTERVAL 7 day)
sma_28019_cte.metric_time__day > DATE_SUB(CAST(subq_32.metric_time__day AS DATETIME), INTERVAL 7 day)
)
)
) subq_34
) subq_33
GROUP BY
metric_time__day
) subq_37
) subq_36
ON
subq_39.ds = subq_37.metric_time__day
) subq_40
subq_38.ds = subq_36.metric_time__day
) subq_39
ON
subq_27.metric_time__day = subq_40.metric_time__day
subq_27.metric_time__day = subq_39.metric_time__day
GROUP BY
metric_time__day
) subq_41
) subq_40
Loading

0 comments on commit 0be9c0a

Please sign in to comment.