From 0efd58e466a3d289c58aacb77f65f05ec909b787 Mon Sep 17 00:00:00 2001 From: Jean Cochrane Date: Tue, 10 Sep 2024 12:19:49 -0500 Subject: [PATCH] Factor shared dbt script logic out into `utils` module (#590) * Factor out town close reports to their own dedicated script * Update dbt/README.md with new `export_qc_town_close_reports` interface * Temp commit: Try moving --refresh-tables to the export_models script * Revert "Temp commit: Try moving --refresh-tables to the export_models script" This reverts commit bd7ad275f0dd30beead6aa5860147553c8bf81ff. * Add output to docs for --refresh-tables flag for export_qc_town_close_reports * Rename --refresh-tables -> --print-table-refresh-command in export_qc_town_close_reports script * Use correct filters for sales table when printing table refresh command in export_qc_town_close_reports * Fix data structure for export_qc_town_close_reports refresh table command filters * Don't filter asmt_hist by cur = 'Y' in export_qc_town_close_reports, since that value doesn't exist * Remove unnecessary --no-run-glue-crawler flag in town close QC report refresh command * Generalize path in output example for --print-table-refresh-command docs * Ignore mypy warnings for select value we know is not null in export_models * Natural language fix to dbt/README.md * Fix import order in socrata_upload script * Fix typo in dbt/README.md Co-authored-by: Dan Snow <31494343+dfsnow@users.noreply.github.com> * Use typing builtin for list in dbt/scripts/export_models.py * Factor shared script logic out into `utils` module * Fix typing for kwargs that we unpack to argparse add_arguments --------- Co-authored-by: Dan Snow <31494343+dfsnow@users.noreply.github.com> --- dbt/scripts/export_models.py | 230 +------------------- dbt/scripts/export_qc_town_close_reports.py | 24 +- dbt/scripts/run_iasworld_data_tests.py | 6 +- dbt/scripts/utils/__init__.py | 0 dbt/scripts/utils/constants.py | 27 +++ dbt/scripts/utils/export.py | 230 ++++++++++++++++++++ 6 files changed, 273 insertions(+), 244 deletions(-) create mode 100644 dbt/scripts/utils/__init__.py create mode 100644 dbt/scripts/utils/constants.py create mode 100644 dbt/scripts/utils/export.py diff --git a/dbt/scripts/export_models.py b/dbt/scripts/export_models.py index d59c6a960..357803e34 100644 --- a/dbt/scripts/export_models.py +++ b/dbt/scripts/export_models.py @@ -1,23 +1,11 @@ # Export dbt models to Excel files. # # Run `python scripts/export_models.py --help` for details. - import argparse -import contextlib -import io -import json -import os -import pathlib -import shutil -import pandas as pd -import pyathena -from dbt.cli.main import dbtRunner -from openpyxl.styles import Alignment -from openpyxl.utils import column_index_from_string, get_column_letter -from openpyxl.worksheet.table import Table, TableStyleInfo +from utils import constants +from utils.export import export_models -DBT = dbtRunner() CLI_DESCRIPTION = """Export dbt models to Excel files. Expects dependencies from requirements.txt (dbt dependencies) and scripts/requirements.export_models.txt (script dependencies) be installed. @@ -54,6 +42,12 @@ def parse_args(): # get preserved formatter_class=argparse.RawTextHelpFormatter, ) + parser.add_argument( + *constants.TARGET_ARGUMENT_ARGS, **constants.TARGET_ARGUMENT_KWARGS + ) + parser.add_argument( + *constants.REBUILD_ARGUMENT_ARGS, **constants.REBUILD_ARGUMENT_KWARGS + ) parser.add_argument( "--select", required=False, @@ -69,18 +63,6 @@ def parse_args(): "but they can't both be set" ), ) - parser.add_argument( - "--target", - required=False, - default="dev", - help="dbt target to use for querying model data, defaults to 'dev'", - ) - parser.add_argument( - "--rebuild", - action=argparse.BooleanOptionalAction, - default=False, - help="Rebuild models before exporting", - ) parser.add_argument( "--where", required=False, @@ -90,202 +72,6 @@ def parse_args(): return parser.parse_args() -def export_models( - target: str = "dev", - select: list[str] | None = None, - selector: str | None = None, - rebuild: bool = False, - where: str | None = None, -): - if not select and not selector: - raise ValueError("One of --select or --selector is required") - - if select and selector: - raise ValueError("--select and --selector cannot both be set") - - select_args = ["--select", *select] if select else ["--selector", selector] # type: ignore - - if rebuild: - dbt_run_args = ["run", "--target", target, *select_args] - print("Rebuilding models") - print(f"> dbt {' '.join(dbt_run_args)}") - dbt_run_result = DBT.invoke(dbt_run_args) - if not dbt_run_result.success: - print("Encountered error in `dbt run` call") - raise ValueError(dbt_run_result.exception) - - print("Listing models to select for export") - dbt_list_args = [ - "--quiet", - "list", - "--target", - target, - "--resource-types", - "model", - "--output", - "json", - "--output-keys", - "name", - "config", - "relation_name", - *select_args, - ] - print(f"> dbt {' '.join(dbt_list_args)}") - dbt_output = io.StringIO() - with contextlib.redirect_stdout(dbt_output): - dbt_list_result = DBT.invoke(dbt_list_args) - - if not dbt_list_result.success: - print("Encountered error in `dbt list` call") - raise ValueError(dbt_list_result.exception) - - # Output is formatted as a list of newline-separated JSON objects - models = [ - json.loads(model_dict_str) - for model_dict_str in dbt_output.getvalue().split("\n") - # Filter out empty strings caused by trailing newlines - if model_dict_str - ] - - if not models: - raise ValueError( - f"No models found for the select option '{' '.join(select_args)}'" - ) - - print( - "The following models will be exported: " - f"{', '.join(model['name'] for model in models)}" - ) - - conn = pyathena.connect( - s3_staging_dir=os.getenv( - "AWS_ATHENA_S3_STAGING_DIR", - "s3://ccao-dbt-athena-results-us-east-1", - ), - region_name=os.getenv("AWS_ATHENA_REGION_NAME", "us-east-1"), - ) - - for model in models: - # Extract useful model metadata from the columns we queried in - # the `dbt list` call above - model_name = model["name"] - relation_name = model["relation_name"] - export_name = model["config"]["meta"].get("export_name") or model_name - template = model["config"]["meta"].get("export_template") or model_name - - # Define inputs and outputs for export based on model metadata - template_path = os.path.join("export", "templates", f"{template}.xlsx") - template_exists = os.path.isfile(template_path) - output_path = os.path.join("export", "output", f"{export_name}.xlsx") - - print(f"Querying data for model {model_name}") - query = f"SELECT * FROM {relation_name}" - if where: - query += f" WHERE {where}" - print(f"> {query}") - model_df = pd.read_sql(query, conn) - - # Delete the output file if one already exists - pathlib.Path(output_path).unlink(missing_ok=True) - - if template_exists: - print(f"Using template file at {template_path}") - shutil.copyfile(template_path, output_path) - else: - print("No template file exists; creating a workbook from scratch") - - writer_kwargs = ( - {"mode": "a", "if_sheet_exists": "overlay"} - if template_exists - else {} - ) - with pd.ExcelWriter( - output_path, engine="openpyxl", **writer_kwargs - ) as writer: - sheet_name = "Sheet1" - model_df.to_excel( - writer, - sheet_name=sheet_name, - header=False if template_exists else True, - index=False, - startrow=1 if template_exists else 0, - ) - sheet = writer.sheets[sheet_name] - - # Add a table for data filtering. Only do this if the result set - # is not empty, because otherwise the empty table will make - # the Excel workbook invalid - if model_df.empty: - print( - "Skipping formatting for output workbook since result set " - "is empty" - ) - else: - table = Table( - displayName="Query_Results", - ref=( - f"A1:{get_column_letter(sheet.max_column)}" - f"{str(sheet.max_row)}" - ), - ) - table.tableStyleInfo = TableStyleInfo( - name="TableStyleMedium11", showRowStripes=True - ) - sheet.add_table(table) - - # If a parid column exists, format it explicitly as a - # 14-digit number to avoid Excel converting it to scientific - # notation or stripping out leading zeros - if "parid" in model_df or "pin" in model_df: - parid_field = "parid" if "parid" in model_df else "pin" - parid_index = model_df.columns.get_loc(parid_field) - # Skip header row when applying formatting. We need to - # catch the special case where there is only one row, or - # else we will iterate the _cells_ in that row instead of - # the row when slicing it from 2 : max_row - non_header_rows = ( - [sheet[2]] - if sheet.max_row == 2 - else sheet[2 : sheet.max_row] - ) - for row in non_header_rows: - row[parid_index].number_format = "00000000000000" - # Left align since PINs do not actually need to be - # compared by order of magnitude the way that numbers - # do - row[parid_index].alignment = Alignment( - horizontal="left" - ) - - # Apply any column formatting that was configured - format_config = model["config"]["meta"].get( - "export_format", {} - ) - if column_configs := format_config.get("columns"): - for column_config in column_configs: - # Set horizontal alignment if config is present - if horiz_align_dir := column_config.get( - "horizontal_align" - ): - horizontal_alignment = Alignment( - horizontal=horiz_align_dir - ) - col_letter = column_config.get("index") - if col_letter is None: - raise ValueError( - "'index' attribute is required when " - "'horizontal_align' is set on " - "export_format.columns config for " - f"model {model_name}" - ) - idx = column_index_from_string(col_letter) - 1 - # Skip header row - for row in sheet[2 : sheet.max_row]: - row[idx].alignment = horizontal_alignment - - print(f"Exported model {model_name} to {output_path}") - - if __name__ == "__main__": args = parse_args() export_models( diff --git a/dbt/scripts/export_qc_town_close_reports.py b/dbt/scripts/export_qc_town_close_reports.py index 1f0bab5be..e1a638a1c 100644 --- a/dbt/scripts/export_qc_town_close_reports.py +++ b/dbt/scripts/export_qc_town_close_reports.py @@ -6,16 +6,10 @@ import datetime import io import json -import os -import sys from dbt.cli.main import dbtRunner - -# Add the parent directory of `scripts` to the module search path -# so that we can import from other modules in the `scripts` directory -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from scripts.export_models import export_models +from utils import constants +from utils.export import export_models DBT = dbtRunner() @@ -106,10 +100,10 @@ def parse_args() -> argparse.Namespace: formatter_class=argparse.RawTextHelpFormatter, ) parser.add_argument( - "--target", - required=False, - default="dev", - help="dbt target to use for querying model data, defaults to 'dev'", + *constants.TARGET_ARGUMENT_ARGS, **constants.TARGET_ARGUMENT_KWARGS + ) + parser.add_argument( + *constants.REBUILD_ARGUMENT_ARGS, **constants.REBUILD_ARGUMENT_KWARGS ) parser.add_argument( "--township", @@ -123,12 +117,6 @@ def parse_args() -> argparse.Namespace: type=int, help="Tax year to use in filtering query results. Defaults to the current year", ) - parser.add_argument( - "--rebuild", - action=argparse.BooleanOptionalAction, - default=False, - help="Rebuild models before exporting", - ) parser.add_argument( "--print-table-refresh-command", action=argparse.BooleanOptionalAction, diff --git a/dbt/scripts/run_iasworld_data_tests.py b/dbt/scripts/run_iasworld_data_tests.py index 4d6a11a78..38bd711c0 100644 --- a/dbt/scripts/run_iasworld_data_tests.py +++ b/dbt/scripts/run_iasworld_data_tests.py @@ -33,6 +33,7 @@ import yaml from dbt.artifacts.schemas.results import TestStatus from dbt.cli.main import dbtRunner +from utils import constants DBT = dbtRunner() @@ -730,10 +731,7 @@ def main() -> None: ), ) parser.add_argument( - "--target", - required=False, - default="dev", - help="dbt target to use for running tests, defaults to 'dev'", + *constants.TARGET_ARGUMENT_ARGS, **constants.TARGET_ARGUMENT_KWARGS ) args = parser.parse_args() diff --git a/dbt/scripts/utils/__init__.py b/dbt/scripts/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dbt/scripts/utils/constants.py b/dbt/scripts/utils/constants.py new file mode 100644 index 000000000..dcf3d0b63 --- /dev/null +++ b/dbt/scripts/utils/constants.py @@ -0,0 +1,27 @@ +# Constant values that are reused across scripts +import argparse +import typing + + +# Define type for kwargs to argparse's add_argument method, since otherwise mypy +# will be confused by the dict types when we unpack them. See here for details: +# https://stackoverflow.com/a/74316829 +class AddArgumentKwargs(typing.TypedDict, total=False): + action: str | type[argparse.Action] + default: typing.Any + help: str + + +# Definitions for common argparse arguments +TARGET_ARGUMENT_ARGS = ["--target"] +TARGET_ARGUMENT_KWARGS: AddArgumentKwargs = { + "action": "store", + "default": "dev", + "help": "dbt target to use for running commands, defaults to 'dev'", +} +REBUILD_ARGUMENT_ARGS = ["--rebuild"] +REBUILD_ARGUMENT_KWARGS: AddArgumentKwargs = { + "action": argparse.BooleanOptionalAction, + "default": False, + "help": "Rebuild models prior to export", +} diff --git a/dbt/scripts/utils/export.py b/dbt/scripts/utils/export.py new file mode 100644 index 000000000..38e669eb9 --- /dev/null +++ b/dbt/scripts/utils/export.py @@ -0,0 +1,230 @@ +# Shared utilities for exporting models +import contextlib +import io +import json +import os +import pathlib +import shutil + +import pandas as pd +import pyathena +from dbt.cli.main import dbtRunner +from openpyxl.styles import Alignment +from openpyxl.utils import column_index_from_string, get_column_letter +from openpyxl.worksheet.table import Table, TableStyleInfo + +# Shared object for running dbt CLI commands +DBT = dbtRunner() + + +def export_models( + target: str = "dev", + select: list[str] | None = None, + selector: str | None = None, + rebuild: bool = False, + where: str | None = None, +): + """ + Export a group of models to Excel workbooks in the output directory + `export/output/`. + + Arguments: + + * target (str): dbt target to use for querying model data, defaults to + "dev" + * select (list[str]): One or more dbt --select statements to + use for filtering models + * selector (str): A selector name to use for filtering + models, as defined in selectors.yml. One of `select` or `selector` + must be set, but they can't both be set + * rebuild (bool): Rebuild models before exporting, defaults to False + * where (str): Optional SQL expression representing a WHERE clause to + filter models + """ + if not select and not selector: + raise ValueError("One of --select or --selector is required") + + if select and selector: + raise ValueError("--select and --selector cannot both be set") + + select_args = ["--select", *select] if select else ["--selector", selector] # type: ignore + + if rebuild: + dbt_run_args = ["run", "--target", target, *select_args] + print("Rebuilding models") + print(f"> dbt {' '.join(dbt_run_args)}") + dbt_run_result = DBT.invoke(dbt_run_args) + if not dbt_run_result.success: + print("Encountered error in `dbt run` call") + raise ValueError(dbt_run_result.exception) + + print("Listing models to select for export") + dbt_list_args = [ + "--quiet", + "list", + "--target", + target, + "--resource-types", + "model", + "--output", + "json", + "--output-keys", + "name", + "config", + "relation_name", + *select_args, + ] + print(f"> dbt {' '.join(dbt_list_args)}") + dbt_output = io.StringIO() + with contextlib.redirect_stdout(dbt_output): + dbt_list_result = DBT.invoke(dbt_list_args) + + if not dbt_list_result.success: + print("Encountered error in `dbt list` call") + raise ValueError(dbt_list_result.exception) + + # Output is formatted as a list of newline-separated JSON objects + models = [ + json.loads(model_dict_str) + for model_dict_str in dbt_output.getvalue().split("\n") + # Filter out empty strings caused by trailing newlines + if model_dict_str + ] + + if not models: + raise ValueError( + f"No models found for the select option '{' '.join(select_args)}'" + ) + + print( + "The following models will be exported: " + f"{', '.join(model['name'] for model in models)}" + ) + + conn = pyathena.connect( + s3_staging_dir=os.getenv( + "AWS_ATHENA_S3_STAGING_DIR", + "s3://ccao-dbt-athena-results-us-east-1", + ), + region_name=os.getenv("AWS_ATHENA_REGION_NAME", "us-east-1"), + ) + + for model in models: + # Extract useful model metadata from the columns we queried in + # the `dbt list` call above + model_name = model["name"] + relation_name = model["relation_name"] + export_name = model["config"]["meta"].get("export_name") or model_name + template = model["config"]["meta"].get("export_template") or model_name + + # Define inputs and outputs for export based on model metadata + template_path = os.path.join("export", "templates", f"{template}.xlsx") + template_exists = os.path.isfile(template_path) + output_path = os.path.join("export", "output", f"{export_name}.xlsx") + + print(f"Querying data for model {model_name}") + query = f"SELECT * FROM {relation_name}" + if where: + query += f" WHERE {where}" + print(f"> {query}") + model_df = pd.read_sql(query, conn) + + # Delete the output file if one already exists + pathlib.Path(output_path).unlink(missing_ok=True) + + if template_exists: + print(f"Using template file at {template_path}") + shutil.copyfile(template_path, output_path) + else: + print("No template file exists; creating a workbook from scratch") + + writer_kwargs = ( + {"mode": "a", "if_sheet_exists": "overlay"} + if template_exists + else {} + ) + with pd.ExcelWriter( + output_path, engine="openpyxl", **writer_kwargs + ) as writer: + sheet_name = "Sheet1" + model_df.to_excel( + writer, + sheet_name=sheet_name, + header=False if template_exists else True, + index=False, + startrow=1 if template_exists else 0, + ) + sheet = writer.sheets[sheet_name] + + # Add a table for data filtering. Only do this if the result set + # is not empty, because otherwise the empty table will make + # the Excel workbook invalid + if model_df.empty: + print( + "Skipping formatting for output workbook since result set " + "is empty" + ) + else: + table = Table( + displayName="Query_Results", + ref=( + f"A1:{get_column_letter(sheet.max_column)}" + f"{str(sheet.max_row)}" + ), + ) + table.tableStyleInfo = TableStyleInfo( + name="TableStyleMedium11", showRowStripes=True + ) + sheet.add_table(table) + + # If a parid column exists, format it explicitly as a + # 14-digit number to avoid Excel converting it to scientific + # notation or stripping out leading zeros + if "parid" in model_df or "pin" in model_df: + parid_field = "parid" if "parid" in model_df else "pin" + parid_index = model_df.columns.get_loc(parid_field) + # Skip header row when applying formatting. We need to + # catch the special case where there is only one row, or + # else we will iterate the _cells_ in that row instead of + # the row when slicing it from 2 : max_row + non_header_rows = ( + [sheet[2]] + if sheet.max_row == 2 + else sheet[2 : sheet.max_row] + ) + for row in non_header_rows: + row[parid_index].number_format = "00000000000000" + # Left align since PINs do not actually need to be + # compared by order of magnitude the way that numbers + # do + row[parid_index].alignment = Alignment( + horizontal="left" + ) + + # Apply any column formatting that was configured + format_config = model["config"]["meta"].get( + "export_format", {} + ) + if column_configs := format_config.get("columns"): + for column_config in column_configs: + # Set horizontal alignment if config is present + if horiz_align_dir := column_config.get( + "horizontal_align" + ): + horizontal_alignment = Alignment( + horizontal=horiz_align_dir + ) + col_letter = column_config.get("index") + if col_letter is None: + raise ValueError( + "'index' attribute is required when " + "'horizontal_align' is set on " + "export_format.columns config for " + f"model {model_name}" + ) + idx = column_index_from_string(col_letter) - 1 + # Skip header row + for row in sheet[2 : sheet.max_row]: + row[idx].alignment = horizontal_alignment + + print(f"Exported model {model_name} to {output_path}")