Skip to content

Commit

Permalink
Adds RAG example on how to chunk Hamilton documentation (#721)
Browse files Browse the repository at this point in the history
This is the first part of a RAG pipeline - document ingestion.
This uses langchain's text splitting functionality, you
could easily replace this with your own logic. The
DAG though is run and defined in Hamilton -- and the processing
of each URL is done in parallel. You could easily farm
this out to Ray or Dask, and even pyspark as well... See the README
for more info.
  • Loading branch information
skrawcz authored Mar 5, 2024
1 parent 02120ae commit 08e9f28
Show file tree
Hide file tree
Showing 10 changed files with 459 additions and 10 deletions.
147 changes: 147 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Scraping and Chunking
Scraping and chunking are an important part of any RAG dataflow. Typically they're
the start of your "backend" operations to populate for example your vector database.

## High Level Explanation
Here we show how to model this process with Hamilton, but also we show how to avoid
dealing with executors and control flow logic that can make your code hard to maintain, test, and reuse.
For the latter case, see the example code below. You would typically see this in a scraping and chunking workflow to
parallelize it. `some_func` below would be some large function, or wrapper around logic to process each
URL. The point to grok, is that you have to deal with this
control flow logic yourself to orchestrate your code -- which invariably tightly couples it and
makes it harder to test and reuse.
```python
def scrape(urls: list) -> list:
all_data = []
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX) as executor:
futures = [
executor.submit(
some_func, url, func_args...
)
for url in urls
]
with tqdm(total=len(urls)) as pbar:
for _ in concurrent.futures.as_completed(futures):
pbar.update(1)
for future in futures:
data = future.result()
all_data += data
return all_data
```
##
Instead, with Hamilton, you can write the processing logic INDEPENDENT of having to deal
with the for loop and control logic to submit to the executor. This is a big win, because
it means you can easily unit test your code, reuse it, and then scale it to run in parallel without
coupling to a specific execution system.

To start, we can "unravel" `some_func` above into a DAG of operations (a simple linear chain here):
```python

def article_regex() -> str:
"""This assumes you're using the furo theme for sphinx"""
return r'<article role="main" id="furo-main-content">(.*?)</article>'


def article_text(url: str, article_regex: str) -> str:
"""Pulls URL and takes out relevant HTML.
:param url: the url to pull.
:param article_regex: the regext to use to get the contents out of.
:return: sub-portion of the HTML
"""
html = requests.get(url)
article = re.findall(article_regex, html.text, re.DOTALL)
if not article:
raise ValueError(f"No article found in {url}")
text = article[0].strip()
return text

def processed_article(article_text: str) -> list:
"""Processes the article text.
:param article_text: the text to process.
:return: the processed text.
"""
# do some processing, even saving it, etc.
return article_text
```
Next we can then "parallelize" & "collect" it over inputs, i.e. "map" over it with various values. To tell Hamilton to
do that we'd add the following functions to "sandwich" the code above:
```python
def url(urls_from_sitemap: list[str], max_urls: int = 1000) -> Parallelizable[str]:
"""
Takes in a list of URLs for parallel processing.
Note: this could be in a separate module, but it's here for simplicity.
"""
for url in urls_from_sitemap[0:max_urls]:
yield url

# The previous Hamilton code could live here, or if in another module, Hamilton
# would stitch the graph together correctly.

def collect_processed_articles(processed_article: Collect[list]) -> list:
"""Function to collect the results from parallel processing.
Note: all `processed_article` results are pulled into memory. So, if you have a lot of results,
you may want to write them to a datastore and pass pointers instead.
"""
return list(url_result)
```
The magic is in the `Parallelizable` & `Collect` types. This tells Hamilton to run what is between them
in parallel as a single task. For more information see the
[parallel documentation](https://hamilton.dagworks.io/en/latest/concepts/parallel-task/) and
[examples](https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/parallelism).

## Let's explain the example

Here is an image of the pipeline when run locally, or via ray or dask:
![pipeline](pipeline.png)

The pipeline is a simple one that:
1. takes in a sitemap.xml file and creates a list of all the URLs in the file. Defaults to Hamilton's.
2. For each URL the process is then parallelized (green border).
3. each url is pulled and stripped to the relevant body of HTML.
4. the HTML is then chunked into smaller pieces -- returning langchain documents
5. what this doesn't do is create embeddings -- but that would be easy to extend.
6. then all the results are collected (red border) and returned.

What this leaves us with is a general way to then plug in various executors to run the code in parallel.
This is what the `run.py`, `run_dask.py`, `run_ray.py`, and `spark/spark_pipeline.py` files do. They run the same code, but on different
execution systems.

### File Structure
Here we explain the file structure of the example:

- `doc_pipeline.py` - the main file that contains the Hamilton code that defines the document chunking pipeline.
- `run.py` - code that you would invoke to run `doc_pipeline` locally, or in a single python process.
- `run_dask.py` - code that you would invoke to run `doc_pipeline` on a Dask cluster / or dask locally.
- `run_ray.py` - code that you would invoke to run `doc_pipeline` on a Ray cluster / or ray locally.
- `spark/doc_pipeline.py` - the main file that contains the Hamilton code that defines the document chunking pipeline,
but adjusted for PySpark.
- `spark/spark_pipeline.py` - code that you would invoke to run `spark/doc_pipeline` on a Spark cluster / or spark locally.
- `spark/README.md` - more details on running the Spark example and why the code differs slightly.

### Running the example
Make sure you have the right python dependencies installed for the execution system you want to use.
See `requirements.txt` (or `spark/requirements.txt`) for the dependencies you need to install.

Then you can run the example with the following commands:
```bash
python run.py
python run_dask.py
python run_ray.py
python spark/spark_pipeline.py
```
See `spark/README.md` for more details on running the Spark example and why the code differs slightly.

## Extensions / what to do next
This example is a simple one, but it's easy to extend. For example, you could:

* add a step to create embeddings from the chunked documents.
* you could also add a step to save the results to a database, or to a file system.
* you'd also likely tune the parallelism to ensure you don't DoS the resource you're hitting.

## Hamilton over Langchain
Hamilton is a general purpose tool, and what we've described here applies broadly
to any code that you might write: data, machine learning, LLMs, web processing, etc. You can
even use it with parts of LangChain!
162 changes: 162 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/doc_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
Things this module does.
1. takes in a sitemap.xml file and creates a list of all the URLs in the file.
2. takes in a list of URLs and pulls the HTML from each URL.
3. it then strips the HTML to the relevant body of HTML.
html/body/div[class="page"]/div[class="main"]/div[class="content"]/div[class="article-container"]/article
4. it then chunks the HTML into smaller pieces -- returning langchain documents
5. what this doesn't do is create embeddings -- but that would be easy to extend.
"""

import re

import requests
from langchain import text_splitter
from langchain_core import documents

from hamilton.htypes import Collect, Parallelizable


def sitemap_text(sitemap_url: str = "https://hamilton.dagworks.io/en/latest/sitemap.xml") -> str:
"""Takes in a sitemap URL and returns the sitemap.xml file.
:param sitemap_url: the URL of sitemap.xml file
:return:
"""
sitemap = requests.get(sitemap_url)
return sitemap.text


def urls_from_sitemap(sitemap_text: str) -> list[str]:
"""Takes in a sitemap.xml file contents and creates a list of all the URLs in the file.
:param sitemap_text: the contents of a sitemap.xml file
:return: list of URLs
"""
urls = re.findall(r"<loc>(.*?)</loc>", sitemap_text)
return urls


def url(urls_from_sitemap: list[str], max_urls: int = 1000) -> Parallelizable[str]:
"""
Takes in a list of URLs for parallel processing.
Note: this could be in a separate module, but it's here for simplicity.
"""
for url in urls_from_sitemap[0:max_urls]:
yield url


# --- Start Parallel Code ---
# The following code is parallelized, once for each url.
# This code could be in a separate module, but it's here for simplicity.


def article_regex() -> str:
"""This assumes you're using the furo theme for sphinx"""
return r'<article role="main" id="furo-main-content">(.*?)</article>'


def article_text(url: str, article_regex: str) -> str:
"""Pulls URL and takes out relevant HTML.
:param url: the url to pull.
:param article_regex: the regext to use to get the contents out of.
:return: sub-portion of the HTML
"""
html = requests.get(url)
article = re.findall(article_regex, html.text, re.DOTALL)
if not article:
raise ValueError(f"No article found in {url}")
text = article[0].strip()
return text


def html_chunker() -> text_splitter.HTMLHeaderTextSplitter:
"""Return HTML chunker object.
:return:
"""
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
return text_splitter.HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)


def text_chunker(
chunk_size: int = 256, chunk_overlap: int = 32
) -> text_splitter.RecursiveCharacterTextSplitter:
"""Returns the text chunker object.
:param chunk_size:
:param chunk_overlap:
:return:
"""
return text_splitter.RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)


def chunked_text(
article_text: str,
html_chunker: text_splitter.HTMLHeaderTextSplitter,
text_chunker: text_splitter.RecursiveCharacterTextSplitter,
) -> list[documents.Document]:
"""This function takes in HTML, chunks it, and then chunks it again.
It then outputs a list of langchain "documents". Multiple documents for one HTML header section is possible.
:param article_text:
:param html_chunker:
:param text_chunker:
:return:
"""
header_splits = html_chunker.split_text(article_text)
splits = text_chunker.split_documents(header_splits)
return splits


def url_result(url: str, article_text: str, chunked_text: list[documents.Document]) -> dict:
"""Function to aggregate what we want to return from parallel processing.
Note: this function is where you could cache the results to a datastore.
:param url:
:param article_text:
:param chunked_text:
:return:
"""
return {"url": url, "article_text": article_text, "chunks": chunked_text}


# --- END Parallel Code ---


def collect_chunked_url_text(url_result: Collect[dict]) -> list:
"""Function to collect the results from parallel processing.
Note: All results for `url_result` are pulled into memory here.
So, if you have a lot of results, you may want to write them to a datastore and pass pointers.
"""
return list(url_result)


if __name__ == "__main__":
# code here for quickly testing the build of the code here.
import doc_pipeline

from hamilton import driver
from hamilton.execution import executors

dr = (
driver.Builder()
.with_modules(doc_pipeline)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({})
.with_local_executor(executors.SynchronousLocalTaskExecutor())
.with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
.build()
)
dr.display_all_functions("pipeline.png")
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
langchain
langchain-core
sf-hamilton[visualization]
# optionally install Ray, or Dask, or both
# sf-hamilton[ray]
# sf-hamilton[dask]
37 changes: 37 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
A basic script to run the pipeline defined in `doc_pipeline.py`.
By default this runs parts of the pipeline in parallel using threads or processes.
To choose threads or processed uncomment the appropriate line in the `Builder` below.
To scale processing here, see `run_ray.py`, `run_dask.py`, and `spark/spark_pipeline.py`.
"""

import doc_pipeline

from hamilton import driver
from hamilton.execution import executors

if __name__ == "__main__":

dr = (
driver.Builder()
.with_modules(doc_pipeline)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({})
.with_local_executor(executors.SynchronousLocalTaskExecutor())
# Choose a backend to process the parallel parts of the pipeline
.with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))
# .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
.build()
)
dr.display_all_functions("pipeline.png")
result = dr.execute(
["collect_chunked_url_text"],
inputs={"chunk_size": 256, "chunk_overlap": 32},
)
# do something with the result...
import pprint

for chunk in result["collect_chunked_url_text"]:
pprint.pprint(chunk)
42 changes: 42 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/run_dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Shows how to run document chunking using dask.
"""

import logging

import doc_pipeline
from dask import distributed

from hamilton import driver, log_setup
from hamilton.plugins import h_dask

log_setup.setup_logging(logging.INFO)

if __name__ == "__main__":
cluster = distributed.LocalCluster()
client = distributed.Client(cluster)
remote_executor = h_dask.DaskExecutor(client=client)

dr = (
driver.Builder()
.with_modules(doc_pipeline)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({})
# Choose a backend to process the parallel parts of the pipeline
# .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))
# .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
.with_remote_executor(h_dask.DaskExecutor(client=client))
.build()
)
dr.display_all_functions("pipeline.png")
result = dr.execute(
["collect_chunked_url_text"],
inputs={"chunk_size": 256, "chunk_overlap": 32},
)
# do something with the result...
import pprint

for chunk in result["collect_chunked_url_text"]:
pprint.pprint(chunk)

client.shutdown()
Loading

0 comments on commit 08e9f28

Please sign in to comment.