diff --git a/examples/LLM_Workflows/scraping_and_chunking/README.md b/examples/LLM_Workflows/scraping_and_chunking/README.md index 785f25e14..87de23be2 100644 --- a/examples/LLM_Workflows/scraping_and_chunking/README.md +++ b/examples/LLM_Workflows/scraping_and_chunking/README.md @@ -1,7 +1,147 @@ # Scraping and Chunking -Scraping and chunking are an important part of any RAG dataflow. +Scraping and chunking are an important part of any RAG dataflow. Typically they're +the start of your "backend" operations to populate for example your vector database. -Here we show you how you can scale the scraping and chunking dataflow to run in parallel -locally, as well as with Ray, Dask, and even PySpark. +## High Level Explanation +Here we show how to model this process with Hamilton, but also we show how to avoid +dealing with executors and control flow logic that can make your code hard to maintain, test, and reuse. +For the latter case, see the example code below. You would typically see this in a scraping and chunking workflow to +parallelize it. `some_func` below would be some large function, or wrapper around logic to process each +URL. The point to grok, is that you have to deal with this +control flow logic yourself to orchestrate your code -- which invariably tightly couples it and +makes it harder to test and reuse. +```python +def scrape(urls: list) -> list: + all_data = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX) as executor: + futures = [ + executor.submit( + some_func, url, func_args... + ) + for url in urls + ] + with tqdm(total=len(urls)) as pbar: + for _ in concurrent.futures.as_completed(futures): + pbar.update(1) + for future in futures: + data = future.result() + all_data += data + return all_data +``` +## +Instead, with Hamilton, you can write the processing logic INDEPENDENT of having to deal +with the for loop and control logic to submit to the executor. This is a big win, because +it means you can easily unit test your code, reuse it, and then scale it to run in parallel without +coupling to a specific execution system. -We'll use creating chunks of text from the Hamilton documentation as an example. +To start, we can "unravel" `some_func` above into a DAG of operations (a simple linear chain here): +```python + +def article_regex() -> str: + """This assumes you're using the furo theme for sphinx""" + return r'
(.*?)
' + + +def article_text(url: str, article_regex: str) -> str: + """Pulls URL and takes out relevant HTML. + + :param url: the url to pull. + :param article_regex: the regext to use to get the contents out of. + :return: sub-portion of the HTML + """ + html = requests.get(url) + article = re.findall(article_regex, html.text, re.DOTALL) + if not article: + raise ValueError(f"No article found in {url}") + text = article[0].strip() + return text + +def processed_article(article_text: str) -> list: + """Processes the article text. + + :param article_text: the text to process. + :return: the processed text. + """ + # do some processing, even saving it, etc. + return article_text +``` +Next we can then "parallelize" & "collect" it over inputs, i.e. "map" over it with various values. To tell Hamilton to +do that we'd add the following functions to "sandwich" the code above: +```python +def url(urls_from_sitemap: list[str], max_urls: int = 1000) -> Parallelizable[str]: + """ + Takes in a list of URLs for parallel processing. + + Note: this could be in a separate module, but it's here for simplicity. + """ + for url in urls_from_sitemap[0:max_urls]: + yield url + +# The previous Hamilton code could live here, or if in another module, Hamilton +# would stitch the graph together correctly. + +def collect_processed_articles(processed_article: Collect[list]) -> list: + """Function to collect the results from parallel processing. + Note: all `processed_article` results are pulled into memory. So, if you have a lot of results, + you may want to write them to a datastore and pass pointers instead. + """ + return list(url_result) +``` +The magic is in the `Parallelizable` & `Collect` types. This tells Hamilton to run what is between them +in parallel as a single task. For more information see the +[parallel documentation](https://hamilton.dagworks.io/en/latest/concepts/parallel-task/) and +[examples](https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/parallelism). + +## Let's explain the example + +Here is an image of the pipeline when run locally, or via ray or dask: +![pipeline](pipeline.png) + +The pipeline is a simple one that: +1. takes in a sitemap.xml file and creates a list of all the URLs in the file. Defaults to Hamilton's. +2. For each URL the process is then parallelized (green border). +3. each url is pulled and stripped to the relevant body of HTML. +4. the HTML is then chunked into smaller pieces -- returning langchain documents +5. what this doesn't do is create embeddings -- but that would be easy to extend. +6. then all the results are collected (red border) and returned. + +What this leaves us with is a general way to then plug in various executors to run the code in parallel. +This is what the `run.py`, `run_dask.py`, `run_ray.py`, and `spark/spark_pipeline.py` files do. They run the same code, but on different +execution systems. + +### File Structure +Here we explain the file structure of the example: + + - `doc_pipeline.py` - the main file that contains the Hamilton code that defines the document chunking pipeline. + - `run.py` - code that you would invoke to run `doc_pipeline` locally, or in a single python process. + - `run_dask.py` - code that you would invoke to run `doc_pipeline` on a Dask cluster / or dask locally. + - `run_ray.py` - code that you would invoke to run `doc_pipeline` on a Ray cluster / or ray locally. + - `spark/doc_pipeline.py` - the main file that contains the Hamilton code that defines the document chunking pipeline, +but adjusted for PySpark. + - `spark/spark_pipeline.py` - code that you would invoke to run `spark/doc_pipeline` on a Spark cluster / or spark locally. + - `spark/README.md` - more details on running the Spark example and why the code differs slightly. + +### Running the example +Make sure you have the right python dependencies installed for the execution system you want to use. +See `requirements.txt` (or `spark/requirements.txt`) for the dependencies you need to install. + +Then you can run the example with the following commands: +```bash +python run.py +python run_dask.py +python run_ray.py +python spark/spark_pipeline.py +``` +See `spark/README.md` for more details on running the Spark example and why the code differs slightly. + +## Extensions / what to do next +This example is a simple one, but it's easy to extend. For example, you could: + +* add a step to create embeddings from the chunked documents. +* you could also add a step to save the results to a database, or to a file system. +* you'd also likely tune the parallelism to ensure you don't DoS the resource you're hitting. + +## Hamilton over Langchain +Hamilton is a general purpose tool, and what we've described here applies broadly +to any code that you might write: data, machine learning, LLMs, web processing, etc. You can +even use it with parts of LangChain! diff --git a/examples/LLM_Workflows/scraping_and_chunking/pipeline.py b/examples/LLM_Workflows/scraping_and_chunking/doc_pipeline.py similarity index 87% rename from examples/LLM_Workflows/scraping_and_chunking/pipeline.py rename to examples/LLM_Workflows/scraping_and_chunking/doc_pipeline.py index 4b80e6689..f1d3af5f3 100644 --- a/examples/LLM_Workflows/scraping_and_chunking/pipeline.py +++ b/examples/LLM_Workflows/scraping_and_chunking/doc_pipeline.py @@ -41,11 +41,18 @@ def urls_from_sitemap(sitemap_text: str) -> list[str]: def url(urls_from_sitemap: list[str], max_urls: int = 1000) -> Parallelizable[str]: """ Takes in a list of URLs for parallel processing. + + Note: this could be in a separate module, but it's here for simplicity. """ for url in urls_from_sitemap[0:max_urls]: yield url +# --- Start Parallel Code --- +# The following code is parallelized, once for each url. +# This code could be in a separate module, but it's here for simplicity. + + def article_regex() -> str: """This assumes you're using the furo theme for sphinx""" return r'
(.*?)
' @@ -115,6 +122,8 @@ def chunked_text( def url_result(url: str, article_text: str, chunked_text: list[documents.Document]) -> dict: """Function to aggregate what we want to return from parallel processing. + Note: this function is where you could cache the results to a datastore. + :param url: :param article_text: :param chunked_text: @@ -123,20 +132,27 @@ def url_result(url: str, article_text: str, chunked_text: list[documents.Documen return {"url": url, "article_text": article_text, "chunks": chunked_text} +# --- END Parallel Code --- + + def collect_chunked_url_text(url_result: Collect[dict]) -> list: - """Function to collect the results from parallel processing.""" + """Function to collect the results from parallel processing. + Note: All results for `url_result` are pulled into memory here. + So, if you have a lot of results, you may want to write them to a datastore and pass pointers. + """ return list(url_result) if __name__ == "__main__": - import pipeline + # code here for quickly testing the build of the code here. + import doc_pipeline from hamilton import driver from hamilton.execution import executors dr = ( driver.Builder() - .with_modules(pipeline) + .with_modules(doc_pipeline) .enable_dynamic_execution(allow_experimental_mode=True) .with_config({}) .with_local_executor(executors.SynchronousLocalTaskExecutor()) @@ -144,8 +160,3 @@ def collect_chunked_url_text(url_result: Collect[dict]) -> list: .build() ) dr.display_all_functions("pipeline.png") - result = dr.execute(["collect_chunked_url_text"]) - import pprint - - for chunk in result["collect_chunked_url_text"]: - pprint.pprint(type(chunk["chunks"][0])) diff --git a/examples/LLM_Workflows/scraping_and_chunking/pipeline.png b/examples/LLM_Workflows/scraping_and_chunking/pipeline.png index 84ec1a6a7..e56716533 100644 Binary files a/examples/LLM_Workflows/scraping_and_chunking/pipeline.png and b/examples/LLM_Workflows/scraping_and_chunking/pipeline.png differ diff --git a/examples/LLM_Workflows/scraping_and_chunking/requirements.txt b/examples/LLM_Workflows/scraping_and_chunking/requirements.txt index 2563024ee..2bbeba39e 100644 --- a/examples/LLM_Workflows/scraping_and_chunking/requirements.txt +++ b/examples/LLM_Workflows/scraping_and_chunking/requirements.txt @@ -1,3 +1,6 @@ langchain langchain-core sf-hamilton[visualization] +# optionally install Ray, or Dask, or both +# sf-hamilton[ray] +# sf-hamilton[dask] diff --git a/examples/LLM_Workflows/scraping_and_chunking/run.py b/examples/LLM_Workflows/scraping_and_chunking/run.py index 591167352..a9942deb4 100644 --- a/examples/LLM_Workflows/scraping_and_chunking/run.py +++ b/examples/LLM_Workflows/scraping_and_chunking/run.py @@ -1,29 +1,37 @@ """ -A basic script to run the pipeline defined by Hamilton. +A basic script to run the pipeline defined in `doc_pipeline.py`. + +By default this runs parts of the pipeline in parallel using threads or processes. +To choose threads or processed uncomment the appropriate line in the `Builder` below. + +To scale processing here, see `run_ray.py`, `run_dask.py`, and `spark/spark_pipeline.py`. """ -import pipeline +import doc_pipeline from hamilton import driver from hamilton.execution import executors -dr = ( - driver.Builder() - .with_modules(pipeline) - .enable_dynamic_execution(allow_experimental_mode=True) - .with_config({}) - .with_local_executor(executors.SynchronousLocalTaskExecutor()) - # could be Ray or Dask - .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5)) - .build() -) -dr.display_all_functions("pipeline.png") -result = dr.execute( - ["collect_chunked_url_text"], - inputs={"chunk_size": 256, "chunk_overlap": 32}, -) -# do something with the result... -# import pprint -# -# for chunk in result["collect_chunked_url_text"]: -# pprint.pprint(chunk) +if __name__ == "__main__": + + dr = ( + driver.Builder() + .with_modules(doc_pipeline) + .enable_dynamic_execution(allow_experimental_mode=True) + .with_config({}) + .with_local_executor(executors.SynchronousLocalTaskExecutor()) + # Choose a backend to process the parallel parts of the pipeline + .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5)) + # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5)) + .build() + ) + dr.display_all_functions("pipeline.png") + result = dr.execute( + ["collect_chunked_url_text"], + inputs={"chunk_size": 256, "chunk_overlap": 32}, + ) + # do something with the result... + import pprint + + for chunk in result["collect_chunked_url_text"]: + pprint.pprint(chunk) diff --git a/examples/LLM_Workflows/scraping_and_chunking/run_dask.py b/examples/LLM_Workflows/scraping_and_chunking/run_dask.py new file mode 100644 index 000000000..e8fd2ecf6 --- /dev/null +++ b/examples/LLM_Workflows/scraping_and_chunking/run_dask.py @@ -0,0 +1,42 @@ +""" +Shows how to run document chunking using dask. +""" + +import logging + +import doc_pipeline +from dask import distributed + +from hamilton import driver, log_setup +from hamilton.plugins import h_dask + +log_setup.setup_logging(logging.INFO) + +if __name__ == "__main__": + cluster = distributed.LocalCluster() + client = distributed.Client(cluster) + remote_executor = h_dask.DaskExecutor(client=client) + + dr = ( + driver.Builder() + .with_modules(doc_pipeline) + .enable_dynamic_execution(allow_experimental_mode=True) + .with_config({}) + # Choose a backend to process the parallel parts of the pipeline + # .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5)) + # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5)) + .with_remote_executor(h_dask.DaskExecutor(client=client)) + .build() + ) + dr.display_all_functions("pipeline.png") + result = dr.execute( + ["collect_chunked_url_text"], + inputs={"chunk_size": 256, "chunk_overlap": 32}, + ) + # do something with the result... + import pprint + + for chunk in result["collect_chunked_url_text"]: + pprint.pprint(chunk) + + client.shutdown() diff --git a/examples/LLM_Workflows/scraping_and_chunking/run_ray.py b/examples/LLM_Workflows/scraping_and_chunking/run_ray.py new file mode 100644 index 000000000..a85cfc33b --- /dev/null +++ b/examples/LLM_Workflows/scraping_and_chunking/run_ray.py @@ -0,0 +1,41 @@ +""" +Shows how to run document chunking using ray. +""" + +import logging + +import doc_pipeline +import ray + +from hamilton import driver, log_setup +from hamilton.plugins import h_ray + +if __name__ == "__main__": + log_setup.setup_logging(logging.INFO) + ray.init() + + dr = ( + driver.Builder() + .with_modules(doc_pipeline) + .enable_dynamic_execution(allow_experimental_mode=True) + .with_config({}) + # Choose a backend to process the parallel parts of the pipeline + # .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5)) + # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5)) + .with_remote_executor( + h_ray.RayTaskExecutor() + ) # be sure to run ray.init() or pass in config. + .build() + ) + dr.display_all_functions("pipeline.png") + result = dr.execute( + ["collect_chunked_url_text"], + inputs={"chunk_size": 256, "chunk_overlap": 32}, + ) + # do something with the result... + import pprint + + for chunk in result["collect_chunked_url_text"]: + pprint.pprint(chunk) + + ray.shutdown() diff --git a/examples/LLM_Workflows/scraping_and_chunking/spark/doc_pipeline.py b/examples/LLM_Workflows/scraping_and_chunking/spark/doc_pipeline.py index 6b89dedfe..b02f8cfdc 100644 --- a/examples/LLM_Workflows/scraping_and_chunking/spark/doc_pipeline.py +++ b/examples/LLM_Workflows/scraping_and_chunking/spark/doc_pipeline.py @@ -1,11 +1,17 @@ +""" +This module is a modified version to enable it to be used in a spark job. + +Notably at the end we create a module level variable that contains a list of functions that are spark safe. +This is used by the `@h_spark.with_columns` decorator to tell it which functions define the subdag +and use the dataframe that the function declares a dependency upon. +""" + import json import re import requests from langchain import text_splitter -# from langchain_core import documents - def article_regex() -> str: """This assumes you're using the furo theme for sphinx""" @@ -77,6 +83,8 @@ def chunked_text( return [json.dumps(s.to_json()) for s in splits] +# this is a helper variable that we use to tell `@h_spark.with_columns` decorator which functions we want +# it to create the subdag with that will take in and operate over the dataframe depended on. spark_safe = [ article_regex, article_text, diff --git a/examples/LLM_Workflows/scraping_and_chunking/spark/requirements.txt b/examples/LLM_Workflows/scraping_and_chunking/spark/requirements.txt new file mode 100644 index 000000000..42898b943 --- /dev/null +++ b/examples/LLM_Workflows/scraping_and_chunking/spark/requirements.txt @@ -0,0 +1,7 @@ +langchain +langchain-core +pyspark +sf-hamilton[visualization] +# optionally install Ray, or Dask, or both +# sf-hamilton[ray] +# sf-hamilton[dask] diff --git a/examples/LLM_Workflows/scraping_and_chunking/spark/spark_pipeline.py b/examples/LLM_Workflows/scraping_and_chunking/spark/spark_pipeline.py index daa73dc9a..c78cc32ef 100644 --- a/examples/LLM_Workflows/scraping_and_chunking/spark/spark_pipeline.py +++ b/examples/LLM_Workflows/scraping_and_chunking/spark/spark_pipeline.py @@ -48,14 +48,9 @@ def urls_from_sitemap( return df -""" -with_columns makes some assumptions: -(a) that all functions in the with_columns subdag take in the dataframe -(b) that all intermediate functions in the subdag need to become columns in the dataframe -(c) it doesn't allow you to wire through inputs to the subdag -""" - - +# with_columns makes some assumptions: +# (a) that all functions in the with_columns subdag take in some part of the dataframe +# (b) that all intermediate functions in the with_columns subdag need to become columns in the dataframe @h_spark.with_columns( *doc_pipeline.spark_safe, select=["article_text", "chunked_text"], @@ -64,6 +59,10 @@ def urls_from_sitemap( def chunked_url_text(urls_from_sitemap: ps.DataFrame) -> ps.DataFrame: """Creates dataframe with chunked text from URLs appended as columns. + I.e. `urls_from_sitemap` declares the dependency, and then + `with_columns` runs and appends columns to it, and then the + internal part of this function is called. + :param urls_from_sitemap: :return: """