Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries for SQL generation using lower optimization levels #1569

Merged
merged 1 commit into from
Dec 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 78 additions & 17 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,26 +187,85 @@ def convert_to_sql_query_plan(
sql_query_plan_id: Optional[DagId] = None,
) -> ConvertToSqlPlanResult:
"""Create an SQL query plan that represents the computation up to the given dataflow plan node."""
# TODO: Make this a more generally accessible attribute instead of checking against the
# BigQuery-ness of the engine
use_column_alias_in_group_by = sql_engine_type is SqlEngine.BIGQUERY

option_set = SqlGenerationOptionSet.options_for_level(
optimization_level, use_column_alias_in_group_by=use_column_alias_in_group_by
# In case there are bugs that raise exceptions at higher optimization levels, retry generation at a lower
# optimization level. Generally skip O0 (unless requested) as that level does not include the column pruner.
# Without that, the generated SQL can be enormous.
optimization_levels_to_attempt: Sequence[SqlQueryOptimizationLevel] = sorted(
# Union handles case if O0 was specifically requested.
set(
possible_level
for possible_level in SqlQueryOptimizationLevel
if SqlQueryOptimizationLevel.O1 <= possible_level <= optimization_level
).union({optimization_level}),
reverse=True,
)
retried_at_lower_optimization_level = False
logger.debug(
LazyFormat(
"Attempting to convert to a SQL plan with optimization levels:",
optimization_levels_to_attempt=optimization_levels_to_attempt,
)
)
for attempted_optimization_level in optimization_levels_to_attempt:
try:
# TODO: Make this a more generally accessible attribute instead of checking against the
# BigQuery-ness of the engine
use_column_alias_in_group_by = sql_engine_type is SqlEngine.BIGQUERY

logger.info(LazyFormat("Using option set:", option_set=option_set))
option_set = SqlGenerationOptionSet.options_for_level(
attempted_optimization_level, use_column_alias_in_group_by=use_column_alias_in_group_by
)

nodes_to_convert_to_cte: FrozenSet[DataflowPlanNode] = frozenset()
if option_set.allow_cte:
nodes_to_convert_to_cte = self._get_nodes_to_convert_to_cte(dataflow_plan_node)
logger.info(
LazyFormat(
"Using option set for SQL generation:",
optimization_level=optimization_level,
option_set=option_set,
)
)

return self.convert_using_specifics(
dataflow_plan_node=dataflow_plan_node,
sql_query_plan_id=sql_query_plan_id,
nodes_to_convert_to_cte=nodes_to_convert_to_cte,
optimizers=option_set.optimizers,
)
nodes_to_convert_to_cte: FrozenSet[DataflowPlanNode] = frozenset()
if option_set.allow_cte:
nodes_to_convert_to_cte = self._get_nodes_to_convert_to_cte(dataflow_plan_node)

result = self.convert_using_specifics(
dataflow_plan_node=dataflow_plan_node,
sql_query_plan_id=sql_query_plan_id,
nodes_to_convert_to_cte=nodes_to_convert_to_cte,
optimizers=option_set.optimizers,
)

if retried_at_lower_optimization_level:
logger.error(
LazyFormat(
"Successfully generated the SQL plan using an optimization level lower than the"
" requested one. A lower one was used due to an exception using the requested one. Please "
"investigate the cause for the exception.",
requested_optimization_level=optimization_level,
successful_optimization_level=attempted_optimization_level,
)
)

return result

except Exception as e:
if optimization_level is optimization_levels_to_attempt[-1]:
logger.error(
"Exhausted attempts to generate the SQL without exceptions."
" Propagating the most recent exception."
)
raise e
retried_at_lower_optimization_level = True
logger.exception(
LazyFormat(
"Got an exception while generating the SQL plan. This indicates a bug that should be"
" investigated, but retrying at a different optimization level to potentially avoid a"
" user-facing error.",
attempted_optimization_level=optimization_level,
)
)

raise RuntimeError("Should have returned a result or raised an exception in the loop.")

def convert_using_specifics(
self,
Expand All @@ -216,7 +275,9 @@ def convert_using_specifics(
optimizers: Sequence[SqlQueryPlanOptimizer],
) -> ConvertToSqlPlanResult:
"""Helper method to convert using specific options. Main use case are tests."""
logger.debug(LazyFormat("Converting to SQL", nodes_to_convert_to_cte=nodes_to_convert_to_cte))
logger.debug(
LazyFormat("Converting to SQL", nodes_to_convert_to_cte=[node.node_id for node in nodes_to_convert_to_cte])
)

if len(nodes_to_convert_to_cte) == 0:
# Avoid `DataflowNodeToSqlCteVisitor` code path for better isolation during rollout.
Expand Down
Loading