From 06bb7192a7d3ff519f47b3d7e03fe986dc2c0311 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 1 May 2024 16:59:08 +0000 Subject: [PATCH 01/27] Ignore dev venv --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index a96ad46f5..a2f8aa654 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ logs/ # Ignore most CSVs, except those that are used as dbt seeds *.csv !dbt/seeds/**/*.csv +venv/ From 63bd8cb1ccba56b62ded10967e67c197b07040a6 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 1 May 2024 17:00:24 +0000 Subject: [PATCH 02/27] Add initial python model --- dbt/models/reporting/reporting.ratio_stats.py | 235 ++++++++++++++++++ dbt/models/reporting/schema.yml | 5 + 2 files changed, 240 insertions(+) create mode 100644 dbt/models/reporting/reporting.ratio_stats.py diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py new file mode 100644 index 000000000..9382427e2 --- /dev/null +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -0,0 +1,235 @@ +import numpy as np +import pandas as pd +from assesspy import boot_ci, cod +from assesspy import cod_ci as cod_boot +from assesspy import cod_met, detect_chasing, mki, mki_met, prb, prb_met, prd +from assesspy import prd_ci as prd_boot +from assesspy import prd_met + + +def median_boot(ratio, nboot=100, alpha=0.05): + return boot_ci(np.median, nboot=nboot, alpha=alpha, ratio=ratio) + + +def ccao_median(x): + # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs + no_outliers = x.between( + x.quantile(0.05), x.quantile(0.95), inclusive="neither" + ) + + x_no_outliers = x[no_outliers] + + median_n = sum(no_outliers) + + median_val = np.median(x_no_outliers) + median_ci = median_boot(x_no_outliers, nboot=1000) + median_ci = f"{median_ci[0]}, {median_ci[1]}" + + out = [median_val, median_ci, median_n] + + return out + + +def ccao_mki(fmv, sale_price): + ratio = fmv / sale_price + + # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs + no_outliers = ratio.between( + ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" + ) + + fmv_no_outliers = fmv[no_outliers] + sale_price_no_outliers = sale_price[no_outliers] + + mki_n = sum(no_outliers) + + if mki_n >= 20: + mki_val = mki(fmv_no_outliers, sale_price_no_outliers) + met = mki_met(mki_val) + + out = [mki_val, met, mki_n] + + else: + out = [None, None, mki_n] + + return out + + +def ccao_cod(ratio): + # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs + no_outliers = ratio[ + ratio.between( + ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" + ) + ] + + cod_n = no_outliers.size + + if cod_n >= 20: + cod_val = cod(no_outliers) + cod_ci = cod_boot(no_outliers, nboot=1000) + cod_ci = f"{cod_ci[0]}, {cod_ci[1]}" + met = cod_met(cod_val) + out = [cod_val, cod_ci, met, cod_n] + + else: + out = [None, None, None, cod_n] + + return out + + +def ccao_prd(fmv, sale_price): + ratio = fmv / sale_price + + # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs + no_outliers = ratio.between( + ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" + ) + + fmv_no_outliers = fmv[no_outliers] + sale_price_no_outliers = sale_price[no_outliers] + + prd_n = sum(no_outliers) + + if prd_n >= 20: + prd_val = prd(fmv_no_outliers, sale_price_no_outliers) + prd_ci = prd_boot(fmv_no_outliers, sale_price_no_outliers, nboot=1000) + prd_ci = f"{prd_ci[0]}, {prd_ci[1]}" + met = prd_met(prd_val) + + out = [prd_val, prd_ci, met, prd_n] + + else: + out = [None, None, None, prd_n] + + return out + + +def ccao_prb(fmv, sale_price): + ratio = fmv / sale_price + + # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs + no_outliers = ratio.between( + ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" + ) + + fmv_no_outliers = fmv[no_outliers] + sale_price_no_outliers = sale_price[no_outliers] + + prb_n = sum(no_outliers) + + if prb_n >= 20: + prb_model = prb(fmv_no_outliers, sale_price_no_outliers) + prb_val = prb_model["prb"] + prb_ci = prb_model["95% ci"] + prb_ci = f"{prb_ci[0]}, {prb_ci[1]}" + met = prb_met(prb_val) + + out = [prb_val, prb_ci, met, prb_n] + + else: + out = [None, None, None, prb_n] + + return out + + +def report_summarise(df, geography_id, geography_type): + # Aggregates data and calculates summary statistics for given groupings + + group_cols = [ + "year", + "triad", + "geography_type", + "property_group", + "assessment_stage", + "geography_id", + "sale_year", + ] + + df["geography_id"] = df[geography_id].astype(str) + df["geography_type"] = geography_type + + # Remove groups with less than three observations + df["n"] = df.groupby(group_cols)["ratio"].transform("count") + df = df[df["n"] > 3] + df = df.groupby(group_cols).apply( + lambda x: pd.Series( + { + "sale_n": np.size(x["triad"]), + "ratio": ccao_median(x["ratio"]), + "cod": ccao_cod(ratio=x["ratio"]), + "mki": ccao_mki(fmv=x["fmv"], sale_price=x["sale_price"]), + "prd": ccao_prd(fmv=x["fmv"], sale_price=x["sale_price"]), + "prb": ccao_prb(fmv=x["fmv"], sale_price=x["sale_price"]), + "detect_chasing": detect_chasing(ratio=x["ratio"]), + "within_20_pct": sum(abs(1 - x["ratio"]) <= 0.20), + "within_10_pct": sum(abs(1 - x["ratio"]) <= 0.10), + "within_05_pct": sum(abs(1 - x["ratio"]) <= 0.05), + } + ) + ) + + df[["median_ratio", "median_ratio_ci", "median_ratio_n"]] = pd.DataFrame( + df.ratio.tolist(), index=df.index + ) + df[["cod", "cod_ci", "cod_met", "cod_n"]] = pd.DataFrame( + df.cod.tolist(), index=df.index + ) + df[["mki", "mki_met", "mki_n"]] = pd.DataFrame( + df.mki.tolist(), index=df.index + ) + df[["prd", "prd_ci", "prd_met", "prd_n"]] = pd.DataFrame( + df.prd.tolist(), index=df.index + ) + df[["prb", "prb_ci", "prb_met", "prb_n"]] = pd.DataFrame( + df.prb.tolist(), index=df.index + ) + df["ratio_met"] = abs(1 - df["median_ratio"]) <= 0.05 + df["vertical_equity_met"] = df.prd_met | df.prb_met + + # Arrange output columns + df = df[ + [ + "sale_n", + "median_ratio", + "median_ratio_ci", + "cod", + "cod_ci", + "cod_n", + "prd", + "prd_ci", + "prd_n", + "prb", + "prb_ci", + "prb_n", + "mki", + "mki_n", + "detect_chasing", + "ratio_met", + "cod_met", + "prd_met", + "prb_met", + "mki_met", + "vertical_equity_met", + "within_20_pct", + "within_10_pct", + "within_05_pct", + ] + ].reset_index() + + return df + + +def model(dbt, session): + dbt.config(packages=["assesspy==1.1.0", "numpy==1.26.*", "pandas==2.*"]) + + input = dbt.ref("reporting.vw_ratio_stats") + + final_df = pd.concat( + [ + report_summarise(input, "triad", "Tri"), + report_summarise(input, "township_code", "Town"), + ] + ).reset_index(drop=True) + + return final_df diff --git a/dbt/models/reporting/schema.yml b/dbt/models/reporting/schema.yml index efef1e78a..b433dc2b9 100644 --- a/dbt/models/reporting/schema.yml +++ b/dbt/models/reporting/schema.yml @@ -5,6 +5,11 @@ sources: tables: - name: ratio_stats description: '{{ doc("table_ratio_stats") }}' + config: + packages: + - "assesspy==1.1.*" + - "numpy==1.26.*" + - "pandas==2.*" tests: - expression_is_true: name: reporting_ratio_stats_no_nulls From f3012e1ff4acd05c45ff2138164af27f9517619b Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 1 May 2024 17:05:13 +0000 Subject: [PATCH 03/27] Bump dbt-athena version --- dbt/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/requirements.txt b/dbt/requirements.txt index 81d7fc03d..e44ad7a0d 100644 --- a/dbt/requirements.txt +++ b/dbt/requirements.txt @@ -1 +1 @@ -dbt-athena-community==1.7.1 +dbt-athena-community==1.7.2 From 1e981ec445d3c540cc0841f4919cb70738dcbb35 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 1 May 2024 17:13:52 +0000 Subject: [PATCH 04/27] Lint ratio stats --- dbt/models/reporting/reporting.ratio_stats.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 9382427e2..68d449db4 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -221,8 +221,6 @@ def report_summarise(df, geography_id, geography_type): def model(dbt, session): - dbt.config(packages=["assesspy==1.1.0", "numpy==1.26.*", "pandas==2.*"]) - input = dbt.ref("reporting.vw_ratio_stats") final_df = pd.concat( From af7f5046501eb1a0d83c283bd65e63816701d3a0 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 1 May 2024 17:55:34 +0000 Subject: [PATCH 05/27] Change project name to fix compile issue --- dbt/dbt_project.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbt/dbt_project.yml b/dbt/dbt_project.yml index 9d08e0323..c9783b8f9 100644 --- a/dbt/dbt_project.yml +++ b/dbt/dbt_project.yml @@ -1,4 +1,4 @@ -name: 'athena' +name: 'ccaodata' version: '0.1.0' # This setting configures which "profile" dbt uses for this project. @@ -40,7 +40,7 @@ vars: # Configuring models # Full documentation: https://docs.getdbt.com/docs/configuring-models models: - athena: + ccaodata: +materialized: view +write_compression: zstd +format: parquet @@ -66,7 +66,7 @@ tests: +schema: test_failure seeds: - athena: + ccaodata: ccao: +schema: ccao location: From 02fab5d21286f26a2e969e4efbc94b04fde2f824 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 1 May 2024 18:02:32 +0000 Subject: [PATCH 06/27] Add Spark workgroup --- dbt/profiles.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbt/profiles.yml b/dbt/profiles.yml index 62779df09..4165e563c 100644 --- a/dbt/profiles.yml +++ b/dbt/profiles.yml @@ -11,6 +11,7 @@ athena: schema: z_static_unused-dbt-stub-database # "database" here corresponds to a Glue data catalog database: awsdatacatalog + spark_work_group: Spark_primary threads: 5 num_retries: 1 ci: @@ -21,6 +22,7 @@ athena: region_name: us-east-1 schema: z_static_unused-dbt-stub-database database: awsdatacatalog + spark_work_group: Spark_primary threads: 5 num_retries: 1 prod: @@ -31,5 +33,6 @@ athena: region_name: us-east-1 schema: default database: awsdatacatalog + spark_work_group: Spark_primary threads: 5 num_retries: 1 From eacd40a5d31e1a34acfbbe4a46638e6d8393adaf Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 1 May 2024 22:21:56 +0000 Subject: [PATCH 07/27] Finalize ratio_stats spark job --- dbt/models/reporting/reporting.ratio_stats.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 68d449db4..3b4dab92f 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -1,3 +1,5 @@ +sc.addPyFile("s3://ccao-athena-results-us-east-1/packages/assesspy.zip") + import numpy as np import pandas as pd from assesspy import boot_ci, cod @@ -220,14 +222,18 @@ def report_summarise(df, geography_id, geography_type): return df -def model(dbt, session): +def model(dbt, spark_session): + dbt.config(materialized="table") + input = dbt.ref("reporting.vw_ratio_stats") - final_df = pd.concat( + df = pd.concat( [ report_summarise(input, "triad", "Tri"), report_summarise(input, "township_code", "Town"), ] ).reset_index(drop=True) - return final_df + spark_df = spark_session.createDataFrame(df) + + return spark_df From 26f9013f78a80551cfc5aa80cd77610b52f9a403 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 1 May 2024 22:22:21 +0000 Subject: [PATCH 08/27] Convert vw_ratio_stats into table --- aws-athena/views/reporting-vw_ratio_stats.sql | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/aws-athena/views/reporting-vw_ratio_stats.sql b/aws-athena/views/reporting-vw_ratio_stats.sql index 45587c654..cedb918bd 100644 --- a/aws-athena/views/reporting-vw_ratio_stats.sql +++ b/aws-athena/views/reporting-vw_ratio_stats.sql @@ -1,5 +1,10 @@ -- View containing ratios by pin, intended to feed the -- glue job 'reporting-ratio_stats'. +{{ + config( + materialized='table' + ) +}} -- Model values for corresponding triads only WITH model_values AS ( From 3e7f558f582c0d9016037ded7135fdc85aea66e2 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 1 May 2024 22:40:45 +0000 Subject: [PATCH 09/27] Move package zip location --- dbt/models/reporting/reporting.ratio_stats.py | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 3b4dab92f..1464dbeb0 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -1,12 +1,16 @@ -sc.addPyFile("s3://ccao-athena-results-us-east-1/packages/assesspy.zip") - -import numpy as np -import pandas as pd -from assesspy import boot_ci, cod -from assesspy import cod_ci as cod_boot -from assesspy import cod_met, detect_chasing, mki, mki_met, prb, prb_met, prd -from assesspy import prd_ci as prd_boot -from assesspy import prd_met +sc.addPyFile( # noqa: F821 + "s3://ccao-dbt-athena-dev-us-east-1/packages/spark-packages.zip" +) + +import numpy as np # noqa: E402 +import pandas as pd # noqa: E402 +from assesspy import boot_ci # noqa: E402 +from assesspy import cod # noqa: E402 +from assesspy import detect_chasing # noqa: E402 +from assesspy import prd_met # noqa: E402 +from assesspy import cod_ci as cod_boot # noqa: E402 +from assesspy import cod_met, mki, mki_met, prb, prb_met, prd # noqa: E402 +from assesspy import prd_ci as prd_boot # noqa: E402 def median_boot(ratio, nboot=100, alpha=0.05): From 199c405ba0af8d57602b9151c97c7d1c6db8e467 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 1 May 2024 22:41:05 +0000 Subject: [PATCH 10/27] Remove deprecated glue job --- aws-glue/jobs/reporting-ratio_stats.py | 347 ------------------------- 1 file changed, 347 deletions(-) delete mode 100644 aws-glue/jobs/reporting-ratio_stats.py diff --git a/aws-glue/jobs/reporting-ratio_stats.py b/aws-glue/jobs/reporting-ratio_stats.py deleted file mode 100644 index 170c7e5c6..000000000 --- a/aws-glue/jobs/reporting-ratio_stats.py +++ /dev/null @@ -1,347 +0,0 @@ -import re -import time - -import boto3 -import numpy as np -import pandas as pd -from assesspy import boot_ci, cod -from assesspy import cod_ci as cod_boot -from assesspy import cod_met, detect_chasing, mki, mki_met, prb, prb_met, prd -from assesspy import prd_ci as prd_boot -from assesspy import prd_met -from awsglue.context import GlueContext -from awsglue.job import Job -from pyspark.context import SparkContext - -sc = SparkContext.getOrCreate() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) - -# Define AWS boto3 clients -athena_client = boto3.client("athena") -glue_client = boto3.client("glue", region_name="us-east-1") -s3_client = boto3.client("s3") - -# Define s3 and Athena paths -athena_db = "iasworld" - -s3_bucket = "ccao-data-warehouse-us-east-1" -s3_prefix = "reporting/ratio_stats/" -s3_output = "s3://" + s3_bucket + "/" + s3_prefix -s3_ratio_stats = "s3://" + s3_bucket + "/" + s3_prefix + "ratio_stats.parquet" - - -# Functions to help with Athena queries ---- -def poll_status(athena_client, execution_id): - """Checks the status of the a query using an - incoming execution id and returns - a 'pass' string value when the status is - either SUCCEEDED, FAILED or CANCELLED.""" - - result = athena_client.get_query_execution(QueryExecutionId=execution_id) - state = result["QueryExecution"]["Status"]["State"] - - if state == "SUCCEEDED": - return "pass" - if state == "FAILED": - return "pass" - if state == "CANCELLED": - return "pass" - else: - return "not pass" - - -def poll_result(athena_client, execution_id): - """Gets the query result using an incoming execution id. - This function is ran after the poll_status function and - only if we are sure that the query was fully executed.""" - - result = athena_client.get_query_execution(QueryExecutionId=execution_id) - - return result - - -def run_query_get_result( - athena_client, s3_bucket, query, database, s3_output, s3_prefix -): - """Runs an incoming query and returns - the output as an s3 file like object.""" - - response = athena_client.start_query_execution( - QueryString=query, - QueryExecutionContext={"Database": database}, - ResultConfiguration={ - "OutputLocation": s3_output, - }, - ) - - QueryExecutionId = response.get("QueryExecutionId") - - # Wait until query is executed - while poll_status(athena_client, QueryExecutionId) != "pass": - time.sleep(2) - pass - - result = poll_result(athena_client, QueryExecutionId) - - r_file_object = None - - # Only return file like object when the query succeeded - if result["QueryExecution"]["Status"]["State"] == "SUCCEEDED": - print("Query SUCCEEDED: {}".format(QueryExecutionId)) - - s3_key = s3_prefix + QueryExecutionId + ".csv" - - r_file_object = boto3.resource("s3").Object(s3_bucket, s3_key) - - return r_file_object - - -# Athena query ---- -SQL_QUERY = "SELECT * FROM reporting.vw_ratio_stats;" - - -# Run run_query_get_result to get file like object ---- -r_file_object = run_query_get_result( - athena_client, s3_bucket, SQL_QUERY, athena_db, s3_output, s3_prefix -) - -# Retrieve s3 location of Athena query result and retrieve it -target = "s3://" + s3_bucket + "/" + r_file_object.key - -pull = pd.read_csv(target) -pull = pull[pull.ratio > 0 & pull.ratio.notnull()] - -# Delete all query results for this job from s3 bucket -response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix) - -for object in response["Contents"]: - if re.search("csv", object["Key"]): - print("Deleting", object["Key"]) - s3_client.delete_object(Bucket=s3_bucket, Key=object["Key"]) - - -def median_boot(ratio, nboot=100, alpha=0.05): - return boot_ci(np.median, nboot=nboot, alpha=alpha, ratio=ratio) - - -def ccao_median(x): - # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs - no_outliers = x.between( - x.quantile(0.05), x.quantile(0.95), inclusive="neither" - ) - - x_no_outliers = x[no_outliers] - - median_n = sum(no_outliers) - - median_val = np.median(x_no_outliers) - median_ci = median_boot(x_no_outliers, nboot=1000) - median_ci = f"{median_ci[0]}, {median_ci[1]}" - - out = [median_val, median_ci, median_n] - - return out - - -def ccao_mki(fmv, sale_price): - ratio = fmv / sale_price - - # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs - no_outliers = ratio.between( - ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" - ) - - fmv_no_outliers = fmv[no_outliers] - sale_price_no_outliers = sale_price[no_outliers] - - mki_n = sum(no_outliers) - - if mki_n >= 20: - mki_val = mki(fmv_no_outliers, sale_price_no_outliers) - met = mki_met(mki_val) - - out = [mki_val, met, mki_n] - - else: - out = [None, None, mki_n] - - return out - - -def ccao_cod(ratio): - # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs - no_outliers = ratio[ - ratio.between( - ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" - ) - ] - - cod_n = no_outliers.size - - if cod_n >= 20: - cod_val = cod(no_outliers) - cod_ci = cod_boot(no_outliers, nboot=1000) - cod_ci = f"{cod_ci[0]}, {cod_ci[1]}" - met = cod_met(cod_val) - out = [cod_val, cod_ci, met, cod_n] - - else: - out = [None, None, None, cod_n] - - return out - - -def ccao_prd(fmv, sale_price): - ratio = fmv / sale_price - - # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs - no_outliers = ratio.between( - ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" - ) - - fmv_no_outliers = fmv[no_outliers] - sale_price_no_outliers = sale_price[no_outliers] - - prd_n = sum(no_outliers) - - if prd_n >= 20: - prd_val = prd(fmv_no_outliers, sale_price_no_outliers) - prd_ci = prd_boot(fmv_no_outliers, sale_price_no_outliers, nboot=1000) - prd_ci = f"{prd_ci[0]}, {prd_ci[1]}" - met = prd_met(prd_val) - - out = [prd_val, prd_ci, met, prd_n] - - else: - out = [None, None, None, prd_n] - - return out - - -def ccao_prb(fmv, sale_price): - ratio = fmv / sale_price - - # Remove top and bottom 5% of ratios as per CCAO Data Department SOPs - no_outliers = ratio.between( - ratio.quantile(0.05), ratio.quantile(0.95), inclusive="neither" - ) - - fmv_no_outliers = fmv[no_outliers] - sale_price_no_outliers = sale_price[no_outliers] - - prb_n = sum(no_outliers) - - if prb_n >= 20: - prb_model = prb(fmv_no_outliers, sale_price_no_outliers) - prb_val = prb_model["prb"] - prb_ci = prb_model["95% ci"] - prb_ci = f"{prb_ci[0]}, {prb_ci[1]}" - met = prb_met(prb_val) - - out = [prb_val, prb_ci, met, prb_n] - - else: - out = [None, None, None, prb_n] - - return out - - -def report_summarise(df, geography_id, geography_type): - # Aggregates data and calculates summary statistics for given groupings - - group_cols = [ - "year", - "triad", - "geography_type", - "property_group", - "assessment_stage", - "geography_id", - "sale_year", - ] - - df["geography_id"] = pull[geography_id].astype(str) - df["geography_type"] = geography_type - - # Remove groups with less than three observations - df["n"] = df.groupby(group_cols)["ratio"].transform("count") - df = df[df["n"] > 3] - df = df.groupby(group_cols).apply( - lambda x: pd.Series( - { - "sale_n": np.size(x["triad"]), - "ratio": ccao_median(x["ratio"]), - "cod": ccao_cod(ratio=x["ratio"]), - "mki": ccao_mki(fmv=x["fmv"], sale_price=x["sale_price"]), - "prd": ccao_prd(fmv=x["fmv"], sale_price=x["sale_price"]), - "prb": ccao_prb(fmv=x["fmv"], sale_price=x["sale_price"]), - "detect_chasing": detect_chasing(ratio=x["ratio"]), - "within_20_pct": sum(abs(1 - x["ratio"]) <= 0.20), - "within_10_pct": sum(abs(1 - x["ratio"]) <= 0.10), - "within_05_pct": sum(abs(1 - x["ratio"]) <= 0.05), - } - ) - ) - - df[["median_ratio", "median_ratio_ci", "median_ratio_n"]] = pd.DataFrame( - df.ratio.tolist(), index=df.index - ) - df[["cod", "cod_ci", "cod_met", "cod_n"]] = pd.DataFrame( - df.cod.tolist(), index=df.index - ) - df[["mki", "mki_met", "mki_n"]] = pd.DataFrame( - df.mki.tolist(), index=df.index - ) - df[["prd", "prd_ci", "prd_met", "prd_n"]] = pd.DataFrame( - df.prd.tolist(), index=df.index - ) - df[["prb", "prb_ci", "prb_met", "prb_n"]] = pd.DataFrame( - df.prb.tolist(), index=df.index - ) - df["ratio_met"] = abs(1 - df["median_ratio"]) <= 0.05 - df["vertical_equity_met"] = df.prd_met | df.prb_met - - # Arrange output columns - df = df[ - [ - "sale_n", - "median_ratio", - "median_ratio_ci", - "cod", - "cod_ci", - "cod_n", - "prd", - "prd_ci", - "prd_n", - "prb", - "prb_ci", - "prb_n", - "mki", - "mki_n", - "detect_chasing", - "ratio_met", - "cod_met", - "prd_met", - "prb_met", - "mki_met", - "vertical_equity_met", - "within_20_pct", - "within_10_pct", - "within_05_pct", - ] - ].reset_index() - - return df - - -# Append and write output to s3 bucket -pd.concat( - [ - report_summarise(pull, "triad", "Tri"), - report_summarise(pull, "township_code", "Town"), - ] -).reset_index(drop=True).to_parquet(s3_ratio_stats) - -# Trigger reporting glue crawler -glue_client.start_crawler(Name="ccao-data-warehouse-reporting-crawler") From f66ed93e640f309d9e2ba591dd85e174ef5e987b Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Thu, 2 May 2024 04:43:13 +0000 Subject: [PATCH 11/27] Convert spark to pandas df --- dbt/models/reporting/reporting.ratio_stats.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 1464dbeb0..85c8f7021 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -140,7 +140,13 @@ def ccao_prb(fmv, sale_price): def report_summarise(df, geography_id, geography_type): - # Aggregates data and calculates summary statistics for given groupings + """ + Aggregates data and calculates summary statistics for given groupings + """ + + # Convert the Spark input dataframes to Pandas for compatibility + # with assesspy functions + df = df.toPandas() group_cols = [ "year", From 8fdc7a4b2545918af6df766f18c8e26fdce334ff Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Thu, 2 May 2024 04:43:55 +0000 Subject: [PATCH 12/27] Drop detect_chasing() --- dbt/models/reporting/reporting.ratio_stats.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 85c8f7021..5b4715a67 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -6,7 +6,6 @@ import pandas as pd # noqa: E402 from assesspy import boot_ci # noqa: E402 from assesspy import cod # noqa: E402 -from assesspy import detect_chasing # noqa: E402 from assesspy import prd_met # noqa: E402 from assesspy import cod_ci as cod_boot # noqa: E402 from assesspy import cod_met, mki, mki_met, prb, prb_met, prd # noqa: E402 @@ -173,7 +172,6 @@ def report_summarise(df, geography_id, geography_type): "mki": ccao_mki(fmv=x["fmv"], sale_price=x["sale_price"]), "prd": ccao_prd(fmv=x["fmv"], sale_price=x["sale_price"]), "prb": ccao_prb(fmv=x["fmv"], sale_price=x["sale_price"]), - "detect_chasing": detect_chasing(ratio=x["ratio"]), "within_20_pct": sum(abs(1 - x["ratio"]) <= 0.20), "within_10_pct": sum(abs(1 - x["ratio"]) <= 0.10), "within_05_pct": sum(abs(1 - x["ratio"]) <= 0.05), @@ -216,7 +214,6 @@ def report_summarise(df, geography_id, geography_type): "prb_n", "mki", "mki_n", - "detect_chasing", "ratio_met", "cod_met", "prd_met", From 8dbec57bd04696c4781102ac9bffe44c0ce267fa Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Thu, 2 May 2024 14:22:49 +0000 Subject: [PATCH 13/27] Fix linting issues --- dbt/models/reporting/reporting.ratio_stats.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 5b4715a67..191513d82 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -1,3 +1,5 @@ +# type: ignore +# pylint: skip-file sc.addPyFile( # noqa: F821 "s3://ccao-dbt-athena-dev-us-east-1/packages/spark-packages.zip" ) From 487b8a70954552676daa54f8a4b2572f4b1c22f0 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Thu, 2 May 2024 14:34:24 +0000 Subject: [PATCH 14/27] Rename project (prefix with org name) --- dbt/dbt_project.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbt/dbt_project.yml b/dbt/dbt_project.yml index c9783b8f9..aa085ef32 100644 --- a/dbt/dbt_project.yml +++ b/dbt/dbt_project.yml @@ -1,4 +1,4 @@ -name: 'ccaodata' +name: 'ccao_data_athena' version: '0.1.0' # This setting configures which "profile" dbt uses for this project. @@ -40,7 +40,7 @@ vars: # Configuring models # Full documentation: https://docs.getdbt.com/docs/configuring-models models: - ccaodata: + ccao_data_athena: +materialized: view +write_compression: zstd +format: parquet @@ -66,7 +66,7 @@ tests: +schema: test_failure seeds: - ccaodata: + ccao_data_athena: ccao: +schema: ccao location: From 15b9af7af5fafeb4b8868270bf65fb424f4a3cdf Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Thu, 2 May 2024 14:39:50 +0000 Subject: [PATCH 15/27] Rename vw_ratio_stats to ratio_stats_input --- .../reporting-ratio_stats_input.sql} | 2 +- dbt/models/reporting/docs.md | 18 +++++++++--------- dbt/models/reporting/reporting.ratio_stats.py | 2 +- .../reporting/reporting.ratio_stats_input.sql | 1 + .../reporting/reporting.vw_ratio_stats.sql | 1 - dbt/models/reporting/schema.yml | 12 ++++++------ 6 files changed, 18 insertions(+), 18 deletions(-) rename aws-athena/{views/reporting-vw_ratio_stats.sql => ctas/reporting-ratio_stats_input.sql} (98%) create mode 120000 dbt/models/reporting/reporting.ratio_stats_input.sql delete mode 120000 dbt/models/reporting/reporting.vw_ratio_stats.sql diff --git a/aws-athena/views/reporting-vw_ratio_stats.sql b/aws-athena/ctas/reporting-ratio_stats_input.sql similarity index 98% rename from aws-athena/views/reporting-vw_ratio_stats.sql rename to aws-athena/ctas/reporting-ratio_stats_input.sql index cedb918bd..03fa2e49b 100644 --- a/aws-athena/views/reporting-vw_ratio_stats.sql +++ b/aws-athena/ctas/reporting-ratio_stats_input.sql @@ -1,5 +1,5 @@ -- View containing ratios by pin, intended to feed the --- glue job 'reporting-ratio_stats'. +-- ratio_stats dbt python model {{ config( materialized='table' diff --git a/dbt/models/reporting/docs.md b/dbt/models/reporting/docs.md index 57330d4ba..ede496ab8 100644 --- a/dbt/models/reporting/docs.md +++ b/dbt/models/reporting/docs.md @@ -10,6 +10,15 @@ reporting Tableau dashboards. `property_group` {% enddocs %} +# ratio_stats_input + +{% docs table_ratio_stats_input %} +View to feed the Python dbt job that creates the `reporting.ratio_stats` table. +Feeds public reporting assets. + +**Primary Key**: `year`, `pin`, `assessment_stage` +{% enddocs %} + # res_report_summary {% docs table_res_report_summary %} @@ -79,15 +88,6 @@ The assessment stages are: **Primary Key**: `year`, `pin`, `stage_name` {% enddocs %} -# vw_ratio_stats - -{% docs view_vw_ratio_stats %} -View to feed the `reporting.ratio_stats` table and Glue job. -Feeds public reporting assets. - -**Primary Key**: `year`, `pin`, `assessment_stage` -{% enddocs %} - # vw_res_report_summary {% docs view_vw_res_report_summary %} diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 191513d82..8956d3f19 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -234,7 +234,7 @@ def report_summarise(df, geography_id, geography_type): def model(dbt, spark_session): dbt.config(materialized="table") - input = dbt.ref("reporting.vw_ratio_stats") + input = dbt.ref("reporting.ratio_stats_input") df = pd.concat( [ diff --git a/dbt/models/reporting/reporting.ratio_stats_input.sql b/dbt/models/reporting/reporting.ratio_stats_input.sql new file mode 120000 index 000000000..109698b5d --- /dev/null +++ b/dbt/models/reporting/reporting.ratio_stats_input.sql @@ -0,0 +1 @@ +../../../aws-athena/ctas/reporting-ratio_stats_input.sql \ No newline at end of file diff --git a/dbt/models/reporting/reporting.vw_ratio_stats.sql b/dbt/models/reporting/reporting.vw_ratio_stats.sql deleted file mode 120000 index f50270b1a..000000000 --- a/dbt/models/reporting/reporting.vw_ratio_stats.sql +++ /dev/null @@ -1 +0,0 @@ -../../../aws-athena/views/reporting-vw_ratio_stats.sql \ No newline at end of file diff --git a/dbt/models/reporting/schema.yml b/dbt/models/reporting/schema.yml index b433dc2b9..1dada74b7 100644 --- a/dbt/models/reporting/schema.yml +++ b/dbt/models/reporting/schema.yml @@ -165,31 +165,31 @@ models: where: CAST(year AS int) < {{ var('test_qc_year_start') }} error_if: ">21" # as of 2024-04-01 - - name: reporting.vw_ratio_stats - description: '{{ doc("view_vw_ratio_stats") }}' + - name: reporting.ratio_stats_input + description: '{{ doc("table_ratio_stats_input") }}' tests: - unique_combination_of_columns: - name: reporting_vw_ratio_stats_unique_by_keys + name: reporting_ratio_stats_input_unique_by_keys combination_of_columns: - pin - year - assessment_stage - sale_price - expression_is_true: - name: reporting_vw_ratio_stats_no_nulls + name: reporting_ratio_stats_input_no_nulls expression: | property_group IS NOT NULL AND assessment_stage IS NOT NULL AND triad IS NOT NULL AND township_code IS NOT NULL - expression_is_true: - name: reporting_vw_ratio_stats_sale_year_equals_year_minus_one + name: reporting_ratio_stats_input_sale_year_equals_year_minus_one expression: CAST(sale_year AS INTEGER) = CAST(year AS INTEGER) - 1 additional_select_columns: - sale_year - year - expression_is_true: - name: reporting_vw_ratio_stats_ratio_greater_than_zero + name: reporting_ratio_stats_input_ratio_greater_than_zero expression: ratio >= 0 additional_select_columns: - ratio From 8f8df475e5b1ea4abde25998c8eff9936c65a076 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Thu, 2 May 2024 14:49:36 +0000 Subject: [PATCH 16/27] Add dbt tag to build daily --- .github/workflows/build_daily_dbt_models.yaml | 2 +- dbt/models/reporting/schema.yml | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_daily_dbt_models.yaml b/.github/workflows/build_daily_dbt_models.yaml index 516f133a9..a47488786 100644 --- a/.github/workflows/build_daily_dbt_models.yaml +++ b/.github/workflows/build_daily_dbt_models.yaml @@ -4,7 +4,7 @@ on: workflow_dispatch: schedule: # Every day at 8am (2pm UTC-5) - - cron: '0 13 * * *' + - cron: '0 12 * * *' jobs: build-daily-dbt-models: diff --git a/dbt/models/reporting/schema.yml b/dbt/models/reporting/schema.yml index 1dada74b7..b3176fc86 100644 --- a/dbt/models/reporting/schema.yml +++ b/dbt/models/reporting/schema.yml @@ -1,6 +1,7 @@ sources: - name: reporting tags: + - daily - load_auto tables: - name: ratio_stats @@ -167,6 +168,10 @@ models: - name: reporting.ratio_stats_input description: '{{ doc("table_ratio_stats_input") }}' + config: + tags: + - daily + - load_auto tests: - unique_combination_of_columns: name: reporting_ratio_stats_input_unique_by_keys From f0dbd9e87ab38aa894c668a809832496a46ee5bb Mon Sep 17 00:00:00 2001 From: Dan Snow <31494343+dfsnow@users.noreply.github.com> Date: Thu, 2 May 2024 13:14:57 -0500 Subject: [PATCH 17/27] Update dbt/models/reporting/docs.md Co-authored-by: Jean Cochrane --- dbt/models/reporting/docs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/models/reporting/docs.md b/dbt/models/reporting/docs.md index ede496ab8..ffa8529ea 100644 --- a/dbt/models/reporting/docs.md +++ b/dbt/models/reporting/docs.md @@ -13,7 +13,7 @@ reporting Tableau dashboards. # ratio_stats_input {% docs table_ratio_stats_input %} -View to feed the Python dbt job that creates the `reporting.ratio_stats` table. +Table to feed the Python dbt job that creates the `reporting.ratio_stats` table. Feeds public reporting assets. **Primary Key**: `year`, `pin`, `assessment_stage` From 1576932011fac02dd43ae4a1fde6cf07db4f35bd Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Thu, 2 May 2024 20:17:57 +0000 Subject: [PATCH 18/27] Re-add detect_chasing as stub --- dbt/models/reporting/reporting.ratio_stats.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 8956d3f19..52b950101 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -163,6 +163,7 @@ def report_summarise(df, geography_id, geography_type): df["geography_type"] = geography_type # Remove groups with less than three observations + # TODO: Remove/upgrade detect_chasing output df["n"] = df.groupby(group_cols)["ratio"].transform("count") df = df[df["n"] > 3] df = df.groupby(group_cols).apply( @@ -174,6 +175,7 @@ def report_summarise(df, geography_id, geography_type): "mki": ccao_mki(fmv=x["fmv"], sale_price=x["sale_price"]), "prd": ccao_prd(fmv=x["fmv"], sale_price=x["sale_price"]), "prb": ccao_prb(fmv=x["fmv"], sale_price=x["sale_price"]), + "detect_chasing": False, "within_20_pct": sum(abs(1 - x["ratio"]) <= 0.20), "within_10_pct": sum(abs(1 - x["ratio"]) <= 0.10), "within_05_pct": sum(abs(1 - x["ratio"]) <= 0.05), @@ -216,6 +218,7 @@ def report_summarise(df, geography_id, geography_type): "prb_n", "mki", "mki_n", + "detect_chasing", "ratio_met", "cod_met", "prd_met", From fe1d40dfbd5eb13de158b26d97c1e57d11f3f828 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Mon, 6 May 2024 15:22:42 +0000 Subject: [PATCH 19/27] Move ratio_stats to model (instead of source) --- dbt/models/reporting/schema.yml | 76 +++++++++++++++------------------ 1 file changed, 35 insertions(+), 41 deletions(-) diff --git a/dbt/models/reporting/schema.yml b/dbt/models/reporting/schema.yml index b3176fc86..3a20b3670 100644 --- a/dbt/models/reporting/schema.yml +++ b/dbt/models/reporting/schema.yml @@ -1,45 +1,39 @@ -sources: - - name: reporting - tags: - - daily - - load_auto - tables: - - name: ratio_stats - description: '{{ doc("table_ratio_stats") }}' - config: - packages: - - "assesspy==1.1.*" - - "numpy==1.26.*" - - "pandas==2.*" - tests: - - expression_is_true: - name: reporting_ratio_stats_no_nulls - expression: | - year IS NOT NULL - AND triad IS NOT NULL - AND geography_type IS NOT NULL - AND property_group IS NOT NULL - AND assessment_stage IS NOT NULL - AND sale_year IS NOT NULL - - expression_is_true: - name: reporting_ratio_stats_metrics_are_sensible - expression: | - cod >= 0 - AND prd >= 0 - AND prb BETWEEN -1 AND 1 - AND mki >= 0 - AND triad IS NOT NULL - AND geography_type IS NOT NULL - AND property_group IS NOT NULL - AND assessment_stage IS NOT NULL - AND sale_year IS NOT NULL - - expression_is_true: - name: reporting_ratio_stats_within_directionality - expression: | - within_20_pct >= within_10_pct - AND within_10_pct >= within_05_pct - models: + - name: ratio_stats + description: '{{ doc("table_ratio_stats") }}' + config: + packages: + - "assesspy==1.1.*" + - "numpy==1.26.*" + - "pandas==2.*" + tests: + - expression_is_true: + name: reporting_ratio_stats_no_nulls + expression: | + year IS NOT NULL + AND triad IS NOT NULL + AND geography_type IS NOT NULL + AND property_group IS NOT NULL + AND assessment_stage IS NOT NULL + AND sale_year IS NOT NULL + - expression_is_true: + name: reporting_ratio_stats_metrics_are_sensible + expression: | + cod >= 0 + AND prd >= 0 + AND prb BETWEEN -1 AND 1 + AND mki >= 0 + AND triad IS NOT NULL + AND geography_type IS NOT NULL + AND property_group IS NOT NULL + AND assessment_stage IS NOT NULL + AND sale_year IS NOT NULL + - expression_is_true: + name: reporting_ratio_stats_within_directionality + expression: | + within_20_pct >= within_10_pct + AND within_10_pct >= within_05_pct + - name: reporting.res_report_summary description: '{{ doc("table_res_report_summary") }}' config: From 5bc9442896eed287eecf147472bb897c5086679f Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Tue, 7 May 2024 02:30:03 +0000 Subject: [PATCH 20/27] Add ratio_stats schema prefix --- dbt/models/reporting/schema.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/models/reporting/schema.yml b/dbt/models/reporting/schema.yml index 0a9bbaf10..cf8bcd3f7 100644 --- a/dbt/models/reporting/schema.yml +++ b/dbt/models/reporting/schema.yml @@ -1,5 +1,5 @@ models: - - name: ratio_stats + - name: reporting.ratio_stats description: '{{ doc("table_ratio_stats") }}' config: packages: From 0499aed5db6e427bef1a1423a919467f5b326e11 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Tue, 7 May 2024 17:50:16 +0000 Subject: [PATCH 21/27] Alphabetize dbt reporting models --- dbt/models/reporting/schema.yml | 66 ++++++++++++++++----------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/dbt/models/reporting/schema.yml b/dbt/models/reporting/schema.yml index cf8bcd3f7..a4b7a3a77 100644 --- a/dbt/models/reporting/schema.yml +++ b/dbt/models/reporting/schema.yml @@ -34,6 +34,39 @@ models: within_20_pct >= within_10_pct AND within_10_pct >= within_05_pct + - name: reporting.ratio_stats_input + description: '{{ doc("table_ratio_stats_input") }}' + config: + tags: + - daily + - load_auto + tests: + - unique_combination_of_columns: + name: reporting_ratio_stats_input_unique_by_keys + combination_of_columns: + - pin + - year + - assessment_stage + - sale_price + - expression_is_true: + name: reporting_ratio_stats_input_no_nulls + expression: | + property_group IS NOT NULL + AND assessment_stage IS NOT NULL + AND triad IS NOT NULL + AND township_code IS NOT NULL + - expression_is_true: + name: reporting_ratio_stats_input_sale_year_equals_year_minus_one + expression: CAST(sale_year AS INTEGER) = CAST(year AS INTEGER) - 1 + additional_select_columns: + - sale_year + - year + - expression_is_true: + name: reporting_ratio_stats_input_ratio_greater_than_zero + expression: ratio >= 0 + additional_select_columns: + - ratio + - name: reporting.res_report_summary description: '{{ doc("table_res_report_summary") }}' config: @@ -177,39 +210,6 @@ models: where: CAST(year AS int) < {{ var('test_qc_year_start') }} error_if: ">21" # as of 2024-04-01 - - name: reporting.ratio_stats_input - description: '{{ doc("table_ratio_stats_input") }}' - config: - tags: - - daily - - load_auto - tests: - - unique_combination_of_columns: - name: reporting_ratio_stats_input_unique_by_keys - combination_of_columns: - - pin - - year - - assessment_stage - - sale_price - - expression_is_true: - name: reporting_ratio_stats_input_no_nulls - expression: | - property_group IS NOT NULL - AND assessment_stage IS NOT NULL - AND triad IS NOT NULL - AND township_code IS NOT NULL - - expression_is_true: - name: reporting_ratio_stats_input_sale_year_equals_year_minus_one - expression: CAST(sale_year AS INTEGER) = CAST(year AS INTEGER) - 1 - additional_select_columns: - - sale_year - - year - - expression_is_true: - name: reporting_ratio_stats_input_ratio_greater_than_zero - expression: ratio >= 0 - additional_select_columns: - - ratio - - name: reporting.vw_top_5 description: '{{ doc("view_vw_top_5") }}' tests: From bd03719bd27554230b13d212375d1d288d4eed83 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Tue, 7 May 2024 18:15:48 +0000 Subject: [PATCH 22/27] Add explicit Spark output data types --- dbt/models/reporting/reporting.ratio_stats.py | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 52b950101..7defc38f3 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -145,10 +145,6 @@ def report_summarise(df, geography_id, geography_type): Aggregates data and calculates summary statistics for given groupings """ - # Convert the Spark input dataframes to Pandas for compatibility - # with assesspy functions - df = df.toPandas() - group_cols = [ "year", "triad", @@ -239,6 +235,10 @@ def model(dbt, spark_session): input = dbt.ref("reporting.ratio_stats_input") + # Convert the Spark input dataframe to Pandas for + # compatibility with assesspy functions + input = input.toPandas() + df = pd.concat( [ report_summarise(input, "triad", "Tri"), @@ -246,6 +246,21 @@ def model(dbt, spark_session): ] ).reset_index(drop=True) - spark_df = spark_session.createDataFrame(df) + # Create a Spark schema to maintain the datatypes of the + # previous output (for Tableau compatibility) + schema = ( + "year: bigint, triad: bigint, geography_type: string, " + + "property_group: string, assessment_stage: string, " + + "geography_id: string, sale_year: bigint, sale_n: bigint, " + + "median_ratio: double, median_ratio_ci: string, cod: double, " + + "cod_ci: string, cod_n: bigint, prd: double, prd_ci: string, " + + "prd_n: bigint, prb: double, prb_ci: string, prb_n: bigint, " + + "mki: double, mki_n: bigint, detect_chasing: boolean, " + + "ratio_met: boolean, cod_met: boolean, prd_met: boolean, " + + "prb_met: boolean, mki_met: boolean, vertical_equity_met: boolean, " + + "within_20_pct: bigint, within_10_pct: bigint, within_05_pct: bigint" + ) + + spark_df = spark_session.createDataFrame(df, schema=schema) return spark_df From 5f12a41a58f322d5c89181e3ffb2c33c93de1e73 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Tue, 7 May 2024 18:21:33 +0000 Subject: [PATCH 23/27] Update reporting table tags --- dbt/models/reporting/schema.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt/models/reporting/schema.yml b/dbt/models/reporting/schema.yml index a4b7a3a77..ff7c70f04 100644 --- a/dbt/models/reporting/schema.yml +++ b/dbt/models/reporting/schema.yml @@ -2,6 +2,8 @@ models: - name: reporting.ratio_stats description: '{{ doc("table_ratio_stats") }}' config: + tags: + - daily packages: - "assesspy==1.1.*" - "numpy==1.26.*" @@ -39,7 +41,6 @@ models: config: tags: - daily - - load_auto tests: - unique_combination_of_columns: name: reporting_ratio_stats_input_unique_by_keys From f747d7fb9ad2ca2d83bdfd2f2fa5ce3a934115b9 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Tue, 7 May 2024 19:19:44 +0000 Subject: [PATCH 24/27] Coerce types to int --- dbt/models/reporting/reporting.ratio_stats.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index 7defc38f3..cf2030056 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -246,6 +246,11 @@ def model(dbt, spark_session): ] ).reset_index(drop=True) + # Force certain columns to datatype to maintain parity with old version + df[["year", "triad", "sale_year"]] = df[ + ["year", "triad", "sale_year"] + ].astype(int) + # Create a Spark schema to maintain the datatypes of the # previous output (for Tableau compatibility) schema = ( From 0efa32506c42705ac1c77c04cbc668582cd935fb Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Tue, 7 May 2024 22:13:01 +0000 Subject: [PATCH 25/27] Update Athena workgroup name --- dbt/profiles.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbt/profiles.yml b/dbt/profiles.yml index e6a8d721a..c7edf7866 100644 --- a/dbt/profiles.yml +++ b/dbt/profiles.yml @@ -11,7 +11,7 @@ athena: schema: z_static_unused_dbt_stub_database # "database" here corresponds to a Glue data catalog database: awsdatacatalog - spark_work_group: Spark_primary + spark_work_group: primary-spark threads: 5 num_retries: 1 ci: @@ -22,7 +22,7 @@ athena: region_name: us-east-1 schema: z_static_unused_dbt_stub_database database: awsdatacatalog - spark_work_group: Spark_primary + spark_work_group: primary-spark threads: 5 num_retries: 1 prod: @@ -33,6 +33,6 @@ athena: region_name: us-east-1 schema: default database: awsdatacatalog - spark_work_group: Spark_primary + spark_work_group: primary-spark threads: 5 num_retries: 1 From 2e3bbfd9bc78f521d7aded6b7ce16a065c70e6a3 Mon Sep 17 00:00:00 2001 From: Dan Snow Date: Wed, 8 May 2024 03:41:32 +0000 Subject: [PATCH 26/27] Replicate filtering from prior vw_ratio_stats pull --- dbt/models/reporting/reporting.ratio_stats.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbt/models/reporting/reporting.ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py index cf2030056..7aac1aa76 100644 --- a/dbt/models/reporting/reporting.ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -239,6 +239,9 @@ def model(dbt, spark_session): # compatibility with assesspy functions input = input.toPandas() + # Replicate filtering from prior vw_ratio_stats pull + input = input[input.ratio > 0 & input.ratio.notnull()] + df = pd.concat( [ report_summarise(input, "triad", "Tri"), From 7c0728052caf5933d44c9afa23317cf88825cd39 Mon Sep 17 00:00:00 2001 From: Dan Snow <31494343+dfsnow@users.noreply.github.com> Date: Wed, 8 May 2024 12:54:57 -0500 Subject: [PATCH 27/27] Update aws-athena/ctas/reporting-ratio_stats_input.sql Co-authored-by: William Ridgeway <10358980+wrridgeway@users.noreply.github.com> --- aws-athena/ctas/reporting-ratio_stats_input.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws-athena/ctas/reporting-ratio_stats_input.sql b/aws-athena/ctas/reporting-ratio_stats_input.sql index 03fa2e49b..7a77a72db 100644 --- a/aws-athena/ctas/reporting-ratio_stats_input.sql +++ b/aws-athena/ctas/reporting-ratio_stats_input.sql @@ -1,4 +1,4 @@ --- View containing ratios by pin, intended to feed the +-- Table containing ratios by pin, intended to feed the -- ratio_stats dbt python model {{ config(