diff --git a/.github/workflows/build_daily_dbt_models.yaml b/.github/workflows/build_daily_dbt_models.yaml index 516f133a9..a47488786 100644 --- a/.github/workflows/build_daily_dbt_models.yaml +++ b/.github/workflows/build_daily_dbt_models.yaml @@ -4,7 +4,7 @@ on: workflow_dispatch: schedule: # Every day at 8am (2pm UTC-5) - - cron: '0 13 * * *' + - cron: '0 12 * * *' jobs: build-daily-dbt-models: diff --git a/.gitignore b/.gitignore index a96ad46f5..a2f8aa654 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ logs/ # Ignore most CSVs, except those that are used as dbt seeds *.csv !dbt/seeds/**/*.csv +venv/ diff --git a/aws-athena/views/reporting-vw_ratio_stats.sql b/aws-athena/ctas/reporting-ratio_stats_input.sql similarity index 94% rename from aws-athena/views/reporting-vw_ratio_stats.sql rename to aws-athena/ctas/reporting-ratio_stats_input.sql index 45587c654..7a77a72db 100644 --- a/aws-athena/views/reporting-vw_ratio_stats.sql +++ b/aws-athena/ctas/reporting-ratio_stats_input.sql @@ -1,5 +1,10 @@ --- View containing ratios by pin, intended to feed the --- glue job 'reporting-ratio_stats'. +-- Table containing ratios by pin, intended to feed the +-- ratio_stats dbt python model +{{ + config( + materialized='table' + ) +}} -- Model values for corresponding triads only WITH model_values AS ( diff --git a/dbt/dbt_project.yml b/dbt/dbt_project.yml index 9d08e0323..aa085ef32 100644 --- a/dbt/dbt_project.yml +++ b/dbt/dbt_project.yml @@ -1,4 +1,4 @@ -name: 'athena' +name: 'ccao_data_athena' version: '0.1.0' # This setting configures which "profile" dbt uses for this project. @@ -40,7 +40,7 @@ vars: # Configuring models # Full documentation: https://docs.getdbt.com/docs/configuring-models models: - athena: + ccao_data_athena: +materialized: view +write_compression: zstd +format: parquet @@ -66,7 +66,7 @@ tests: +schema: test_failure seeds: - athena: + ccao_data_athena: ccao: +schema: ccao location: diff --git a/dbt/models/reporting/docs.md b/dbt/models/reporting/docs.md index 58d2bbcd7..542eae5df 100644 --- a/dbt/models/reporting/docs.md +++ b/dbt/models/reporting/docs.md @@ -10,6 +10,15 @@ reporting Tableau dashboards. `property_group` {% enddocs %} +# ratio_stats_input + +{% docs table_ratio_stats_input %} +Table to feed the Python dbt job that creates the `reporting.ratio_stats` table. +Feeds public reporting assets. + +**Primary Key**: `year`, `pin`, `assessment_stage` +{% enddocs %} + # res_report_summary {% docs table_res_report_summary %} @@ -88,15 +97,6 @@ The assessment stages are: **Primary Key**: `year`, `pin`, `stage_name` {% enddocs %} -# vw_ratio_stats - -{% docs view_vw_ratio_stats %} -View to feed the `reporting.ratio_stats` table and Glue job. -Feeds public reporting assets. - -**Primary Key**: `year`, `pin`, `assessment_stage` -{% enddocs %} - # vw_res_report_summary {% docs view_vw_res_report_summary %} diff --git a/aws-glue/jobs/reporting-ratio_stats.py b/dbt/models/reporting/reporting.ratio_stats.py similarity index 57% rename from aws-glue/jobs/reporting-ratio_stats.py rename to dbt/models/reporting/reporting.ratio_stats.py index 170c7e5c6..7aac1aa76 100644 --- a/aws-glue/jobs/reporting-ratio_stats.py +++ b/dbt/models/reporting/reporting.ratio_stats.py @@ -1,125 +1,17 @@ -import re -import time - -import boto3 -import numpy as np -import pandas as pd -from assesspy import boot_ci, cod -from assesspy import cod_ci as cod_boot -from assesspy import cod_met, detect_chasing, mki, mki_met, prb, prb_met, prd -from assesspy import prd_ci as prd_boot -from assesspy import prd_met -from awsglue.context import GlueContext -from awsglue.job import Job -from pyspark.context import SparkContext - -sc = SparkContext.getOrCreate() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) - -# Define AWS boto3 clients -athena_client = boto3.client("athena") -glue_client = boto3.client("glue", region_name="us-east-1") -s3_client = boto3.client("s3") - -# Define s3 and Athena paths -athena_db = "iasworld" - -s3_bucket = "ccao-data-warehouse-us-east-1" -s3_prefix = "reporting/ratio_stats/" -s3_output = "s3://" + s3_bucket + "/" + s3_prefix -s3_ratio_stats = "s3://" + s3_bucket + "/" + s3_prefix + "ratio_stats.parquet" - - -# Functions to help with Athena queries ---- -def poll_status(athena_client, execution_id): - """Checks the status of the a query using an - incoming execution id and returns - a 'pass' string value when the status is - either SUCCEEDED, FAILED or CANCELLED.""" - - result = athena_client.get_query_execution(QueryExecutionId=execution_id) - state = result["QueryExecution"]["Status"]["State"] - - if state == "SUCCEEDED": - return "pass" - if state == "FAILED": - return "pass" - if state == "CANCELLED": - return "pass" - else: - return "not pass" - - -def poll_result(athena_client, execution_id): - """Gets the query result using an incoming execution id. - This function is ran after the poll_status function and - only if we are sure that the query was fully executed.""" - - result = athena_client.get_query_execution(QueryExecutionId=execution_id) - - return result - - -def run_query_get_result( - athena_client, s3_bucket, query, database, s3_output, s3_prefix -): - """Runs an incoming query and returns - the output as an s3 file like object.""" - - response = athena_client.start_query_execution( - QueryString=query, - QueryExecutionContext={"Database": database}, - ResultConfiguration={ - "OutputLocation": s3_output, - }, - ) - - QueryExecutionId = response.get("QueryExecutionId") - - # Wait until query is executed - while poll_status(athena_client, QueryExecutionId) != "pass": - time.sleep(2) - pass - - result = poll_result(athena_client, QueryExecutionId) - - r_file_object = None - - # Only return file like object when the query succeeded - if result["QueryExecution"]["Status"]["State"] == "SUCCEEDED": - print("Query SUCCEEDED: {}".format(QueryExecutionId)) - - s3_key = s3_prefix + QueryExecutionId + ".csv" - - r_file_object = boto3.resource("s3").Object(s3_bucket, s3_key) - - return r_file_object - - -# Athena query ---- -SQL_QUERY = "SELECT * FROM reporting.vw_ratio_stats;" - - -# Run run_query_get_result to get file like object ---- -r_file_object = run_query_get_result( - athena_client, s3_bucket, SQL_QUERY, athena_db, s3_output, s3_prefix +# type: ignore +# pylint: skip-file +sc.addPyFile( # noqa: F821 + "s3://ccao-dbt-athena-dev-us-east-1/packages/spark-packages.zip" ) -# Retrieve s3 location of Athena query result and retrieve it -target = "s3://" + s3_bucket + "/" + r_file_object.key - -pull = pd.read_csv(target) -pull = pull[pull.ratio > 0 & pull.ratio.notnull()] - -# Delete all query results for this job from s3 bucket -response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix) - -for object in response["Contents"]: - if re.search("csv", object["Key"]): - print("Deleting", object["Key"]) - s3_client.delete_object(Bucket=s3_bucket, Key=object["Key"]) +import numpy as np # noqa: E402 +import pandas as pd # noqa: E402 +from assesspy import boot_ci # noqa: E402 +from assesspy import cod # noqa: E402 +from assesspy import prd_met # noqa: E402 +from assesspy import cod_ci as cod_boot # noqa: E402 +from assesspy import cod_met, mki, mki_met, prb, prb_met, prd # noqa: E402 +from assesspy import prd_ci as prd_boot # noqa: E402 def median_boot(ratio, nboot=100, alpha=0.05): @@ -249,7 +141,9 @@ def ccao_prb(fmv, sale_price): def report_summarise(df, geography_id, geography_type): - # Aggregates data and calculates summary statistics for given groupings + """ + Aggregates data and calculates summary statistics for given groupings + """ group_cols = [ "year", @@ -261,10 +155,11 @@ def report_summarise(df, geography_id, geography_type): "sale_year", ] - df["geography_id"] = pull[geography_id].astype(str) + df["geography_id"] = df[geography_id].astype(str) df["geography_type"] = geography_type # Remove groups with less than three observations + # TODO: Remove/upgrade detect_chasing output df["n"] = df.groupby(group_cols)["ratio"].transform("count") df = df[df["n"] > 3] df = df.groupby(group_cols).apply( @@ -276,7 +171,7 @@ def report_summarise(df, geography_id, geography_type): "mki": ccao_mki(fmv=x["fmv"], sale_price=x["sale_price"]), "prd": ccao_prd(fmv=x["fmv"], sale_price=x["sale_price"]), "prb": ccao_prb(fmv=x["fmv"], sale_price=x["sale_price"]), - "detect_chasing": detect_chasing(ratio=x["ratio"]), + "detect_chasing": False, "within_20_pct": sum(abs(1 - x["ratio"]) <= 0.20), "within_10_pct": sum(abs(1 - x["ratio"]) <= 0.10), "within_05_pct": sum(abs(1 - x["ratio"]) <= 0.05), @@ -335,13 +230,45 @@ def report_summarise(df, geography_id, geography_type): return df -# Append and write output to s3 bucket -pd.concat( - [ - report_summarise(pull, "triad", "Tri"), - report_summarise(pull, "township_code", "Town"), - ] -).reset_index(drop=True).to_parquet(s3_ratio_stats) +def model(dbt, spark_session): + dbt.config(materialized="table") + + input = dbt.ref("reporting.ratio_stats_input") + + # Convert the Spark input dataframe to Pandas for + # compatibility with assesspy functions + input = input.toPandas() + + # Replicate filtering from prior vw_ratio_stats pull + input = input[input.ratio > 0 & input.ratio.notnull()] + + df = pd.concat( + [ + report_summarise(input, "triad", "Tri"), + report_summarise(input, "township_code", "Town"), + ] + ).reset_index(drop=True) + + # Force certain columns to datatype to maintain parity with old version + df[["year", "triad", "sale_year"]] = df[ + ["year", "triad", "sale_year"] + ].astype(int) + + # Create a Spark schema to maintain the datatypes of the + # previous output (for Tableau compatibility) + schema = ( + "year: bigint, triad: bigint, geography_type: string, " + + "property_group: string, assessment_stage: string, " + + "geography_id: string, sale_year: bigint, sale_n: bigint, " + + "median_ratio: double, median_ratio_ci: string, cod: double, " + + "cod_ci: string, cod_n: bigint, prd: double, prd_ci: string, " + + "prd_n: bigint, prb: double, prb_ci: string, prb_n: bigint, " + + "mki: double, mki_n: bigint, detect_chasing: boolean, " + + "ratio_met: boolean, cod_met: boolean, prd_met: boolean, " + + "prb_met: boolean, mki_met: boolean, vertical_equity_met: boolean, " + + "within_20_pct: bigint, within_10_pct: bigint, within_05_pct: bigint" + ) + + spark_df = spark_session.createDataFrame(df, schema=schema) -# Trigger reporting glue crawler -glue_client.start_crawler(Name="ccao-data-warehouse-reporting-crawler") + return spark_df diff --git a/dbt/models/reporting/reporting.ratio_stats_input.sql b/dbt/models/reporting/reporting.ratio_stats_input.sql new file mode 120000 index 000000000..109698b5d --- /dev/null +++ b/dbt/models/reporting/reporting.ratio_stats_input.sql @@ -0,0 +1 @@ +../../../aws-athena/ctas/reporting-ratio_stats_input.sql \ No newline at end of file diff --git a/dbt/models/reporting/reporting.vw_ratio_stats.sql b/dbt/models/reporting/reporting.vw_ratio_stats.sql deleted file mode 120000 index f50270b1a..000000000 --- a/dbt/models/reporting/reporting.vw_ratio_stats.sql +++ /dev/null @@ -1 +0,0 @@ -../../../aws-athena/views/reporting-vw_ratio_stats.sql \ No newline at end of file diff --git a/dbt/models/reporting/schema.yml b/dbt/models/reporting/schema.yml index becb94d8e..ff7c70f04 100644 --- a/dbt/models/reporting/schema.yml +++ b/dbt/models/reporting/schema.yml @@ -1,39 +1,73 @@ -sources: - - name: reporting - tags: - - load_auto - tables: - - name: ratio_stats - description: '{{ doc("table_ratio_stats") }}' - tests: - - expression_is_true: - name: reporting_ratio_stats_no_nulls - expression: | - year IS NOT NULL - AND triad IS NOT NULL - AND geography_type IS NOT NULL - AND property_group IS NOT NULL - AND assessment_stage IS NOT NULL - AND sale_year IS NOT NULL - - expression_is_true: - name: reporting_ratio_stats_metrics_are_sensible - expression: | - cod >= 0 - AND prd >= 0 - AND prb BETWEEN -1 AND 1 - AND mki >= 0 - AND triad IS NOT NULL - AND geography_type IS NOT NULL - AND property_group IS NOT NULL - AND assessment_stage IS NOT NULL - AND sale_year IS NOT NULL - - expression_is_true: - name: reporting_ratio_stats_within_directionality - expression: | - within_20_pct >= within_10_pct - AND within_10_pct >= within_05_pct - models: + - name: reporting.ratio_stats + description: '{{ doc("table_ratio_stats") }}' + config: + tags: + - daily + packages: + - "assesspy==1.1.*" + - "numpy==1.26.*" + - "pandas==2.*" + tests: + - expression_is_true: + name: reporting_ratio_stats_no_nulls + expression: | + year IS NOT NULL + AND triad IS NOT NULL + AND geography_type IS NOT NULL + AND property_group IS NOT NULL + AND assessment_stage IS NOT NULL + AND sale_year IS NOT NULL + - expression_is_true: + name: reporting_ratio_stats_metrics_are_sensible + expression: | + cod >= 0 + AND prd >= 0 + AND prb BETWEEN -1 AND 1 + AND mki >= 0 + AND triad IS NOT NULL + AND geography_type IS NOT NULL + AND property_group IS NOT NULL + AND assessment_stage IS NOT NULL + AND sale_year IS NOT NULL + - expression_is_true: + name: reporting_ratio_stats_within_directionality + expression: | + within_20_pct >= within_10_pct + AND within_10_pct >= within_05_pct + + - name: reporting.ratio_stats_input + description: '{{ doc("table_ratio_stats_input") }}' + config: + tags: + - daily + tests: + - unique_combination_of_columns: + name: reporting_ratio_stats_input_unique_by_keys + combination_of_columns: + - pin + - year + - assessment_stage + - sale_price + - expression_is_true: + name: reporting_ratio_stats_input_no_nulls + expression: | + property_group IS NOT NULL + AND assessment_stage IS NOT NULL + AND triad IS NOT NULL + AND township_code IS NOT NULL + - expression_is_true: + name: reporting_ratio_stats_input_sale_year_equals_year_minus_one + expression: CAST(sale_year AS INTEGER) = CAST(year AS INTEGER) - 1 + additional_select_columns: + - sale_year + - year + - expression_is_true: + name: reporting_ratio_stats_input_ratio_greater_than_zero + expression: ratio >= 0 + additional_select_columns: + - ratio + - name: reporting.res_report_summary description: '{{ doc("table_res_report_summary") }}' config: @@ -177,35 +211,6 @@ models: where: CAST(year AS int) < {{ var('test_qc_year_start') }} error_if: ">21" # as of 2024-04-01 - - name: reporting.vw_ratio_stats - description: '{{ doc("view_vw_ratio_stats") }}' - tests: - - unique_combination_of_columns: - name: reporting_vw_ratio_stats_unique_by_keys - combination_of_columns: - - pin - - year - - assessment_stage - - sale_price - - expression_is_true: - name: reporting_vw_ratio_stats_no_nulls - expression: | - property_group IS NOT NULL - AND assessment_stage IS NOT NULL - AND triad IS NOT NULL - AND township_code IS NOT NULL - - expression_is_true: - name: reporting_vw_ratio_stats_sale_year_equals_year_minus_one - expression: CAST(sale_year AS INTEGER) = CAST(year AS INTEGER) - 1 - additional_select_columns: - - sale_year - - year - - expression_is_true: - name: reporting_vw_ratio_stats_ratio_greater_than_zero - expression: ratio >= 0 - additional_select_columns: - - ratio - - name: reporting.vw_top_5 description: '{{ doc("view_vw_top_5") }}' tests: diff --git a/dbt/profiles.yml b/dbt/profiles.yml index 1ab4d7962..c7edf7866 100644 --- a/dbt/profiles.yml +++ b/dbt/profiles.yml @@ -11,6 +11,7 @@ athena: schema: z_static_unused_dbt_stub_database # "database" here corresponds to a Glue data catalog database: awsdatacatalog + spark_work_group: primary-spark threads: 5 num_retries: 1 ci: @@ -21,6 +22,7 @@ athena: region_name: us-east-1 schema: z_static_unused_dbt_stub_database database: awsdatacatalog + spark_work_group: primary-spark threads: 5 num_retries: 1 prod: @@ -31,5 +33,6 @@ athena: region_name: us-east-1 schema: default database: awsdatacatalog + spark_work_group: primary-spark threads: 5 num_retries: 1 diff --git a/dbt/requirements.txt b/dbt/requirements.txt index 81d7fc03d..e44ad7a0d 100644 --- a/dbt/requirements.txt +++ b/dbt/requirements.txt @@ -1 +1 @@ -dbt-athena-community==1.7.1 +dbt-athena-community==1.7.2