diff --git a/services/permits/.env-example b/services/permits/.env-example index e1e00dd19e..9af404ca2d 100644 --- a/services/permits/.env-example +++ b/services/permits/.env-example @@ -7,10 +7,9 @@ OBJECT_STORE_ACCESS_KEY_ID= OBJECT_STORE_ACCESS_KEY= OBJECT_STORE_HOST=nrs.objectstore.gov.bc.ca OBJECT_STORE_BUCKET= -PERMIT_SERVICE_ENDPOINT=http://localhost:8004/haystack/file-upload +PERMIT_SERVICE_ENDPOINT=http://localhost:8004 PERMITS_CLIENT_ID=mds-core-api-internal-5194 PERMITS_CLIENT_SECRET= -PERMIT_CSV_ENDPOINT=http://localhost:8004/permit_conditions/csv ELASTICSEARCH_CA_CERT= ELASTICSEARCH_HOST= ELASTICSEARCH_USERNAME= diff --git a/services/permits/.gitignore b/services/permits/.gitignore index 1f7d40f507..d0da42e541 100644 --- a/services/permits/.gitignore +++ b/services/permits/.gitignore @@ -1,2 +1,3 @@ app/cache -app/extract \ No newline at end of file +app/extract +permit_cache \ No newline at end of file diff --git a/services/permits/app/celery.py b/services/permits/app/celery.py new file mode 100644 index 0000000000..17b65c6aa7 --- /dev/null +++ b/services/permits/app/celery.py @@ -0,0 +1,28 @@ +import os + +from celery.app import Celery + +CACHE_REDIS_HOST = os.environ.get('CACHE_REDIS_HOST', 'redis') +CACHE_REDIS_PORT = os.environ.get('CACHE_REDIS_PORT', 6379) +CACHE_REDIS_PASS = os.environ.get('CACHE_REDIS_PASS', 'redis-password') +CACHE_REDIS_URL = 'redis://:{0}@{1}:{2}'.format(CACHE_REDIS_PASS, CACHE_REDIS_HOST, + CACHE_REDIS_PORT) + + +celery_app = Celery(__name__, broker=CACHE_REDIS_URL) + +ca_cert = os.environ.get("ELASTICSEARCH_CA_CERT", None) +host = os.environ.get("ELASTICSEARCH_HOST", None) or "http://elasticsearch:9200" +username = os.environ.get("ELASTICSEARCH_USERNAME", "") +password = os.environ.get("ELASTICSEARCH_PASSWORD", "") + + +scheme, hostname = host.split('://') + +backend_url = f'elasticsearch+{scheme}://{username}:{password}@{hostname}/celery' +celery_app = Celery(__name__, broker=CACHE_REDIS_URL, backend=backend_url) +celery_app.backend.doc_type = None +celery_app.conf.task_default_queue = 'permits' +celery_app.autodiscover_tasks([ + 'app.permit_conditions.tasks', +]) diff --git a/services/permits/app/compare_extraction_results.py b/services/permits/app/compare_extraction_results.py index 19f7192b30..132c69a825 100644 --- a/services/permits/app/compare_extraction_results.py +++ b/services/permits/app/compare_extraction_results.py @@ -4,29 +4,39 @@ # CSV files should have the following columns: section_title, section_paragraph, paragraph_title, subparagraph, clause, subclause, page_number, condition_text ### import argparse +import logging import os +import numpy as np import pandas as pd from app.permit_conditions.validator.permit_condition_model import PermitCondition from diff_match_patch import diff_match_patch from fuzzywuzzy import fuzz from jinja2 import Environment, FileSystemLoader +from pydantic import ValidationError +logger = logging.getLogger(__name__) # Function to create Content instances from a DataFrame def create_content_instances(df): content_list = [] for _, row in df.iterrows(): - content = PermitCondition( - section_title=row["section_title"], - section_paragraph=row["section_paragraph"], - paragraph_title=row["paragraph_title"], - subparagraph=row["subparagraph"], - clause=row["clause"], - subclause=row["subclause"], - page_number=int(row["page_number"]) if row["page_number"] else None, - condition_text=row["condition_text"], - ) + try: + content = PermitCondition( + section_title=row["section_title"], + section_paragraph=row["section_paragraph"], + paragraph_title=row["paragraph_title"], + subparagraph=row["subparagraph"], + clause=row["clause"], + subclause=row["subclause"], + page_number=int(row["page_number"]) if (row.get("page_number") and row['page_number'] != '') else 0, + condition_text=row["condition_text"], + original_condition_text=row["condition_text"], + ) + except ValidationError as e: + logger.error(f"Failed parsing of permit condition: {e}") + logger.error(row) + raise # This will be used as the text for comparison purposes text = f""" @@ -78,11 +88,15 @@ def write_html_report(context, report_prefix): def write_csv_report(comparison_results, report_prefix): + np.set_printoptions(linewidth=1000000) + # Create a DataFrame from the comparison results comparison_df = pd.DataFrame(comparison_results) - comparison_csv_filename = f"{report_prefix}_comparison_report.csv" - comparison_df.to_csv(comparison_csv_filename, index=False) + comparison_csv_filename = f"{report_prefix}_comparison_report.xlsx" + + + comparison_df.to_excel(comparison_csv_filename) return comparison_csv_filename @@ -103,6 +117,8 @@ def validate_condition(csv_pairs): auto_extracted_content = create_content_instances(auto_extracted_df) manual_extracted_content = create_content_instances(manual_extracted_df) + print(f'Found {len(auto_extracted_content)} conditions in {auto_extracted_csv} and {len(manual_extracted_content)} conditions in {manual_extracted_csv}') + # 3. Find missing and added conditions auto_content_dict = { create_comparison_key(content): content @@ -143,10 +159,12 @@ def validate_condition(csv_pairs): comparison_results.append( { "Key": key, - "auto_extracted_condition": "N/A", - "manual_extracted_condition": manual_content_dict[ - key - ].condition_text, + "auto_section_title": "", + "auto_paragraph_title": "", + "auto_extracted_condition": "", + "manual_section_title": manual_content_dict[key].section_title, + "manual_paragraph_title": manual_content_dict[key].paragraph_title, + "manual_extracted_condition": manual_content_dict[key].original_condition_text, "match_percentage": 0, "is_match": False, } @@ -166,8 +184,13 @@ def validate_condition(csv_pairs): comparison_results.append( { "Key": key, - "auto_extracted_condition": auto_content_dict[key].condition_text, - "manual_extracted_condition": "N/A", + "auto_section_title": auto_content_dict[key].section_title, + "auto_paragraph_title": auto_content_dict[key].paragraph_title, + "auto_extracted_condition": auto_content_dict[key].original_condition_text, + "manual_section_title": "", + "manual_paragraph_title": "", + "manual_extracted_condition": "", + "manual_extracted_condition": "", "match_percentage": 0, "is_match": False, } @@ -177,8 +200,6 @@ def validate_condition(csv_pairs): match_results = compare_matching_conditions( auto_content_dict, manual_content_dict, - comparison_results, - context, ) context["comparison_results"] += match_results["context_comparison_results"] @@ -234,6 +255,7 @@ def compare_matching_conditions( total_comparable_conditions = 0 matching_score = 0 + # Compare how well the text matches for conditions that are present in both csvs # and gemerate a html diff for each pair of conditions for key in sorted(manual_content_dict.keys()): @@ -241,7 +263,7 @@ def compare_matching_conditions( total_comparable_conditions += 1 auto_condition_text = auto_content_dict[key].condition_text manual_condition_text = manual_content_dict[key].condition_text - match_percentage = fuzz.ratio(auto_condition_text, manual_condition_text) + match_percentage = fuzz.ratio(auto_condition_text.replace('\n', ''), manual_condition_text.replace('\n', '')) is_match = match_percentage >= 100 if is_match: @@ -250,8 +272,12 @@ def compare_matching_conditions( comparison_results.append( { "Key": key, - "auto_extracted_condition": auto_condition_text, - "manual_extracted_condition": manual_condition_text, + "auto_section_title": auto_content_dict[key].section_title, + "auto_paragraph_title": auto_content_dict[key].paragraph_title, + "auto_extracted_condition": auto_content_dict[key].original_condition_text, + "manual_section_title": manual_content_dict[key].section_title, + "manual_paragraph_title": manual_content_dict[key].paragraph_title, + "manual_extracted_condition": manual_content_dict[key].original_condition_text, "match_percentage": match_percentage, "is_match": is_match, } @@ -267,12 +293,12 @@ def compare_matching_conditions( } ) - return { - "comparison_results": comparison_results, - "matching_score": matching_score, - "total_comparable_conditions": total_comparable_conditions, - "context_comparison_results": context_comparison_results, - } + return { + "comparison_results": comparison_results, + "matching_score": matching_score, + "total_comparable_conditions": total_comparable_conditions, + "context_comparison_results": context_comparison_results, + } def write_reports( diff --git a/services/permits/app/extract_and_validate_pdf.py b/services/permits/app/extract_and_validate_pdf.py index 35e20c3978..75971281a3 100644 --- a/services/permits/app/extract_and_validate_pdf.py +++ b/services/permits/app/extract_and_validate_pdf.py @@ -4,7 +4,9 @@ ### import argparse +import json import os +from time import sleep from app.compare_extraction_results import validate_condition from dotenv import find_dotenv, load_dotenv @@ -19,13 +21,13 @@ PERMITS_CLIENT_SECRET = os.getenv("PERMITS_CLIENT_SECRET", None) TOKEN_URL = os.getenv("TOKEN_URL", None) AUTHORIZATION_URL = os.getenv("AUTHORIZATION_URL", None) -PERMIT_CSV_ENDPOINT = os.getenv("PERMIT_CSV_ENDPOINT", None) +PERMIT_SERVICE_ENDPOINT = os.getenv("PERMIT_SERVICE_ENDPOINT", None) assert PERMITS_CLIENT_ID, "PERMITS_CLIENT_ID is not set" assert PERMITS_CLIENT_SECRET, "PERMITS_CLIENT_SECRET is not set" assert TOKEN_URL, "TOKEN_URL is not set" assert AUTHORIZATION_URL, "AUTHORIZATION_URL is not set" -assert PERMIT_CSV_ENDPOINT, "PERMIT_CSV_ENDPOINT is not set" +assert PERMIT_SERVICE_ENDPOINT, "PERMIT_SERVICE_ENDPOINT is not set" def authenticate_with_oauth(): @@ -41,13 +43,37 @@ def authenticate_with_oauth(): return oauth_session -def extract_conditions_from_pdf(pdf_path, endpoint_url, oauth_session): - # Extract conditions from PDF +def extract_conditions_from_pdf(pdf_path, oauth_session): + # Kick off the permit conditions extraction process with open(pdf_path, "rb") as pdf_file: files = {"file": (os.path.basename(pdf_path), pdf_file, "application/pdf")} - response = oauth_session.post(endpoint_url, files=files) + response = oauth_session.post(f"{PERMIT_SERVICE_ENDPOINT}/permit_conditions", files=files) response.raise_for_status() - return response.content.decode("utf-8") + + task_id = response.json().get('id') + + if not task_id: + raise Exception("Failed to extract conditions from PDF. No task ID returned from permit extractions endpoint.") + + status = None + + # Poll the status endpoint until the task is complete + while status not in ("SUCCESS", "FAILURE"): + sleep(3) + status_response = oauth_session.get(f"{PERMIT_SERVICE_ENDPOINT}/permit_conditions/status", params={"task_id": task_id}) + status_response.raise_for_status() + + status = status_response.json().get('status') + + print(json.dumps(status_response.json(), indent=2)) + + if status != "SUCCESS": + raise Exception(f"Failed to extract conditions from PDF. Task status: {status}") + + success_response = oauth_session.get(f"{PERMIT_SERVICE_ENDPOINT}/permit_conditions/results/csv", params={"task_id": task_id}) + success_response.raise_for_status() + + return success_response.content.decode("utf-8") # Process each pair of PDF and expected CSV @@ -62,7 +88,7 @@ def extract_and_validate_conditions(pdf_csv_pairs): # 1. Extract conditions from PDF extracted_csv_content = extract_conditions_from_pdf( - pdf_path, PERMIT_CSV_ENDPOINT, oauth_session + pdf_path, oauth_session ) # 2. Save the extracted CSV content to a temporary file @@ -76,6 +102,8 @@ def extract_and_validate_conditions(pdf_csv_pairs): validate_condition(pairs) +print(__name__) + if __name__ == "__main__": parser = argparse.ArgumentParser( description="Process PDF documents, extract permit conditions, and validate against expected CSV results." @@ -84,6 +112,7 @@ def extract_and_validate_conditions(pdf_csv_pairs): "--pdf_csv_pairs", nargs=2, action="append", + required=True, help=""" Pairs of PDF file and expected CSV file. Each pair should be specified as two consecutive file paths. Usage: python extract_and_validate_pdf.py --pdf_csv_pairs --pdf_csv_pairs ... diff --git a/services/permits/app/helpers/celery_task_status.py b/services/permits/app/helpers/celery_task_status.py new file mode 100644 index 0000000000..7686b3f352 --- /dev/null +++ b/services/permits/app/helpers/celery_task_status.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class CeleryTaskStatus(Enum): + PENDING = "PENDING" + STARTED = "STARTED" + RETRY = "RETRY" + FAILURE = "FAILURE" + SUCCESS = "SUCCESS" diff --git a/services/permits/app/helpers/temporary_file.py b/services/permits/app/helpers/temporary_file.py index 0c9ffe2c30..ec2ae6702d 100644 --- a/services/permits/app/helpers/temporary_file.py +++ b/services/permits/app/helpers/temporary_file.py @@ -1,10 +1,15 @@ +import os import tempfile from fastapi import UploadFile +FILE_UPLOAD_PATH = os.environ.get("FILE_UPLOAD_PATH", "/file-uploads") -def store_temporary(file: UploadFile): - tmp = tempfile.NamedTemporaryFile() +assert FILE_UPLOAD_PATH, "FILE_UPLOAD_PATH is not set in the environment" + +def store_temporary(file: UploadFile, suffix: str = "") -> tempfile.NamedTemporaryFile: + + tmp = tempfile.NamedTemporaryFile(suffix=suffix, dir=FILE_UPLOAD_PATH, delete=False) with open(tmp.name, "w") as f: while contents := file.file.read(1024 * 1024): diff --git a/services/permits/app/permit_conditions/context.py b/services/permits/app/permit_conditions/context.py new file mode 100644 index 0000000000..cd7c859622 --- /dev/null +++ b/services/permits/app/permit_conditions/context.py @@ -0,0 +1,3 @@ +from contextvars import ContextVar + +context = ContextVar("permit_conditions_context") diff --git a/services/permits/app/permit_conditions/converters/pdf_to_text_converter.py b/services/permits/app/permit_conditions/converters/pdf_to_text_converter.py index 94d628c947..8862891c17 100644 --- a/services/permits/app/permit_conditions/converters/pdf_to_text_converter.py +++ b/services/permits/app/permit_conditions/converters/pdf_to_text_converter.py @@ -1,9 +1,11 @@ import logging import os from pathlib import Path +from time import sleep from typing import Any, Dict, List, Optional import ocrmypdf +from app.permit_conditions.context import context from haystack import Document, component, logging from pypdf import PdfReader from pypdf.errors import PdfReadError @@ -35,6 +37,8 @@ def run( id_hash_keys: Optional[List[str]] = None, documents: Optional[List[Document]] = None, ) -> List[Document]: + context.get().update_state(state="PROGRESS", meta={"stage": "pdf_to_text_converter"}) + if not documents: pages = self._read_pdf( file_path, @@ -61,11 +65,7 @@ def _read_pdf(self, file_path: Path) -> List[str]: pdf_reader = PdfReader(doc) for idx, page in enumerate(pdf_reader.pages): try: - page_text = page.extract_text( - extraction_mode="layout", - layout_mode_space_vertically=False, - layout_mode_scale_weight=0.5, - ) + page_text = page.extract_text() pages.append(page_text) if DEBUG_MODE: fn = f"debug/pdfreader-{idx}.txt" diff --git a/services/permits/app/permit_conditions/pipelines/CachedAzureOpenAIChatGenerator.py b/services/permits/app/permit_conditions/pipelines/CachedAzureOpenAIChatGenerator.py index bba4059f10..288df46be1 100644 --- a/services/permits/app/permit_conditions/pipelines/CachedAzureOpenAIChatGenerator.py +++ b/services/permits/app/permit_conditions/pipelines/CachedAzureOpenAIChatGenerator.py @@ -35,6 +35,10 @@ def hash_messages(messages): @component class CachedAzureOpenAIChatGenerator(AzureOpenAIChatGenerator): + + def __init__(self, **kwargs): + super(CachedAzureOpenAIChatGenerator, self).__init__(**kwargs) + self.it = 0 """ A class that represents a cached version of the AzureOpenAIChatGenerator. @@ -86,6 +90,7 @@ def fetch_result(self, messages, generation_kwargs): @component.output_types(data=ChatData) def run(self, data: ChatData, generation_kwargs=None, iteration=0): + self.it+=1 """ Runs the chat generation process. @@ -128,4 +133,8 @@ def run(self, data: ChatData, generation_kwargs=None, iteration=0): reply.meta["usage"]["prompt_tokens"] = prompt_tokens reply.meta["usage"]["total_tokens"] = total_tokens + if DEBUG_MODE: + with open(f"debug/cached_azure_openai_chat_generator_output_{iteration}.txt", "w") as f: + f.write(reply.content) + return {"data": ChatData([reply], data.documents)} diff --git a/services/permits/app/permit_conditions/resources/permit_condition_resource.py b/services/permits/app/permit_conditions/resources/permit_condition_resource.py index ae1858a576..717e3adf98 100644 --- a/services/permits/app/permit_conditions/resources/permit_condition_resource.py +++ b/services/permits/app/permit_conditions/resources/permit_condition_resource.py @@ -1,17 +1,22 @@ import csv import json +import os import tempfile from io import StringIO +from typing import Optional, Union +from app.helpers.celery_task_status import CeleryTaskStatus from app.helpers.temporary_file import store_temporary from app.permit_conditions.pipelines.permit_condition_pipeline import ( permit_condition_pipeline, ) +from app.permit_conditions.tasks.tasks import run_permit_condition_pipeline from app.permit_conditions.validator.permit_condition_model import ( PermitCondition, PermitConditions, ) from fastapi import APIRouter, File, HTTPException, Response, UploadFile +from fastapi.responses import JSONResponse from pydantic import BaseModel router = APIRouter() @@ -20,17 +25,25 @@ logger = logging.getLogger(__name__) +class JobStatus(BaseModel): + id: str + status: str + meta: Optional[dict] = None + +class InProgressJobStatusResponse(BaseModel): + detail: str + @router.post("/permit_conditions") -async def extract_permit_conditions(file: UploadFile = File(...)) -> PermitConditions: +async def extract_permit_conditions(file: UploadFile = File(...)) -> JobStatus: """ - Extracts permit conditions from a file. + Asynchronously extracts permit conditions from the given PDF file. Args: file (UploadFile): The file to extract permit conditions from. Returns: - dict: A dictionary containing the extracted permit conditions. + dict: A dictionary containing the id of the job and its status. Raises: Any exceptions that occur during the extraction process. @@ -42,56 +55,86 @@ async def extract_permit_conditions(file: UploadFile = File(...)) -> PermitCondi # Write the uploaded file to a temporary file # so it can be processed by the pipeline. - tmp = store_temporary(file) + tmp = store_temporary(file, suffix=".pdf") try: - pipeline = permit_condition_pipeline() - - return pipeline.run( - { - "pdf_converter": {"file_path": tmp.name}, - "prompt_builder": { - "template_variables": { - "max_pages": 6, - } - }, - } - )["validator"] - finally: + res = run_permit_condition_pipeline.delay(tmp.name, {"original_file_name": file.filename, 'size': file.size, 'content_type': file.content_type}) + return JobStatus(id=res.task_id, status=res.status, meta=res.info) + except: tmp.close() + raise +@router.get("/permit_conditions/status") +def status(task_id: str) -> JobStatus: + """ + Get the status of a permit conditions extraction job. + Args: + The task ID of the job. + """ + res = run_permit_condition_pipeline.app.AsyncResult(task_id) + return JobStatus(id=res.task_id, status=res.status, meta=res.info) -@router.post("/permit_conditions/csv") -async def extract_permit_conditions_as_csv( - file: UploadFile = File(...), -) -> PermitConditions: - # Extract permit conditions as a dictionary - conditions = (await extract_permit_conditions(file))["conditions"] +@router.get("/permit_conditions/results", response_model=PermitConditions, responses={202: {"model": InProgressJobStatusResponse}}) +def results(task_id: str) -> PermitConditions: + """ + Get the results of a permit conditions extraction job. + Args: + The task ID of the job. + Raises: + 400 Bad Request: If the task has not completed successfully + """ + res = run_permit_condition_pipeline.app.AsyncResult(task_id) - # Create a StringIO object to write CSV data - csv_data = StringIO() - # Define the fieldnames for the CSV file - fieldnames = list(PermitCondition.schema()["properties"].keys()) + if res.status == CeleryTaskStatus.SUCCESS.name: + return res.get() + elif res.status == CeleryTaskStatus.FAILURE.name: + raise HTTPException(500, detail=f"Task failed to complete: {res.status}") + else: + return JSONResponse(status_code=202, content={"detail": f"Task has not completed yet. Current status: {res.status}"}) - # Create a CSV writer - csv_writer = csv.DictWriter(csv_data, fieldnames=fieldnames) +@router.get("/permit_conditions/results/csv", responses={202: {"model": InProgressJobStatusResponse}}) +def results(task_id: str) -> str: + """ + Get the results of a permit conditions extraction job in a csv format + Args: + The task ID of the job. + Raises: + 400 Bad Request: If the task has not completed successfully + """ + res = run_permit_condition_pipeline.app.AsyncResult(task_id) + + if res.status == CeleryTaskStatus.SUCCESS.name: + conditions = PermitConditions(conditions=res.get()['conditions']) - # Write the header row - csv_writer.writeheader() + # Create a StringIO object to write CSV data + csv_data = StringIO() + # Define the fieldnames for the CSV file + fieldnames = list(PermitCondition.model_json_schema()["properties"].keys()) - # Write each condition as a row in the CSV file - for condition in conditions: - csv_writer.writerow(json.loads(condition.json())) + # Create a CSV writer + csv_writer = csv.DictWriter(csv_data, fieldnames=fieldnames) - # Reset the StringIO object to the beginning - csv_data.seek(0) + # Write the header row + csv_writer.writeheader() + + # Write each condition as a row in the CSV file + for condition in conditions.conditions: + csv_writer.writerow(json.loads(condition.model_dump_json())) + + # Reset the StringIO object to the beginning + csv_data.seek(0) + + # Return the CSV file as a response + return Response( + content=csv_data.getvalue(), + media_type="text/csv", + headers={"Content-Disposition": 'attachment; filename="permit_conditions.csv"'}, + ) - # Return the CSV file as a response - return Response( - content=csv_data.getvalue(), - media_type="text/csv", - headers={"Content-Disposition": 'attachment; filename="permit_conditions.csv"'}, - ) + elif res.status == CeleryTaskStatus.FAILURE.name: + raise HTTPException(500, detail=f"Task failed to complete: {res.status}") + else: + return JSONResponse(status_code=202, content={"detail": f"Task has not completed yet. Current status: {res.status}"}) @router.get("/permit_conditions/flow") diff --git a/services/permits/app/permit_conditions/tasks/__init__.py b/services/permits/app/permit_conditions/tasks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/services/permits/app/permit_conditions/tasks/tasks.py b/services/permits/app/permit_conditions/tasks/tasks.py new file mode 100644 index 0000000000..bb6f72c234 --- /dev/null +++ b/services/permits/app/permit_conditions/tasks/tasks.py @@ -0,0 +1,49 @@ +from contextlib import contextmanager +from contextvars import ContextVar +from time import sleep + +from app.celery import celery_app +from app.permit_conditions.context import context +from app.permit_conditions.pipelines.permit_condition_pipeline import ( + permit_condition_pipeline, +) + + +@contextmanager +def task_context(task): + # Creates a context that is bound to the given task so the task can be accessed + # by each step of the pipeline (for example to update the task state / meta (e.g. for a progress indicator)) + # usage: + # with task_context(self): + # .... + # from app.permit_conditions.context import context + # context.get().update_state(state="PROGRESS", meta={"stage": "pdf_to_text_converter"}) + + t = context.set(task) + + try: + yield + finally: + context.reset(t) + +@celery_app.task(bind=True) +def run_permit_condition_pipeline(self, file_name: str, meta: dict): + with task_context(self): + pipeline = permit_condition_pipeline() + + self.update_state(state="PROGRESS", meta={"stage": "start", file_name: file_name, **meta}) + + result = pipeline.run( + { + "pdf_converter": {"file_path": file_name}, + "prompt_builder": { + "template_variables": { + "max_pages": 6, + } + }, + } + )["validator"] + + conditions = result["conditions"] + + return conditions.model_dump() diff --git a/services/permits/app/permit_conditions/validator/permit_condition_model.py b/services/permits/app/permit_conditions/validator/permit_condition_model.py index bda5e35276..610b3a8f1c 100644 --- a/services/permits/app/permit_conditions/validator/permit_condition_model.py +++ b/services/permits/app/permit_conditions/validator/permit_condition_model.py @@ -23,8 +23,9 @@ class PermitCondition(BaseModel): subparagraph: str = None clause: str = None subclause: str = None - page_number: int = None + page_number: Optional[int] = None condition_text: str = None + original_condition_text:Optional[str] = None class PermitConditions(BaseModel): diff --git a/services/permits/app/permit_conditions/validator/permit_condition_validator.py b/services/permits/app/permit_conditions/validator/permit_condition_validator.py index 5e363e6050..1bccfc234a 100644 --- a/services/permits/app/permit_conditions/validator/permit_condition_validator.py +++ b/services/permits/app/permit_conditions/validator/permit_condition_validator.py @@ -101,7 +101,7 @@ def run(self, data: ChatData): # If there are no more pages to process, return the conditions found all_replies = self.documents + conditions - return {"conditions": all_replies} + return {"conditions": PermitConditions(conditions=all_replies)} def _parse_reply(self, reply) -> List[PromptResponse]: try: @@ -118,7 +118,11 @@ def _parse_reply(self, reply) -> List[PromptResponse]: else: conditions.append(condition) - response = PermitConditions.parse_obj({"conditions": conditions}) + for c in conditions: + if c.get('page_number') == '': + c['page_number'] = None + + response = PermitConditions.model_validate({"conditions": conditions}) return response.conditions except Exception as e: diff --git a/services/permits/app/tests/test_permit_condition_resource.py b/services/permits/app/tests/test_permit_condition_resource.py new file mode 100644 index 0000000000..9518e4f5e8 --- /dev/null +++ b/services/permits/app/tests/test_permit_condition_resource.py @@ -0,0 +1,130 @@ +from unittest import mock + +import httpx +import pytest +from app.app import mds # Import your FastAPI app here +from app.helpers.celery_task_status import CeleryTaskStatus +from app.permit_conditions.tasks.tasks import run_permit_condition_pipeline +from fastapi.testclient import TestClient + +# Create a TestClient using the FastAPI app +client = TestClient(mds) + +@pytest.fixture +def mock_run_permit_condition_pipeline(mocker): + pending_task = mock.MagicMock(task_id='123', status='PENDING', info={}) + return mocker.patch.object(run_permit_condition_pipeline, 'delay', return_value=pending_task) + +@pytest.fixture +def mock_store_temporary(mocker): + mocked_file = mock.MagicMock(name='abc.123.pdf', close=mock.MagicMock()) + return mocker.patch('app.permit_conditions.resources.permit_condition_resource.store_temporary', return_value=mocked_file) + +@pytest.fixture +def mock_async_result(mocker): + mock_result = mocker.patch( + 'app.permit_conditions.tasks.tasks.run_permit_condition_pipeline.app.AsyncResult' + ) + return mock_result + + +@pytest.mark.asyncio +async def test_extract_permit_conditions_success(mock_store_temporary, mock_run_permit_condition_pipeline): + file_content = b'%PDF-1.4...' + files = {'file': ('test.pdf', file_content, 'application/pdf')} + + response = client.post("/permit_conditions", files=files) + + assert response.status_code == 200 + + assert response.json()['status'] == CeleryTaskStatus.PENDING.name + +@pytest.mark.asyncio +async def test_extract_permit_conditions_invalid_file_type(): + file_content = b'Not a PDF content' + files = {'file': ('test.txt', file_content, 'text/plain')} + + response = client.post("/permit_conditions", files=files) + + assert response.status_code == 400 + + assert response.json()['detail'] == 'Invalid file type. Only PDF files are supported.' + +def test_results_success(mock_async_result): + task_id = "some-task-id" + mock_async_result.return_value.status = CeleryTaskStatus.SUCCESS.name + mock_async_result.return_value.get.return_value = {'conditions': []} + + response = client.get(f"/permit_conditions/results?task_id={task_id}") + + assert response.status_code == 200 + + assert response.json() == {'conditions': []} + +def test_results_failure(mock_async_result): + task_id = "some-task-id" + mock_async_result.return_value.status = CeleryTaskStatus.FAILURE.name + + response = client.get(f"/permit_conditions/results?task_id={task_id}") + + assert response.status_code == 500 + + assert response.json()['detail'] == ("Task failed to complete: FAILURE") + +def test_results_incomplete(mock_async_result): + task_id = "some-task-id" + mock_async_result.return_value.status = CeleryTaskStatus.PENDING.name + + response = client.get(f"/permit_conditions/results?task_id={task_id}") + + assert response.status_code == 202 + + assert response.json()['detail'] == ("Task has not completed yet. Current status: PENDING") + +def test_status_success(mock_async_result): + task_id = "some-task-id" + mock_async_result.return_value.task_id = task_id + mock_async_result.return_value.status = CeleryTaskStatus.SUCCESS.name + mock_async_result.return_value.info = {'some': 'info'} + + response = client.get(f"/permit_conditions/status?task_id={task_id}") + + assert response.status_code == 200 + + assert response.json() == { + 'id': task_id, + 'status': CeleryTaskStatus.SUCCESS.name, + 'meta': {'some': 'info'} + } + +def test_status_pending(mock_async_result): + task_id = "some-task-id" + mock_async_result.return_value.task_id = task_id + mock_async_result.return_value.status = "PENDING" + mock_async_result.return_value.info = None + + response = client.get(f"/permit_conditions/status?task_id={task_id}") + + assert response.status_code == 200 + + assert response.json() == { + 'id': task_id, + 'status': "PENDING", + 'meta': None + } + +def test_status_failure(mock_async_result): + task_id = "some-task-id" + mock_async_result.return_value.task_id = task_id + mock_async_result.return_value.status = CeleryTaskStatus.FAILURE.name + mock_async_result.return_value.info = {'error': 'some error'} + + response = client.get(f"/permit_conditions/status?task_id={task_id}") + + assert response.status_code == 200 + + assert response.json() == { + 'id': task_id, + 'status': CeleryTaskStatus.FAILURE.name, + 'meta': {'error': 'some error'} + } diff --git a/services/permits/celery.sh b/services/permits/celery.sh new file mode 100755 index 0000000000..8cdba35cb0 --- /dev/null +++ b/services/permits/celery.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# -n is the number of tasks to consume +# -A is the name of the app to run +# -Q is the name of the queue to consume from +# -concurrency is the number of child processes processing the queue +# -B is the Beat +# --scheduler is the scheduler class to use +# -s Path to the schedule database. +# -E Enable sending task-related events that can be captured by monitors +# --pidfile is the location of the pid file +celery -A app.celery worker --loglevel=info -n permit_service@%h --concurrency=1 -Q permits diff --git a/services/permits/docker-compose.yaml b/services/permits/docker-compose.yaml index db82a933a1..11c529245f 100644 --- a/services/permits/docker-compose.yaml +++ b/services/permits/docker-compose.yaml @@ -1,5 +1,12 @@ services: - haystack-api: + + nlm-ingestor: + container_name: nlm_ingestor + image: ghcr.io/nlmatics/nlm-ingestor:latest + ports: + - 5010:5001 + + haystack: container_name: haystack build: context: './' @@ -17,12 +24,37 @@ services: - ROOT_PATH=/haystack - FILE_UPLOAD_PATH=/file-uploads - HAYSTACK_TELEMETRY_ENABLED=False + haystack_celery: + container_name: haystack_celery + build: + context: './' + dockerfile: './Dockerfile' + volumes: + - ./:/opt/pipelines + - ./:/code + - fileuploads:/file-uploads + command: >- + python -m watchdog.watchmedo auto-restart + -d app/ -p '*.py' --recursive -- + celery -A app.celery worker --loglevel=info -n permit_service@%h --concurrency=1 -Q permits + restart: on-failure + environment: + - DOCUMENTSTORE_PARAMS_HOST=elasticsearch + - TOKENIZERS_PARALLELISM=false + - ROOT_PATH=/haystack + - FILE_UPLOAD_PATH=/file-uploads + - HAYSTACK_TELEMETRY_ENABLED=False depends_on: kibana: condition: service_healthy elasticsearch: condition: service_healthy - + # nlm-ingestor: + # condition: service_started + haystack: + condition: service_started + redis: + condition: service_started elasticsearch: image: "docker.elastic.co/elasticsearch/elasticsearch:8.12.1" container_name: elasticsearch @@ -33,6 +65,7 @@ services: - discovery.type=single-node - xpack.security.enabled=false - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m" + - cluster.routing.allocation.disk.threshold_enabled=false volumes: - esdata01:/usr/share/elasticsearch/data diff --git a/services/permits/requirements.txt b/services/permits/requirements.txt index e0b44c7237..a0bd7d1ca5 100644 --- a/services/permits/requirements.txt +++ b/services/permits/requirements.txt @@ -18,4 +18,11 @@ pypdf==4.2.0 json-repair==0.25.3 pytest==8.0.0 fuzzywuzzy==0.18.0 +oauthlib==3.2.2 +requests_oauthlib==2.0.0 diff-match-patch==20230430 +celery[redis,elasticsearch]==5.4.0 +openpyxl==3.1.5 +redis==4.6.0 +watchdog==4.0.1 +\ \ No newline at end of file