Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MDS-6086] Updated permit condition extraction to happen asynchronously using Celery #3205

Merged
merged 5 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions services/permits/.env-example
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ OBJECT_STORE_ACCESS_KEY_ID=
OBJECT_STORE_ACCESS_KEY=
OBJECT_STORE_HOST=nrs.objectstore.gov.bc.ca
OBJECT_STORE_BUCKET=
PERMIT_SERVICE_ENDPOINT=http://localhost:8004/haystack/file-upload
PERMIT_SERVICE_ENDPOINT=http://localhost:8004
PERMITS_CLIENT_ID=mds-core-api-internal-5194
PERMITS_CLIENT_SECRET=
PERMIT_CSV_ENDPOINT=http://localhost:8004/permit_conditions/csv
ELASTICSEARCH_CA_CERT=
ELASTICSEARCH_HOST=
ELASTICSEARCH_USERNAME=
Expand Down
3 changes: 2 additions & 1 deletion services/permits/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
app/cache
app/extract
app/extract
permit_cache
28 changes: 28 additions & 0 deletions services/permits/app/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os

from celery.app import Celery

CACHE_REDIS_HOST = os.environ.get('CACHE_REDIS_HOST', 'redis')
CACHE_REDIS_PORT = os.environ.get('CACHE_REDIS_PORT', 6379)
CACHE_REDIS_PASS = os.environ.get('CACHE_REDIS_PASS', 'redis-password')
CACHE_REDIS_URL = 'redis://:{0}@{1}:{2}'.format(CACHE_REDIS_PASS, CACHE_REDIS_HOST,
CACHE_REDIS_PORT)


celery_app = Celery(__name__, broker=CACHE_REDIS_URL)

ca_cert = os.environ.get("ELASTICSEARCH_CA_CERT", None)
host = os.environ.get("ELASTICSEARCH_HOST", None) or "http://elasticsearch:9200"
username = os.environ.get("ELASTICSEARCH_USERNAME", "")
password = os.environ.get("ELASTICSEARCH_PASSWORD", "")


scheme, hostname = host.split('://')

backend_url = f'elasticsearch+{scheme}://{username}:{password}@{hostname}/celery'
celery_app = Celery(__name__, broker=CACHE_REDIS_URL, backend=backend_url)
celery_app.backend.doc_type = None
celery_app.conf.task_default_queue = 'permits'
celery_app.autodiscover_tasks([
'app.permit_conditions.tasks',
])
84 changes: 55 additions & 29 deletions services/permits/app/compare_extraction_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,39 @@
# CSV files should have the following columns: section_title, section_paragraph, paragraph_title, subparagraph, clause, subclause, page_number, condition_text
###
import argparse
import logging
import os

import numpy as np
import pandas as pd
from app.permit_conditions.validator.permit_condition_model import PermitCondition
from diff_match_patch import diff_match_patch
from fuzzywuzzy import fuzz
from jinja2 import Environment, FileSystemLoader
from pydantic import ValidationError

logger = logging.getLogger(__name__)

# Function to create Content instances from a DataFrame
def create_content_instances(df):
content_list = []
for _, row in df.iterrows():
content = PermitCondition(
section_title=row["section_title"],
section_paragraph=row["section_paragraph"],
paragraph_title=row["paragraph_title"],
subparagraph=row["subparagraph"],
clause=row["clause"],
subclause=row["subclause"],
page_number=int(row["page_number"]) if row["page_number"] else None,
condition_text=row["condition_text"],
)
try:
content = PermitCondition(
section_title=row["section_title"],
section_paragraph=row["section_paragraph"],
paragraph_title=row["paragraph_title"],
subparagraph=row["subparagraph"],
clause=row["clause"],
subclause=row["subclause"],
page_number=int(row["page_number"]) if (row.get("page_number") and row['page_number'] != '') else 0,
condition_text=row["condition_text"],
original_condition_text=row["condition_text"],
)
except ValidationError as e:
logger.error(f"Failed parsing of permit condition: {e}")
logger.error(row)
raise

# This will be used as the text for comparison purposes
text = f"""
Expand Down Expand Up @@ -78,11 +88,15 @@ def write_html_report(context, report_prefix):


def write_csv_report(comparison_results, report_prefix):
np.set_printoptions(linewidth=1000000)

# Create a DataFrame from the comparison results
comparison_df = pd.DataFrame(comparison_results)

comparison_csv_filename = f"{report_prefix}_comparison_report.csv"
comparison_df.to_csv(comparison_csv_filename, index=False)
comparison_csv_filename = f"{report_prefix}_comparison_report.xlsx"


comparison_df.to_excel(comparison_csv_filename)

return comparison_csv_filename

Expand All @@ -103,6 +117,8 @@ def validate_condition(csv_pairs):
auto_extracted_content = create_content_instances(auto_extracted_df)
manual_extracted_content = create_content_instances(manual_extracted_df)

print(f'Found {len(auto_extracted_content)} conditions in {auto_extracted_csv} and {len(manual_extracted_content)} conditions in {manual_extracted_csv}')
simensma-fresh marked this conversation as resolved.
Show resolved Hide resolved

# 3. Find missing and added conditions
auto_content_dict = {
create_comparison_key(content): content
Expand Down Expand Up @@ -143,10 +159,12 @@ def validate_condition(csv_pairs):
comparison_results.append(
{
"Key": key,
"auto_extracted_condition": "N/A",
"manual_extracted_condition": manual_content_dict[
key
].condition_text,
"auto_section_title": "",
"auto_paragraph_title": "",
"auto_extracted_condition": "",
"manual_section_title": manual_content_dict[key].section_title,
"manual_paragraph_title": manual_content_dict[key].paragraph_title,
"manual_extracted_condition": manual_content_dict[key].original_condition_text,
"match_percentage": 0,
"is_match": False,
}
Expand All @@ -166,8 +184,13 @@ def validate_condition(csv_pairs):
comparison_results.append(
{
"Key": key,
"auto_extracted_condition": auto_content_dict[key].condition_text,
"manual_extracted_condition": "N/A",
"auto_section_title": auto_content_dict[key].section_title,
"auto_paragraph_title": auto_content_dict[key].paragraph_title,
"auto_extracted_condition": auto_content_dict[key].original_condition_text,
"manual_section_title": "",
"manual_paragraph_title": "",
"manual_extracted_condition": "",
"manual_extracted_condition": "",
"match_percentage": 0,
"is_match": False,
}
Expand All @@ -177,8 +200,6 @@ def validate_condition(csv_pairs):
match_results = compare_matching_conditions(
auto_content_dict,
manual_content_dict,
comparison_results,
context,
)

context["comparison_results"] += match_results["context_comparison_results"]
Expand Down Expand Up @@ -234,14 +255,15 @@ def compare_matching_conditions(
total_comparable_conditions = 0
matching_score = 0


# Compare how well the text matches for conditions that are present in both csvs
# and gemerate a html diff for each pair of conditions
for key in sorted(manual_content_dict.keys()):
if key in auto_content_dict:
total_comparable_conditions += 1
auto_condition_text = auto_content_dict[key].condition_text
manual_condition_text = manual_content_dict[key].condition_text
match_percentage = fuzz.ratio(auto_condition_text, manual_condition_text)
match_percentage = fuzz.ratio(auto_condition_text.replace('\n', ''), manual_condition_text.replace('\n', ''))
is_match = match_percentage >= 100

if is_match:
Expand All @@ -250,8 +272,12 @@ def compare_matching_conditions(
comparison_results.append(
{
"Key": key,
"auto_extracted_condition": auto_condition_text,
"manual_extracted_condition": manual_condition_text,
"auto_section_title": auto_content_dict[key].section_title,
"auto_paragraph_title": auto_content_dict[key].paragraph_title,
"auto_extracted_condition": auto_content_dict[key].original_condition_text,
"manual_section_title": manual_content_dict[key].section_title,
"manual_paragraph_title": manual_content_dict[key].paragraph_title,
"manual_extracted_condition": manual_content_dict[key].original_condition_text,
"match_percentage": match_percentage,
"is_match": is_match,
}
Expand All @@ -267,12 +293,12 @@ def compare_matching_conditions(
}
)

return {
"comparison_results": comparison_results,
"matching_score": matching_score,
"total_comparable_conditions": total_comparable_conditions,
"context_comparison_results": context_comparison_results,
}
return {
"comparison_results": comparison_results,
"matching_score": matching_score,
"total_comparable_conditions": total_comparable_conditions,
"context_comparison_results": context_comparison_results,
}


def write_reports(
Expand Down
43 changes: 36 additions & 7 deletions services/permits/app/extract_and_validate_pdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
###

import argparse
import json
import os
from time import sleep

from app.compare_extraction_results import validate_condition
from dotenv import find_dotenv, load_dotenv
Expand All @@ -19,13 +21,13 @@
PERMITS_CLIENT_SECRET = os.getenv("PERMITS_CLIENT_SECRET", None)
TOKEN_URL = os.getenv("TOKEN_URL", None)
AUTHORIZATION_URL = os.getenv("AUTHORIZATION_URL", None)
PERMIT_CSV_ENDPOINT = os.getenv("PERMIT_CSV_ENDPOINT", None)
PERMIT_SERVICE_ENDPOINT = os.getenv("PERMIT_SERVICE_ENDPOINT", None)

assert PERMITS_CLIENT_ID, "PERMITS_CLIENT_ID is not set"
assert PERMITS_CLIENT_SECRET, "PERMITS_CLIENT_SECRET is not set"
assert TOKEN_URL, "TOKEN_URL is not set"
assert AUTHORIZATION_URL, "AUTHORIZATION_URL is not set"
assert PERMIT_CSV_ENDPOINT, "PERMIT_CSV_ENDPOINT is not set"
assert PERMIT_SERVICE_ENDPOINT, "PERMIT_SERVICE_ENDPOINT is not set"


def authenticate_with_oauth():
Expand All @@ -41,13 +43,37 @@ def authenticate_with_oauth():
return oauth_session


def extract_conditions_from_pdf(pdf_path, endpoint_url, oauth_session):
# Extract conditions from PDF
def extract_conditions_from_pdf(pdf_path, oauth_session):
# Kick off the permit conditions extraction process
with open(pdf_path, "rb") as pdf_file:
files = {"file": (os.path.basename(pdf_path), pdf_file, "application/pdf")}
response = oauth_session.post(endpoint_url, files=files)
response = oauth_session.post(f"{PERMIT_SERVICE_ENDPOINT}/permit_conditions", files=files)
response.raise_for_status()
return response.content.decode("utf-8")

task_id = response.json().get('id')

if not task_id:
raise Exception("Failed to extract conditions from PDF. No task ID returned from permit extractions endpoint.")

status = None

# Poll the status endpoint until the task is complete
while status not in ("SUCCESS", "FAILURE"):
sleep(3)
status_response = oauth_session.get(f"{PERMIT_SERVICE_ENDPOINT}/permit_conditions/status", params={"task_id": task_id})
status_response.raise_for_status()

status = status_response.json().get('status')

print(json.dumps(status_response.json(), indent=2))

if status != "SUCCESS":
raise Exception(f"Failed to extract conditions from PDF. Task status: {status}")

success_response = oauth_session.get(f"{PERMIT_SERVICE_ENDPOINT}/permit_conditions/results/csv", params={"task_id": task_id})
success_response.raise_for_status()

return success_response.content.decode("utf-8")


# Process each pair of PDF and expected CSV
Expand All @@ -62,7 +88,7 @@ def extract_and_validate_conditions(pdf_csv_pairs):

# 1. Extract conditions from PDF
extracted_csv_content = extract_conditions_from_pdf(
pdf_path, PERMIT_CSV_ENDPOINT, oauth_session
pdf_path, oauth_session
)

# 2. Save the extracted CSV content to a temporary file
Expand All @@ -76,6 +102,8 @@ def extract_and_validate_conditions(pdf_csv_pairs):
validate_condition(pairs)


print(__name__)

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Process PDF documents, extract permit conditions, and validate against expected CSV results."
Expand All @@ -84,6 +112,7 @@ def extract_and_validate_conditions(pdf_csv_pairs):
"--pdf_csv_pairs",
nargs=2,
action="append",
required=True,
help="""
Pairs of PDF file and expected CSV file. Each pair should be specified as two consecutive file paths.
Usage: python extract_and_validate_pdf.py --pdf_csv_pairs <pdf_path> <expected_csv_path> --pdf_csv_pairs <pdf_path> <expected_csv_path> ...
Expand Down
9 changes: 9 additions & 0 deletions services/permits/app/helpers/celery_task_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from enum import Enum


class CeleryTaskStatus(Enum):
PENDING = "PENDING"
STARTED = "STARTED"
RETRY = "RETRY"
FAILURE = "FAILURE"
SUCCESS = "SUCCESS"
9 changes: 7 additions & 2 deletions services/permits/app/helpers/temporary_file.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import os
import tempfile

from fastapi import UploadFile

FILE_UPLOAD_PATH = os.environ.get("FILE_UPLOAD_PATH", "/file-uploads")

def store_temporary(file: UploadFile):
tmp = tempfile.NamedTemporaryFile()
assert FILE_UPLOAD_PATH, "FILE_UPLOAD_PATH is not set in the environment"

def store_temporary(file: UploadFile, suffix: str = "") -> tempfile.NamedTemporaryFile:

tmp = tempfile.NamedTemporaryFile(suffix=suffix, dir=FILE_UPLOAD_PATH, delete=False)

with open(tmp.name, "w") as f:
while contents := file.file.read(1024 * 1024):
Expand Down
3 changes: 3 additions & 0 deletions services/permits/app/permit_conditions/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from contextvars import ContextVar

context = ContextVar("permit_conditions_context")
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import os
from pathlib import Path
from time import sleep
from typing import Any, Dict, List, Optional

import ocrmypdf
from app.permit_conditions.context import context
from haystack import Document, component, logging
from pypdf import PdfReader
from pypdf.errors import PdfReadError
Expand Down Expand Up @@ -35,6 +37,8 @@ def run(
id_hash_keys: Optional[List[str]] = None,
documents: Optional[List[Document]] = None,
) -> List[Document]:
context.get().update_state(state="PROGRESS", meta={"stage": "pdf_to_text_converter"})

if not documents:
pages = self._read_pdf(
file_path,
Expand All @@ -61,11 +65,7 @@ def _read_pdf(self, file_path: Path) -> List[str]:
pdf_reader = PdfReader(doc)
for idx, page in enumerate(pdf_reader.pages):
try:
page_text = page.extract_text(
extraction_mode="layout",
layout_mode_space_vertically=False,
layout_mode_scale_weight=0.5,
)
page_text = page.extract_text()
pages.append(page_text)
if DEBUG_MODE:
fn = f"debug/pdfreader-{idx}.txt"
Expand Down
Loading
Loading