diff --git a/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/README.md b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/README.md
new file mode 100644
index 000000000..3f3ca3fb8
--- /dev/null
+++ b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/README.md
@@ -0,0 +1,50 @@
+# Purpose of this module
+
+The purpose of this module is to take Sphinx Furo themed documentation, pull the pages, and chunk the text
+for further processing, e.g. creating embeddings. This is fairly generic code that is easy to change
+and extend for your purposes. It runs anywhere that python runs, and can be extended to run on Ray, Dask,
+and even PySpark.
+
+```python
+# import sphinx_doc_chunking via the means that you want. See above code.
+
+from hamilton import driver
+
+from hamilton.execution import executors
+
+dr = (
+ driver.Builder()
+ .with_modules(sphinx_doc_chunking)
+ .enable_dynamic_execution(allow_experimental_mode=True)
+ .with_config({})
+ # defaults to multi-threading -- and tasks control max concurrency
+ .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=25))
+ .build()
+)
+```
+
+## What you should modify
+
+You'll likely want to:
+
+1. play with what does the chunking and settings for that.
+2. change how URLs are sourced.
+3. change how text is extracted from a page.
+4. extend the code to hit an API to get embeddings.
+5. extend the code to push data to a vector database.
+
+# Configuration Options
+There is no configuration required for this module.
+
+# Limitations
+
+You general multiprocessing caveats apply if you choose an executor other than MultiThreading. For example:
+
+1. Serialization -- objects need to be serializable between processes.
+2. Concurrency/parallelism -- you're in control of this.
+3. Failures -- you'll need to make your code do the right thing here.
+4. Memory requirements -- the "collect" (or reduce) step pulls things into memory. If you hit this, this just
+means you need to redesign your code a little, e.g. write large things to a store and pass pointers.
+
+To extend this to [PySpark see the examples folder](https://github.com/dagworks-inc/hamilton/tree/main/examples/LLM_Workflows/scraping_and_chunking/spark)
+for the changes required to adjust the code to handle PySpark.
diff --git a/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/__init__.py b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/__init__.py
new file mode 100644
index 000000000..7f841a97e
--- /dev/null
+++ b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/__init__.py
@@ -0,0 +1,162 @@
+"""
+Things this module does.
+
+ 1. takes in a sitemap.xml file and creates a list of all the URLs in the file.
+ 2. takes in a list of URLs and pulls the HTML from each URL.
+ 3. it then strips the HTML to the relevant body of HTML. We assume `furo themed sphinx docs`.
+ html/body/div[class="page"]/div[class="main"]/div[class="content"]/div[class="article-container"]/article
+ 4. it then chunks the HTML into smaller pieces -- returning langchain documents
+ 5. what this doesn't do is create embeddings -- but that would be easy to extend.
+"""
+
+import re
+
+import requests
+from langchain import text_splitter
+from langchain_core import documents
+
+from hamilton.htypes import Collect, Parallelizable
+
+
+def sitemap_text(sitemap_url: str = "https://hamilton.dagworks.io/en/latest/sitemap.xml") -> str:
+ """Takes in a sitemap URL and returns the sitemap.xml file.
+
+ :param sitemap_url: the URL of sitemap.xml file
+ :return:
+ """
+ sitemap = requests.get(sitemap_url)
+ return sitemap.text
+
+
+def urls_from_sitemap(sitemap_text: str) -> list[str]:
+ """Takes in a sitemap.xml file contents and creates a list of all the URLs in the file.
+
+ :param sitemap_text: the contents of a sitemap.xml file
+ :return: list of URLs
+ """
+ urls = re.findall(r"(.*?)", sitemap_text)
+ return urls
+
+
+def url(urls_from_sitemap: list[str], max_urls: int = 1000) -> Parallelizable[str]:
+ """
+ Takes in a list of URLs for parallel processing.
+
+ Note: this could be in a separate module, but it's here for simplicity.
+ """
+ for url in urls_from_sitemap[0:max_urls]:
+ yield url
+
+
+# --- Start Parallel Code ---
+# The following code is parallelized, once for each url.
+# This code could be in a separate module, but it's here for simplicity.
+
+
+def article_regex() -> str:
+ """This assumes you're using the furo theme for sphinx"""
+ return r'(.*?)'
+
+
+def article_text(url: str, article_regex: str) -> str:
+ """Pulls URL and takes out relevant HTML.
+
+ :param url: the url to pull.
+ :param article_regex: the regext to use to get the contents out of.
+ :return: sub-portion of the HTML
+ """
+ html = requests.get(url)
+ article = re.findall(article_regex, html.text, re.DOTALL)
+ if not article:
+ raise ValueError(f"No article found in {url}")
+ text = article[0].strip()
+ return text
+
+
+def html_chunker() -> text_splitter.HTMLHeaderTextSplitter:
+ """Return HTML chunker object.
+
+ :return:
+ """
+ headers_to_split_on = [
+ ("h1", "Header 1"),
+ ("h2", "Header 2"),
+ ("h3", "Header 3"),
+ ]
+ return text_splitter.HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
+
+
+def text_chunker(
+ chunk_size: int = 256, chunk_overlap: int = 32
+) -> text_splitter.RecursiveCharacterTextSplitter:
+ """Returns the text chunker object.
+
+ :param chunk_size:
+ :param chunk_overlap:
+ :return:
+ """
+ return text_splitter.RecursiveCharacterTextSplitter(
+ chunk_size=chunk_size, chunk_overlap=chunk_overlap
+ )
+
+
+def chunked_text(
+ article_text: str,
+ html_chunker: text_splitter.HTMLHeaderTextSplitter,
+ text_chunker: text_splitter.RecursiveCharacterTextSplitter,
+) -> list[documents.Document]:
+ """This function takes in HTML, chunks it, and then chunks it again.
+
+ It then outputs a list of langchain "documents". Multiple documents for one HTML header section is possible.
+
+ :param article_text:
+ :param html_chunker:
+ :param text_chunker:
+ :return:
+ """
+ header_splits = html_chunker.split_text(article_text)
+ splits = text_chunker.split_documents(header_splits)
+ return splits
+
+
+def url_result(url: str, article_text: str, chunked_text: list[documents.Document]) -> dict:
+ """Function to aggregate what we want to return from parallel processing.
+
+ Note: this function is where you could cache the results to a datastore.
+
+ :param url:
+ :param article_text:
+ :param chunked_text:
+ :return:
+ """
+ return {"url": url, "article_text": article_text, "chunks": chunked_text}
+
+
+# --- END Parallel Code ---
+
+
+def collect_chunked_url_text(url_result: Collect[dict]) -> list:
+ """Function to collect the results from parallel processing.
+ Note: All results for `url_result` are pulled into memory here.
+ So, if you have a lot of results, you may want to write them to a datastore and pass pointers.
+ """
+ return list(url_result)
+
+
+if __name__ == "__main__":
+ # code here for quickly testing the build of the code here.
+ import __main__ as sphinx_doc_chunking
+
+ from hamilton import driver
+ from hamilton.execution import executors
+
+ dr = (
+ driver.Builder()
+ .with_modules(sphinx_doc_chunking)
+ .enable_dynamic_execution(allow_experimental_mode=True)
+ .with_config({})
+ .with_local_executor(executors.SynchronousLocalTaskExecutor())
+ .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=25))
+ .build()
+ )
+ dr.display_all_functions("dag.png")
diff --git a/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/dag.png b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/dag.png
new file mode 100644
index 000000000..54b69ba8a
Binary files /dev/null and b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/dag.png differ
diff --git a/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/requirements.txt b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/requirements.txt
new file mode 100644
index 000000000..27dae66ce
--- /dev/null
+++ b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/requirements.txt
@@ -0,0 +1,6 @@
+langchain
+langchain-core
+sf-hamilton[dask]
+# optionally install Ray, or Dask, or both
+sf-hamilton[ray]
+sf-hamilton[visualization]
diff --git a/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/tags.json b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/tags.json
new file mode 100644
index 000000000..607c32605
--- /dev/null
+++ b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/tags.json
@@ -0,0 +1,7 @@
+{
+ "schema": "1.0",
+ "use_case_tags": ["data processing", "document chunking", "chunking", "langchain"],
+ "secondary_tags": {
+ "language": "English"
+ }
+}
diff --git a/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/test.ipynb b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/test.ipynb
new file mode 100644
index 000000000..d60262dc3
--- /dev/null
+++ b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/test.ipynb
@@ -0,0 +1,193 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## A basic notebook to run the pipeline defined in `doc_pipeline.py`.\n",
+ "\n",
+ "By default this runs parts of the pipeline in parallel using threads or processes.\n",
+ "\n",
+ "To scale processing here look at all the subsequent cells that show how to run on \n",
+ " ray or dask. For spark see spark/notebook.ipynb."
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "c174ce5a23eed9a1"
+ },
+ {
+ "cell_type": "code",
+ "outputs": [],
+ "source": [
+ "import doc_pipeline\n",
+ "\n",
+ "from hamilton import driver\n",
+ "from hamilton.execution import executors\n",
+ "\n",
+ "dr = (\n",
+ " driver.Builder()\n",
+ " .with_modules(doc_pipeline)\n",
+ " .enable_dynamic_execution(allow_experimental_mode=True)\n",
+ " .with_config({})\n",
+ " .with_local_executor(executors.SynchronousLocalTaskExecutor())\n",
+ " # Choose a backend to process the parallel parts of the pipeline\n",
+ " .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n",
+ " # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n",
+ " .build()\n",
+ ")\n",
+ "dag = dr.display_all_functions()\n",
+ "result = dr.execute(\n",
+ " [\"collect_chunked_url_text\"],\n",
+ " inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n",
+ ")\n",
+ "# do something with the result...\n",
+ "import pprint\n",
+ "\n",
+ "for chunk in result[\"collect_chunked_url_text\"]:\n",
+ " pprint.pprint(chunk)\n",
+ "dag"
+ ],
+ "metadata": {
+ "collapsed": true
+ },
+ "id": "initial_id",
+ "execution_count": 0
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Ray"
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "7bc40e6914aed330"
+ },
+ {
+ "cell_type": "code",
+ "outputs": [],
+ "source": [
+ "import logging\n",
+ "\n",
+ "import doc_pipeline\n",
+ "import ray\n",
+ "\n",
+ "from hamilton import driver, log_setup\n",
+ "from hamilton.plugins import h_ray\n",
+ "\n",
+ "log_setup.setup_logging(logging.INFO)\n",
+ "ray.init()\n",
+ "\n",
+ "dr = (\n",
+ " driver.Builder()\n",
+ " .with_modules(doc_pipeline)\n",
+ " .enable_dynamic_execution(allow_experimental_mode=True)\n",
+ " .with_config({})\n",
+ " # Choose a backend to process the parallel parts of the pipeline\n",
+ " # .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n",
+ " # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n",
+ " .with_remote_executor(\n",
+ " h_ray.RayTaskExecutor()\n",
+ " ) # be sure to run ray.init() or pass in config.\n",
+ " .build()\n",
+ ")\n",
+ "dag = dr.display_all_functions()\n",
+ "result = dr.execute(\n",
+ " [\"collect_chunked_url_text\"],\n",
+ " inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n",
+ ")\n",
+ "# do something with the result...\n",
+ "import pprint\n",
+ "\n",
+ "for chunk in result[\"collect_chunked_url_text\"]:\n",
+ " pprint.pprint(chunk)\n",
+ "\n",
+ "ray.shutdown()\n",
+ "dag"
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "a4df6e50283f68ab"
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Dask"
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "46aa4763a337dcb1"
+ },
+ {
+ "cell_type": "code",
+ "outputs": [],
+ "source": [
+ "import logging\n",
+ "\n",
+ "import doc_pipeline\n",
+ "from dask import distributed\n",
+ "\n",
+ "from hamilton import driver, log_setup\n",
+ "from hamilton.plugins import h_dask\n",
+ "\n",
+ "log_setup.setup_logging(logging.INFO)\n",
+ "\n",
+ "cluster = distributed.LocalCluster()\n",
+ "client = distributed.Client(cluster)\n",
+ "remote_executor = h_dask.DaskExecutor(client=client)\n",
+ "\n",
+ "dr = (\n",
+ " driver.Builder()\n",
+ " .with_modules(doc_pipeline)\n",
+ " .enable_dynamic_execution(allow_experimental_mode=True)\n",
+ " .with_config({})\n",
+ " # Choose a backend to process the parallel parts of the pipeline\n",
+ " # .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n",
+ " # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n",
+ " .with_remote_executor(h_dask.DaskExecutor(client=client))\n",
+ " .build()\n",
+ ")\n",
+ "dag = dr.display_all_functions()\n",
+ "result = dr.execute(\n",
+ " [\"collect_chunked_url_text\"],\n",
+ " inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n",
+ ")\n",
+ "# do something with the result...\n",
+ "import pprint\n",
+ "\n",
+ "for chunk in result[\"collect_chunked_url_text\"]:\n",
+ " pprint.pprint(chunk)\n",
+ "\n",
+ "client.shutdown()\n",
+ "dag"
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "103824eec22810fe"
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 2
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython2",
+ "version": "2.7.6"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/valid_configs.jsonl b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/valid_configs.jsonl
new file mode 100644
index 000000000..b8a6704f8
--- /dev/null
+++ b/contrib/hamilton/contrib/dagworks/sphinx_doc_chunking/valid_configs.jsonl
@@ -0,0 +1 @@
+{"description": "Default", "name": "default", "config": {}}
diff --git a/examples/LLM_Workflows/scraping_and_chunking/notebook.ipynb b/examples/LLM_Workflows/scraping_and_chunking/notebook.ipynb
new file mode 100644
index 000000000..d60262dc3
--- /dev/null
+++ b/examples/LLM_Workflows/scraping_and_chunking/notebook.ipynb
@@ -0,0 +1,193 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## A basic notebook to run the pipeline defined in `doc_pipeline.py`.\n",
+ "\n",
+ "By default this runs parts of the pipeline in parallel using threads or processes.\n",
+ "\n",
+ "To scale processing here look at all the subsequent cells that show how to run on \n",
+ " ray or dask. For spark see spark/notebook.ipynb."
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "c174ce5a23eed9a1"
+ },
+ {
+ "cell_type": "code",
+ "outputs": [],
+ "source": [
+ "import doc_pipeline\n",
+ "\n",
+ "from hamilton import driver\n",
+ "from hamilton.execution import executors\n",
+ "\n",
+ "dr = (\n",
+ " driver.Builder()\n",
+ " .with_modules(doc_pipeline)\n",
+ " .enable_dynamic_execution(allow_experimental_mode=True)\n",
+ " .with_config({})\n",
+ " .with_local_executor(executors.SynchronousLocalTaskExecutor())\n",
+ " # Choose a backend to process the parallel parts of the pipeline\n",
+ " .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n",
+ " # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n",
+ " .build()\n",
+ ")\n",
+ "dag = dr.display_all_functions()\n",
+ "result = dr.execute(\n",
+ " [\"collect_chunked_url_text\"],\n",
+ " inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n",
+ ")\n",
+ "# do something with the result...\n",
+ "import pprint\n",
+ "\n",
+ "for chunk in result[\"collect_chunked_url_text\"]:\n",
+ " pprint.pprint(chunk)\n",
+ "dag"
+ ],
+ "metadata": {
+ "collapsed": true
+ },
+ "id": "initial_id",
+ "execution_count": 0
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Ray"
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "7bc40e6914aed330"
+ },
+ {
+ "cell_type": "code",
+ "outputs": [],
+ "source": [
+ "import logging\n",
+ "\n",
+ "import doc_pipeline\n",
+ "import ray\n",
+ "\n",
+ "from hamilton import driver, log_setup\n",
+ "from hamilton.plugins import h_ray\n",
+ "\n",
+ "log_setup.setup_logging(logging.INFO)\n",
+ "ray.init()\n",
+ "\n",
+ "dr = (\n",
+ " driver.Builder()\n",
+ " .with_modules(doc_pipeline)\n",
+ " .enable_dynamic_execution(allow_experimental_mode=True)\n",
+ " .with_config({})\n",
+ " # Choose a backend to process the parallel parts of the pipeline\n",
+ " # .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n",
+ " # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n",
+ " .with_remote_executor(\n",
+ " h_ray.RayTaskExecutor()\n",
+ " ) # be sure to run ray.init() or pass in config.\n",
+ " .build()\n",
+ ")\n",
+ "dag = dr.display_all_functions()\n",
+ "result = dr.execute(\n",
+ " [\"collect_chunked_url_text\"],\n",
+ " inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n",
+ ")\n",
+ "# do something with the result...\n",
+ "import pprint\n",
+ "\n",
+ "for chunk in result[\"collect_chunked_url_text\"]:\n",
+ " pprint.pprint(chunk)\n",
+ "\n",
+ "ray.shutdown()\n",
+ "dag"
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "a4df6e50283f68ab"
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Dask"
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "46aa4763a337dcb1"
+ },
+ {
+ "cell_type": "code",
+ "outputs": [],
+ "source": [
+ "import logging\n",
+ "\n",
+ "import doc_pipeline\n",
+ "from dask import distributed\n",
+ "\n",
+ "from hamilton import driver, log_setup\n",
+ "from hamilton.plugins import h_dask\n",
+ "\n",
+ "log_setup.setup_logging(logging.INFO)\n",
+ "\n",
+ "cluster = distributed.LocalCluster()\n",
+ "client = distributed.Client(cluster)\n",
+ "remote_executor = h_dask.DaskExecutor(client=client)\n",
+ "\n",
+ "dr = (\n",
+ " driver.Builder()\n",
+ " .with_modules(doc_pipeline)\n",
+ " .enable_dynamic_execution(allow_experimental_mode=True)\n",
+ " .with_config({})\n",
+ " # Choose a backend to process the parallel parts of the pipeline\n",
+ " # .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n",
+ " # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n",
+ " .with_remote_executor(h_dask.DaskExecutor(client=client))\n",
+ " .build()\n",
+ ")\n",
+ "dag = dr.display_all_functions()\n",
+ "result = dr.execute(\n",
+ " [\"collect_chunked_url_text\"],\n",
+ " inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n",
+ ")\n",
+ "# do something with the result...\n",
+ "import pprint\n",
+ "\n",
+ "for chunk in result[\"collect_chunked_url_text\"]:\n",
+ " pprint.pprint(chunk)\n",
+ "\n",
+ "client.shutdown()\n",
+ "dag"
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "103824eec22810fe"
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 2
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython2",
+ "version": "2.7.6"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/examples/LLM_Workflows/scraping_and_chunking/spark/notebook.ipynb b/examples/LLM_Workflows/scraping_and_chunking/spark/notebook.ipynb
new file mode 100644
index 000000000..78e5f4c1e
--- /dev/null
+++ b/examples/LLM_Workflows/scraping_and_chunking/spark/notebook.ipynb
@@ -0,0 +1,66 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Shows how to run the spark pipeline."
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "5e46bd52497eadfc"
+ },
+ {
+ "cell_type": "code",
+ "outputs": [],
+ "source": [
+ "from hamilton import driver\n",
+ "\n",
+ "import doc_pipeline\n",
+ "import spark_pipeline\n",
+ "\n",
+ "dr = (\n",
+ " driver.Builder()\n",
+ " .with_modules(doc_pipeline, spark_pipeline)\n",
+ " .with_config({})\n",
+ " .build()\n",
+ ")\n",
+ "dag = dr.visualize_execution(\n",
+ " [\"chunked_url_text\"],\n",
+ " inputs={\"app_name\": \"chunking_spark_job\", \"num_partitions\": 4},\n",
+ ")\n",
+ "result = dr.execute(\n",
+ " [\"chunked_url_text\"],\n",
+ " inputs={\"app_name\": \"chunking_spark_job\", \"num_partitions\": 4},\n",
+ ")\n",
+ "print(result[\"chunked_url_text\"].show())\n",
+ "dag"
+ ],
+ "metadata": {
+ "collapsed": false
+ },
+ "id": "71e6f0f5208c02db"
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 2
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython2",
+ "version": "2.7.6"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}