From 79f931edac21ff89cdad0cb329a48a282e963275 Mon Sep 17 00:00:00 2001 From: ArslanSaleem Date: Wed, 23 Oct 2024 14:51:20 +0200 Subject: [PATCH 1/8] refactor[summary]: clean up code to remove deprecated processes --- backend/app/processing/file_preprocessing.py | 28 ++-- backend/app/processing/process_queue.py | 21 +-- .../app/{requests.py => requests/__init__.py} | 123 ++---------------- backend/app/requests/schemas.py | 26 ++++ 4 files changed, 65 insertions(+), 133 deletions(-) rename backend/app/{requests.py => requests/__init__.py} (64%) create mode 100644 backend/app/requests/schemas.py diff --git a/backend/app/processing/file_preprocessing.py b/backend/app/processing/file_preprocessing.py index 514877b..c4ac821 100644 --- a/backend/app/processing/file_preprocessing.py +++ b/backend/app/processing/file_preprocessing.py @@ -27,22 +27,22 @@ def process_segmentation(project_id: int, asset_id: int, asset_file_name: str): with SessionLocal() as db: asset_content = project_repository.get_asset_content(db, asset_id) - # segmentation = extract_file_segmentation( - # api_token=api_key, pdf_content=asset_content.content - # ) - vectorstore = ChromaDB(f"panda-etl-{project_id}") - vectorstore.add_docs( - docs=asset_content.content["content"], - metadatas=[ - { + + docs = [] + metadatas = [] + for content in asset_content.content["content"]: + docs.append(content["text"]) + metadatas.append({ "asset_id": asset_id, "filename": asset_file_name, "project_id": project_id, - "page_number": asset_content.content["page_number_data"][index], - } - for index, _ in enumerate(asset_content.content["content"]) - ], + **(content["metadata"] if content.get("metadata") else {"page_number": 1}), # Unpack all metadata or default to page_number: 1 + }) + + vectorstore.add_docs( + docs=docs, + metadatas=metadatas ) project_repository.update_asset_content_status( @@ -88,7 +88,7 @@ def preprocess_file(asset_id: int): while retries < settings.max_retries and not success: try: # Perform the expensive operation here, without holding the DB connection - pdf_content = extract_text_from_file(api_key, asset.path, asset.type) + pdf_content = extract_text_from_file(api_key, asset.path) success = True @@ -111,7 +111,7 @@ def preprocess_file(asset_id: int): if success and pdf_content: with SessionLocal() as db: asset_content = project_repository.update_or_add_asset_content( - db, asset_id, pdf_content + db, asset_id, pdf_content.model_dump() ) # Submit the segmentation task once the content is saved file_segmentation_executor.submit( diff --git a/backend/app/processing/process_queue.py b/backend/app/processing/process_queue.py index 453e6c4..a03112f 100644 --- a/backend/app/processing/process_queue.py +++ b/backend/app/processing/process_queue.py @@ -11,11 +11,9 @@ from app.models import ProcessStatus from app.requests import ( extract_data, - extract_summary_of_summaries, highlight_sentences_in_pdf, ) from datetime import datetime -from app.requests import extract_summary from app.models.process_step import ProcessStepStatus from app.repositories import user_repository from app.config import settings @@ -198,6 +196,7 @@ def process_task(process_id: int): if not all_process_steps_ready: logger.info(f"Process id: [{process.id}] some steps preprocessing is missing moving to waiting queue") process_execution_scheduler.add_process_to_queue(process.id) + db.commit() # Skip status update since not all steps are ready return @@ -345,10 +344,10 @@ def extract_process(api_key, process, process_step, asset_content): vectorstore = ChromaDB(f"panda-etl-{process.project_id}", similarity_threshold=3) all_relevant_docs = [] - for context in data["context"]: - for sources in context: + for references in data.references: + for reference in references: page_numbers = [] - for source_index, source in enumerate(sources["sources"]): + for source_index, source in enumerate(reference.sources): if len(source) < 30: best_match = find_best_match_for_short_reference( source, @@ -357,7 +356,7 @@ def extract_process(api_key, process, process_step, asset_content): process.project_id ) if best_match: - sources["sources"][source_index] = best_match["text"] + reference.sources[source_index] = best_match["text"] page_numbers.append(best_match["page_number"]) else: relevant_docs = vectorstore.get_relevant_docs( @@ -386,7 +385,7 @@ def extract_process(api_key, process, process_step, asset_content): break if not match and len(relevant_docs["documents"][0]) > 0: - sources["sources"][source_index] = relevant_docs["documents"][0][0] + reference.sources[source_index] = relevant_docs["documents"][0][0] if relevant_docs["documents"][0]: page_numbers.append( relevant_docs["metadatas"][0][most_relevant_index]["page_number"] @@ -400,11 +399,13 @@ def extract_process(api_key, process, process_step, asset_content): ) if page_numbers: - sources["page_numbers"] = page_numbers + reference.page_numbers = page_numbers + + data_dict = data.model_dump() return { - "fields": data["fields"], - "context": data["context"], + "fields": data_dict["fields"], + "context": data_dict["references"], } def find_best_match_for_short_reference(source, all_relevant_docs, asset_id, project_id, threshold=0.8): diff --git a/backend/app/requests.py b/backend/app/requests/__init__.py similarity index 64% rename from backend/app/requests.py rename to backend/app/requests/__init__.py index a0f4236..a5cf74e 100644 --- a/backend/app/requests.py +++ b/backend/app/requests/__init__.py @@ -1,7 +1,7 @@ import json import os -from typing import List from app.exceptions import CreditLimitExceededException +from .schemas import ExtractFieldsResponse, TextExtractionResponse import requests from app.config import settings from app.logger import Logger @@ -31,7 +31,7 @@ def request_api_key(email: str): return data.get("message", "No message in response") -def extract_text_from_file(api_token: str, file_path: str, type: str): +def extract_text_from_file(api_token: str, file_path: str, metadata: bool=True) -> TextExtractionResponse: # Prepare the headers with the Bearer token headers = {"x-authorization": f"Bearer {api_token}"} files = {} @@ -39,15 +39,17 @@ def extract_text_from_file(api_token: str, file_path: str, type: str): files["file"] = (os.path.basename(file_path), file) response = requests.post( - f"{settings.pandaetl_server_url}/v1/extract/file/content", + f"{settings.pandaetl_server_url}/v1/parse", files=files, headers=headers, timeout=360, + params={"metadata": metadata} ) # Check the response status code if response.status_code == 201 or response.status_code == 200: - return response.json() + data = response.json() + return TextExtractionResponse(**data) else: logger.error( f"Unable to process file ${file_path} during text extraction. It returned {response.status_code} code: {response.text}" @@ -55,7 +57,7 @@ def extract_text_from_file(api_token: str, file_path: str, type: str): raise Exception("Unable to process file!") -def extract_data(api_token, fields, file_path=None, pdf_content=None): +def extract_data(api_token, fields, file_path=None, pdf_content=None) -> ExtractFieldsResponse: fields_data = fields if isinstance(fields, str) else json.dumps(fields) # Prepare the headers with the Bearer token @@ -82,11 +84,17 @@ def extract_data(api_token, fields, file_path=None, pdf_content=None): data=data, headers=headers, timeout=360, + params={"references": True} ) # Check the response status code if response.status_code == 201 or response.status_code == 200: - return response.json() + + data = response.json() + + return ExtractFieldsResponse( + **data + ) elif response.status_code == 402: raise CreditLimitExceededException( @@ -125,72 +133,6 @@ def extract_field_descriptions(api_token, fields): raise Exception("Unable to process file!") -def extract_summary(api_token, config, file_path=None, pdf_content=None): - config_data = config if isinstance(config, str) else json.dumps(config) - pdf_content_data = ( - config if isinstance(pdf_content, str) else json.dumps(pdf_content) - ) - - # Prepare the headers with the Bearer token - headers = {"x-authorization": f"Bearer {api_token}"} - - # Prepare the data and files dictionaries - data = {"config": config_data} - files = {} - - if file_path: - if not os.path.isfile(file_path): - raise FileNotFoundError(f"The file at {file_path} does not exist.") - - file = open(file_path, "rb") - files["file"] = (os.path.basename(file_path), file) - - elif pdf_content: - data["content"] = pdf_content_data - - # Send the request - response = requests.post( - f"{settings.pandaetl_server_url}/v1/extract/summary", - files=files if files else None, - data=data, - headers=headers, - timeout=360, - ) - # Check the response status code - if response.status_code == 201 or response.status_code == 200: - return response.json() - else: - logger.error( - f"Unable to process file ${file_path} during summary generation. It returned {response.status_code} code: {response.text}" - ) - raise Exception("Unable to process file!") - - -def extract_summary_of_summaries(api_token: str, summaries: List[str], prompt: str): - - # Prepare the headers with the Bearer token - headers = {"x-authorization": f"Bearer {api_token}"} - - # Prepare the data and files dictionaries - data = {"summaries": summaries, "prompt": prompt} - - # Send the request - response = requests.post( - f"{settings.pandaetl_server_url}/v1/extract/summary-of-summaries", - json=data, - headers=headers, - timeout=360, - ) - # Check the response status code - if response.status_code == 201 or response.status_code == 200: - return response.json() - else: - logger.error( - f"Unable to process files during summary of summaries generation. It returned {response.status_code} code: {response.text}" - ) - raise Exception("Unable to process file!") - - def highlight_sentences_in_pdf(api_token, sentences, file_path, output_path): # Prepare the headers with the Bearer token headers = {"x-authorization": f"Bearer {api_token}"} @@ -228,43 +170,6 @@ def highlight_sentences_in_pdf(api_token, sentences, file_path, output_path): raise Exception("Unable to process file!") -def extract_file_segmentation(api_token, file_path=None, pdf_content=None): - - # Prepare the headers with the Bearer token - headers = {"x-authorization": f"Bearer {api_token}"} - - # Prepare the data and files dictionaries - data = {} - files = {} - - if file_path: - if not os.path.isfile(file_path): - raise FileNotFoundError(f"The file at {file_path} does not exist.") - - file = open(file_path, "rb") - files["file"] = (os.path.basename(file_path), file) - - elif pdf_content: - data["pdf_content"] = json.dumps(pdf_content) - - # Send the request - response = requests.post( - f"{settings.pandaetl_server_url}/v1/extract/file/segment", - files=files if files else None, - data=data, - headers=headers, - timeout=360, - ) - # Check the response status code - if response.status_code == 201 or response.status_code == 200: - return response.json() - else: - logger.error( - f"Unable to process file ${file_path} during file segmentation. It returned {response.status_code} code: {response.text}" - ) - raise Exception("Unable to process file!") - - def chat_query(api_token, query, docs): # Prepare the headers with the Bearer token diff --git a/backend/app/requests/schemas.py b/backend/app/requests/schemas.py new file mode 100644 index 0000000..f5243d3 --- /dev/null +++ b/backend/app/requests/schemas.py @@ -0,0 +1,26 @@ +from typing import Dict, List, Optional +from pydantic import BaseModel + + +class SentenceMetadata(BaseModel): + page_number: Optional[int] = None + +class StructuredSentence(BaseModel): + text: str + metadata: Optional[SentenceMetadata] = None + +class TextExtractionResponse(BaseModel): + content: List[StructuredSentence] + word_count: int + lang: str + + +class ReferenceData(BaseModel): + name: str + sources: List[str] + page_numbers: Optional[List[int]] = [] + + +class ExtractFieldsResponse(BaseModel): + fields: List[Dict] + references: Optional[List[List[ReferenceData]]] From c810bc0f8110067cae8578f026ce5fd3d9f147a1 Mon Sep 17 00:00:00 2001 From: ArslanSaleem Date: Wed, 23 Oct 2024 17:43:17 +0200 Subject: [PATCH 2/8] fix leftovers --- backend/app/api/v1/chat.py | 2 -- backend/app/utils.py | 1 - 2 files changed, 3 deletions(-) diff --git a/backend/app/api/v1/chat.py b/backend/app/api/v1/chat.py index 25c7c01..f6854e0 100644 --- a/backend/app/api/v1/chat.py +++ b/backend/app/api/v1/chat.py @@ -118,8 +118,6 @@ def chat(project_id: int, chat_request: ChatRequest, db: Session = Depends(get_d if clean_text(original_sentence) in clean_text(sent): best_match_index = index - print(best_match_index) - print(len(doc_metadata)) metadata = doc_metadata[best_match_index] sent = doc_sent[best_match_index] diff --git a/backend/app/utils.py b/backend/app/utils.py index 48b3509..80a8a7e 100644 --- a/backend/app/utils.py +++ b/backend/app/utils.py @@ -51,7 +51,6 @@ def fetch_html_and_save(url, file_path): ), "Referer": url, "Accept-Language": "en-US,en;q=0.9", - # "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", } From 6809df8324f89c0151b333ec276fa482c36c441d Mon Sep 17 00:00:00 2001 From: ArslanSaleem Date: Wed, 23 Oct 2024 17:45:17 +0200 Subject: [PATCH 3/8] fix leftovers --- backend/app/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/app/utils.py b/backend/app/utils.py index 80a8a7e..32c0454 100644 --- a/backend/app/utils.py +++ b/backend/app/utils.py @@ -58,8 +58,6 @@ def fetch_html_and_save(url, file_path): response = session.get(url, headers=headers) response.raise_for_status() - print(response.content) - # Save the content to a file with open(file_path, "wb") as file: file.write(response.content) From d79f8bb3100072e9cf2c4eb45e033f8ff0c6e9b5 Mon Sep 17 00:00:00 2001 From: ArslanSaleem Date: Wed, 23 Oct 2024 17:47:15 +0200 Subject: [PATCH 4/8] fix leftovers --- backend/app/processing/process_queue.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/backend/app/processing/process_queue.py b/backend/app/processing/process_queue.py index dfea54d..012d79e 100644 --- a/backend/app/processing/process_queue.py +++ b/backend/app/processing/process_queue.py @@ -263,18 +263,15 @@ def extract_process(api_key, process, process_step, asset_content): for source_index, source in enumerate(reference.sources): if len(source) < 30: - print("Here...") best_match = find_best_match_for_short_reference( source, all_relevant_docs, process_step.asset.id, process.project_id ) - print("Done!~") if best_match: reference.sources[source_index] = best_match["text"] page_numbers.append(best_match["page_number"]) - print("Done!~", best_match["page_number"]) else: relevant_docs = vectorstore.get_relevant_docs( From d966fb5de67404339946c2001e392837c4f9aea3 Mon Sep 17 00:00:00 2001 From: ArslanSaleem Date: Wed, 23 Oct 2024 22:23:29 +0200 Subject: [PATCH 5/8] refactor: code improvements --- backend/app/processing/file_preprocessing.py | 3 ++- backend/app/processing/process_queue.py | 2 +- backend/app/requests/__init__.py | 4 ++-- backend/tests/processing/test_process_queue.py | 2 ++ 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/backend/app/processing/file_preprocessing.py b/backend/app/processing/file_preprocessing.py index c4ac821..0da0ee2 100644 --- a/backend/app/processing/file_preprocessing.py +++ b/backend/app/processing/file_preprocessing.py @@ -1,4 +1,5 @@ from concurrent.futures import ThreadPoolExecutor +from app.requests.schemas import TextExtractionResponse from sqlalchemy.orm.exc import ObjectDeletedError from app.models.asset_content import AssetProcessingStatus from app.database import SessionLocal @@ -88,7 +89,7 @@ def preprocess_file(asset_id: int): while retries < settings.max_retries and not success: try: # Perform the expensive operation here, without holding the DB connection - pdf_content = extract_text_from_file(api_key, asset.path) + pdf_content: TextExtractionResponse = extract_text_from_file(api_key, asset.path) success = True diff --git a/backend/app/processing/process_queue.py b/backend/app/processing/process_queue.py index 012d79e..4d96471 100644 --- a/backend/app/processing/process_queue.py +++ b/backend/app/processing/process_queue.py @@ -242,7 +242,7 @@ def extract_process(api_key, process, process_step, asset_content): if not pdf_content: pdf_content = ( - "\n".join(item["text"] for item in asset_content.content["content"] if "text" in item) + "\n".join(item["text"] for item in asset_content.content.get("content", []) if "text" in item) if asset_content.content else None ) diff --git a/backend/app/requests/__init__.py b/backend/app/requests/__init__.py index a5cf74e..9a1bc91 100644 --- a/backend/app/requests/__init__.py +++ b/backend/app/requests/__init__.py @@ -71,8 +71,8 @@ def extract_data(api_token, fields, file_path=None, pdf_content=None) -> Extract if not os.path.isfile(file_path): raise FileNotFoundError(f"The file at {file_path} does not exist.") - file = open(file_path, "rb") - files["file"] = (os.path.basename(file_path), file) + with open(file_path, "rb") as file: + files["file"] = (os.path.basename(file_path), file) elif pdf_content: data["pdf_content"] = pdf_content diff --git a/backend/tests/processing/test_process_queue.py b/backend/tests/processing/test_process_queue.py index d898789..081c833 100644 --- a/backend/tests/processing/test_process_queue.py +++ b/backend/tests/processing/test_process_queue.py @@ -63,9 +63,11 @@ def test_extract_process(mock_chroma, mock_extract_data): result = extract_process("api_key", process, process_step, asset_content) + print(result["context"] ) assert "fields" in result assert "context" in result assert result["fields"] == [{"field1": "value1"}] + assert result["context"] == [[{'name': 'ESG_Reporting_Assurance', 'sources': ['Assurance'], 'page_numbers': []}]] mock_extract_data.assert_called_once() mock_chroma_instance.get_relevant_docs.assert_called() From 3e1d563d8de3fdb5ac69e4ba42e2be15b115a755 Mon Sep 17 00:00:00 2001 From: ArslanSaleem Date: Wed, 23 Oct 2024 22:38:27 +0200 Subject: [PATCH 6/8] remove extra print statement --- backend/tests/processing/test_process_queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/tests/processing/test_process_queue.py b/backend/tests/processing/test_process_queue.py index 081c833..5002220 100644 --- a/backend/tests/processing/test_process_queue.py +++ b/backend/tests/processing/test_process_queue.py @@ -63,7 +63,6 @@ def test_extract_process(mock_chroma, mock_extract_data): result = extract_process("api_key", process, process_step, asset_content) - print(result["context"] ) assert "fields" in result assert "context" in result assert result["fields"] == [{"field1": "value1"}] From 83a0ee0f1593871d6aafca8a4a35485edfcbb5ab Mon Sep 17 00:00:00 2001 From: ArslanSaleem Date: Wed, 23 Oct 2024 23:06:08 +0200 Subject: [PATCH 7/8] refactor: cleanup some potential bug --- backend/app/requests/__init__.py | 19 ++++++++++--------- backend/app/requests/schemas.py | 2 +- .../tests/processing/test_process_queue.py | 3 ++- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/backend/app/requests/__init__.py b/backend/app/requests/__init__.py index 9a1bc91..3c2f833 100644 --- a/backend/app/requests/__init__.py +++ b/backend/app/requests/__init__.py @@ -35,16 +35,17 @@ def extract_text_from_file(api_token: str, file_path: str, metadata: bool=True) # Prepare the headers with the Bearer token headers = {"x-authorization": f"Bearer {api_token}"} files = {} - file = open(file_path, "rb") - files["file"] = (os.path.basename(file_path), file) - response = requests.post( - f"{settings.pandaetl_server_url}/v1/parse", - files=files, - headers=headers, - timeout=360, - params={"metadata": metadata} - ) + with open(file_path, "rb") as file: + files["file"] = (os.path.basename(file_path), file) + + response = requests.post( + f"{settings.pandaetl_server_url}/v1/parse", + files=files, + headers=headers, + timeout=360, + params={"metadata": metadata} + ) # Check the response status code if response.status_code == 201 or response.status_code == 200: diff --git a/backend/app/requests/schemas.py b/backend/app/requests/schemas.py index f5243d3..237f11e 100644 --- a/backend/app/requests/schemas.py +++ b/backend/app/requests/schemas.py @@ -18,7 +18,7 @@ class TextExtractionResponse(BaseModel): class ReferenceData(BaseModel): name: str sources: List[str] - page_numbers: Optional[List[int]] = [] + page_numbers: Optional[List[int]] = None class ExtractFieldsResponse(BaseModel): diff --git a/backend/tests/processing/test_process_queue.py b/backend/tests/processing/test_process_queue.py index 5002220..043fb8b 100644 --- a/backend/tests/processing/test_process_queue.py +++ b/backend/tests/processing/test_process_queue.py @@ -66,7 +66,8 @@ def test_extract_process(mock_chroma, mock_extract_data): assert "fields" in result assert "context" in result assert result["fields"] == [{"field1": "value1"}] - assert result["context"] == [[{'name': 'ESG_Reporting_Assurance', 'sources': ['Assurance'], 'page_numbers': []}]] + print(result["context"]) + assert result["context"] == [[{'name': 'ESG_Reporting_Assurance', 'sources': ['Assurance'], 'page_numbers': None}]] mock_extract_data.assert_called_once() mock_chroma_instance.get_relevant_docs.assert_called() From c595ba39c47ca8c054a93dd03c39663620bcd10c Mon Sep 17 00:00:00 2001 From: ArslanSaleem Date: Thu, 24 Oct 2024 08:02:59 +0200 Subject: [PATCH 8/8] fix: remove extra print statement --- backend/tests/processing/test_process_queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/tests/processing/test_process_queue.py b/backend/tests/processing/test_process_queue.py index 043fb8b..07f269b 100644 --- a/backend/tests/processing/test_process_queue.py +++ b/backend/tests/processing/test_process_queue.py @@ -66,7 +66,6 @@ def test_extract_process(mock_chroma, mock_extract_data): assert "fields" in result assert "context" in result assert result["fields"] == [{"field1": "value1"}] - print(result["context"]) assert result["context"] == [[{'name': 'ESG_Reporting_Assurance', 'sources': ['Assurance'], 'page_numbers': None}]] mock_extract_data.assert_called_once() mock_chroma_instance.get_relevant_docs.assert_called()