Skip to content

Commit

Permalink
Refactor ratio stats Glue job to dbt python model (#422)
Browse files Browse the repository at this point in the history
* Ignore dev venv

* Add initial python model

* Bump dbt-athena version

* Lint ratio stats

* Change project name to fix compile issue

* Add Spark workgroup

* Finalize ratio_stats spark job

* Convert vw_ratio_stats into table

* Move package zip location

* Remove deprecated glue job

* Convert spark to pandas df

* Drop detect_chasing()

* Fix linting issues

* Rename project (prefix with org name)

* Rename vw_ratio_stats to ratio_stats_input

* Add dbt tag to build daily

* Update dbt/models/reporting/docs.md

Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com>

* Re-add detect_chasing as stub

* Move ratio_stats to model (instead of source)

* Add ratio_stats schema prefix

* Alphabetize dbt reporting models

* Add explicit Spark output data types

* Update reporting table tags

* Coerce types to int

* Update Athena workgroup name

* Replicate filtering from prior vw_ratio_stats pull

* Update aws-athena/ctas/reporting-ratio_stats_input.sql

Co-authored-by: William Ridgeway <10358980+wrridgeway@users.noreply.github.com>

---------

Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com>
Co-authored-by: William Ridgeway <10358980+wrridgeway@users.noreply.github.com>
  • Loading branch information
3 people authored May 8, 2024
1 parent af87014 commit b13870e
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 213 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_daily_dbt_models.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
workflow_dispatch:
schedule:
# Every day at 8am (2pm UTC-5)
- cron: '0 13 * * *'
- cron: '0 12 * * *'

jobs:
build-daily-dbt-models:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ logs/
# Ignore most CSVs, except those that are used as dbt seeds
*.csv
!dbt/seeds/**/*.csv
venv/
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
-- View containing ratios by pin, intended to feed the
-- glue job 'reporting-ratio_stats'.
-- Table containing ratios by pin, intended to feed the
-- ratio_stats dbt python model
{{
config(
materialized='table'
)
}}

-- Model values for corresponding triads only
WITH model_values AS (
Expand Down
6 changes: 3 additions & 3 deletions dbt/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: 'athena'
name: 'ccao_data_athena'
version: '0.1.0'

# This setting configures which "profile" dbt uses for this project.
Expand Down Expand Up @@ -40,7 +40,7 @@ vars:
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
models:
athena:
ccao_data_athena:
+materialized: view
+write_compression: zstd
+format: parquet
Expand All @@ -66,7 +66,7 @@ tests:
+schema: test_failure

seeds:
athena:
ccao_data_athena:
ccao:
+schema: ccao
location:
Expand Down
18 changes: 9 additions & 9 deletions dbt/models/reporting/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ reporting Tableau dashboards.
`property_group`
{% enddocs %}

# ratio_stats_input

{% docs table_ratio_stats_input %}
Table to feed the Python dbt job that creates the `reporting.ratio_stats` table.
Feeds public reporting assets.

**Primary Key**: `year`, `pin`, `assessment_stage`
{% enddocs %}

# res_report_summary

{% docs table_res_report_summary %}
Expand Down Expand Up @@ -88,15 +97,6 @@ The assessment stages are:
**Primary Key**: `year`, `pin`, `stage_name`
{% enddocs %}

# vw_ratio_stats

{% docs view_vw_ratio_stats %}
View to feed the `reporting.ratio_stats` table and Glue job.
Feeds public reporting assets.

**Primary Key**: `year`, `pin`, `assessment_stage`
{% enddocs %}

# vw_res_report_summary

{% docs view_vw_res_report_summary %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,125 +1,17 @@
import re
import time

import boto3
import numpy as np
import pandas as pd
from assesspy import boot_ci, cod
from assesspy import cod_ci as cod_boot
from assesspy import cod_met, detect_chasing, mki, mki_met, prb, prb_met, prd
from assesspy import prd_ci as prd_boot
from assesspy import prd_met
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Define AWS boto3 clients
athena_client = boto3.client("athena")
glue_client = boto3.client("glue", region_name="us-east-1")
s3_client = boto3.client("s3")

# Define s3 and Athena paths
athena_db = "iasworld"

s3_bucket = "ccao-data-warehouse-us-east-1"
s3_prefix = "reporting/ratio_stats/"
s3_output = "s3://" + s3_bucket + "/" + s3_prefix
s3_ratio_stats = "s3://" + s3_bucket + "/" + s3_prefix + "ratio_stats.parquet"


# Functions to help with Athena queries ----
def poll_status(athena_client, execution_id):
"""Checks the status of the a query using an
incoming execution id and returns
a 'pass' string value when the status is
either SUCCEEDED, FAILED or CANCELLED."""

result = athena_client.get_query_execution(QueryExecutionId=execution_id)
state = result["QueryExecution"]["Status"]["State"]

if state == "SUCCEEDED":
return "pass"
if state == "FAILED":
return "pass"
if state == "CANCELLED":
return "pass"
else:
return "not pass"


def poll_result(athena_client, execution_id):
"""Gets the query result using an incoming execution id.
This function is ran after the poll_status function and
only if we are sure that the query was fully executed."""

result = athena_client.get_query_execution(QueryExecutionId=execution_id)

return result


def run_query_get_result(
athena_client, s3_bucket, query, database, s3_output, s3_prefix
):
"""Runs an incoming query and returns
the output as an s3 file like object."""

response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": database},
ResultConfiguration={
"OutputLocation": s3_output,
},
)

QueryExecutionId = response.get("QueryExecutionId")

# Wait until query is executed
while poll_status(athena_client, QueryExecutionId) != "pass":
time.sleep(2)
pass

result = poll_result(athena_client, QueryExecutionId)

r_file_object = None

# Only return file like object when the query succeeded
if result["QueryExecution"]["Status"]["State"] == "SUCCEEDED":
print("Query SUCCEEDED: {}".format(QueryExecutionId))

s3_key = s3_prefix + QueryExecutionId + ".csv"

r_file_object = boto3.resource("s3").Object(s3_bucket, s3_key)

return r_file_object


# Athena query ----
SQL_QUERY = "SELECT * FROM reporting.vw_ratio_stats;"


# Run run_query_get_result to get file like object ----
r_file_object = run_query_get_result(
athena_client, s3_bucket, SQL_QUERY, athena_db, s3_output, s3_prefix
# type: ignore
# pylint: skip-file
sc.addPyFile( # noqa: F821
"s3://ccao-dbt-athena-dev-us-east-1/packages/spark-packages.zip"
)

# Retrieve s3 location of Athena query result and retrieve it
target = "s3://" + s3_bucket + "/" + r_file_object.key

pull = pd.read_csv(target)
pull = pull[pull.ratio > 0 & pull.ratio.notnull()]

# Delete all query results for this job from s3 bucket
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)

for object in response["Contents"]:
if re.search("csv", object["Key"]):
print("Deleting", object["Key"])
s3_client.delete_object(Bucket=s3_bucket, Key=object["Key"])
import numpy as np # noqa: E402
import pandas as pd # noqa: E402
from assesspy import boot_ci # noqa: E402
from assesspy import cod # noqa: E402
from assesspy import prd_met # noqa: E402
from assesspy import cod_ci as cod_boot # noqa: E402
from assesspy import cod_met, mki, mki_met, prb, prb_met, prd # noqa: E402
from assesspy import prd_ci as prd_boot # noqa: E402


def median_boot(ratio, nboot=100, alpha=0.05):
Expand Down Expand Up @@ -249,7 +141,9 @@ def ccao_prb(fmv, sale_price):


def report_summarise(df, geography_id, geography_type):
# Aggregates data and calculates summary statistics for given groupings
"""
Aggregates data and calculates summary statistics for given groupings
"""

group_cols = [
"year",
Expand All @@ -261,10 +155,11 @@ def report_summarise(df, geography_id, geography_type):
"sale_year",
]

df["geography_id"] = pull[geography_id].astype(str)
df["geography_id"] = df[geography_id].astype(str)
df["geography_type"] = geography_type

# Remove groups with less than three observations
# TODO: Remove/upgrade detect_chasing output
df["n"] = df.groupby(group_cols)["ratio"].transform("count")
df = df[df["n"] > 3]
df = df.groupby(group_cols).apply(
Expand All @@ -276,7 +171,7 @@ def report_summarise(df, geography_id, geography_type):
"mki": ccao_mki(fmv=x["fmv"], sale_price=x["sale_price"]),
"prd": ccao_prd(fmv=x["fmv"], sale_price=x["sale_price"]),
"prb": ccao_prb(fmv=x["fmv"], sale_price=x["sale_price"]),
"detect_chasing": detect_chasing(ratio=x["ratio"]),
"detect_chasing": False,
"within_20_pct": sum(abs(1 - x["ratio"]) <= 0.20),
"within_10_pct": sum(abs(1 - x["ratio"]) <= 0.10),
"within_05_pct": sum(abs(1 - x["ratio"]) <= 0.05),
Expand Down Expand Up @@ -335,13 +230,45 @@ def report_summarise(df, geography_id, geography_type):
return df


# Append and write output to s3 bucket
pd.concat(
[
report_summarise(pull, "triad", "Tri"),
report_summarise(pull, "township_code", "Town"),
]
).reset_index(drop=True).to_parquet(s3_ratio_stats)
def model(dbt, spark_session):
dbt.config(materialized="table")

input = dbt.ref("reporting.ratio_stats_input")

# Convert the Spark input dataframe to Pandas for
# compatibility with assesspy functions
input = input.toPandas()

# Replicate filtering from prior vw_ratio_stats pull
input = input[input.ratio > 0 & input.ratio.notnull()]

df = pd.concat(
[
report_summarise(input, "triad", "Tri"),
report_summarise(input, "township_code", "Town"),
]
).reset_index(drop=True)

# Force certain columns to datatype to maintain parity with old version
df[["year", "triad", "sale_year"]] = df[
["year", "triad", "sale_year"]
].astype(int)

# Create a Spark schema to maintain the datatypes of the
# previous output (for Tableau compatibility)
schema = (
"year: bigint, triad: bigint, geography_type: string, "
+ "property_group: string, assessment_stage: string, "
+ "geography_id: string, sale_year: bigint, sale_n: bigint, "
+ "median_ratio: double, median_ratio_ci: string, cod: double, "
+ "cod_ci: string, cod_n: bigint, prd: double, prd_ci: string, "
+ "prd_n: bigint, prb: double, prb_ci: string, prb_n: bigint, "
+ "mki: double, mki_n: bigint, detect_chasing: boolean, "
+ "ratio_met: boolean, cod_met: boolean, prd_met: boolean, "
+ "prb_met: boolean, mki_met: boolean, vertical_equity_met: boolean, "
+ "within_20_pct: bigint, within_10_pct: bigint, within_05_pct: bigint"
)

spark_df = spark_session.createDataFrame(df, schema=schema)

# Trigger reporting glue crawler
glue_client.start_crawler(Name="ccao-data-warehouse-reporting-crawler")
return spark_df
1 change: 1 addition & 0 deletions dbt/models/reporting/reporting.ratio_stats_input.sql
1 change: 0 additions & 1 deletion dbt/models/reporting/reporting.vw_ratio_stats.sql

This file was deleted.

Loading

0 comments on commit b13870e

Please sign in to comment.