Skip to content

Commit

Permalink
Moves all scaling related constructs out of experimental
Browse files Browse the repository at this point in the history
- h_spark
- h_dask
- h_ray

All move to plugins. We preserve the name `h_...` to avoid duplicate
imports from the library themselves. Note that we also deprecate the
idea of having "_implementations". People will be importing these, so we
want them name to be easy to refer to/remember. This is why
polars_implementations is deprecated, and we instead use h_polars.

We leave in references to all previously released constructs (note,
the new pyspark API has not been released so its OK to remove that from
the experimental.h_spark file).
  • Loading branch information
elijahbenizzy committed Aug 22, 2023
1 parent 71ca9d1 commit c2720cd
Show file tree
Hide file tree
Showing 69 changed files with 1,961 additions and 1,918 deletions.
2 changes: 1 addition & 1 deletion .ci/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fi

if [[ ${TASK} == "async" ]]; then
pip install \
-r graph_adapter_tests/h_async/requirements-test.txt
-r plugin_tests/h_async/requirements-test.txt
fi

if [[ ${TASK} == "pyspark" ]]; then
Expand Down
8 changes: 4 additions & 4 deletions .ci/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ source "${HOME}/venvs/hamilton-venv/bin/activate"

if [[ ${TASK} == "async" ]]; then
pip install .
pytest graph_adapter_tests/h_async
pytest plugin_tests/h_async
exit 0
fi

if [[ ${TASK} == "dask" ]]; then
pip install -e '.[dask]'
pytest graph_adapter_tests/h_dask
pytest plugin_tests/h_dask
exit 0
fi

Expand All @@ -33,14 +33,14 @@ fi

if [[ ${TASK} == "ray" ]]; then
pip install -e '.[ray]'
pytest graph_adapter_tests/h_ray
pytest plugin_tests/h_ray
exit 0
fi

if [[ ${TASK} == "pyspark" ]]; then
pip install -e '.[pyspark]'
pip install 'numpy<1.24.0' # downgrade until spark fixes their bug
pytest graph_adapter_tests/h_spark
pytest plugin_tests/h_spark
exit 0
fi

Expand Down
1 change: 1 addition & 0 deletions docs/data_adapters_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"hamilton.io.default_data_loaders",
"hamilton.plugins.pandas_extensions",
"hamilton.plugins.polars_extensions",
"hamilton.plugins.spark_extensions",
]

for module in MODULES_TO_IMPORT:
Expand Down
23 changes: 7 additions & 16 deletions docs/extensions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ All that's needed is to:
.. code-block:: python
from hamilton import driver
from hamilton.experimental import h_dask # import the correct module
from hamilton.plugins import h_dask # import the correct module
from dask.distributed import Client # import the distributed system of choice
client = Client(...) # instantiate the specific client
Expand All @@ -50,17 +50,8 @@ All that's needed is to:
See :doc:`reference/graph-adapters/index` and :doc:`reference/api-extensions/custom-graph-adapters`
for options.

A note on the definition of `Experimental`
==========================================

TL;DR: the code is stable, but it needs more bake time & feedback!

The following implementations are considered experimental because they require more production bake time. Anything in
Hamilton in the ``experimental`` package, should be considered changeable, i.e. their APIs might change, but we'll
endeavor to ensure backwards compatible changes when they can be accommodated.

Ray - Experimental!
===================
Ray
===

`Ray <https://ray.io>`_ is a system to scale python workloads. Hamilton makes it very easy for you to use Ray.

Expand All @@ -82,8 +73,8 @@ If you have a Ray cluster setup, then you can farm out Hamilton computation to i
compute, and the potential to scale to large data set sizes, however, you'll be limited to the size of a single machine
in terms of the amount of data it can process.

Dask - Experimental!
====================
Dask
====

`Ray <https://ray.io>`_ is a system to scale python workloads. Hamilton makes it very easy for you to use Ray.

Expand All @@ -103,8 +94,8 @@ Distributed Computation:
If you have a Dask cluster setup, then you can farm out Hamilton computation to it. This enables lots of parallel
compute, and the ability to scale to petabyte scale data set sizes.

Koalas on Spark, a.k.a. Pandas API on Spark - Experimental!
===========================================================
Koalas on Spark, a.k.a. Pandas API on Spark
===========================================

`Spark <https://spark.apache.org/>`_ is a scalable data processing framework. `Koalas <https://koalas.readthedocs.io/en/latest>`_
was the project code name to implement the \
Expand Down
5 changes: 4 additions & 1 deletion docs/reference/api-extensions/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ API Extensions
custom-drivers
custom-decorators

Currently the API extensions are all experimental. Note this doesn't mean they're not well-tested or thought out --
Currently a few of the API extensions are still experimental. Note this doesn't mean they're not well-tested or thought out --
rather that we're actively looking for feedback. More docs upcoming, but for now fish around the
`experimental package <https://github.com/dagworks-inc/hamilton/tree/main/hamilton/experimental>`_, and give the
extensions a try!

The other extensions live within `plugins <https://github.com/dagworks-inc/hamilton/tree/main/hamilton/plugins>`_. These are
fully supported and will be backwards compatible across major versions.
7 changes: 3 additions & 4 deletions docs/reference/graph-adapters/DaskGraphAdapter.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
=======================
h_dask.DaskGraphAdapter
=======================
This is an experimental GraphAdapter; there is a possibility of their API changing. That said, the code is stable,
and you should feel comfortable giving the code for a spin - let us know how it goes, and what the rough edges are if
you find any. We'd love feedback if you are using these to know how to improve them or graduate them.

Runs the entire Hamilton DAG on dask.

.. autoclass:: hamilton.experimental.h_dask.DaskGraphAdapter

.. autoclass:: hamilton.plugins.h_dask.DaskGraphAdapter
:special-members: __init__
:members:
:inherited-members:
2 changes: 1 addition & 1 deletion docs/reference/graph-adapters/PySparkUDFGraphAdapter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ you find any. We'd love feedback if you are using these to know how to improve t



.. autoclass:: hamilton.experimental.h_spark.PySparkUDFGraphAdapter
.. autoclass:: hamilton.plugins.h_spark.PySparkUDFGraphAdapter
:special-members: __init__
:members:
6 changes: 2 additions & 4 deletions docs/reference/graph-adapters/RayGraphAdapter.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
=======================
h_ray.RayGraphAdapter
=======================
This is an experimental GraphAdapter; there is a possibility of their API changing. That said, the code is stable,
and you should feel comfortable giving the code for a spin - let us know how it goes, and what the rough edges are if
you find any. We'd love feedback if you are using these to know how to improve them or graduate them.

The graph adapter to delegate execution of the individual nodes in a Hamilton graph to Ray.

.. autoclass:: hamilton.experimental.h_ray.RayGraphAdapter
.. autoclass:: hamilton.plugins.h_ray.RayGraphAdapter
:special-members: __init__
:members:
:inherited-members:
6 changes: 2 additions & 4 deletions docs/reference/graph-adapters/RayWorkflowGraphAdapter.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
=============================
h_ray.RayWorkflowGraphAdapter
=============================
This is an experimental GraphAdapter; there is a possibility of their API changing. That said, the code is stable,
and you should feel comfortable giving the code for a spin - let us know how it goes, and what the rough edges are if
you find any. We'd love feedback if you are using these to know how to improve them or graduate them.
A Graph Adapter for delegating the execution of hamilton nodes to Ray.


.. autoclass:: hamilton.experimental.h_ray.RayWorkflowGraphAdapter
.. autoclass:: hamilton.plugins.h_ray.RayWorkflowGraphAdapter
:special-members: __init__
:members:
:inherited-members:
2 changes: 1 addition & 1 deletion docs/reference/graph-adapters/SparkKoalasGraphAdapter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ and you should feel comfortable giving the code for a spin - let us know how it
you find any. We'd love feedback if you are using these to know how to improve them or graduate them.


.. autoclass:: hamilton.experimental.h_spark.SparkKoalasGraphAdapter
.. autoclass:: hamilton.plugins.h_spark.SparkKoalasGraphAdapter
:special-members: __init__
:members:
:inherited-members:
2 changes: 1 addition & 1 deletion docs/reference/result-builders/Dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ Dask
=======================


.. autoclass:: hamilton.experimental.h_dask.DaskDataFrameResult
.. autoclass:: hamilton.plugins.h_dask.DaskDataFrameResult
:members: build_result
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"from pyspark.sql import SparkSession\n",
"\n",
"from hamilton import driver, log_setup\n",
"from hamilton.experimental import h_spark"
"from hamilton.plugins import h_spark"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion examples/LLM_Workflows/pdf_summarizer/run_on_spark/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pyspark.sql import SparkSession

from hamilton import driver, log_setup
from hamilton.experimental import h_spark
from hamilton.plugins import h_spark


def my_spark_job(spark: SparkSession, openai_gpt_model: str, content_type: str, user_query: str):
Expand Down
2 changes: 1 addition & 1 deletion examples/dask/community_demo/demo_day_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@
"# Dask graph adapter -- let's distribute the functions!\n",
"import logging\n",
"from hamilton import base, driver\n",
"from hamilton.experimental import h_dask\n",
"from hamilton.plugins import h_dask\n",
"from dask.distributed import Client, LocalCluster\n",
"logger = logging.getLogger(\"notebook_logger\")\n",
"# Setup a local cluster.\n",
Expand Down
2 changes: 1 addition & 1 deletion examples/dask/hello_world/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys

from hamilton import driver
from hamilton.experimental import h_dask
from hamilton.plugins import h_dask

logger = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion examples/dask/hello_world/run_with_delayed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas as pd

from hamilton import base, driver
from hamilton.experimental import h_dask
from hamilton.plugins import h_dask

logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys

from hamilton import driver
from hamilton.experimental import h_dask
from hamilton.plugins import h_dask

logger = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion examples/data_quality/pandera/run_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from dask.distributed import Client, LocalCluster

from hamilton import base, driver
from hamilton.experimental import h_dask
from hamilton.plugins import h_dask

if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
Expand Down
2 changes: 1 addition & 1 deletion examples/data_quality/pandera/run_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import ray

from hamilton import base, driver
from hamilton.experimental import h_ray
from hamilton.plugins import h_ray

if __name__ == "__main__":

Expand Down
2 changes: 1 addition & 1 deletion examples/data_quality/pandera/run_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pyspark.sql import SparkSession

from hamilton import base, driver, log_setup
from hamilton.experimental import h_spark
from hamilton.plugins import h_spark

if __name__ == "__main__":
log_setup.setup_logging(log_level=log_setup.LOG_LEVELS["INFO"])
Expand Down
2 changes: 1 addition & 1 deletion examples/data_quality/simple/run_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from dask.distributed import Client, LocalCluster

from hamilton import base, driver
from hamilton.experimental import h_dask
from hamilton.plugins import h_dask

if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
Expand Down
2 changes: 1 addition & 1 deletion examples/data_quality/simple/run_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import ray

from hamilton import base, driver
from hamilton.experimental import h_ray
from hamilton.plugins import h_ray

if __name__ == "__main__":

Expand Down
2 changes: 1 addition & 1 deletion examples/data_quality/simple/run_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pyspark.sql import SparkSession

from hamilton import base, driver, log_setup
from hamilton.experimental import h_spark
from hamilton.plugins import h_spark

if __name__ == "__main__":
log_setup.setup_logging(log_level=log_setup.LOG_LEVELS["INFO"])
Expand Down
2 changes: 1 addition & 1 deletion examples/parallelism/file_processing/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from hamilton import driver, log_setup
from hamilton.execution import executors
from hamilton.experimental import h_dask, h_ray
from hamilton.plugins import h_dask, h_ray

log_setup.setup_logging(logging.INFO)

Expand Down
4 changes: 2 additions & 2 deletions examples/parallelism/star_counting/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def _get_executor(mode: str):
elif mode == "dask":
from dask import distributed

from hamilton.experimental import h_dask
from hamilton.plugins import h_dask

cluster = distributed.LocalCluster()
client = distributed.Client(cluster)
Expand All @@ -29,7 +29,7 @@ def _get_executor(mode: str):
else:
import ray

from hamilton.experimental import h_ray
from hamilton.plugins import h_ray

remote_executor = h_ray.RayTaskExecutor(num_cpus=4)
shutdown = ray.shutdown
Expand Down
2 changes: 1 addition & 1 deletion examples/ray/hello_world/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import ray

from hamilton import base, driver, log_setup
from hamilton.experimental import h_ray
from hamilton.plugins import h_ray

if __name__ == "__main__":
log_setup.setup_logging()
Expand Down
2 changes: 1 addition & 1 deletion examples/ray/hello_world/run_rayworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from ray import workflow

from hamilton import base, driver, log_setup
from hamilton.experimental import h_ray
from hamilton.plugins import h_ray

if __name__ == "__main__":
log_setup.setup_logging()
Expand Down
2 changes: 1 addition & 1 deletion examples/spark/pandas_on_spark/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pyspark.sql import SparkSession

from hamilton import base, driver, log_setup
from hamilton.experimental import h_spark
from hamilton.plugins import h_spark

if __name__ == "__main__":
log_setup.setup_logging(log_level=log_setup.LOG_LEVELS["INFO"])
Expand Down
2 changes: 1 addition & 1 deletion examples/spark/pyspark/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import pyspark.sql as ps
from pyspark.sql.functions import col, mean, stddev

from hamilton.experimental import h_spark
from hamilton.function_modifiers import extract_fields
from hamilton.plugins import h_spark


def spark_session() -> ps.SparkSession:
Expand Down
2 changes: 1 addition & 1 deletion examples/spark/pyspark_udfs/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pyspark.sql import functions as F

from hamilton import driver, log_setup
from hamilton.experimental import h_spark
from hamilton.plugins import h_spark


def create_hamilton_driver(config: dict, modules: list) -> driver.Driver:
Expand Down
2 changes: 1 addition & 1 deletion examples/spark/tpc-h/query_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pyspark.sql import functions as F

from hamilton import htypes
from hamilton.experimental import h_spark
from hamilton.plugins import h_spark


# See https://github.com/dragansah/tpch-dbgen/blob/master/tpch-queries/1.sql
Expand Down
2 changes: 1 addition & 1 deletion examples/spark/tpc-h/query_12.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pyspark.sql import functions as F

from hamilton import htypes
from hamilton.experimental import h_spark
from hamilton.plugins import h_spark

# see # See # See https://github.com/dragansah/tpch-dbgen/blob/master/tpch-queries/12.sql

Expand Down
2 changes: 1 addition & 1 deletion examples/spark/tpc-h/query_8.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# See # See https://github.com/dragansah/tpch-dbgen/blob/master/tpch-queries/8.sql
from hamilton import htypes
from hamilton.experimental import h_spark
from hamilton.plugins import h_spark


def start_date() -> str:
Expand Down
2 changes: 1 addition & 1 deletion examples/spark/tpc-h/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import click

exec("from hamilton.experimental import h_spark")
exec("from hamilton.plugins import h_spark")
import csv_data_loaders
import pyspark
import query_1
Expand Down
Loading

0 comments on commit c2720cd

Please sign in to comment.