Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ratio stats Glue job to dbt python model #422

Merged
merged 28 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
06bb719
Ignore dev venv
dfsnow May 1, 2024
63bd8cb
Add initial python model
dfsnow May 1, 2024
f3012e1
Bump dbt-athena version
dfsnow May 1, 2024
1e981ec
Lint ratio stats
dfsnow May 1, 2024
af7f504
Change project name to fix compile issue
dfsnow May 1, 2024
02fab5d
Add Spark workgroup
dfsnow May 1, 2024
eacd40a
Finalize ratio_stats spark job
dfsnow May 1, 2024
26f9013
Convert vw_ratio_stats into table
dfsnow May 1, 2024
3e7f558
Move package zip location
dfsnow May 1, 2024
199c405
Remove deprecated glue job
dfsnow May 1, 2024
f66ed93
Convert spark to pandas df
dfsnow May 2, 2024
8fdc7a4
Drop detect_chasing()
dfsnow May 2, 2024
8dbec57
Fix linting issues
dfsnow May 2, 2024
487b8a7
Rename project (prefix with org name)
dfsnow May 2, 2024
15b9af7
Rename vw_ratio_stats to ratio_stats_input
dfsnow May 2, 2024
8f8df47
Add dbt tag to build daily
dfsnow May 2, 2024
f0dbd9e
Update dbt/models/reporting/docs.md
dfsnow May 2, 2024
1576932
Re-add detect_chasing as stub
dfsnow May 2, 2024
fe1d40d
Move ratio_stats to model (instead of source)
dfsnow May 6, 2024
9434c4e
Merge branch 'master' into dfsnow/spike-ratio-stats-refactor
dfsnow May 6, 2024
5bc9442
Add ratio_stats schema prefix
dfsnow May 7, 2024
0499aed
Alphabetize dbt reporting models
dfsnow May 7, 2024
bd03719
Add explicit Spark output data types
dfsnow May 7, 2024
5f12a41
Update reporting table tags
dfsnow May 7, 2024
f747d7f
Coerce types to int
dfsnow May 7, 2024
0efa325
Update Athena workgroup name
dfsnow May 7, 2024
2e3bbfd
Replicate filtering from prior vw_ratio_stats pull
dfsnow May 8, 2024
7c07280
Update aws-athena/ctas/reporting-ratio_stats_input.sql
dfsnow May 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_daily_dbt_models.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
workflow_dispatch:
schedule:
# Every day at 8am (2pm UTC-5)
- cron: '0 13 * * *'
- cron: '0 12 * * *'
dfsnow marked this conversation as resolved.
Show resolved Hide resolved

jobs:
build-daily-dbt-models:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ logs/
# Ignore most CSVs, except those that are used as dbt seeds
*.csv
!dbt/seeds/**/*.csv
venv/
dfsnow marked this conversation as resolved.
Show resolved Hide resolved
dfsnow marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
-- View containing ratios by pin, intended to feed the
dfsnow marked this conversation as resolved.
Show resolved Hide resolved
-- glue job 'reporting-ratio_stats'.
-- ratio_stats dbt python model
{{
config(
materialized='table'
)
}}

-- Model values for corresponding triads only
WITH model_values AS (
Expand Down
6 changes: 3 additions & 3 deletions dbt/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: 'athena'
name: 'ccao_data_athena'
version: '0.1.0'
dfsnow marked this conversation as resolved.
Show resolved Hide resolved

# This setting configures which "profile" dbt uses for this project.
Expand Down Expand Up @@ -40,7 +40,7 @@ vars:
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
models:
athena:
ccao_data_athena:
+materialized: view
+write_compression: zstd
+format: parquet
Expand All @@ -66,7 +66,7 @@ tests:
+schema: test_failure

seeds:
athena:
ccao_data_athena:
ccao:
+schema: ccao
location:
Expand Down
18 changes: 9 additions & 9 deletions dbt/models/reporting/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ reporting Tableau dashboards.
`property_group`
{% enddocs %}

# ratio_stats_input

{% docs table_ratio_stats_input %}
Table to feed the Python dbt job that creates the `reporting.ratio_stats` table.
Feeds public reporting assets.

**Primary Key**: `year`, `pin`, `assessment_stage`
{% enddocs %}

# res_report_summary

{% docs table_res_report_summary %}
Expand Down Expand Up @@ -88,15 +97,6 @@ The assessment stages are:
**Primary Key**: `year`, `pin`, `stage_name`
{% enddocs %}

# vw_ratio_stats

{% docs view_vw_ratio_stats %}
View to feed the `reporting.ratio_stats` table and Glue job.
Feeds public reporting assets.

**Primary Key**: `year`, `pin`, `assessment_stage`
{% enddocs %}

# vw_res_report_summary

{% docs view_vw_res_report_summary %}
Expand Down
dfsnow marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wrridgeway It looks like these functions are still cutting out the top and bottom 5% of ratios. I can't quite remember where we landed on this, but I thought we were shifting to kicking out sales using the new flags.

Original file line number Diff line number Diff line change
@@ -1,125 +1,17 @@
import re
import time

import boto3
import numpy as np
import pandas as pd
from assesspy import boot_ci, cod
from assesspy import cod_ci as cod_boot
from assesspy import cod_met, detect_chasing, mki, mki_met, prb, prb_met, prd
from assesspy import prd_ci as prd_boot
from assesspy import prd_met
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Define AWS boto3 clients
athena_client = boto3.client("athena")
glue_client = boto3.client("glue", region_name="us-east-1")
s3_client = boto3.client("s3")

# Define s3 and Athena paths
athena_db = "iasworld"

s3_bucket = "ccao-data-warehouse-us-east-1"
s3_prefix = "reporting/ratio_stats/"
s3_output = "s3://" + s3_bucket + "/" + s3_prefix
s3_ratio_stats = "s3://" + s3_bucket + "/" + s3_prefix + "ratio_stats.parquet"


# Functions to help with Athena queries ----
def poll_status(athena_client, execution_id):
"""Checks the status of the a query using an
incoming execution id and returns
a 'pass' string value when the status is
either SUCCEEDED, FAILED or CANCELLED."""

result = athena_client.get_query_execution(QueryExecutionId=execution_id)
state = result["QueryExecution"]["Status"]["State"]

if state == "SUCCEEDED":
return "pass"
if state == "FAILED":
return "pass"
if state == "CANCELLED":
return "pass"
else:
return "not pass"


def poll_result(athena_client, execution_id):
"""Gets the query result using an incoming execution id.
This function is ran after the poll_status function and
only if we are sure that the query was fully executed."""

result = athena_client.get_query_execution(QueryExecutionId=execution_id)

return result


def run_query_get_result(
athena_client, s3_bucket, query, database, s3_output, s3_prefix
):
"""Runs an incoming query and returns
the output as an s3 file like object."""

response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": database},
ResultConfiguration={
"OutputLocation": s3_output,
},
)

QueryExecutionId = response.get("QueryExecutionId")

# Wait until query is executed
while poll_status(athena_client, QueryExecutionId) != "pass":
time.sleep(2)
pass

result = poll_result(athena_client, QueryExecutionId)

r_file_object = None

# Only return file like object when the query succeeded
if result["QueryExecution"]["Status"]["State"] == "SUCCEEDED":
print("Query SUCCEEDED: {}".format(QueryExecutionId))

s3_key = s3_prefix + QueryExecutionId + ".csv"

r_file_object = boto3.resource("s3").Object(s3_bucket, s3_key)

return r_file_object


# Athena query ----
SQL_QUERY = "SELECT * FROM reporting.vw_ratio_stats;"


# Run run_query_get_result to get file like object ----
r_file_object = run_query_get_result(
athena_client, s3_bucket, SQL_QUERY, athena_db, s3_output, s3_prefix
# type: ignore
# pylint: skip-file
sc.addPyFile( # noqa: F821
Comment on lines +1 to +2
Copy link
Member Author

@dfsnow dfsnow May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annoyingly, our linters and pre-commit checks absolutely hate this setup. There are a two problems:

  1. sc is undefined in the script (it's added when you submit the job).
  2. The imports come necessarily after addPyFile(), which loads the packages from S3.

I'm not sure if there's a way to disable these linting toggles in a prettier way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Question, non-blocking] Unfortunately this is pretty common when 1) writing code for environments with implicit context and B) writing setup code that needs to run prior to import. I don't think there's an obviously better way to do it, although we might consider switching to a different linter that supports block-level ignores (unlike flake8).

That being said, can you provide some context on why pylint is turned off? Is that just for your editor integration?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's because it's also mad about sc not being defined. Pylint in this case is kind of extraneous since flake8 is running already in precommit.

"s3://ccao-dbt-athena-dev-us-east-1/packages/spark-packages.zip"
)
dfsnow marked this conversation as resolved.
Show resolved Hide resolved

# Retrieve s3 location of Athena query result and retrieve it
target = "s3://" + s3_bucket + "/" + r_file_object.key

pull = pd.read_csv(target)
pull = pull[pull.ratio > 0 & pull.ratio.notnull()]

# Delete all query results for this job from s3 bucket
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)

for object in response["Contents"]:
if re.search("csv", object["Key"]):
print("Deleting", object["Key"])
s3_client.delete_object(Bucket=s3_bucket, Key=object["Key"])
import numpy as np # noqa: E402
import pandas as pd # noqa: E402
from assesspy import boot_ci # noqa: E402
from assesspy import cod # noqa: E402
from assesspy import prd_met # noqa: E402
from assesspy import cod_ci as cod_boot # noqa: E402
from assesspy import cod_met, mki, mki_met, prb, prb_met, prd # noqa: E402
from assesspy import prd_ci as prd_boot # noqa: E402


def median_boot(ratio, nboot=100, alpha=0.05):
Expand Down Expand Up @@ -249,7 +141,9 @@ def ccao_prb(fmv, sale_price):


def report_summarise(df, geography_id, geography_type):
# Aggregates data and calculates summary statistics for given groupings
"""
Aggregates data and calculates summary statistics for given groupings
"""

group_cols = [
"year",
Expand All @@ -261,10 +155,11 @@ def report_summarise(df, geography_id, geography_type):
"sale_year",
]

df["geography_id"] = pull[geography_id].astype(str)
df["geography_id"] = df[geography_id].astype(str)
df["geography_type"] = geography_type

# Remove groups with less than three observations
# TODO: Remove/upgrade detect_chasing output
df["n"] = df.groupby(group_cols)["ratio"].transform("count")
df = df[df["n"] > 3]
df = df.groupby(group_cols).apply(
Expand All @@ -276,7 +171,7 @@ def report_summarise(df, geography_id, geography_type):
"mki": ccao_mki(fmv=x["fmv"], sale_price=x["sale_price"]),
"prd": ccao_prd(fmv=x["fmv"], sale_price=x["sale_price"]),
"prb": ccao_prb(fmv=x["fmv"], sale_price=x["sale_price"]),
"detect_chasing": detect_chasing(ratio=x["ratio"]),
"detect_chasing": False,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dropped the detect_chasing() stuff because:

a) It's not actually super effective at detecting sales chasing.
b) It was causing extremely difficult to debug errors in the spark job.

We can work to add it back later if it's truly needed. FYI @ccao-jardine.

"within_20_pct": sum(abs(1 - x["ratio"]) <= 0.20),
"within_10_pct": sum(abs(1 - x["ratio"]) <= 0.10),
"within_05_pct": sum(abs(1 - x["ratio"]) <= 0.05),
Expand Down Expand Up @@ -335,13 +230,45 @@ def report_summarise(df, geography_id, geography_type):
return df


# Append and write output to s3 bucket
pd.concat(
[
report_summarise(pull, "triad", "Tri"),
report_summarise(pull, "township_code", "Town"),
]
).reset_index(drop=True).to_parquet(s3_ratio_stats)
def model(dbt, spark_session):
dbt.config(materialized="table")

input = dbt.ref("reporting.ratio_stats_input")

# Convert the Spark input dataframe to Pandas for
# compatibility with assesspy functions
input = input.toPandas()

# Replicate filtering from prior vw_ratio_stats pull
input = input[input.ratio > 0 & input.ratio.notnull()]

df = pd.concat(
[
report_summarise(input, "triad", "Tri"),
report_summarise(input, "township_code", "Town"),
]
).reset_index(drop=True)

# Force certain columns to datatype to maintain parity with old version
df[["year", "triad", "sale_year"]] = df[
["year", "triad", "sale_year"]
].astype(int)

# Create a Spark schema to maintain the datatypes of the
# previous output (for Tableau compatibility)
schema = (
"year: bigint, triad: bigint, geography_type: string, "
+ "property_group: string, assessment_stage: string, "
+ "geography_id: string, sale_year: bigint, sale_n: bigint, "
+ "median_ratio: double, median_ratio_ci: string, cod: double, "
+ "cod_ci: string, cod_n: bigint, prd: double, prd_ci: string, "
+ "prd_n: bigint, prb: double, prb_ci: string, prb_n: bigint, "
+ "mki: double, mki_n: bigint, detect_chasing: boolean, "
+ "ratio_met: boolean, cod_met: boolean, prd_met: boolean, "
+ "prb_met: boolean, mki_met: boolean, vertical_equity_met: boolean, "
+ "within_20_pct: bigint, within_10_pct: bigint, within_05_pct: bigint"
)

Comment on lines +259 to +270
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion, non-blocking] Since this is essentially just a comma-and-space-separated string of key-value pairs, it might make future development more convenient to define it as a dict and then serialize it to this format with a oneliner like so:

schema_dict = { ... }
schema = ", ".join(f"{key}: {val}" for key, val in schema_dict.items())

But I leave it up to you to decide which data structure you find more convenient!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect that we'll persist this schema definition. It will probably only live until Nicole gets back and can confirm that Tableau is still working fine.

spark_df = spark_session.createDataFrame(df, schema=schema)

# Trigger reporting glue crawler
glue_client.start_crawler(Name="ccao-data-warehouse-reporting-crawler")
return spark_df
1 change: 1 addition & 0 deletions dbt/models/reporting/reporting.ratio_stats_input.sql
1 change: 0 additions & 1 deletion dbt/models/reporting/reporting.vw_ratio_stats.sql

This file was deleted.

Loading
Loading