-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Full spark integration #249
Conversation
8699370
to
8ca4ef6
Compare
cb12bde
to
d370f28
Compare
4adca5d
to
8ca4ef6
Compare
2587e09
to
14a32e1
Compare
7cb43c0
to
5b64362
Compare
See issue for more design. Basic overview is: 1. We use `with_columns` to group together map transforms 2. Map transforms can be UDFs (normal) or pandas UDFs 3. THese all get run and "linearized" -- this means that we have two sets of edges: -> edges that form the "physical" dependency -- the dataframe geting passed through and consistently appended to -> edges that represent logical dependencies -- these are the original edges in the with_columns group` While this muddles the edges, this allows us to visualize both the structure and execution of the DAG. We will likely be adding metadata to edges to help with visualization.
This has basic documentation in the README, a notebook, and some simple hello_world code/a script. There are a few caveats that are noted in the code.
Dependencies within a with_columns group can come from one of three places: 1. Other columns inside that group 2. The upsteram dataframe 3. External places in the DAG Previously we were requiring users to specify (2) with the initial_schema kwarg, but we now allow them to specify (3) with the external_inputs kwarg.
append will just append to that dataframe select will just select the specified columns from that dataframe
This enables you to extract columns from a cetnral dataframe. Note there are two available approaches: 1. Specify initial_schema to get a set of columns extracted already 2. Specify dataframe_subdag_param to extract them yourself
1. Removes dataframe parameter in favor of using the function's first parameter 2. Renames the dataframe_subdag_param parameter to pass_dataframe_as 3. Renames the initial_schema parameter as columns_to_pass We also update the README to be a little easier to run.
These are in an example. We don't have a notebook yet (as getting the data is a pain and I don't want to link our copy due to licensing), but its great to demonstrate how it works.
5b64362
to
a1ac01d
Compare
hamilton/experimental/h_spark.py
Outdated
if return_type in (int, float, bool, str, bytes): | ||
return python_to_spark_type(return_type) | ||
elif return_type in (list[int], list[float], list[bool], list[str], list[bytes]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should reference _list
above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else this will break for python < 3.9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this was lost in the merge
5696e81
to
49ed8d6
Compare
[Short description explaining the high-level reason for the pull request]
Changes
How I tested this
Notes
Checklist