Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spark pdf summarizer example #268

Merged
merged 3 commits into from
Aug 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/LLM_Workflows/pdf_summarizer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ or you can do `docker compose logs -f` to tail the logs (ctrl+c to stop tailing
3. Uncomment dagworks-sdk in `requirements.txt`.
4. Uncomment the lines in server.py to replace `sync_dr` with the DAGWorks Driver.
5. Rebuild the docker images `docker compose up -d --build`.

# Running on Spark!
Yes, that's right, you can also run the exact same code on spark! It's just a oneline
code change. See the [run_on_spark README](run_on_spark/README.md) for more details.
4 changes: 1 addition & 3 deletions examples/LLM_Workflows/pdf_summarizer/backend/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@


# define constants for Hamilton driver
driver_config = dict(
file_type="pdf",
)
driver_config = dict(file_type="pdf")

# instantiate the Hamilton driver; it will power all API endpoints
# async driver for use with async functions
Expand Down
20 changes: 11 additions & 9 deletions examples/LLM_Workflows/pdf_summarizer/backend/summarization.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import concurrent
import tempfile
from typing import Generator
from typing import Generator, Union

import openai
import tiktoken
Expand All @@ -26,10 +26,10 @@ def summarize_text_from_summaries_prompt(content_type: str = "an academic paper"


@config.when(file_type="pdf")
def raw_text(pdf_source: str | bytes | tempfile.SpooledTemporaryFile) -> str:
def raw_text(pdf_source: Union[str, bytes, tempfile.SpooledTemporaryFile]) -> str:
"""Takes a filepath to a PDF and returns a string of the PDF's contents
:param pdf_source: Series of filepaths to PDFs
:return: Series of strings of the PDFs' contents
:param pdf_source: the path, or the temporary file, to the PDF.
:return: the text of the PDF.
"""
reader = PdfReader(pdf_source)
_pdf_text = ""
Expand Down Expand Up @@ -67,10 +67,10 @@ def _create_chunks(text: str, n: int, tokenizer: tiktoken.Encoding) -> Generator


def chunked_text(
raw_text: str, max_token_length: int = 1500, tokenizer_encoding: str = "cl100k_base"
raw_text: str, tokenizer_encoding: str = "cl100k_base", max_token_length: int = 1500
) -> list[str]:
"""Chunks the pdf text into smaller chunks of size max_token_length.
:param pdf_text: the Series of individual pdf texts to chunk.
:param raw_text: the Series of individual pdf texts to chunk.
:param max_token_length: the maximum length of tokens in each chunk.
:param tokenizer_encoding: the encoding to use for the tokenizer.
:return: Series of chunked pdf text. Each element is a list of chunks.
Expand Down Expand Up @@ -102,7 +102,7 @@ def summarized_chunks(
"""Summarizes a series of chunks of text.
Note: this takes the first result from the top_n_related_articles series and summarizes it. This is because
the top_n_related_articles series is sorted by relatedness, so the first result is the most related.
:param top_n_related_articles: series with each entry being a list of chunks of text for an article.
:param chunked_text: a list of chunks of text for an article.
:param summarize_chunk_of_text_prompt: the prompt to use to summarize each chunk of text.
:param openai_gpt_model: the openai gpt model to use.
:return: a single string of each chunk of text summarized, concatenated together.
Expand All @@ -125,12 +125,14 @@ def summarized_chunks(


def prompt_and_text_content(
summarize_text_from_summaries_prompt: str, user_query: str, summarized_chunks: str
summarized_chunks: str,
summarize_text_from_summaries_prompt: str,
user_query: str,
) -> str:
"""Creates the prompt for summarizing the text from the summarized chunks of the pdf.
:param summarized_chunks: a long string of chunked summaries of a file.
:param summarize_text_from_summaries_prompt: the template to use to summarize the chunks.
:param user_query: the original user query.
:param summarized_chunks: a long string of chunked summaries of a file.
:return: the prompt to use to summarize the chunks.
"""
return summarize_text_from_summaries_prompt.format(query=user_query, results=summarized_chunks)
Expand Down
65 changes: 65 additions & 0 deletions examples/LLM_Workflows/pdf_summarizer/run_on_spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# PDF Summarizer on Spark

Here we show how you can run the same Hamilton dataflow, that we defined in the backend
folder, on Spark. This is useful if you want to run the same dataflow on a larger dataset,
or have to run it on a cluster. Importantly this means you don't have to rewrite your
code, or have to change where/how you develop!

![Summarization dataflow](spark_summarization.dot.png)

# File organization
- `summarization.py` this should be a carbon copy of the one in the backend folder.
- `run.py` this contains the code to create a spark job and run the summarization code.

# How this works
We take the dataflow defined by `summarization.py` and execute it as a bunch
of row based UDFs on spark. The magic to do this happens in the Hamilton PySparkUDFGraphAdapter.

The premise is that there is a central dataframe
that contains a column that maps to the required input. Which in this example
is `pdf_source`. You can request whatever intermediate outputs as columns, which
in this example we do with `["raw_text", "chunked_text", "summarized_text"]`.

## Running the code

1. Make sure you have the right dependencies installed. You can do this by running
`pip install -r requirements.txt`.
2. Download some PDFs, and then update `run.py` with the paths to them.
3. Then you can run the code with `python run.py`. Be sure to have your OPENAI_API_KEY in the
environment.

# Sharing `summarization.py` in real life
Here in this example, we just copied summarization.py to share the code. In real life
you would most likely:

1. create a python package with your dataflows and publish things that way.
2. or, in lieu of publishing a package, you share code via repository and augment the python path/
zip the code up to share it between the fastAPI service and spark.




# Errors you might encounter:
## Fork error on mac
If you are running spark on a mac, you might get the following error:

```
objc[95025]: +[__NSCFConstantString initialize] may have been in progress in another thread when fork() was called.
objc[95025]: +[__NSCFConstantString initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
```
Export the following environment variable to fix it before running the code:

```bash
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
```

## Pyspark error "got multiple values for argument"
You should not get this error, but you might if you adjust the code.

E.g.
```python
TypeError: prompt_and_text_content() got multiple values for argument 'summarize_text_from_summaries_prompt'
```
Solution -- ensure that what ends up being columns in a dataframe are the leftmost
arguments to each function, and not after any "primitive" arguments. This is because we
bind primitive functions with a kwargs call, but we pass in columns as positional arguments.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
openai
PyPDF2
pyspark
sf-hamilton[visualization]
tenacity
tiktoken
tqdm
# dagworks-sdk>=0.0.14 # uncomment to use DAGWorks driver.
Loading