diff --git a/examples/reusing_functions/reusing_functions.ipynb b/examples/reusing_functions/reusing_functions.ipynb new file mode 100644 index 000000000..83bfd42f0 --- /dev/null +++ b/examples/reusing_functions/reusing_functions.ipynb @@ -0,0 +1,1474 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Resusing Functions Example\n", + "\n", + "This notebook demonstrates the use of the subdag operator.\n", + "\n", + "The subdag operator allows you to effectively run a driver within a node.\n", + "In this case, we are calculating unique website visitors from the following set of parameters:\n", + "\n", + "1. Region = CA (canada) or US (United States)\n", + "2. Granularity of data = (day, week, month)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Pre-requisites" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You need to have the following installed and set up:\n", + "- Hamilton\n", + "- Pandas\n", + "\n", + "Execute the code commented below to install the above if you don't have them already installed." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "# %pip install sf-hamilton pandas" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Importing all the things you need\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import Any, Dict\n", + "import pandas as pd\n", + "from hamilton.base import ResultMixin, SimplePythonGraphAdapter\n", + "from hamilton import driver\n", + "%load_ext hamilton.plugins.jupyter_magic" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "`unique_users` module" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "filtered_interactions\n", + "\n", + "filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "unique_users\n", + "\n", + "unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "filtered_interactions->unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "_filtered_interactions_inputs\n", + "\n", + "region\n", + "str\n", + "website_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "_filtered_interactions_inputs->filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "_unique_users_inputs\n", + "\n", + "grain\n", + "str\n", + "\n", + "\n", + "\n", + "_unique_users_inputs->unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input\n", + "\n", + "input\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%%cell_to_module unique_users --display\n", + "\n", + "import pandas as pd\n", + "\n", + "_grain_mapping = {\"day\": \"D\", \"week\": \"W\", \"month\": \"M\"}\n", + "\n", + "\n", + "def _validate_grain(grain: str):\n", + " assert grain in [\"day\", \"week\", \"month\"]\n", + "\n", + "\n", + "def filtered_interactions(website_interactions: pd.DataFrame, region: str) -> pd.DataFrame:\n", + " return website_interactions[website_interactions.region == region]\n", + "\n", + "\n", + "def unique_users(filtered_interactions: pd.DataFrame, grain: str) -> pd.Series:\n", + " \"\"\"Gives the number of shares traded by the frequency\"\"\"\n", + " _validate_grain(grain)\n", + " return filtered_interactions.resample(_grain_mapping[grain])[\"user_id\"].nunique()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "`reusable_subdags` module" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US\n", + "\n", + "weekly_unique_users_US\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.grain\n", + "\n", + "monthly_unique_users_CA.grain\n", + "str\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.unique_users\n", + "\n", + "monthly_unique_users_CA.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.grain->monthly_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.unique_users\n", + "\n", + "daily_unique_users_US.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "daily_unique_users_US\n", + "\n", + "daily_unique_users_US\n", + "Series\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.unique_users->daily_unique_users_US\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.unique_users\n", + "\n", + "monthly_unique_users_US.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US\n", + "\n", + "monthly_unique_users_US\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.unique_users->monthly_unique_users_US\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA\n", + "\n", + "weekly_unique_users_CA\n", + "Series\n", + "\n", + "\n", + "\n", + "website_interactions\n", + "\n", + "website_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.filtered_interactions\n", + "\n", + "daily_unique_users_CA.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "website_interactions->daily_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.filtered_interactions\n", + "\n", + "weekly_unique_users_CA.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "website_interactions->weekly_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.filtered_interactions\n", + "\n", + "monthly_unique_users_US.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "website_interactions->monthly_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.filtered_interactions\n", + "\n", + "monthly_unique_users_CA.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "website_interactions->monthly_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.filtered_interactions\n", + "\n", + "weekly_unique_users_US.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "website_interactions->weekly_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.filtered_interactions\n", + "\n", + "daily_unique_users_US.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "website_interactions->daily_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.grain\n", + "\n", + "weekly_unique_users_CA.grain\n", + "str\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.unique_users\n", + "\n", + "weekly_unique_users_CA.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.grain->weekly_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.unique_users\n", + "\n", + "daily_unique_users_CA.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.filtered_interactions->daily_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.region\n", + "\n", + "monthly_unique_users_CA.region\n", + "str\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.region->monthly_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.unique_users\n", + "\n", + "weekly_unique_users_US.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.unique_users->weekly_unique_users_US\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA\n", + "\n", + "monthly_unique_users_CA\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.unique_users->monthly_unique_users_CA\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.filtered_interactions->weekly_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.region\n", + "\n", + "daily_unique_users_CA.region\n", + "str\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.region->daily_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.filtered_interactions->monthly_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.grain\n", + "\n", + "daily_unique_users_US.grain\n", + "str\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.grain->daily_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.filtered_interactions->monthly_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.grain\n", + "\n", + "weekly_unique_users_US.grain\n", + "str\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.grain->weekly_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.region\n", + "\n", + "monthly_unique_users_US.region\n", + "str\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.region->monthly_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA\n", + "\n", + "daily_unique_users_CA\n", + "Series\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.unique_users->daily_unique_users_CA\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.grain\n", + "\n", + "monthly_unique_users_US.grain\n", + "str\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.grain->monthly_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.grain\n", + "\n", + "daily_unique_users_CA.grain\n", + "str\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.grain->daily_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.region\n", + "\n", + "weekly_unique_users_US.region\n", + "str\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.region->weekly_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.region\n", + "\n", + "weekly_unique_users_CA.region\n", + "str\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.region->weekly_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.filtered_interactions->weekly_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.unique_users->weekly_unique_users_CA\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.region\n", + "\n", + "daily_unique_users_US.region\n", + "str\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.region->daily_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.filtered_interactions->daily_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%%cell_to_module reusable_subdags --display\n", + "import pandas as pd\n", + "import unique_users\n", + "\n", + "from hamilton.function_modifiers import subdag, value\n", + "\n", + "\n", + "def website_interactions() -> pd.DataFrame:\n", + " \"\"\"Gives event-driven data with a series\n", + "\n", + " :return: Some mock event data.\n", + " \"\"\"\n", + " data = [\n", + " (\"20220901-14:00:00\", 1, \"US\"),\n", + " (\"20220901-18:30:00\", 2, \"US\"),\n", + " (\"20220901-19:00:00\", 1, \"US\"),\n", + " (\"20220902-08:00:00\", 3, \"US\"),\n", + " (\"20220903-16:00:00\", 1, \"US\"),\n", + " (\"20220907-13:00:00\", 4, \"US\"),\n", + " (\"20220910-14:00:00\", 1, \"US\"),\n", + " (\"20220911-12:00:00\", 3, \"US\"),\n", + " (\"20220914-11:00:00\", 1, \"US\"),\n", + " (\"20220915-07:30:00\", 2, \"US\"),\n", + " (\"20220916-06:00:00\", 1, \"US\"),\n", + " (\"20220917-16:00:00\", 2, \"US\"),\n", + " (\"20220920-17:00:00\", 5, \"US\"),\n", + " (\"20220922-09:30:00\", 2, \"US\"),\n", + " (\"20220922-10:00:00\", 1, \"US\"),\n", + " (\"20220924-07:00:00\", 6, \"US\"),\n", + " (\"20220924-08:00:00\", 1, \"US\"),\n", + " (\"20220925-21:00:00\", 1, \"US\"),\n", + " (\"20220926-15:30:00\", 2, \"US\"),\n", + " (\"20220901-14:00:00\", 7, \"CA\"),\n", + " (\"20220901-18:30:00\", 8, \"CA\"),\n", + " (\"20220901-19:00:00\", 9, \"CA\"),\n", + " (\"20220902-08:00:00\", 7, \"CA\"),\n", + " (\"20220903-16:00:00\", 10, \"CA\"),\n", + " (\"20220907-13:00:00\", 9, \"CA\"),\n", + " (\"20220910-14:00:00\", 8, \"CA\"),\n", + " (\"20220911-12:00:00\", 11, \"CA\"),\n", + " (\"20220914-11:00:00\", 12, \"CA\"),\n", + " (\"20220915-07:30:00\", 7, \"CA\"),\n", + " (\"20220916-06:00:00\", 9, \"CA\"),\n", + " (\"20220917-16:00:00\", 10, \"CA\"),\n", + " (\"20220920-17:00:00\", 7, \"CA\"),\n", + " (\"20220922-09:30:00\", 11, \"CA\"),\n", + " (\"20220922-10:00:00\", 8, \"CA\"),\n", + " (\"20220924-07:00:00\", 9, \"CA\"),\n", + " (\"20220924-08:00:00\", 10, \"CA\"),\n", + " (\"20220925-21:00:00\", 13, \"CA\"),\n", + " (\"20220926-15:30:00\", 14, \"CA\"),\n", + " ]\n", + " df = (\n", + " pd.DataFrame(data, columns=[\"timestamp\", \"user_id\", \"region\"])\n", + " .set_index(\"timestamp\")\n", + " .sort_index()\n", + " )\n", + " df.index = pd.DatetimeIndex(df.index)\n", + " return df\n", + "\n", + "\n", + "@subdag(\n", + " unique_users,\n", + " inputs={\"grain\": value(\"day\")},\n", + " config={\"region\": \"US\"},\n", + ")\n", + "def daily_unique_users_US(unique_users: pd.Series) -> pd.Series:\n", + " return unique_users\n", + "\n", + "\n", + "@subdag(\n", + " unique_users,\n", + " inputs={\"grain\": value(\"week\")},\n", + " config={\"region\": \"US\"},\n", + ")\n", + "def weekly_unique_users_US(unique_users: pd.Series) -> pd.Series:\n", + " return unique_users\n", + "\n", + "\n", + "@subdag(\n", + " unique_users,\n", + " inputs={\"grain\": value(\"month\")},\n", + " config={\"region\": \"US\"},\n", + ")\n", + "def monthly_unique_users_US(unique_users: pd.Series) -> pd.Series:\n", + " return unique_users\n", + "\n", + "\n", + "@subdag(\n", + " unique_users,\n", + " inputs={\"grain\": value(\"day\")},\n", + " config={\"region\": \"CA\"},\n", + ")\n", + "def daily_unique_users_CA(unique_users: pd.Series) -> pd.Series:\n", + " return unique_users\n", + "\n", + "\n", + "@subdag(\n", + " unique_users,\n", + " inputs={\"grain\": value(\"week\")},\n", + " config={\"region\": \"CA\"},\n", + ")\n", + "def weekly_unique_users_CA(unique_users: pd.Series) -> pd.Series:\n", + " return unique_users\n", + "\n", + "\n", + "@subdag(\n", + " unique_users,\n", + " inputs={\"grain\": value(\"month\")},\n", + " config={\"region\": \"CA\"},\n", + ")\n", + "def monthly_unique_users_CA(unique_users: pd.Series) -> pd.Series:\n", + " return unique_users" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "C:\\Users\\obeng\\AppData\\Local\\Temp\\tmpvmsp6sci.py:18: FutureWarning: 'M' is deprecated and will be removed in a future version, please use 'ME' instead.\n", + " return filtered_interactions.resample(_grain_mapping[grain])[\"user_id\"].nunique()\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + " daily_unique_users_US daily_unique_users_CA \\\n", + "timestamp \n", + "2022-09-01 2.0 3.0 \n", + "2022-09-02 1.0 1.0 \n", + "2022-09-03 1.0 1.0 \n", + "2022-09-04 0.0 0.0 \n", + "2022-09-05 0.0 0.0 \n", + "2022-09-06 0.0 0.0 \n", + "2022-09-07 1.0 1.0 \n", + "2022-09-08 0.0 0.0 \n", + "2022-09-09 0.0 0.0 \n", + "2022-09-10 1.0 1.0 \n", + "2022-09-11 1.0 1.0 \n", + "2022-09-12 0.0 0.0 \n", + "2022-09-13 0.0 0.0 \n", + "2022-09-14 1.0 1.0 \n", + "2022-09-15 1.0 1.0 \n", + "2022-09-16 1.0 1.0 \n", + "2022-09-17 1.0 1.0 \n", + "2022-09-18 0.0 0.0 \n", + "2022-09-19 0.0 0.0 \n", + "2022-09-20 1.0 1.0 \n", + "2022-09-21 0.0 0.0 \n", + "2022-09-22 2.0 2.0 \n", + "2022-09-23 0.0 0.0 \n", + "2022-09-24 2.0 2.0 \n", + "2022-09-25 1.0 1.0 \n", + "2022-09-26 1.0 1.0 \n", + "2022-09-27 NaN NaN \n", + "2022-09-28 NaN NaN \n", + "2022-09-29 NaN NaN \n", + "2022-09-30 NaN NaN \n", + "2022-10-01 NaN NaN \n", + "2022-10-02 NaN NaN \n", + "\n", + " weekly_unique_users_US weekly_unique_users_CA \\\n", + "timestamp \n", + "2022-09-01 3.0 4.0 \n", + "2022-09-02 3.0 4.0 \n", + "2022-09-03 3.0 4.0 \n", + "2022-09-04 3.0 4.0 \n", + "2022-09-05 3.0 3.0 \n", + "2022-09-06 3.0 3.0 \n", + "2022-09-07 3.0 3.0 \n", + "2022-09-08 3.0 3.0 \n", + "2022-09-09 3.0 3.0 \n", + "2022-09-10 3.0 3.0 \n", + "2022-09-11 3.0 3.0 \n", + "2022-09-12 2.0 4.0 \n", + "2022-09-13 2.0 4.0 \n", + "2022-09-14 2.0 4.0 \n", + "2022-09-15 2.0 4.0 \n", + "2022-09-16 2.0 4.0 \n", + "2022-09-17 2.0 4.0 \n", + "2022-09-18 2.0 4.0 \n", + "2022-09-19 4.0 6.0 \n", + "2022-09-20 4.0 6.0 \n", + "2022-09-21 4.0 6.0 \n", + "2022-09-22 4.0 6.0 \n", + "2022-09-23 4.0 6.0 \n", + "2022-09-24 4.0 6.0 \n", + "2022-09-25 4.0 6.0 \n", + "2022-09-26 1.0 1.0 \n", + "2022-09-27 1.0 1.0 \n", + "2022-09-28 1.0 1.0 \n", + "2022-09-29 1.0 1.0 \n", + "2022-09-30 1.0 1.0 \n", + "2022-10-01 1.0 1.0 \n", + "2022-10-02 1.0 1.0 \n", + "\n", + " monthly_unique_users_US monthly_unique_users_CA \n", + "timestamp \n", + "2022-09-01 6.0 8.0 \n", + "2022-09-02 6.0 8.0 \n", + "2022-09-03 6.0 8.0 \n", + "2022-09-04 6.0 8.0 \n", + "2022-09-05 6.0 8.0 \n", + "2022-09-06 6.0 8.0 \n", + "2022-09-07 6.0 8.0 \n", + "2022-09-08 6.0 8.0 \n", + "2022-09-09 6.0 8.0 \n", + "2022-09-10 6.0 8.0 \n", + "2022-09-11 6.0 8.0 \n", + "2022-09-12 6.0 8.0 \n", + "2022-09-13 6.0 8.0 \n", + "2022-09-14 6.0 8.0 \n", + "2022-09-15 6.0 8.0 \n", + "2022-09-16 6.0 8.0 \n", + "2022-09-17 6.0 8.0 \n", + "2022-09-18 6.0 8.0 \n", + "2022-09-19 6.0 8.0 \n", + "2022-09-20 6.0 8.0 \n", + "2022-09-21 6.0 8.0 \n", + "2022-09-22 6.0 8.0 \n", + "2022-09-23 6.0 8.0 \n", + "2022-09-24 6.0 8.0 \n", + "2022-09-25 6.0 8.0 \n", + "2022-09-26 6.0 8.0 \n", + "2022-09-27 6.0 8.0 \n", + "2022-09-28 6.0 8.0 \n", + "2022-09-29 6.0 8.0 \n", + "2022-09-30 6.0 8.0 \n", + "2022-10-01 NaN NaN \n", + "2022-10-02 NaN NaN \n" + ] + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.region\n", + "\n", + "daily_unique_users_US.region\n", + "str\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.filtered_interactions\n", + "\n", + "daily_unique_users_US.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.region->daily_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.grain\n", + "\n", + "daily_unique_users_US.grain\n", + "str\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.unique_users\n", + "\n", + "daily_unique_users_US.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.grain->daily_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.grain\n", + "\n", + "weekly_unique_users_US.grain\n", + "str\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.unique_users\n", + "\n", + "weekly_unique_users_US.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.grain->weekly_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.filtered_interactions\n", + "\n", + "monthly_unique_users_CA.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.unique_users\n", + "\n", + "monthly_unique_users_CA.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.filtered_interactions->monthly_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US\n", + "\n", + "weekly_unique_users_US\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.grain\n", + "\n", + "monthly_unique_users_CA.grain\n", + "str\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.grain->monthly_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_US\n", + "\n", + "daily_unique_users_US\n", + "Series\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.unique_users->daily_unique_users_US\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.unique_users\n", + "\n", + "monthly_unique_users_US.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US\n", + "\n", + "monthly_unique_users_US\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.unique_users->monthly_unique_users_US\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA\n", + "\n", + "weekly_unique_users_CA\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.region\n", + "\n", + "monthly_unique_users_US.region\n", + "str\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.filtered_interactions\n", + "\n", + "monthly_unique_users_US.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.region->monthly_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "website_interactions\n", + "\n", + "website_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "website_interactions->monthly_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.filtered_interactions\n", + "\n", + "daily_unique_users_CA.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "website_interactions->daily_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.filtered_interactions\n", + "\n", + "weekly_unique_users_US.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "website_interactions->weekly_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.filtered_interactions\n", + "\n", + "weekly_unique_users_CA.filtered_interactions\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "website_interactions->weekly_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "website_interactions->monthly_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "website_interactions->daily_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.unique_users\n", + "\n", + "daily_unique_users_CA.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA\n", + "\n", + "daily_unique_users_CA\n", + "Series\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.unique_users->daily_unique_users_CA\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.grain\n", + "\n", + "weekly_unique_users_CA.grain\n", + "str\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.unique_users\n", + "\n", + "weekly_unique_users_CA.unique_users\n", + "Series\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.grain->weekly_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.grain\n", + "\n", + "monthly_unique_users_US.grain\n", + "str\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.grain->monthly_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.filtered_interactions->daily_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.grain\n", + "\n", + "daily_unique_users_CA.grain\n", + "str\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.grain->daily_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.region\n", + "\n", + "monthly_unique_users_CA.region\n", + "str\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.region->monthly_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.unique_users->weekly_unique_users_US\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA\n", + "\n", + "monthly_unique_users_CA\n", + "Series\n", + "\n", + "\n", + "\n", + "monthly_unique_users_CA.unique_users->monthly_unique_users_CA\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.region\n", + "\n", + "weekly_unique_users_US.region\n", + "str\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.region->weekly_unique_users_US.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.region\n", + "\n", + "weekly_unique_users_CA.region\n", + "str\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.region->weekly_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_US.filtered_interactions->weekly_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.filtered_interactions->weekly_unique_users_CA.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.region\n", + "\n", + "daily_unique_users_CA.region\n", + "str\n", + "\n", + "\n", + "\n", + "daily_unique_users_CA.region->daily_unique_users_CA.filtered_interactions\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "weekly_unique_users_CA.unique_users->weekly_unique_users_CA\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "monthly_unique_users_US.filtered_interactions->monthly_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "daily_unique_users_US.filtered_interactions->daily_unique_users_US.unique_users\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n", + "output\n", + "\n", + "output\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "\"\"\"A pretty simple pipeline to demonstrate function reuse.\n", + "Note that this *also* demonstrates building a custom results builder!\n", + "\n", + "Why not use the standard one? Well, because time-series joining is weird.\n", + "In this case, we're running the subdag for different granularities (daily, weekly, and monthly),\n", + "and we want the functions that provide outputs for these granularities to yield one\n", + "row per datapoint. This means that the weekly series will have 7x less much data as the daily\n", + "series. Monthly series will have less than the weekly series, etc...\n", + "\n", + "This results builder handles it by upsampling them all to a specified granularity.\n", + "\n", + "Specifically, it:\n", + "1. Up/down-samples all time-series to the granularity specified in the constructor\n", + "2. Joins them as normal (a full outer join)\n", + "\n", + "Note that pandas also has capabilities for as-of joins, but those tend to be messy and tricky to work\n", + "with. Furthermore, the as-of joins don't usually work with multiple. Upsampling/downsampling does the\n", + "trick quite well and adds more control to the user.\n", + "\n", + "Note that it might be nice to pass in an argument to say which data is the \"spine\" column,\n", + "allowing us to have a basis of data to join with. For now, however, this should be a useful piece\n", + "of code to help with time-series joining!\n", + "\n", + "Furthermore, you could actually include upsampling *in* the DAG -- this has the added feature of\n", + "encoding an index/spine column, and could be run as the final step for each subdag. Doing so is left\n", + "as an exercise to the reader.\n", + "\n", + "As an example, consider the following outputs:\n", + "1. monthly_unique_users_US\n", + "\n", + "timestamp\n", + "2022-09-30 6\n", + "Freq: M, Name: user_id, dtype: int64\n", + "\n", + "2. weekly_unique_users_US\n", + "\n", + "2022-09-04 3\n", + "2022-09-11 3\n", + "2022-09-18 2\n", + "2022-09-25 4\n", + "2022-10-02 1\n", + "Freq: W-SUN, Name: user_id, dtype: int64\n", + "\n", + "3. daily_unique_users_US\n", + "timestamp\n", + "2022-09-01 2\n", + "2022-09-02 1\n", + "2022-09-03 1\n", + "2022-09-04 0\n", + "...\n", + "2022-09-22 2\n", + "2022-09-23 0\n", + "2022-09-24 2\n", + "2022-09-25 1\n", + "2022-09-26 1\n", + "Freq: D, Name: user_id, dtype: int64\n", + "\n", + "Joining these with upsample granularity of \"D\" would produce:\n", + "\n", + " daily_unique_users_US weekly_unique_users_US monthly_unique_users_US\n", + "timestamp\n", + "2022-09-01 2.0 3.0 6.0\n", + "2022-09-02 1.0 3.0 6.0\n", + "2022-09-03 1.0 3.0 6.0\n", + "2022-09-04 0.0 3.0 6.0\n", + "2022-09-05 0.0 3.0 6.0\n", + "2022-09-06 0.0 3.0 6.0\n", + "...\n", + "2022-09-28 NaN 1.0 6.0\n", + "2022-09-29 NaN 1.0 6.0\n", + "2022-09-30 NaN 1.0 6.0\n", + "2022-10-01 NaN 1.0 NaN\n", + "2022-10-02 NaN 1.0 NaN\n", + "\"\"\"\n", + "\n", + "\n", + "class TimeSeriesJoinResultsBuilder(ResultMixin):\n", + " def __init__(self, upsample_frequency: str):\n", + " \"\"\"Initializes a results builder that does a time-series join\n", + " :param upsample_frequency: Argument to pandas sample() function.\n", + " \"\"\"\n", + " self.sampling_methodology = upsample_frequency\n", + "\n", + " def resample(self, time_series: pd.Series):\n", + " return time_series.resample(\n", + " self.sampling_methodology\n", + " ).bfill() # TODO -- think through the right fill -- ffill()/bfill()/whatnot\n", + "\n", + " @staticmethod\n", + " def is_time_series(series: Any):\n", + " if not isinstance(series, pd.Series):\n", + " return False\n", + " if not series.index.inferred_type == \"datetime64\":\n", + " return False\n", + " return True\n", + "\n", + " def build_result(self, **outputs: Dict[str, Any]) -> Any:\n", + " non_ts_output = [\n", + " key\n", + " for key, value in outputs.items()\n", + " if not TimeSeriesJoinResultsBuilder.is_time_series(value)\n", + " ]\n", + " if len(non_ts_output) > 0:\n", + " raise ValueError(\n", + " f\"All outputs from DAG must be time-series -- the following are not: {non_ts_output}\"\n", + " )\n", + " resampled_results = {key: self.resample(value) for key, value in outputs.items()}\n", + " return pd.DataFrame(resampled_results).bfill()\n", + "\n", + "\n", + "\n", + "dr = (driver.Builder()\n", + " .with_config({})\n", + " .with_modules(reusable_subdags)\n", + " .with_adapters(SimplePythonGraphAdapter(\n", + " result_builder=TimeSeriesJoinResultsBuilder(upsample_frequency=\"D\")\n", + " ),\n", + ")\n", + "\n", + "result = dr.execute(\n", + " [\n", + " \"daily_unique_users_US\",\n", + " \"daily_unique_users_CA\",\n", + " \"weekly_unique_users_US\",\n", + " \"weekly_unique_users_CA\",\n", + " \"monthly_unique_users_US\",\n", + " \"monthly_unique_users_CA\",\n", + " ]\n", + ")\n", + "execution_graph = dr.visualize_execution([\n", + " \"daily_unique_users_US\",\n", + " \"daily_unique_users_CA\",\n", + " \"weekly_unique_users_US\",\n", + " \"weekly_unique_users_CA\",\n", + " \"monthly_unique_users_US\",\n", + " \"monthly_unique_users_CA\",\n", + " ], \"./reusable_subdags\", {\"format\": \"png\"})\n", + "\n", + "print(result)\n", + "display(execution_graph)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.5" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}