diff --git a/backend/Makefile b/backend/Makefile index ed82a1e..45e6f02 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -12,3 +12,12 @@ generate-migration: test: poetry run pytest + + +# Check code for linting issues +lint: + poetry run ruff check . + +# Automatically fix linting issues where possible +lint-fix: + poetry run ruff check . --fix --show-fixes diff --git a/backend/app/processing/process_queue.py b/backend/app/processing/process_queue.py index 36d3671..53d9924 100644 --- a/backend/app/processing/process_queue.py +++ b/backend/app/processing/process_queue.py @@ -3,6 +3,8 @@ from typing import List from app.database import SessionLocal from app.exceptions import CreditLimitExceededException +from app.models.asset_content import AssetProcessingStatus +from app.processing.process_scheduler import ProcessScheduler from app.repositories import process_repository from app.repositories import project_repository from concurrent.futures import ThreadPoolExecutor @@ -24,16 +26,17 @@ from app.utils import clean_text from app.vectorstore.chroma import ChromaDB - executor = ThreadPoolExecutor(max_workers=5) logger = Logger() -def submit_process(process_id: int): +def submit_process(process_id: int) -> None: executor.submit(process_task, process_id) +process_execution_scheduler = ProcessScheduler(60, submit_process, logger) + # Background task processing function def process_step_task( process_id: int, @@ -52,7 +55,7 @@ def process_step_task( return False # Stop processing if the process is stopped logger.log(f"Processing file: {process_step.asset.path}") - if process_step.status == ProcessStepStatus.COMPLETED: + if process_step.status == ProcessStepStatus.COMPLETED and process.type!="extract": summaries.append(process_step.output.get("summary", "")) return True @@ -133,7 +136,7 @@ def process_task(process_id: int): process.started_at = datetime.utcnow() db.commit() - process_steps = process_repository.get_process_steps(db, process.id) + process_steps = process_repository.get_process_steps_with_asset_content(db, process.id, [ProcessStepStatus.PENDING.name, ProcessStepStatus.IN_PROGRESS.name]) if not process_steps: raise Exception("No process found!") @@ -144,6 +147,11 @@ def process_task(process_id: int): # Step 2: Process each step in parallel outside the database connection failed_docs = [] summaries = [] + + ready_process_steps = [process_step for process_step in process_steps if process_step.asset.content.processing == AssetProcessingStatus.COMPLETED] + + all_process_steps_ready = len(ready_process_steps) == len(process_steps) # Check if all process steps are ready + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [ executor.submit( @@ -154,7 +162,7 @@ def process_task(process_id: int): failed_docs, api_key, ) - for process_step in process_steps + for process_step in ready_process_steps ] # Wait for all submitted tasks to complete concurrent.futures.wait(futures) @@ -186,6 +194,12 @@ def process_task(process_id: int): if summary_of_summaries: process.output = {"summary": summary_of_summaries} + if not all_process_steps_ready: + logger.info(f"Process id: [{process.id}] some steps preprocessing is missing moving to waiting queue") + process_execution_scheduler.add_process_to_queue(process.id) + # Skip status update since not all steps are ready + return + process.status = ( ProcessStatus.COMPLETED if not failed_docs else ProcessStatus.FAILED ) diff --git a/backend/app/processing/process_scheduler.py b/backend/app/processing/process_scheduler.py new file mode 100644 index 0000000..8ece83b --- /dev/null +++ b/backend/app/processing/process_scheduler.py @@ -0,0 +1,65 @@ +from typing import Callable +from app.logger import Logger +import schedule +import threading +import time + +class ProcessScheduler: + """ + A scheduler that manages a queue of processes to be executed. + + It periodically checks the queue and executes processes if any are present. + If the queue becomes empty, the scheduler automatically stops to conserve resources. + The scheduler can be started and stopped as needed, and new processes can be added to the queue at any time. + """ + def __init__(self, secs: int, executor: Callable[[int], None], logger: Logger = None): + self.waiting_processes = [] + self.scheduler_thread = None + self.scheduler_running = False + self.lock = threading.Lock() + self.executor = executor + if logger is None: + self.logger = Logger() + else: + self.logger = logger + + schedule.every(secs).seconds.do(self._reprocess_holding_processes) + + def _reprocess_holding_processes(self): + """Internal method to process tasks in the queue.""" + with self.lock: + if self.waiting_processes: + process_id = self.waiting_processes.pop(0) + self.logger.info(f"[ProcessScheduler]: Executing process from queue [{process_id}]") + self.executor(process_id) + else: + self.stop_scheduler() + + def _run_scheduler(self): + """Internal method to continuously run the scheduler.""" + while self.scheduler_running: + schedule.run_pending() + time.sleep(1) + + def start_scheduler(self): + """Start the scheduler thread if it's not already running.""" + if not self.scheduler_running: + self.logger.info("[ProcessScheduler]: Starting scheduler") + self.scheduler_running = True + self.scheduler_thread = threading.Thread(target=self._run_scheduler) + self.scheduler_thread.daemon = True + self.scheduler_thread.start() + else: + self.logger.info("[ProcessScheduler]: Scheduler is already running") + + def stop_scheduler(self): + """Stop the scheduler thread.""" + self.scheduler_running = False + self.logger.info("[ProcessScheduler]: Scheduler stopped") + + def add_process_to_queue(self, process_id: int) -> None: + """Add a process to the queue and start the scheduler if needed.""" + with self.lock: + self.waiting_processes.append(process_id) + self.logger.info(f"[ProcessScheduler]: Scheduler adding process [{process_id}]") + self.start_scheduler() diff --git a/backend/app/repositories/process_repository.py b/backend/app/repositories/process_repository.py index 8560e68..0c5c7e0 100644 --- a/backend/app/repositories/process_repository.py +++ b/backend/app/repositories/process_repository.py @@ -1,3 +1,4 @@ +from typing import List from app.models.process import ProcessStatus from sqlalchemy import func, or_ from sqlalchemy.orm import Session, joinedload, defer, aliased @@ -91,6 +92,18 @@ def get_process_steps(db: Session, process_id: int): ) +def get_process_steps_with_asset_content(db: Session, process_id: int, status: List[ProcessStatus]): + return ( + db.query(models.ProcessStep) + .filter(models.ProcessStep.process_id == process_id) + .filter(models.ProcessStep.status.in_(status)) + .options( + joinedload(models.ProcessStep.process).joinedload(models.Process.project), + joinedload(models.ProcessStep.asset).joinedload(models.Asset.content), + ) + .all() + ) + def get_process_step(db: Session, step_id: int): return ( db.query(models.ProcessStep) diff --git a/backend/poetry.lock b/backend/poetry.lock index bca8e84..6bb168a 100644 --- a/backend/poetry.lock +++ b/backend/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "alembic" @@ -2431,6 +2431,20 @@ files = [ {file = "ruff-0.3.7.tar.gz", hash = "sha256:d5c1aebee5162c2226784800ae031f660c350e7a3402c4d1f8ea4e97e232e3ba"}, ] +[[package]] +name = "schedule" +version = "1.2.2" +description = "Job scheduling for humans." +optional = false +python-versions = ">=3.7" +files = [ + {file = "schedule-1.2.2-py3-none-any.whl", hash = "sha256:5bef4a2a0183abf44046ae0d164cadcac21b1db011bdd8102e4a0c1e91e06a7d"}, + {file = "schedule-1.2.2.tar.gz", hash = "sha256:15fe9c75fe5fd9b9627f3f19cc0ef1420508f9f9a46f45cd0769ef75ede5f0b7"}, +] + +[package.extras] +timezone = ["pytz"] + [[package]] name = "setuptools" version = "74.0.0" @@ -3271,4 +3285,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = ">=3.11,<3.13" -content-hash = "f93ac443a905910a7a021a4163b8f067845c178bc03eaa1f68de01014f5e25c4" +content-hash = "7d88091442473516457b1bec3ae46d31579f996a6dc8d62da334db11907a75b4" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 438ed4c..82cf460 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -17,6 +17,7 @@ dateparser = "^1.2.0" requests = "^2.32.3" chromadb = "^0.5.5" openai = "^1.51.2" +schedule = "^1.2.2" [tool.poetry.group.dev.dependencies] pytest = "^8.3.2" diff --git a/backend/tests/processing/test_process_scheduler.py b/backend/tests/processing/test_process_scheduler.py new file mode 100644 index 0000000..22334d1 --- /dev/null +++ b/backend/tests/processing/test_process_scheduler.py @@ -0,0 +1,141 @@ +import unittest +from unittest.mock import MagicMock, patch +from app.processing.process_scheduler import ProcessScheduler + +class TestProcessScheduler(unittest.TestCase): + def setUp(self): + self.executor_mock = MagicMock() # Mock the executor function + self.logger_mock = MagicMock() # Mock the logger + self.scheduler = ProcessScheduler(1, self.executor_mock, self.logger_mock) + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_add_process_to_queue(self, _): + # Test adding a process to the queue and ensure it's added correctly + process_id = 1 + self.scheduler.add_process_to_queue(process_id) + + # Check if the process is added to the waiting_processes list + self.assertIn(process_id, self.scheduler.waiting_processes) + + # Check if the scheduler was started + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Starting scheduler") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_reprocess_holding_processes(self, _): + # Test processing when queue is not empty + process_id = 2 + self.scheduler.waiting_processes = [process_id] + + self.scheduler._reprocess_holding_processes() + + # Ensure the executor was called with the process_id + self.executor_mock.assert_called_once_with(process_id) + + # Ensure the logger recorded the correct process execution + self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Executing process from queue [{process_id}]") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_stop_scheduler_when_empty(self, _): + # Test if the scheduler stops when there are no processes left + self.scheduler.waiting_processes = [] # Empty the queue + self.scheduler._reprocess_holding_processes() + + # Ensure the scheduler stops + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") + self.assertFalse(self.scheduler.scheduler_running) + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_scheduler_starts_when_needed(self, _): + # Test if the scheduler starts when a new process is added + process_id = 3 + self.scheduler.start_scheduler() + + self.scheduler.add_process_to_queue(process_id) + + # Ensure the process was added and scheduler started + self.assertIn(process_id, self.scheduler.waiting_processes) + self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Scheduler adding process [{process_id}]") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_scheduler_does_not_double_start(self, _): + # Test that scheduler doesn't start twice if it's already running + self.scheduler.scheduler_running = True # Simulate running scheduler + + self.scheduler.start_scheduler() + + # Ensure that a log is made saying it's already running + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler is already running") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_stop_scheduler(self, _): + # Test stopping the scheduler manually + self.scheduler.start_scheduler() + self.scheduler.stop_scheduler() + + # Check if scheduler is stopped + self.assertFalse(self.scheduler.scheduler_running) + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_reprocess_multiple_processes(self, _): + # Test reprocessing when there are multiple processes in the queue + process_ids = [1, 2, 3] + self.scheduler.waiting_processes = process_ids.copy() + + # Simulate running the scheduler multiple times + self.scheduler._reprocess_holding_processes() + self.scheduler._reprocess_holding_processes() + self.scheduler._reprocess_holding_processes() + + # Ensure each process was executed in the correct order + self.executor_mock.assert_has_calls([ + unittest.mock.call(1), + unittest.mock.call(2), + unittest.mock.call(3), + ]) + self.assertEqual(self.scheduler.waiting_processes, []) + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_no_processes_added(self, _): + # Test the behavior when no processes are added + self.scheduler.start_scheduler() + self.scheduler._reprocess_holding_processes() + + # Scheduler should stop since no processes were added + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") + self.assertFalse(self.scheduler.scheduler_running) + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_scheduler_stops_when_empty_after_processing(self, _): + # Test that the scheduler stops after processing all tasks + process_id = 1 + self.scheduler.waiting_processes = [process_id] + + # Process the task + self.scheduler._reprocess_holding_processes() + + # Explicitly stop the scheduler after processing + self.scheduler.stop_scheduler() + + # Ensure the scheduler stopped after processing the last task + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") + + @patch('time.sleep', return_value=None) # Mocking sleep to avoid delays + def test_scheduler_restart_after_adding_new_process(self, _): + # Test if the scheduler restarts when a new process is added after stopping + process_id_1 = 1 + process_id_2 = 2 + self.scheduler.waiting_processes = [process_id_1] + + # Process the first task and stop the scheduler + self.scheduler._reprocess_holding_processes() + self.assertFalse(self.scheduler.scheduler_running) + + # Add a new process and ensure the scheduler restarts + self.scheduler.add_process_to_queue(process_id_2) + self.assertTrue(self.scheduler.scheduler_running) + self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Scheduler adding process [{process_id_2}]") + self.logger_mock.info.assert_any_call("[ProcessScheduler]: Starting scheduler") + +if __name__ == '__main__': + unittest.main()