Skip to content

Commit

Permalink
Factor shared dbt script logic out into utils module (#590)
Browse files Browse the repository at this point in the history
* Factor out town close reports to their own dedicated script

* Update dbt/README.md with new `export_qc_town_close_reports` interface

* Temp commit: Try moving --refresh-tables to the export_models script

* Revert "Temp commit: Try moving --refresh-tables to the export_models script"

This reverts commit bd7ad27.

* Add output to docs for --refresh-tables flag for export_qc_town_close_reports

* Rename --refresh-tables -> --print-table-refresh-command in export_qc_town_close_reports script

* Use correct filters for sales table when printing table refresh command in export_qc_town_close_reports

* Fix data structure for export_qc_town_close_reports refresh table command filters

* Don't filter asmt_hist by cur = 'Y' in export_qc_town_close_reports, since that value doesn't exist

* Remove unnecessary --no-run-glue-crawler flag in town close QC report refresh command

* Generalize path in output example for --print-table-refresh-command docs

* Ignore mypy warnings for select value we know is not null in export_models

* Natural language fix to dbt/README.md

* Fix import order in socrata_upload script

* Fix typo in dbt/README.md

Co-authored-by: Dan Snow <31494343+dfsnow@users.noreply.github.com>

* Use typing builtin for list in dbt/scripts/export_models.py

* Factor shared script logic out into `utils` module

* Fix typing for kwargs that we unpack to argparse add_arguments

---------

Co-authored-by: Dan Snow <31494343+dfsnow@users.noreply.github.com>
  • Loading branch information
jeancochrane and dfsnow authored Sep 10, 2024
1 parent b93c545 commit 0efd58e
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 244 deletions.
230 changes: 8 additions & 222 deletions dbt/scripts/export_models.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,11 @@
# Export dbt models to Excel files.
#
# Run `python scripts/export_models.py --help` for details.

import argparse
import contextlib
import io
import json
import os
import pathlib
import shutil

import pandas as pd
import pyathena
from dbt.cli.main import dbtRunner
from openpyxl.styles import Alignment
from openpyxl.utils import column_index_from_string, get_column_letter
from openpyxl.worksheet.table import Table, TableStyleInfo
from utils import constants
from utils.export import export_models

DBT = dbtRunner()
CLI_DESCRIPTION = """Export dbt models to Excel files.
Expects dependencies from requirements.txt (dbt dependencies) and scripts/requirements.export_models.txt (script dependencies) be installed.
Expand Down Expand Up @@ -54,6 +42,12 @@ def parse_args():
# get preserved
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
*constants.TARGET_ARGUMENT_ARGS, **constants.TARGET_ARGUMENT_KWARGS
)
parser.add_argument(
*constants.REBUILD_ARGUMENT_ARGS, **constants.REBUILD_ARGUMENT_KWARGS
)
parser.add_argument(
"--select",
required=False,
Expand All @@ -69,18 +63,6 @@ def parse_args():
"but they can't both be set"
),
)
parser.add_argument(
"--target",
required=False,
default="dev",
help="dbt target to use for querying model data, defaults to 'dev'",
)
parser.add_argument(
"--rebuild",
action=argparse.BooleanOptionalAction,
default=False,
help="Rebuild models before exporting",
)
parser.add_argument(
"--where",
required=False,
Expand All @@ -90,202 +72,6 @@ def parse_args():
return parser.parse_args()


def export_models(
target: str = "dev",
select: list[str] | None = None,
selector: str | None = None,
rebuild: bool = False,
where: str | None = None,
):
if not select and not selector:
raise ValueError("One of --select or --selector is required")

if select and selector:
raise ValueError("--select and --selector cannot both be set")

select_args = ["--select", *select] if select else ["--selector", selector] # type: ignore

if rebuild:
dbt_run_args = ["run", "--target", target, *select_args]
print("Rebuilding models")
print(f"> dbt {' '.join(dbt_run_args)}")
dbt_run_result = DBT.invoke(dbt_run_args)
if not dbt_run_result.success:
print("Encountered error in `dbt run` call")
raise ValueError(dbt_run_result.exception)

print("Listing models to select for export")
dbt_list_args = [
"--quiet",
"list",
"--target",
target,
"--resource-types",
"model",
"--output",
"json",
"--output-keys",
"name",
"config",
"relation_name",
*select_args,
]
print(f"> dbt {' '.join(dbt_list_args)}")
dbt_output = io.StringIO()
with contextlib.redirect_stdout(dbt_output):
dbt_list_result = DBT.invoke(dbt_list_args)

if not dbt_list_result.success:
print("Encountered error in `dbt list` call")
raise ValueError(dbt_list_result.exception)

# Output is formatted as a list of newline-separated JSON objects
models = [
json.loads(model_dict_str)
for model_dict_str in dbt_output.getvalue().split("\n")
# Filter out empty strings caused by trailing newlines
if model_dict_str
]

if not models:
raise ValueError(
f"No models found for the select option '{' '.join(select_args)}'"
)

print(
"The following models will be exported: "
f"{', '.join(model['name'] for model in models)}"
)

conn = pyathena.connect(
s3_staging_dir=os.getenv(
"AWS_ATHENA_S3_STAGING_DIR",
"s3://ccao-dbt-athena-results-us-east-1",
),
region_name=os.getenv("AWS_ATHENA_REGION_NAME", "us-east-1"),
)

for model in models:
# Extract useful model metadata from the columns we queried in
# the `dbt list` call above
model_name = model["name"]
relation_name = model["relation_name"]
export_name = model["config"]["meta"].get("export_name") or model_name
template = model["config"]["meta"].get("export_template") or model_name

# Define inputs and outputs for export based on model metadata
template_path = os.path.join("export", "templates", f"{template}.xlsx")
template_exists = os.path.isfile(template_path)
output_path = os.path.join("export", "output", f"{export_name}.xlsx")

print(f"Querying data for model {model_name}")
query = f"SELECT * FROM {relation_name}"
if where:
query += f" WHERE {where}"
print(f"> {query}")
model_df = pd.read_sql(query, conn)

# Delete the output file if one already exists
pathlib.Path(output_path).unlink(missing_ok=True)

if template_exists:
print(f"Using template file at {template_path}")
shutil.copyfile(template_path, output_path)
else:
print("No template file exists; creating a workbook from scratch")

writer_kwargs = (
{"mode": "a", "if_sheet_exists": "overlay"}
if template_exists
else {}
)
with pd.ExcelWriter(
output_path, engine="openpyxl", **writer_kwargs
) as writer:
sheet_name = "Sheet1"
model_df.to_excel(
writer,
sheet_name=sheet_name,
header=False if template_exists else True,
index=False,
startrow=1 if template_exists else 0,
)
sheet = writer.sheets[sheet_name]

# Add a table for data filtering. Only do this if the result set
# is not empty, because otherwise the empty table will make
# the Excel workbook invalid
if model_df.empty:
print(
"Skipping formatting for output workbook since result set "
"is empty"
)
else:
table = Table(
displayName="Query_Results",
ref=(
f"A1:{get_column_letter(sheet.max_column)}"
f"{str(sheet.max_row)}"
),
)
table.tableStyleInfo = TableStyleInfo(
name="TableStyleMedium11", showRowStripes=True
)
sheet.add_table(table)

# If a parid column exists, format it explicitly as a
# 14-digit number to avoid Excel converting it to scientific
# notation or stripping out leading zeros
if "parid" in model_df or "pin" in model_df:
parid_field = "parid" if "parid" in model_df else "pin"
parid_index = model_df.columns.get_loc(parid_field)
# Skip header row when applying formatting. We need to
# catch the special case where there is only one row, or
# else we will iterate the _cells_ in that row instead of
# the row when slicing it from 2 : max_row
non_header_rows = (
[sheet[2]]
if sheet.max_row == 2
else sheet[2 : sheet.max_row]
)
for row in non_header_rows:
row[parid_index].number_format = "00000000000000"
# Left align since PINs do not actually need to be
# compared by order of magnitude the way that numbers
# do
row[parid_index].alignment = Alignment(
horizontal="left"
)

# Apply any column formatting that was configured
format_config = model["config"]["meta"].get(
"export_format", {}
)
if column_configs := format_config.get("columns"):
for column_config in column_configs:
# Set horizontal alignment if config is present
if horiz_align_dir := column_config.get(
"horizontal_align"
):
horizontal_alignment = Alignment(
horizontal=horiz_align_dir
)
col_letter = column_config.get("index")
if col_letter is None:
raise ValueError(
"'index' attribute is required when "
"'horizontal_align' is set on "
"export_format.columns config for "
f"model {model_name}"
)
idx = column_index_from_string(col_letter) - 1
# Skip header row
for row in sheet[2 : sheet.max_row]:
row[idx].alignment = horizontal_alignment

print(f"Exported model {model_name} to {output_path}")


if __name__ == "__main__":
args = parse_args()
export_models(
Expand Down
24 changes: 6 additions & 18 deletions dbt/scripts/export_qc_town_close_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@
import datetime
import io
import json
import os
import sys

from dbt.cli.main import dbtRunner

# Add the parent directory of `scripts` to the module search path
# so that we can import from other modules in the `scripts` directory
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from scripts.export_models import export_models
from utils import constants
from utils.export import export_models

DBT = dbtRunner()

Expand Down Expand Up @@ -106,10 +100,10 @@ def parse_args() -> argparse.Namespace:
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--target",
required=False,
default="dev",
help="dbt target to use for querying model data, defaults to 'dev'",
*constants.TARGET_ARGUMENT_ARGS, **constants.TARGET_ARGUMENT_KWARGS
)
parser.add_argument(
*constants.REBUILD_ARGUMENT_ARGS, **constants.REBUILD_ARGUMENT_KWARGS
)
parser.add_argument(
"--township",
Expand All @@ -123,12 +117,6 @@ def parse_args() -> argparse.Namespace:
type=int,
help="Tax year to use in filtering query results. Defaults to the current year",
)
parser.add_argument(
"--rebuild",
action=argparse.BooleanOptionalAction,
default=False,
help="Rebuild models before exporting",
)
parser.add_argument(
"--print-table-refresh-command",
action=argparse.BooleanOptionalAction,
Expand Down
6 changes: 2 additions & 4 deletions dbt/scripts/run_iasworld_data_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import yaml
from dbt.artifacts.schemas.results import TestStatus
from dbt.cli.main import dbtRunner
from utils import constants

DBT = dbtRunner()

Expand Down Expand Up @@ -730,10 +731,7 @@ def main() -> None:
),
)
parser.add_argument(
"--target",
required=False,
default="dev",
help="dbt target to use for running tests, defaults to 'dev'",
*constants.TARGET_ARGUMENT_ARGS, **constants.TARGET_ARGUMENT_KWARGS
)

args = parser.parse_args()
Expand Down
Empty file added dbt/scripts/utils/__init__.py
Empty file.
27 changes: 27 additions & 0 deletions dbt/scripts/utils/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Constant values that are reused across scripts
import argparse
import typing


# Define type for kwargs to argparse's add_argument method, since otherwise mypy
# will be confused by the dict types when we unpack them. See here for details:
# https://stackoverflow.com/a/74316829
class AddArgumentKwargs(typing.TypedDict, total=False):
action: str | type[argparse.Action]
default: typing.Any
help: str


# Definitions for common argparse arguments
TARGET_ARGUMENT_ARGS = ["--target"]
TARGET_ARGUMENT_KWARGS: AddArgumentKwargs = {
"action": "store",
"default": "dev",
"help": "dbt target to use for running commands, defaults to 'dev'",
}
REBUILD_ARGUMENT_ARGS = ["--rebuild"]
REBUILD_ARGUMENT_KWARGS: AddArgumentKwargs = {
"action": argparse.BooleanOptionalAction,
"default": False,
"help": "Rebuild models prior to export",
}
Loading

0 comments on commit 0efd58e

Please sign in to comment.