Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Dask Jupyter Notebook example for e.g., data scientists #337

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 340 additions & 0 deletions examples/dask/hello_world/notebook.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"source": [
"Uncomment and run the cell below if you are in a Google Colab environment. It will:\n",
"1. Mount google drive. You will be asked to authenticate and give permissions.\n",
"2. Change directory to google drive.\n",
"3. Make a directory \"hamilton-tutorials\"\n",
"4. Change directory to it.\n",
"5. Clone this repository to your google drive\n",
"6. Move your current directory to the hello_world example\n",
"7. Install requirements.\n",
"\n",
"This means that any modifications will be saved, and you won't lose them if you close your browser."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"## 1. Mount google drive\n",
"# from google.colab import drive\n",
"# drive.mount('/content/drive')\n",
"## 2. Change directory to google drive.\n",
"# %cd /content/drive/MyDrive\n",
"## 3. Make a directory \"hamilton-tutorials\"\n",
"# !mkdir hamilton-tutorials\n",
"## 4. Change directory to it.\n",
"# %cd hamilton-tutorials\n",
"## 5. Clone this repository to your google drive\n",
"# !git clone https://github.com/DAGWorks-Inc/hamilton/\n",
"## 6. Move your current directory to the hello_world example\n",
"# %cd hamilton/examples/hello_world\n",
"## 7. Install requirements.\n",
"# %pip install -r requirements.txt\n",
"# clear_output() # optionally clear outputs\n",
"# To check your current working directory you can type `!pwd` in a cell and run it."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"# Cell 2 - import modules to create part of the DAG from\n",
"# We use the autoreload extension that comes with ipython to automatically reload modules when\n",
"# the code in them changes.\n",
"\n",
"# import the jupyter extension\n",
"%load_ext autoreload\n",
"# set it to only reload the modules imported\n",
"%autoreload 1"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"# Import modules\n",
"import pandas as pd\n",
"from dask import dataframe\n",
"from dask.distributed import (\n",
" Client,\n",
" LocalCluster,\n",
")\n",
"\n",
"from hamilton import (\n",
" ad_hoc_utils,\n",
" driver,\n",
")\n",
"from hamilton.plugins import h_dask"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"# We'll place the data loaders into a new module\n",
"\n",
"def spend(spend_location: str, spend_partitions: int) -> dataframe.Series:\n",
" \"\"\"Dummy function showing how to wire through loading data.\n",
"\n",
" :param spend_location:\n",
" :param spend_partitions: number of partitions to segment the data into\n",
" :return:\n",
" \"\"\"\n",
" return dataframe.from_pandas(\n",
" pd.Series([10, 10, 20, 40, 40, 50]), name=\"spend\", npartitions=spend_partitions\n",
" )\n",
"\n",
"\n",
"def signups(signups_location: str, signups_partitions: int) -> dataframe.Series:\n",
" \"\"\"Dummy function showing how to wire through loading data.\n",
"\n",
" :param signups_location:\n",
" :param signups_partitions: number of partitions to segment the data into\n",
" :return:\n",
" \"\"\"\n",
" return dataframe.from_pandas(\n",
" pd.Series([1, 10, 50, 100, 200, 400]), name=\"signups\", npartitions=signups_partitions\n",
" )\n",
"\n",
"data_loaders = ad_hoc_utils.create_temporary_module(\n",
" spend,\n",
" signups,\n",
" module_name=\"data_loaders\",\n",
")"
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"# We'll place the spend calculations into a new module\n",
"\n",
"def avg_3wk_spend(spend: pd.Series) -> pd.Series:\n",
" \"\"\"Rolling 3 week average spend.\"\"\"\n",
" return spend.rolling(3).mean()\n",
"\n",
"\n",
"def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:\n",
" \"\"\"The cost per signup in relation to spend.\"\"\"\n",
" return spend / signups\n",
"\n",
"\n",
"def spend_mean(spend: pd.Series) -> float:\n",
" \"\"\"Shows function creating a scalar. In this case it computes the mean of the entire column.\"\"\"\n",
" return spend.mean()\n",
"\n",
"\n",
"def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:\n",
" \"\"\"Shows function that takes a scalar. In this case to zero mean spend.\"\"\"\n",
" return spend - spend_mean\n",
"\n",
"\n",
"def spend_std_dev(spend: pd.Series) -> float:\n",
" \"\"\"Function that computes the standard deviation of the spend column.\"\"\"\n",
" return spend.std()\n",
"\n",
"\n",
"def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:\n",
" \"\"\"Function showing one way to make spend have zero mean and unit variance.\"\"\"\n",
" return spend_zero_mean / spend_std_dev\n",
"\n",
"spend_calculations = ad_hoc_utils.create_temporary_module(\n",
" avg_3wk_spend,\n",
" spend_per_signup,\n",
" spend_mean,\n",
" spend_zero_mean,\n",
" spend_std_dev,\n",
" spend_zero_mean_unit_variance,\n",
" module_name=\"spend_calculations\",\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Note: Hamilton collects completely anonymous data about usage. This will help us improve Hamilton over time. See https://github.com/dagworks-inc/hamilton#usage-analytics--data-privacy for details.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"LocalCluster(c13c9b83, 'tcp://127.0.0.1:56333', workers=4, threads=8, memory=16.00 GiB)\n"
]
}
],
"source": [
"# Set up the driver, input and output columns\n",
"\n",
"cluster = LocalCluster()\n",
"client = Client(cluster)\n",
"\n",
"print(client.cluster)\n",
"\n",
"adapter = h_dask.DaskGraphAdapter(\n",
" client,\n",
" h_dask.DaskDataFrameResult(),\n",
" visualize_kwargs={\"filename\": \"run_dask.png\", \"format\": \"png\"},\n",
" use_delayed=False,\n",
" compute_at_end=False,\n",
")\n",
"\n",
"config = {\n",
" \"spend_location\": \"some file path\",\n",
" \"spend_partitions\": 2,\n",
" \"signups_location\": \"some file path\",\n",
" \"signups_partitions\": 2,\n",
" \"foobar\": \"some_other_data\",\n",
"}\n",
"\n",
"dr = driver.Driver(config, spend_calculations, data_loaders, adapter=adapter)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" spend signups avg_3wk_spend spend_per_signup spend_mean spend_zero_mean_unit_variance foobar\n",
"0 10 1 NaN 10.000 28.333333 -1.064405 some_other_data\n",
"1 10 10 NaN 1.000 28.333333 -1.064405 some_other_data\n",
"2 20 50 13.333333 0.400 28.333333 -0.483821 some_other_data\n",
"3 40 100 23.333333 0.400 28.333333 0.677349 some_other_data\n",
"4 40 200 33.333333 0.200 28.333333 0.677349 some_other_data\n",
"5 50 400 43.333333 0.125 28.333333 1.257934 some_other_data\n"
]
}
],
"source": [
"# Execute the driver.\n",
"\n",
"output_columns = [\n",
" \"spend\",\n",
" \"signups\",\n",
" \"avg_3wk_spend\",\n",
" \"spend_per_signup\",\n",
" \"spend_mean\",\n",
" \"spend_zero_mean_unit_variance\",\n",
" \"foobar\",\n",
"]\n",
"\n",
"dask_df = dr.execute(output_columns) # it's dask dataframe -- it hasn't been evaluated yet.\n",
"df = dask_df.compute()\n",
"\n",
"# To visualize do `pip install \"sf-hamilton[visualization]\"` if you want these to work\n",
"dr.visualize_execution(output_columns, './hello_world_dask', {\"format\": \"png\"})\n",
"dr.display_all_functions(\"./my_full_dag.dot\")\n",
"print(df.to_string())\n",
"client.shutdown()\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
}
},
"nbformat": 4,
"nbformat_minor": 4
}