From 8c6633d0d1426cbba61645b588b6e31982dadfa0 Mon Sep 17 00:00:00 2001 From: wagnerlmichael <93889413+wagnerlmichael@users.noreply.github.com> Date: Tue, 26 Nov 2024 16:30:36 -0600 Subject: [PATCH] Refactor ratio stats for build speed increase (#521) * Move functinos from assesspy into file and run spark groupby * Try to add numba as dependency * Add numba dependency * Revert added packages * Confirm numpy speed up * Confirm numpy strat without spark * Try importing dask * Add multiprocessing code * Move parallel functions inside boot_ci function * Fix linter errors * Fix linter errors * Fix linter errors * Fix linter errors * Fix linter errors * Remove rduplicate function * Remove comment * Clean up * Change random seeds and add logs * Add checkpoint for functioning reduced report_summarise() * Add checks and successfully add column to final df * Working spark code with pd sampling * Update column names * Test PySpark applyInPandas * Fix col orders and types * Add working mostly-Spark implementation * Bump max DPUs * Get only the first value from each group col * Bump nboot to 1000 * Refactor ratio_stats for assesspy 2.0.0 * Update types * Update med ratio col names * Check that median sample is gte 2 * Fix sample constants * Add Athena logging * Condense Spark ratio_stats code * Reduce nboot to 300 * Add sales chasing check * Swap bool to Spark data type * Add sample size check for is_sales_chased * Repace calced ratio column * Ignore E402 only for Spark python models --------- Co-authored-by: Dan Snow Co-authored-by: Dan Snow --- dbt/models/reporting/reporting.ratio_stats.py | 380 +++++++----------- dbt/models/reporting/schema.yml | 2 +- pyproject.toml | 5 + 3 files changed, 152 insertions(+), 235 deletions(-) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 06c755ef9..d9799e3eb 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -1,157 +1,129 @@ # pylint: skip-file # type: ignore -sc.addPyFile( # noqa: F821 - "s3://ccao-athena-dependencies-us-east-1/assesspy==1.2.0.zip" +sc.addPyFile("s3://ccao-athena-dependencies-us-east-1/assesspy==2.0.0.zip") + +from typing import Union + +import assesspy as ap +import pandas as pd +from pyspark.sql.functions import col, lit + +CCAO_LOWER_QUANTILE = 0.05 +CCAO_UPPER_QUANTILE = 0.95 +CCAO_MIN_SAMPLE_SIZE = 20.0 + +# The schema definition used within applyInPandas (where it is required) and +# to determine the final column order +SPARK_SCHEMA = ( + "year string, triad string, geography_type string, property_group string, " + "assessment_stage string, geography_id string, sale_year string, sale_n bigint, " + "med_ratio double, med_ratio_ci_l double, med_ratio_ci_u double, med_ratio_n bigint, " + "cod double, cod_ci_l double, cod_ci_u double, cod_met boolean, cod_n bigint, " + "prd double, prd_ci_l double, prd_ci_u double, prd_met boolean, prd_n bigint, " + "prb double, prb_ci_l double, prb_ci_u double, prb_met boolean, prb_n bigint, " + "mki double, mki_ci_l double, mki_ci_u double, mki_met boolean, mki_n bigint, " + "is_sales_chased boolean, within_20_pct bigint, within_10_pct bigint, within_05_pct bigint" ) -import numpy as np # noqa: I001 E402 -import pandas as pd # noqa: E402 -from assesspy import ( # noqa: E402 - boot_ci, # noqa: E402 - cod, # noqa: E402 - cod_ci as cod_boot, # noqa: E402 - cod_met, # noqa: E402 - mki, # noqa: E402 - mki_met, # noqa: E402 - prb, # noqa: E402 - prb_met, # noqa: E402 - prd, # noqa: E402 - prd_met, # noqa: E402 - prd_ci as prd_boot, # noqa: E402 -) - - -def median_boot(ratio, nboot=100, alpha=0.05): - return boot_ci(np.median, nboot=nboot, alpha=alpha, ratio=ratio) - - -def ccao_median(x): - # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs - no_outliers = x.between( - x.quantile(0.05), x.quantile(0.95), inclusive="neither" - ) - - x_no_outliers = x[no_outliers] - - median_n = sum(no_outliers) - - median_val = np.median(x_no_outliers) - median_ci = median_boot(x_no_outliers, nboot=1000) - median_ci = f"{median_ci[0]}, {median_ci[1]}" - - out = [median_val, median_ci, median_n] - - return out - - -def ccao_mki(fmv, sale_price): - ratio = fmv / sale_price - - # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs - no_outliers = ratio.between( - ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" - ) - - fmv_no_outliers = fmv[no_outliers] - sale_price_no_outliers = sale_price[no_outliers] - - mki_n = sum(no_outliers) - - if mki_n >= 20: - mki_val = mki(fmv_no_outliers, sale_price_no_outliers) - met = mki_met(mki_val) - - out = [mki_val, met, mki_n] - - else: - out = [None, None, mki_n] - - return out +def ccao_drop_outliers( + estimate: Union[list[int], list[float], pd.Series], + sale_price: Union[list[int], list[float], pd.Series], +) -> tuple[pd.Series, pd.Series, float]: + """ + Helper function to drop the top and bottom N% (usually 5%) of the input + ratios, per CCAO SOPs and IAAO recommendation. + """ + ratio: pd.Series = estimate / sale_price + ratio_not_outlier = ratio.between( + ratio.quantile(CCAO_LOWER_QUANTILE), + ratio.quantile(CCAO_UPPER_QUANTILE), + inclusive="neither", + ).reset_index(drop=True) -def ccao_cod(ratio): - # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs - no_outliers = ratio[ - ratio.between( - ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" - ) - ] + estimate_no_outliers = estimate[ratio_not_outlier] + sale_price_no_outliers = sale_price[ratio_not_outlier] + n: float = float(estimate_no_outliers.size) - cod_n = no_outliers.size + return estimate_no_outliers, sale_price_no_outliers, n - if cod_n >= 20: - cod_val = cod(no_outliers) - cod_ci = cod_boot(no_outliers, nboot=1000) - cod_ci = f"{cod_ci[0]}, {cod_ci[1]}" - met = cod_met(cod_val) - out = [cod_val, cod_ci, met, cod_n] +def ccao_metric( + fun: callable, + estimate: Union[list[int], list[float], pd.Series], + sale_price: Union[list[int], list[float], pd.Series], +) -> dict: + """ + Helper function to calculate a metric, its confidence interval, and + whether the metric meets the IAAO/Quintos standard. Also checks if the + number of samples is large enough to calculate a metric. Per CCAO + standards, the sample size must be at least 20. + """ + est_no_out, sale_no_out, n = ccao_drop_outliers(estimate, sale_price) + + if n >= CCAO_MIN_SAMPLE_SIZE: + val = getattr(ap, fun)(est_no_out, sale_no_out) + # MKI doesn't have a _ci function, so we need a check here to return + # None if it is called + try: + ci_l, ci_u = getattr(ap, f"{fun}_ci")( + est_no_out, sale_no_out, nboot=300 + ) + except AttributeError: + ci_l, ci_u = None, None + met = getattr(ap, f"{fun}_met")(val) + out = [val, ci_l, ci_u, met, n] else: - out = [None, None, None, cod_n] - - return out - + out = [None, None, None, None, n] -def ccao_prd(fmv, sale_price): - ratio = fmv / sale_price - - # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs - no_outliers = ratio.between( - ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" + # Zip into a dictionary for use with the calc_summary function below, + # which expects a dict to expand into a DataFrame + out_dict = dict( + zip([fun, f"{fun}_ci_l", f"{fun}_ci_u", f"{fun}_met", f"{fun}_n"], out) ) - fmv_no_outliers = fmv[no_outliers] - sale_price_no_outliers = sale_price[no_outliers] + return out_dict - prd_n = sum(no_outliers) - if prd_n >= 20: - prd_val = prd(fmv_no_outliers, sale_price_no_outliers) - prd_ci = prd_boot(fmv_no_outliers, sale_price_no_outliers, nboot=1000) - prd_ci = f"{prd_ci[0]}, {prd_ci[1]}" - met = prd_met(prd_val) +def ccao_median( + estimate: Union[list[int], list[float], pd.Series], + sale_price: Union[list[int], list[float], pd.Series], +) -> dict: + """ + Calculates the median ratio of estimate to sale price, excluding outliers. + Ignores the CCAO minimum sample size requirement (only needs 2 values). + """ + est_no_out, sale_no_out, n = ccao_drop_outliers(estimate, sale_price) - out = [prd_val, prd_ci, met, prd_n] + def median_val(estimate, sale_price): + ratio = estimate / sale_price + return ratio.median() + if n >= 2: + val = median_val(est_no_out, sale_no_out) + ci_l, ci_u = ap.boot_ci( + median_val, estimate=est_no_out, sale_price=sale_no_out, nboot=300 + ) + out = [val, ci_l, ci_u, n] else: - out = [None, None, None, prd_n] + val = median_val(est_no_out, sale_no_out) + out = [val, None, None, n] - return out - - -def ccao_prb(fmv, sale_price): - ratio = fmv / sale_price - - # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs - no_outliers = ratio.between( - ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" + out_dict = dict( + zip( + ["med_ratio", "med_ratio_ci_l", "med_ratio_ci_u", "med_ratio_n"], + out, + ) ) - fmv_no_outliers = fmv[no_outliers] - sale_price_no_outliers = sale_price[no_outliers] + return out_dict - prb_n = sum(no_outliers) - if prb_n >= 20: - prb_model = prb(fmv_no_outliers, sale_price_no_outliers) - prb_val = prb_model["prb"] - prb_ci = prb_model["95% ci"] - prb_ci = f"{prb_ci[0]}, {prb_ci[1]}" - met = prb_met(prb_val) - - out = [prb_val, prb_ci, met, prb_n] - - else: - out = [None, None, None, prb_n] - - return out - - -def report_summarise(df, geography_id, geography_type): +def calc_summary(df: pd.Series, geography_id: str, geography_type: str): """ - Aggregates data and calculates summary statistics for given groupings + Calculate ratio summary statistics for a given geography and geography type. + Takes a DataFrame as input and returns a single row DataFrame of stats. """ - group_cols = [ "year", "triad", @@ -162,120 +134,60 @@ def report_summarise(df, geography_id, geography_type): "sale_year", ] - df["geography_id"] = df[geography_id].astype(str) - df["geography_type"] = geography_type - - # Remove groups with less than three observations - # TODO: Remove/upgrade detect_chasing output - df["n"] = df.groupby(group_cols)["ratio"].transform("count") - df = df[df["n"] > 3] - df = df.groupby(group_cols).apply( - lambda x: pd.Series( - { - "sale_n": np.size(x["triad"]), - "ratio": ccao_median(x["ratio"]), - "cod": ccao_cod(ratio=x["ratio"]), - "mki": ccao_mki(fmv=x["fmv"], sale_price=x["sale_price"]), - "prd": ccao_prd(fmv=x["fmv"], sale_price=x["sale_price"]), - "prb": ccao_prb(fmv=x["fmv"], sale_price=x["sale_price"]), - "detect_chasing": False, - "within_20_pct": sum(abs(1 - x["ratio"]) <= 0.20), - "within_10_pct": sum(abs(1 - x["ratio"]) <= 0.10), - "within_05_pct": sum(abs(1 - x["ratio"]) <= 0.05), - } + out = ( + df.withColumn("geography_id", col(geography_id).cast("string")) + .withColumn("geography_type", lit(geography_type)) + .groupby(group_cols) + .applyInPandas( + lambda x: pd.DataFrame( + [ + { + # Include the grouping column values in the output + **dict( + zip(group_cols, [x[c].iloc[0] for c in group_cols]) + ), + "sale_n": x["triad"].size, + **ccao_median(x["fmv"], x["sale_price"]), + **ccao_metric("cod", x["fmv"], x["sale_price"]), + **ccao_metric("prd", x["fmv"], x["sale_price"]), + **ccao_metric("prb", x["fmv"], x["sale_price"]), + **ccao_metric("mki", x["fmv"], x["sale_price"]), + "is_sales_chased": ap.is_sales_chased(x["ratio"]) + if x["ratio"].size >= CCAO_MIN_SAMPLE_SIZE + else None, + "within_20_pct": sum(abs(1 - x["ratio"]) <= 0.20), + "within_10_pct": sum(abs(1 - x["ratio"]) <= 0.10), + "within_05_pct": sum(abs(1 - x["ratio"]) <= 0.05), + } + ] + ), + schema=SPARK_SCHEMA, ) ) - df[["median_ratio", "median_ratio_ci", "median_ratio_n"]] = pd.DataFrame( - df.ratio.tolist(), index=df.index - ) - df[["cod", "cod_ci", "cod_met", "cod_n"]] = pd.DataFrame( - df.cod.tolist(), index=df.index - ) - df[["mki", "mki_met", "mki_n"]] = pd.DataFrame( - df.mki.tolist(), index=df.index - ) - df[["prd", "prd_ci", "prd_met", "prd_n"]] = pd.DataFrame( - df.prd.tolist(), index=df.index - ) - df[["prb", "prb_ci", "prb_met", "prb_n"]] = pd.DataFrame( - df.prb.tolist(), index=df.index - ) - df["ratio_met"] = abs(1 - df["median_ratio"]) <= 0.05 - df["vertical_equity_met"] = df.prd_met | df.prb_met - - # Arrange output columns - df = df[ - [ - "sale_n", - "median_ratio", - "median_ratio_ci", - "cod", - "cod_ci", - "cod_n", - "prd", - "prd_ci", - "prd_n", - "prb", - "prb_ci", - "prb_n", - "mki", - "mki_n", - "detect_chasing", - "ratio_met", - "cod_met", - "prd_met", - "prb_met", - "mki_met", - "vertical_equity_met", - "within_20_pct", - "within_10_pct", - "within_05_pct", - ] - ].reset_index() - - return df + return out def model(dbt, spark_session): - dbt.config(materialized="table") + dbt.config(materialized="table", engine_config={"MaxConcurrentDpus": 40}) + athena_user_logger.info("Loading ratio stats input table") input = dbt.ref("reporting.ratio_stats_input") + input = input.filter(input.ratio.isNotNull()).filter(input.ratio > 0) - # Convert the Spark input dataframe to Pandas for - # compatibility with assesspy functions - input = input.toPandas() - - # Replicate filtering from prior vw_ratio_stats pull - input = input[input.ratio > 0 & input.ratio.notnull()] - - df = pd.concat( - [ - report_summarise(input, "triad", "Tri"), - report_summarise(input, "township_code", "Town"), - ] - ).reset_index(drop=True) - - # Force certain columns to datatype to maintain parity with old version - df[["year", "triad", "sale_year"]] = df[ - ["year", "triad", "sale_year"] - ].astype(int) + athena_user_logger.info("Calculating group-level statistics") + df_tri = calc_summary(input, "triad", "Tri") + df_town = calc_summary(input, "township_code", "Town") + df = df_tri.unionByName(df_town) - # Create a Spark schema to maintain the datatypes of the - # previous output (for Tableau compatibility) - schema = ( - "year: bigint, triad: bigint, geography_type: string, " - + "property_group: string, assessment_stage: string, " - + "geography_id: string, sale_year: bigint, sale_n: bigint, " - + "median_ratio: double, median_ratio_ci: string, cod: double, " - + "cod_ci: string, cod_n: bigint, prd: double, prd_ci: string, " - + "prd_n: bigint, prb: double, prb_ci: string, prb_n: bigint, " - + "mki: double, mki_n: bigint, detect_chasing: boolean, " - + "ratio_met: boolean, cod_met: boolean, prd_met: boolean, " - + "prb_met: boolean, mki_met: boolean, vertical_equity_met: boolean, " - + "within_20_pct: bigint, within_10_pct: bigint, within_05_pct: bigint" - ) + def schema_to_spark_cols(schema: str): + columns = [] + for col_def in schema.split(", "): + col_name = col_def.split(" ")[0] + columns.append(col(col_name)) + return columns - spark_df = spark_session.createDataFrame(df, schema=schema) + athena_user_logger.info("Cleaning up and arranging columns") + df = df.select(*schema_to_spark_cols(SPARK_SCHEMA)) - return spark_df + return df diff --git a/dbt/models/reporting/schema.yml b/dbt/models/reporting/schema.yml index f7a666bf0..e27e85594 100644 --- a/dbt/models/reporting/schema.yml +++ b/dbt/models/reporting/schema.yml @@ -30,7 +30,7 @@ models: tags: - daily packages: - - "assesspy==1.2.0" + - "assesspy==2.0.0" data_tests: - expression_is_true: name: reporting_ratio_stats_metrics_are_sensible diff --git a/pyproject.toml b/pyproject.toml index 022b0583a..55188f807 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,8 @@ reportUnusedImport = false [tool.ruff] line-length = 79 +# Ignores objects created by Spark on Athena +builtins = ["_", "sc", "athena_user_logger"] [tool.ruff.lint] extend-select = ["I"] @@ -21,6 +23,9 @@ extend-select = ["I"] # decide we want to import code from dbt/ to a context outside of it known-third-party = ["dbt"] +[tool.ruff.lint.per-file-ignores] +"dbt/models/**.py" = ["E402"] + [tool.sqlfluff.core] dialect = "athena" exclude_rules = "ambiguous.column_count, structure.column_order, RF04, ST05"