-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor ratio stats Glue job to dbt python model #422
Conversation
5889e5d
to
8dbec57
Compare
b6fd45e
to
15b9af7
Compare
# type: ignore | ||
# pylint: skip-file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annoyingly, our linters and pre-commit checks absolutely hate this setup. There are a two problems:
sc
is undefined in the script (it's added when you submit the job).- The imports come necessarily after
addPyFile()
, which loads the packages from S3.
I'm not sure if there's a way to disable these linting toggles in a prettier way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Question, non-blocking] Unfortunately this is pretty common when 1) writing code for environments with implicit context and B) writing setup code that needs to run prior to import. I don't think there's an obviously better way to do it, although we might consider switching to a different linter that supports block-level ignores (unlike flake8).
That being said, can you provide some context on why pylint is turned off? Is that just for your editor integration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it's because it's also mad about sc
not being defined. Pylint in this case is kind of extraneous since flake8 is running already in precommit.
|
||
# Convert the Spark input dataframes to Pandas for compatibility | ||
# with assesspy functions | ||
df = df.toPandas() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is absolutely critical to making this work. It converts the spark table returned by dbt-athena
(in this case all of reporting.ratio_stats_input
) to a pandas data frame. IMO, we should refactor this to use pyspark since it will probably be much faster.
@@ -276,7 +174,6 @@ def report_summarise(df, geography_id, geography_type): | |||
"mki": ccao_mki(fmv=x["fmv"], sale_price=x["sale_price"]), | |||
"prd": ccao_prd(fmv=x["fmv"], sale_price=x["sale_price"]), | |||
"prb": ccao_prb(fmv=x["fmv"], sale_price=x["sale_price"]), | |||
"detect_chasing": detect_chasing(ratio=x["ratio"]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dropped the detect_chasing()
stuff because:
a) It's not actually super effective at detecting sales chasing.
b) It was causing extremely difficult to debug errors in the spark job.
We can work to add it back later if it's truly needed. FYI @ccao-jardine.
@jeancochrane Hit another small roadblock on this after a bunch of permissions debugging. Seems like spark can't write tables with a dash in the name. Not an issue in prod, but doesn't work with our kebab-case name spacing schema. Do you think it's worth changing the macro to force everything to snake_case? |
@dfsnow It's a little bit annoying to have to change our naming scheme, but there's not actually a huge difference between the hyphenated vs. underscored versions so I don't mind. Were you able to find documentation confirming this limitation with Athena PySpark? I couldn't find anything so I'm wondering if the root cause is actually an error in the underlying plugin implementation (e.g. not using backticks to escape special characters). |
Sadly, there are official docs supporting this. I also forked |
@jeancochrane @wrridgeway This could probably use one more look before merging. I'll post the diff between the current |
A quick diff shows that the old
All that said, I think this is ready to merge. I've kept it as close as I can to the original |
Assuming testing comes back fine, looks good to me, though this all begs the question: is there a point to having the input table exist rather than just being a pull in the python script to generate stats any longer if both processes are handled by dbt on a daily basis and the input table serves no function outside feeding the script? It probably didn't need to exist in the first place. This seems similar to EI issues where we have a separate SQL script and read it in as a pull in the R script. |
Co-authored-by: William Ridgeway <10358980+wrridgeway@users.noreply.github.com>
Probably not tbh, we can likely ditch the input table. However, I'm going to punt that to a follow-up issue since I want to get this merged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! I can't really speak to the accuracy of the underlying table, but the model definition portion of the PR looks good to go.
schema = ( | ||
"year: bigint, triad: bigint, geography_type: string, " | ||
+ "property_group: string, assessment_stage: string, " | ||
+ "geography_id: string, sale_year: bigint, sale_n: bigint, " | ||
+ "median_ratio: double, median_ratio_ci: string, cod: double, " | ||
+ "cod_ci: string, cod_n: bigint, prd: double, prd_ci: string, " | ||
+ "prd_n: bigint, prb: double, prb_ci: string, prb_n: bigint, " | ||
+ "mki: double, mki_n: bigint, detect_chasing: boolean, " | ||
+ "ratio_met: boolean, cod_met: boolean, prd_met: boolean, " | ||
+ "prb_met: boolean, mki_met: boolean, vertical_equity_met: boolean, " | ||
+ "within_20_pct: bigint, within_10_pct: bigint, within_05_pct: bigint" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Suggestion, non-blocking] Since this is essentially just a comma-and-space-separated string of key-value pairs, it might make future development more convenient to define it as a dict and then serialize it to this format with a oneliner like so:
schema_dict = { ... }
schema = ", ".join(f"{key}: {val}" for key, val in schema_dict.items())
But I leave it up to you to decide which data structure you find more convenient!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't expect that we'll persist this schema definition. It will probably only live until Nicole gets back and can confirm that Tableau is still working fine.
This PR migrates the
ratio_stats
Glue job to a dbt Python model that uses Athena's Spark integration as the Python runtime. There are some tradeoffs here:Pros
ratio_stats
table with the DAG, meaning it will get rebuild on upstream changes AND it works with dbt profiles (so changes can first be built in CI buckets)Cons
dbt-athena
.IMO the pros are worth it here, but it's probably worth it to merge this and keep it up as a test for a month or so before integrating it more deeply.