From e08fc3759ceacc0928393a4c20e3bb57b0535349 Mon Sep 17 00:00:00 2001 From: ArslanSaleem Date: Wed, 16 Oct 2024 15:58:18 +0200 Subject: [PATCH 1/2] chore[processes]: wait for process preprocessing before comple tely executing --- backend/Makefile | 6 + backend/app/processing/process_queue.py | 22 ++- backend/app/processing/process_scheduler.py | 77 ++++++++++ .../app/repositories/process_repository.py | 13 ++ backend/poetry.lock | 18 ++- backend/pyproject.toml | 1 + .../processing/test_process_scheduler.py | 141 ++++++++++++++++++ 7 files changed, 271 insertions(+), 7 deletions(-) create mode 100644 backend/app/processing/process_scheduler.py create mode 100644 backend/tests/processing/test_process_scheduler.py diff --git a/backend/Makefile b/backend/Makefile index ed82a1e..6cca837 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -12,3 +12,9 @@ generate-migration: test: poetry run pytest + +lint: + poetry run ruff check . + +lint-fix: + poetry run ruff check . --fix diff --git a/backend/app/processing/process_queue.py b/backend/app/processing/process_queue.py index 36d3671..c83791c 100644 --- a/backend/app/processing/process_queue.py +++ b/backend/app/processing/process_queue.py @@ -3,6 +3,8 @@ from typing import List from app.database import SessionLocal from app.exceptions import CreditLimitExceededException +from app.models.asset_content import AssetProcessingStatus +from app.processing.process_scheduler import ProcessScheduler from app.repositories import process_repository from app.repositories import project_repository from concurrent.futures import ThreadPoolExecutor @@ -24,16 +26,17 @@ from app.utils import clean_text from app.vectorstore.chroma import ChromaDB - executor = ThreadPoolExecutor(max_workers=5) logger = Logger() -def submit_process(process_id: int): +def submit_process(process_id: int) -> None: executor.submit(process_task, process_id) +process_execution_scheduler = ProcessScheduler(60, submit_process, logger) + # Background task processing function def process_step_task( process_id: int, @@ -52,7 +55,7 @@ def process_step_task( return False # Stop processing if the process is stopped logger.log(f"Processing file: {process_step.asset.path}") - if process_step.status == ProcessStepStatus.COMPLETED: + if process_step.status == ProcessStepStatus.COMPLETED and process.type!="extract": summaries.append(process_step.output.get("summary", "")) return True @@ -133,7 +136,7 @@ def process_task(process_id: int): process.started_at = datetime.utcnow() db.commit() - process_steps = process_repository.get_process_steps(db, process.id) + process_steps = process_repository.get_process_steps_with_asset_content(db, process.id, ["PENDING", "IN_PROGRESS"]) if not process_steps: raise Exception("No process found!") @@ -144,6 +147,11 @@ def process_task(process_id: int): # Step 2: Process each step in parallel outside the database connection failed_docs = [] summaries = [] + + ready_process_steps = [process_step for process_step in process_steps if process_step.asset.content.processing == AssetProcessingStatus.COMPLETED] + + all_process_step_ready = len(ready_process_steps) == len(process_steps) # Asset preprocessing is pending + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [ executor.submit( @@ -154,7 +162,7 @@ def process_task(process_id: int): failed_docs, api_key, ) - for process_step in process_steps + for process_step in ready_process_steps ] # Wait for all submitted tasks to complete concurrent.futures.wait(futures) @@ -186,6 +194,10 @@ def process_task(process_id: int): if summary_of_summaries: process.output = {"summary": summary_of_summaries} + if not all_process_step_ready: + logger.info(f"Process id: [{process.id}] some steps preprocessing is missing moving to waiting queue") + process_execution_scheduler.add_process_to_queue(process.id) + process.status = ( ProcessStatus.COMPLETED if not failed_docs else ProcessStatus.FAILED ) diff --git a/backend/app/processing/process_scheduler.py b/backend/app/processing/process_scheduler.py new file mode 100644 index 0000000..badf45f --- /dev/null +++ b/backend/app/processing/process_scheduler.py @@ -0,0 +1,77 @@ +from typing import Callable +from app.logger import Logger +import schedule +import threading +import time + +class ProcessScheduler: + """ + A scheduler that manages a queue of processes to be executed. + + It periodically checks the queue and executes processes if any are present. + If the queue becomes empty, the scheduler automatically stops to conserve resources. + The scheduler can be started and stopped as needed, and new processes can be added to the queue at any time. + """ + def __init__(self, secs: int, executor: Callable[[int], None], logger: Logger): + self.waiting_processes = [] + self.scheduler_thread = None + self.scheduler_running = False + self.executor = executor + self.logger = logger + schedule.every(secs).seconds.do(self._reprocess_holding_processes) + + def _reprocess_holding_processes(self): + """Internal method to process tasks in the queue.""" + if self.waiting_processes: + id = self.waiting_processes.pop(0) + self.logger.info(f"[ProcessScheduler]: Executing process from queue [{id}]") + self.executor(id) + else: + self.stop_scheduler() + + def _run_scheduler(self): + """Internal method to continuously run the scheduler.""" + while self.scheduler_running: + schedule.run_pending() + time.sleep(1) + + def start_scheduler(self): + """Start the scheduler thread if it's not already running.""" + if not self.scheduler_running: + self.logger.info("[ProcessScheduler]: Starting scheduler") + self.scheduler_running = True + self.scheduler_thread = threading.Thread(target=self._run_scheduler) + self.scheduler_thread.daemon = True + self.scheduler_thread.start() + else: + self.logger.info("[ProcessScheduler]: Scheduler is already running") + + def stop_scheduler(self): + """Stop the scheduler thread.""" + self.scheduler_running = False + self.logger.info("[ProcessScheduler]: Scheduler stopped") + + def add_process_to_queue(self, process_id: int) -> None: + """Add a process to the queue and start the scheduler if needed.""" + self.waiting_processes.append(process_id) + self.logger.info(f"[ProcessScheduler]: Scheduler adding process [{process_id}]") + self.start_scheduler() + + +if __name__ == "__main__": + + def print_process(process): + print(process) + + scheduler = ProcessScheduler(15, print_process) + + # Add processes to the queue and start processing + scheduler.add_process_to_queue("Process 1") + time.sleep(3) # Simulate some time for processing + + # Add more processes to the queue + scheduler.add_process_to_queue("Process 2") + time.sleep(3) + + # Simulate waiting for some time to see the scheduler behavior + time.sleep(300) # Wait 2 minutes to observe scheduler stopping when empty diff --git a/backend/app/repositories/process_repository.py b/backend/app/repositories/process_repository.py index 8560e68..0c5c7e0 100644 --- a/backend/app/repositories/process_repository.py +++ b/backend/app/repositories/process_repository.py @@ -1,3 +1,4 @@ +from typing import List from app.models.process import ProcessStatus from sqlalchemy import func, or_ from sqlalchemy.orm import Session, joinedload, defer, aliased @@ -91,6 +92,18 @@ def get_process_steps(db: Session, process_id: int): ) +def get_process_steps_with_asset_content(db: Session, process_id: int, status: List[ProcessStatus]): + return ( + db.query(models.ProcessStep) + .filter(models.ProcessStep.process_id == process_id) + .filter(models.ProcessStep.status.in_(status)) + .options( + joinedload(models.ProcessStep.process).joinedload(models.Process.project), + joinedload(models.ProcessStep.asset).joinedload(models.Asset.content), + ) + .all() + ) + def get_process_step(db: Session, step_id: int): return ( db.query(models.ProcessStep) diff --git a/backend/poetry.lock b/backend/poetry.lock index bca8e84..6bb168a 100644 --- a/backend/poetry.lock +++ b/backend/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "alembic" @@ -2431,6 +2431,20 @@ files = [ {file = "ruff-0.3.7.tar.gz", hash = "sha256:d5c1aebee5162c2226784800ae031f660c350e7a3402c4d1f8ea4e97e232e3ba"}, ] +[[package]] +name = "schedule" +version = "1.2.2" +description = "Job scheduling for humans." +optional = false +python-versions = ">=3.7" +files = [ + {file = "schedule-1.2.2-py3-none-any.whl", hash = "sha256:5bef4a2a0183abf44046ae0d164cadcac21b1db011bdd8102e4a0c1e91e06a7d"}, + {file = "schedule-1.2.2.tar.gz", hash = "sha256:15fe9c75fe5fd9b9627f3f19cc0ef1420508f9f9a46f45cd0769ef75ede5f0b7"}, +] + +[package.extras] +timezone = ["pytz"] + [[package]] name = "setuptools" version = "74.0.0" @@ -3271,4 +3285,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = ">=3.11,<3.13" -content-hash = "f93ac443a905910a7a021a4163b8f067845c178bc03eaa1f68de01014f5e25c4" +content-hash = "7d88091442473516457b1bec3ae46d31579f996a6dc8d62da334db11907a75b4" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 438ed4c..82cf460 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -17,6 +17,7 @@ dateparser = "^1.2.0" requests = "^2.32.3" chromadb = "^0.5.5" openai = "^1.51.2" +schedule = "^1.2.2" [tool.poetry.group.dev.dependencies] pytest = "^8.3.2" diff --git a/backend/tests/processing/test_process_scheduler.py b/backend/tests/processing/test_process_scheduler.py new file mode 100644 index 0000000..22334d1 --- /dev/null +++ b/backend/tests/processing/test_process_scheduler.py @@ -0,0 +1,141 @@ +import unittest +from unittest.mock import MagicMock, patch +from app.processing.process_scheduler import ProcessScheduler + +class TestProcessScheduler(unittest.TestCase): + def setUp(self): + self.executor_mock = MagicMock() # Mock the executor function + self.logger_mock = MagicMock() # Mock the logger + self.scheduler = ProcessScheduler(1, self.executor_mock, self.logger_mock) + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_add_process_to_queue(self, _): + # Test adding a process to the queue and ensure it's added correctly + process_id = 1 + self.scheduler.add_process_to_queue(process_id) + + # Check if the process is added to the waiting_processes list + self.assertIn(process_id, self.scheduler.waiting_processes) + + # Check if the scheduler was started + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Starting scheduler") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_reprocess_holding_processes(self, _): + # Test processing when queue is not empty + process_id = 2 + self.scheduler.waiting_processes = [process_id] + + self.scheduler._reprocess_holding_processes() + + # Ensure the executor was called with the process_id + self.executor_mock.assert_called_once_with(process_id) + + # Ensure the logger recorded the correct process execution + self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Executing process from queue [{process_id}]") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_stop_scheduler_when_empty(self, _): + # Test if the scheduler stops when there are no processes left + self.scheduler.waiting_processes = [] # Empty the queue + self.scheduler._reprocess_holding_processes() + + # Ensure the scheduler stops + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") + self.assertFalse(self.scheduler.scheduler_running) + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_scheduler_starts_when_needed(self, _): + # Test if the scheduler starts when a new process is added + process_id = 3 + self.scheduler.start_scheduler() + + self.scheduler.add_process_to_queue(process_id) + + # Ensure the process was added and scheduler started + self.assertIn(process_id, self.scheduler.waiting_processes) + self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Scheduler adding process [{process_id}]") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_scheduler_does_not_double_start(self, _): + # Test that scheduler doesn't start twice if it's already running + self.scheduler.scheduler_running = True # Simulate running scheduler + + self.scheduler.start_scheduler() + + # Ensure that a log is made saying it's already running + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler is already running") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_stop_scheduler(self, _): + # Test stopping the scheduler manually + self.scheduler.start_scheduler() + self.scheduler.stop_scheduler() + + # Check if scheduler is stopped + self.assertFalse(self.scheduler.scheduler_running) + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_reprocess_multiple_processes(self, _): + # Test reprocessing when there are multiple processes in the queue + process_ids = [1, 2, 3] + self.scheduler.waiting_processes = process_ids.copy() + + # Simulate running the scheduler multiple times + self.scheduler._reprocess_holding_processes() + self.scheduler._reprocess_holding_processes() + self.scheduler._reprocess_holding_processes() + + # Ensure each process was executed in the correct order + self.executor_mock.assert_has_calls([ + unittest.mock.call(1), + unittest.mock.call(2), + unittest.mock.call(3), + ]) + self.assertEqual(self.scheduler.waiting_processes, []) + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_no_processes_added(self, _): + # Test the behavior when no processes are added + self.scheduler.start_scheduler() + self.scheduler._reprocess_holding_processes() + + # Scheduler should stop since no processes were added + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") + self.assertFalse(self.scheduler.scheduler_running) + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_scheduler_stops_when_empty_after_processing(self, _): + # Test that the scheduler stops after processing all tasks + process_id = 1 + self.scheduler.waiting_processes = [process_id] + + # Process the task + self.scheduler._reprocess_holding_processes() + + # Explicitly stop the scheduler after processing + self.scheduler.stop_scheduler() + + # Ensure the scheduler stopped after processing the last task + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_scheduler_restart_after_adding_new_process(self, _): + # Test if the scheduler restarts when a new process is added after stopping + process_id_1 = 1 + process_id_2 = 2 + self.scheduler.waiting_processes = [process_id_1] + + # Process the first task and stop the scheduler + self.scheduler._reprocess_holding_processes() + self.assertFalse(self.scheduler.scheduler_running) + + # Add a new process and ensure the scheduler restarts + self.scheduler.add_process_to_queue(process_id_2) + self.assertTrue(self.scheduler.scheduler_running) + self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Scheduler adding process [{process_id_2}]") + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Starting scheduler") + +if __name__ == '__main__': + unittest.main() From 13f0b936b4e4fb3793401248bc5707f40ea5e61c Mon Sep 17 00:00:00 2001 From: ArslanSaleem Date: Wed, 16 Oct 2024 17:06:48 +0200 Subject: [PATCH 2/2] fix[process_queue]: code improvements --- backend/Makefile | 5 ++- backend/app/processing/process_queue.py | 8 ++-- backend/app/processing/process_scheduler.py | 48 ++++++++------------- 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/backend/Makefile b/backend/Makefile index 6cca837..45e6f02 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -13,8 +13,11 @@ generate-migration: test: poetry run pytest + +# Check code for linting issues lint: poetry run ruff check . +# Automatically fix linting issues where possible lint-fix: - poetry run ruff check . --fix + poetry run ruff check . --fix --show-fixes diff --git a/backend/app/processing/process_queue.py b/backend/app/processing/process_queue.py index c83791c..53d9924 100644 --- a/backend/app/processing/process_queue.py +++ b/backend/app/processing/process_queue.py @@ -136,7 +136,7 @@ def process_task(process_id: int): process.started_at = datetime.utcnow() db.commit() - process_steps = process_repository.get_process_steps_with_asset_content(db, process.id, ["PENDING", "IN_PROGRESS"]) + process_steps = process_repository.get_process_steps_with_asset_content(db, process.id, [ProcessStepStatus.PENDING.name, ProcessStepStatus.IN_PROGRESS.name]) if not process_steps: raise Exception("No process found!") @@ -150,7 +150,7 @@ def process_task(process_id: int): ready_process_steps = [process_step for process_step in process_steps if process_step.asset.content.processing == AssetProcessingStatus.COMPLETED] - all_process_step_ready = len(ready_process_steps) == len(process_steps) # Asset preprocessing is pending + all_process_steps_ready = len(ready_process_steps) == len(process_steps) # Check if all process steps are ready with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [ @@ -194,9 +194,11 @@ def process_task(process_id: int): if summary_of_summaries: process.output = {"summary": summary_of_summaries} - if not all_process_step_ready: + if not all_process_steps_ready: logger.info(f"Process id: [{process.id}] some steps preprocessing is missing moving to waiting queue") process_execution_scheduler.add_process_to_queue(process.id) + # Skip status update since not all steps are ready + return process.status = ( ProcessStatus.COMPLETED if not failed_docs else ProcessStatus.FAILED diff --git a/backend/app/processing/process_scheduler.py b/backend/app/processing/process_scheduler.py index badf45f..8ece83b 100644 --- a/backend/app/processing/process_scheduler.py +++ b/backend/app/processing/process_scheduler.py @@ -12,22 +12,28 @@ class ProcessScheduler: If the queue becomes empty, the scheduler automatically stops to conserve resources. The scheduler can be started and stopped as needed, and new processes can be added to the queue at any time. """ - def __init__(self, secs: int, executor: Callable[[int], None], logger: Logger): + def __init__(self, secs: int, executor: Callable[[int], None], logger: Logger = None): self.waiting_processes = [] self.scheduler_thread = None self.scheduler_running = False + self.lock = threading.Lock() self.executor = executor - self.logger = logger + if logger is None: + self.logger = Logger() + else: + self.logger = logger + schedule.every(secs).seconds.do(self._reprocess_holding_processes) def _reprocess_holding_processes(self): """Internal method to process tasks in the queue.""" - if self.waiting_processes: - id = self.waiting_processes.pop(0) - self.logger.info(f"[ProcessScheduler]: Executing process from queue [{id}]") - self.executor(id) - else: - self.stop_scheduler() + with self.lock: + if self.waiting_processes: + process_id = self.waiting_processes.pop(0) + self.logger.info(f"[ProcessScheduler]: Executing process from queue [{process_id}]") + self.executor(process_id) + else: + self.stop_scheduler() def _run_scheduler(self): """Internal method to continuously run the scheduler.""" @@ -53,25 +59,7 @@ def stop_scheduler(self): def add_process_to_queue(self, process_id: int) -> None: """Add a process to the queue and start the scheduler if needed.""" - self.waiting_processes.append(process_id) - self.logger.info(f"[ProcessScheduler]: Scheduler adding process [{process_id}]") - self.start_scheduler() - - -if __name__ == "__main__": - - def print_process(process): - print(process) - - scheduler = ProcessScheduler(15, print_process) - - # Add processes to the queue and start processing - scheduler.add_process_to_queue("Process 1") - time.sleep(3) # Simulate some time for processing - - # Add more processes to the queue - scheduler.add_process_to_queue("Process 2") - time.sleep(3) - - # Simulate waiting for some time to see the scheduler behavior - time.sleep(300) # Wait 2 minutes to observe scheduler stopping when empty + with self.lock: + self.waiting_processes.append(process_id) + self.logger.info(f"[ProcessScheduler]: Scheduler adding process [{process_id}]") + self.start_scheduler()