Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ratio stats for build speed increase #521

Merged
merged 42 commits into from
Nov 26, 2024

Conversation

wagnerlmichael
Copy link
Member

@wagnerlmichael wagnerlmichael commented Jun 25, 2024

Summary

This is not a final PR for review, but a progress update to determine next steps.

Currently all of the assesspy functions used in this script are copied in. If we were to move forward with this solution, they would need to be refactored in the actual package rather than copy/pasted and changed in this script.

Currently with these changes the build time for reporting.ratio_stats table is ~15 minutes, a large improvement over the previous ~1 hour. All of this speed up came from editing the boot_ci function. I'm not sure how much speed we could get from editing the other functions. Changing sampling from pandas to numpy index sampling contributed to about ~10% of the speed up whereas parallel processing contributed to ~90% of the speed up.

Dev table here: "z_ci_436_refactor_ratio_stats_job_to_use_pyspark_reporting"."ratio_stats"

Other strategies tried

Spark

I tried for a while with different spark strategies. First I attempted to convert the data frame to a spark data frame and sample on that, but that didn't work. It was extremely slow, I'm assuming this was the case due to computationally intensive transformations from pandas to spark to pandas.

I tried to get around this issue by using a pandas udf. Supposedly, this allows the spark api to operate on the pandas data frame in a columnar format, maintaining speed increases from distributed processing. This also resulted in much longer build times or errors I couldn't work through.

I also tried a single pandas df conversion to spark, and then edit the remaining data structures in boot_ci so that they were all spark compatible, I also could not get this speed up working.

I am new to spark, so it is very possible I missed something obvious or there are remaining workable solutions.

Numba and Dask

I tried basic numba parallelization and Dask parallelization, but neither were able to be imported in properly. I because this is because they both have C bindings and Athena doesn't allow for this with third-party package additions.

concurrent.futures

I tried using this built-in python func but the parallelization was failing due to a pickling error, I switched to multiprocessing and that finally worked.

Considerations on current strategy

If we were to move forward with this solution, we would need to decide how to reconcile the changed boot_ci function with the assesspy build. One option is to edit the package itself and include a boolean param that turns parallel processing on/off. Another option is too just keep the copy pasted functions in this script, but that creates two sources of truth for the assesspy functions which isn't ideal.

One potential upside of not using spark is that we can potentially maintain these functions in assesspy rather than building out an entirely new set of spark assesspy functions.

Other ways forward

We could also continue to develop here. Two other paths forward for me could be:

  • Try to spend more time figuring out spark
  • Try further non-spark speed up in other functions

@wagnerlmichael wagnerlmichael linked an issue Jun 25, 2024 that may be closed by this pull request
Comment on lines 159 to 176
def bootstrap_worker(
data_array, fun, num_kwargs, n, nboot, start, end, result_queue
):
ests = []
for _ in range(start, end):
sample_indices = np.random.choice(
data_array.shape[0], size=n, replace=True
)
sample_array = data_array[sample_indices]
if fun.__name__ == "cod" or num_kwargs == 1:
ests.append(fun(sample_array[:, 0]))
elif fun.__name__ == "prd":
ests.append(fun(sample_array[:, 0], sample_array[:, 1]))
else:
raise Exception(
"Input function should require 1 argument or be assesspy.prd." # noqa
)
result_queue.put(ests)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function sets up for our parallel processing, it is the unit of work that a single core will be doing. It randomly samples just like the prior code.

Comment on lines 163 to 166
for _ in range(start, end):
sample_indices = np.random.choice(
data_array.shape[0], size=n, replace=True
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We substitute the old pandas sampling for a faster np.random.choice() sampling.

Comment on lines 178 to 189
def parallel_bootstrap(
data_array, fun, num_kwargs, n, nboot, num_processes=4
):
processes = []
result_queue = mp.Queue()
chunk_size = nboot // num_processes

for i in range(num_processes):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else nboot
p = mp.Process(
target=bootstrap_worker,
Copy link
Member Author

@wagnerlmichael wagnerlmichael Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function allocates the size of each process, and divides the ests bootstrap calculation into n_processes. This conditional if i < num_processes - 1 else nboot handles the case in which the nboot number isn't cleanly divisble by num_processes

Comment on lines 204 to 211
results = []
for _ in range(num_processes):
results.extend(result_queue.get())

for p in processes:
p.join()

return results
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grabs the data from all the processes and combines them at the end

result_queue.put(ests)

def parallel_bootstrap(
data_array, fun, num_kwargs, n, nboot, num_processes=4
Copy link
Member Author

@wagnerlmichael wagnerlmichael Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_processes=4 is the optimal number of cores here. 3 to 4 is a big speed increase but 4, 8, and 16 all give similar times. I'm guessing this is because of the data transfer bottleneck between cores.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Question, non-blocking] I'm guessing you checked this, but do we know for sure that the machine that this was tested on had more than 4 cores available?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this in person and there were only 4 cores available, but currently aws only allows a single 4 core DPU for processing. If they change this we could probably get much faster speeds with more cores.

Copy link
Contributor

@jeancochrane jeancochrane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice work! It's a bummer to hear that the Spark code either didn't work or wasn't faster, but I don't have enough Spark experience to advise on a path forward at this point. Maybe it would make sense to take another crack at it in the future, but in the meantime I like the improvements you've made here, and I'm on board with the plan to make these changes to the assesspy package.

My recommended path forward would be to recreate these changes in a branch of assesspy, bundle and push the code from that branch to S3 as a .zip file, and then test it out by updating the sc.addPyFile() call in this model definition to point to the new version of the package. Then once we get the assesspy branch merged and released, we can update the ratio_stats model to depend on the new version. Does that sound reasonable to everyone else?

result_queue.put(ests)

def parallel_bootstrap(
data_array, fun, num_kwargs, n, nboot, num_processes=4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Question, non-blocking] I'm guessing you checked this, but do we know for sure that the machine that this was tested on had more than 4 cores available?

@dfsnow
Copy link
Member

dfsnow commented Jul 2, 2024

@wagnerlmichael I mocked up a working set of functions using only Spark-compatible abstractions and dropped the result in a gist. The result seems to be pretty fast, at least on the limited subset of columns I calculated.

You should be able to drop that code into an Athena Spark notebook and run it without modification. It runs the COD stat calculations at the township_code level in 1m45s. I didn't mock up the other functions but it should be fairly straightforward to build out from here.

Can you take another crack at this when you get some free time, building off the linked gist? I'm happy to walk through what I did in the functions/how I figured them out. If the gist is stuff you've already tried then let me know and I'll think about another way forward.

@dfsnow dfsnow force-pushed the 436-refactor-ratio_stats-job-to-use-pyspark branch from 611789d to e55970d Compare November 19, 2024 22:16
@dfsnow dfsnow force-pushed the 436-refactor-ratio_stats-job-to-use-pyspark branch from 89edc2e to 4ae9362 Compare November 25, 2024 22:26
Copy link
Member

@dfsnow dfsnow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • @jeancochrane This is ready for your review.
  • @wagnerlmichael You should take a look at the changes here.
  • @ccao-jardine I added back a sales chasing stat, but I also changed the column order (and some names). Is that okay?

This PR is full refactor of the ratio_stats Python model. It simplifies the code by abstracting assessment metrics to the new AssessPy 2.0.0 release. It also significantly decreases the run time of the model by taking advantage of Spark parallelism (from ~15 min to ~3 min).

The refactors made in AssessPy to accommodate the needs of this model should be reusable in other contexts, and this model can serve as a template for future Python models in our dbt stack.


def ccao_metric(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I condensed the individual metric functions into this wrapper. It:

  • Drops any outliers, per ccao_drop_outliers
  • Checks the min sample size requirement
  • Calculates the stat (COD, PRD, PRB, or MKI)
  • Calculates the CI for the stat (except for MKI)
  • Calculates the post-outlier-drop sample size (n)
  • Returns a dictionary for use in calc_summary

We could move this and the other ccao_ prefixed functions into the ccao package, but I'm honestly not sure it's necessary. This function and the others are sufficiently short that it doesn't seem like a big deal to have them in here.

prd_ci = prd_boot(fmv_no_outliers, sale_price_no_outliers, nboot=1000)
prd_ci = f"{prd_ci[0]}, {prd_ci[1]}"
met = prd_met(prd_val)
def ccao_median(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Median gets its own function because it's not included as a stat in assesspy and takes different inputs (ratio, as opposed to estimate and sale_price).

Comment on lines +138 to +141
df.withColumn("geography_id", col(geography_id).cast("string"))
.withColumn("geography_type", lit(geography_type))
.groupby(group_cols)
.applyInPandas(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key to making things faster was just using native PySpark abstractions. Here df is a Spark table, and applyInPandas does all the heavy lifting of running a lambda function across all the specified grouping columns.

@@ -21,6 +23,9 @@ extend-select = ["I"]
# decide we want to import code from dbt/ to a context outside of it
known-third-party = ["dbt"]

[tool.ruff.lint.per-file-ignores]
"dbt/models/**.py" = ["E402"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignores import order errors only for Python dbt models.

@dfsnow dfsnow requested a review from jeancochrane November 26, 2024 19:59
Copy link
Contributor

@jeancochrane jeancochrane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work 🚀

@dfsnow dfsnow marked this pull request as ready for review November 26, 2024 22:29
@dfsnow dfsnow requested a review from a team as a code owner November 26, 2024 22:29
@dfsnow dfsnow merged commit 8c6633d into master Nov 26, 2024
7 checks passed
@dfsnow dfsnow deleted the 436-refactor-ratio_stats-job-to-use-pyspark branch November 26, 2024 22:30
@ccao-jardine
Copy link
Member

I also changed the column order (and some names). Is that okay?

Should be fine, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor ratio_stats job to use Pyspark
4 participants