From 0d3770495150660cd9045f3d64ae91d409b5e82f Mon Sep 17 00:00:00 2001 From: bryangalindo Date: Sun, 10 Sep 2023 23:34:11 -0400 Subject: [PATCH 1/2] Adds Dask Jupyter Notebook example for e.g., data scientists --- examples/dask/hello_world/notebook.ipynb | 340 +++++++++++++++++++++++ 1 file changed, 340 insertions(+) create mode 100644 examples/dask/hello_world/notebook.ipynb diff --git a/examples/dask/hello_world/notebook.ipynb b/examples/dask/hello_world/notebook.ipynb new file mode 100644 index 000000000..60d6000b6 --- /dev/null +++ b/examples/dask/hello_world/notebook.ipynb @@ -0,0 +1,340 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "Uncomment and run the cell below if you are in a Google Colab environment. It will:\n", + "1. Mount google drive. You will be asked to authenticate and give permissions.\n", + "2. Change directory to google drive.\n", + "3. Make a directory \"hamilton-tutorials\"\n", + "4. Change directory to it.\n", + "5. Clone this repository to your google drive\n", + "6. Move your current directory to the hello_world example\n", + "7. Install requirements.\n", + "\n", + "This means that any modifications will be saved, and you won't lose them if you close your browser." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "## 1. Mount google drive\n", + "# from google.colab import drive\n", + "# drive.mount('/content/drive')\n", + "## 2. Change directory to google drive.\n", + "# %cd /content/drive/MyDrive\n", + "## 3. Make a directory \"hamilton-tutorials\"\n", + "# !mkdir hamilton-tutorials\n", + "## 4. Change directory to it.\n", + "# %cd hamilton-tutorials\n", + "## 5. Clone this repository to your google drive\n", + "# !git clone https://github.com/DAGWorks-Inc/hamilton/\n", + "## 6. Move your current directory to the hello_world example\n", + "# %cd hamilton/examples/hello_world\n", + "## 7. Install requirements.\n", + "# %pip install -r requirements.txt\n", + "# clear_output() # optionally clear outputs\n", + "# To check your current working directory you can type `!pwd` in a cell and run it." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Cell 2 - import modules to create part of the DAG from\n", + "# We use the autoreload extension that comes with ipython to automatically reload modules when\n", + "# the code in them changes.\n", + "\n", + "# import the jupyter extension\n", + "%load_ext autoreload\n", + "# set it to only reload the modules imported\n", + "%autoreload 1" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Import modules\n", + "import pandas as pd\n", + "from dask import dataframe\n", + "from dask.distributed import (\n", + " Client,\n", + " LocalCluster,\n", + ")\n", + "\n", + "from hamilton import (\n", + " ad_hoc_utils,\n", + " driver,\n", + ")\n", + "from hamilton.plugins import h_dask" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# We'll place the data loaders into a new module\n", + "\n", + "def spend(spend_location: str, spend_partitions: int) -> dataframe.Series:\n", + " \"\"\"Dummy function showing how to wire through loading data.\n", + "\n", + " :param spend_location:\n", + " :param spend_partitions: number of partitions to segment the data into\n", + " :return:\n", + " \"\"\"\n", + " return dataframe.from_pandas(\n", + " pd.Series([10, 10, 20, 40, 40, 50]), name=\"spend\", npartitions=spend_partitions\n", + " )\n", + "\n", + "\n", + "def signups(signups_location: str, signups_partitions: int) -> dataframe.Series:\n", + " \"\"\"Dummy function showing how to wire through loading data.\n", + "\n", + " :param signups_location:\n", + " :param signups_partitions: number of partitions to segment the data into\n", + " :return:\n", + " \"\"\"\n", + " return dataframe.from_pandas(\n", + " pd.Series([1, 10, 50, 100, 200, 400]), name=\"signups\", npartitions=signups_partitions\n", + " )\n", + "\n", + "data_loaders = ad_hoc_utils.create_temporary_module(\n", + " spend,\n", + " signups,\n", + " module_name=\"data_loaders\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# We'll place the spend calculations into a new module\n", + "\n", + "def avg_3wk_spend(spend: pd.Series) -> pd.Series:\n", + " \"\"\"Rolling 3 week average spend.\"\"\"\n", + " return spend.rolling(3).mean()\n", + "\n", + "\n", + "def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:\n", + " \"\"\"The cost per signup in relation to spend.\"\"\"\n", + " return spend / signups\n", + "\n", + "\n", + "def spend_mean(spend: pd.Series) -> float:\n", + " \"\"\"Shows function creating a scalar. In this case it computes the mean of the entire column.\"\"\"\n", + " return spend.mean()\n", + "\n", + "\n", + "def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:\n", + " \"\"\"Shows function that takes a scalar. In this case to zero mean spend.\"\"\"\n", + " return spend - spend_mean\n", + "\n", + "\n", + "def spend_std_dev(spend: pd.Series) -> float:\n", + " \"\"\"Function that computes the standard deviation of the spend column.\"\"\"\n", + " return spend.std()\n", + "\n", + "\n", + "def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:\n", + " \"\"\"Function showing one way to make spend have zero mean and unit variance.\"\"\"\n", + " return spend_zero_mean / spend_std_dev\n", + "\n", + "spend_calculations = ad_hoc_utils.create_temporary_module(\n", + " avg_3wk_spend,\n", + " spend_per_signup,\n", + " spend_mean,\n", + " spend_zero_mean,\n", + " spend_std_dev,\n", + " spend_zero_mean_unit_variance,\n", + " module_name=\"spend_calculations\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Note: Hamilton collects completely anonymous data about usage. This will help us improve Hamilton over time. See https://github.com/dagworks-inc/hamilton#usage-analytics--data-privacy for details.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "LocalCluster(c13c9b83, 'tcp://127.0.0.1:56333', workers=4, threads=8, memory=16.00 GiB)\n" + ] + } + ], + "source": [ + "# Set up the driver, input and output columns\n", + "\n", + "cluster = LocalCluster()\n", + "client = Client(cluster)\n", + "\n", + "print(client.cluster)\n", + "\n", + "adapter = h_dask.DaskGraphAdapter(\n", + " client,\n", + " h_dask.DaskDataFrameResult(),\n", + " visualize_kwargs={\"filename\": \"run_dask.png\", \"format\": \"png\"},\n", + " use_delayed=False,\n", + " compute_at_end=False,\n", + ")\n", + "\n", + "config = {\n", + " \"spend_location\": \"some file path\",\n", + " \"spend_partitions\": 2,\n", + " \"signups_location\": \"some file path\",\n", + " \"signups_partitions\": 2,\n", + " \"foobar\": \"some_other_data\",\n", + "}\n", + "\n", + "dr = driver.Driver(config, spend_calculations, data_loaders, adapter=adapter)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " spend signups avg_3wk_spend spend_per_signup spend_mean spend_zero_mean_unit_variance foobar\n", + "0 10 1 NaN 10.000 28.333333 -1.064405 some_other_data\n", + "1 10 10 NaN 1.000 28.333333 -1.064405 some_other_data\n", + "2 20 50 13.333333 0.400 28.333333 -0.483821 some_other_data\n", + "3 40 100 23.333333 0.400 28.333333 0.677349 some_other_data\n", + "4 40 200 33.333333 0.200 28.333333 0.677349 some_other_data\n", + "5 50 400 43.333333 0.125 28.333333 1.257934 some_other_data\n" + ] + } + ], + "source": [ + "# Execute the driver.\n", + "\n", + "output_columns = [\n", + " \"spend\",\n", + " \"signups\",\n", + " \"avg_3wk_spend\",\n", + " \"spend_per_signup\",\n", + " \"spend_mean\",\n", + " \"spend_zero_mean_unit_variance\",\n", + " \"foobar\",\n", + "]\n", + "\n", + "dask_df = dr.execute(output_columns) # it's dask dataframe -- it hasn't been evaluated yet.\n", + "df = dask_df.compute()\n", + "\n", + "# To visualize do `pip install \"sf-hamilton[visualization]\"` if you want these to work\n", + "dr.visualize_execution(output_columns, './hello_world_dask', {\"format\": \"png\"})\n", + "dr.display_all_functions(\"./my_full_dag.dot\")\n", + "print(df.to_string())\n", + "client.shutdown()\n", + "\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.5" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} From 9b5214f428081a8f6da27c11b56c82ae0ca50bb2 Mon Sep 17 00:00:00 2001 From: bryangalindo Date: Tue, 12 Sep 2023 00:40:31 -0400 Subject: [PATCH 2/2] Updates temp module with %%writefile to make prod push easier --- examples/dask/hello_world/notebook.ipynb | 286 +++++++++++++++++++---- 1 file changed, 245 insertions(+), 41 deletions(-) diff --git a/examples/dask/hello_world/notebook.ipynb b/examples/dask/hello_world/notebook.ipynb index 60d6000b6..cc36baf1d 100644 --- a/examples/dask/hello_world/notebook.ipynb +++ b/examples/dask/hello_world/notebook.ipynb @@ -93,6 +93,7 @@ "outputs": [], "source": [ "# Import modules\n", + "\n", "import pandas as pd\n", "from dask import dataframe\n", "from dask.distributed import (\n", @@ -100,10 +101,7 @@ " LocalCluster,\n", ")\n", "\n", - "from hamilton import (\n", - " ad_hoc_utils,\n", - " driver,\n", - ")\n", + "from hamilton import driver\n", "from hamilton.plugins import h_dask" ] }, @@ -119,8 +117,21 @@ "name": "#%%\n" } }, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Overwriting data_loaders.py\n" + ] + } + ], "source": [ + "%%writefile data_loaders.py\n", + "\n", + "import pandas as pd\n", + "from dask import dataframe\n", + "\n", "# We'll place the data loaders into a new module\n", "\n", "def spend(spend_location: str, spend_partitions: int) -> dataframe.Series:\n", @@ -144,13 +155,7 @@ " \"\"\"\n", " return dataframe.from_pandas(\n", " pd.Series([1, 10, 50, 100, 200, 400]), name=\"signups\", npartitions=signups_partitions\n", - " )\n", - "\n", - "data_loaders = ad_hoc_utils.create_temporary_module(\n", - " spend,\n", - " signups,\n", - " module_name=\"data_loaders\",\n", - ")" + " )" ] }, { @@ -165,10 +170,22 @@ "name": "#%%\n" } }, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Overwriting spend_calculations.py\n" + ] + } + ], "source": [ + "%%writefile spend_calculations.py\n", + "\n", "# We'll place the spend calculations into a new module\n", "\n", + "import pandas as pd\n", + "\n", "def avg_3wk_spend(spend: pd.Series) -> pd.Series:\n", " \"\"\"Rolling 3 week average spend.\"\"\"\n", " return spend.rolling(3).mean()\n", @@ -196,17 +213,7 @@ "\n", "def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:\n", " \"\"\"Function showing one way to make spend have zero mean and unit variance.\"\"\"\n", - " return spend_zero_mean / spend_std_dev\n", - "\n", - "spend_calculations = ad_hoc_utils.create_temporary_module(\n", - " avg_3wk_spend,\n", - " spend_per_signup,\n", - " spend_mean,\n", - " spend_zero_mean,\n", - " spend_std_dev,\n", - " spend_zero_mean_unit_variance,\n", - " module_name=\"spend_calculations\",\n", - ")" + " return spend_zero_mean / spend_std_dev" ] }, { @@ -233,12 +240,14 @@ "name": "stdout", "output_type": "stream", "text": [ - "LocalCluster(c13c9b83, 'tcp://127.0.0.1:56333', workers=4, threads=8, memory=16.00 GiB)\n" + "LocalCluster(94f8c394, 'tcp://127.0.0.1:50070', workers=4, threads=8, memory=16.00 GiB)\n" ] } ], "source": [ - "# Set up the driver, input and output columns\n", + "%aimport data_loaders, spend_calculations\n", + "\n", + "# Set up the local Dask cluster, adapter, and driver.\n", "\n", "cluster = LocalCluster()\n", "client = Client(cluster)\n", @@ -278,17 +287,184 @@ }, "outputs": [ { - "name": "stdout", - "output_type": "stream", - "text": [ - " spend signups avg_3wk_spend spend_per_signup spend_mean spend_zero_mean_unit_variance foobar\n", - "0 10 1 NaN 10.000 28.333333 -1.064405 some_other_data\n", - "1 10 10 NaN 1.000 28.333333 -1.064405 some_other_data\n", - "2 20 50 13.333333 0.400 28.333333 -0.483821 some_other_data\n", - "3 40 100 23.333333 0.400 28.333333 0.677349 some_other_data\n", - "4 40 200 33.333333 0.200 28.333333 0.677349 some_other_data\n", - "5 50 400 43.333333 0.125 28.333333 1.257934 some_other_data\n" - ] + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "signups\n", + "\n", + "signups\n", + "\n", + "\n", + "\n", + "spend_per_signup\n", + "\n", + "spend_per_signup\n", + "\n", + "\n", + "\n", + "signups->spend_per_signup\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "spend_partitions\n", + "\n", + "Input: spend_partitions\n", + "\n", + "\n", + "\n", + "spend\n", + "\n", + "spend\n", + "\n", + "\n", + "\n", + "spend_partitions->spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "spend_std_dev\n", + "\n", + "spend_std_dev\n", + "\n", + "\n", + "\n", + "spend_zero_mean_unit_variance\n", + "\n", + "spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "spend_std_dev->spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "spend->spend_std_dev\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "spend->spend_per_signup\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "spend_mean\n", + "\n", + "spend_mean\n", + "\n", + "\n", + "\n", + "spend->spend_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "avg_3wk_spend\n", + "\n", + "avg_3wk_spend\n", + "\n", + "\n", + "\n", + "spend->avg_3wk_spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "spend_zero_mean\n", + "\n", + "spend_zero_mean\n", + "\n", + "\n", + "\n", + "spend->spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "signups_location\n", + "\n", + "Input: signups_location\n", + "\n", + "\n", + "\n", + "signups_location->signups\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "foobar\n", + "\n", + "Input: foobar\n", + "\n", + "\n", + "\n", + "spend_mean->spend_zero_mean\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "spend_location\n", + "\n", + "Input: spend_location\n", + "\n", + "\n", + "\n", + "spend_location->spend\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "signups_partitions\n", + "\n", + "Input: signups_partitions\n", + "\n", + "\n", + "\n", + "signups_partitions->signups\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "spend_zero_mean->spend_zero_mean_unit_variance\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" } ], "source": [ @@ -308,11 +484,39 @@ "df = dask_df.compute()\n", "\n", "# To visualize do `pip install \"sf-hamilton[visualization]\"` if you want these to work\n", - "dr.visualize_execution(output_columns, './hello_world_dask', {\"format\": \"png\"})\n", - "dr.display_all_functions(\"./my_full_dag.dot\")\n", + "dr.visualize_execution(output_columns)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " spend signups avg_3wk_spend spend_per_signup spend_mean spend_zero_mean_unit_variance foobar\n", + "0 10 1 NaN 10.000 28.333333 -1.064405 some_other_data\n", + "1 10 10 NaN 1.000 28.333333 -1.064405 some_other_data\n", + "2 20 50 13.333333 0.400 28.333333 -0.483821 some_other_data\n", + "3 40 100 23.333333 0.400 28.333333 0.677349 some_other_data\n", + "4 40 200 33.333333 0.200 28.333333 0.677349 some_other_data\n", + "5 50 400 43.333333 0.125 28.333333 1.257934 some_other_data\n" + ] + } + ], + "source": [ "print(df.to_string())\n", - "client.shutdown()\n", - "\n" + "client.shutdown()" ] } ],