-
Notifications
You must be signed in to change notification settings - Fork 203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Python UDFs in distributed queries #173
Comments
Fancy! |
Would the plan be to focus on Python UDFs that don't have dependencies? I don't think I've ever seen a good story for taking Python code from one place and executing it in another if there are 3rd party dependencies involved. Would you folks consider some sort of HTTP UDF as well? BigQuery supports a similar feature. It could probably be done better here, e.g. by transmitting over the wire in a more efficient format and providing tooling to deserialize on the other end (maybe DataFusion?). |
Unfortunately, I do not have the Python skills to answer the question. I think we can learn from how Dask does this? I wonder if @jdye64 can offer any high-level guidance. |
Another approach here is to implement a custom executor process that wraps Ballista and takes care of registering custom Python code as UDFs so that we don't have to worry about trying to send code over the network. |
Yes, @adriangb is right. Much pain comes in trying to serialize and execute python code on remote nodes that have dependencies. This has been the case since even for Hive UDFs back years ago. The python ecosystem as a whole is one that relies heavily on existing dependencies. Therefore I think if we can come up with a straightforward method for ensuring all of the executors have a valid virtual environment with all the dependencies required by the UDF installed we should be good. This is the approach we take in some parts of Dask for example. So maybe as part of the Python UDF registration we require a "list" of dependencies that are required by the UDF. When the executor server starts up it could create that virtual env, through pip or conda or whatever is chosen, and installed those dependencies. Think of it like a executor server bootstrapping process. Then when any sql queries are submitted the UDF can be serialized and sent to the executor, once there the UDF can be executed in that virtual environment. Couple of thoughts
|
Agreed that if you want to allow custom Python code to run you need to allow 3rd party dependencies. But 3rd party dependencies are a whole big can of worms in Python, to the point where I would avoid opening it if you can. Hence the suggestion for HTTP UDFs which also have other benefits/use cases. I think a key aspect of this is to allow users to stick to the workflows they know instead of having to build a new one. For example as a data engineer/backend dev I manage a large project with multiple deployable artifacts that get bundled into containers and deployed on k8s. I have the knowledge and infrastructure in place to handle all of the complexities involved in this (e.g. locking dependencies across deployable alá Cargo workspaces). Any sort of new dependency management paradigm that does not fit in with this is extra work and possibly a source of bugs. That includes the SSH into a node model and Airflow's same dependency everywhere model 🙃. I think a good model would be a sort of "UDF executor web framework" that does some hand-holding but ultimately leaves packaging and dependencies up to users: from ballista import UDFExecutorApp, udf
from pyarrow import RecordBatchReader
@udf.aggregator(name="override_the_name")
def some_aggregation_function(reader: RecordBatchReader) -> Iterable[RecordBatch]:
...
def main() -> None:
app = UDFExecutorApp([some_aggregation_function])
app.serve(scheduler_host=...) I'm just making something up here but the point is to keep a relatively familiar Flask-style API but abstract away the fiddly bits. This would register itself with the scheduler as being able to execute |
I have no hands-on experience with the HTTP UDFs model but am intrigued by the approach. Thank you for laying out your thoughts. Had some thoughts and questions. Things I really like:
Concerns/Questions (mostly because of my lack of experience here)
|
I was thinking "entirely new process".
With this example above I'd say it can be Arrow flight or any other communication protocol, I'm not constraining it to HTTP.
Yeah this I don't know about. Would the data be currently residing in the Executor? If so the only way to do everything in memory would be to run these UDFs on the Executor itself. Which gets into the complication of dependency management. Could users wrap the executor itself? Maybe when |
In Apache Spark, we utilize Python PEX executables that are built with all necessary dependencies. When an executor is launched, it downloads the PEX file during initialization, which is then used to execute Python UDF code. |
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
I would like to be able to execute queries containing custom Python code. This is already supported in DataFusion but we need to add the serde aspect in Ballista.
Describe the solution you'd like
It looks like we can serialize Python functions with https://docs.rs/pyo3/0.7.0/pyo3/marshal/index.html
We then need to store the serialied Python functions in protobuf and deserialize them in the executors.
Describe alternatives you've considered
None
Additional context
None
The text was updated successfully, but these errors were encountered: