Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor[process_queue]: wait for process preprocessing before completely executing #26

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,12 @@ generate-migration:

test:
poetry run pytest


# Check code for linting issues
lint:
poetry run ruff check .

# Automatically fix linting issues where possible
lint-fix:
poetry run ruff check . --fix --show-fixes
24 changes: 19 additions & 5 deletions backend/app/processing/process_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from typing import List
from app.database import SessionLocal
from app.exceptions import CreditLimitExceededException
from app.models.asset_content import AssetProcessingStatus
from app.processing.process_scheduler import ProcessScheduler
from app.repositories import process_repository
from app.repositories import project_repository
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -24,16 +26,17 @@
from app.utils import clean_text
from app.vectorstore.chroma import ChromaDB


executor = ThreadPoolExecutor(max_workers=5)

logger = Logger()


def submit_process(process_id: int):
def submit_process(process_id: int) -> None:
executor.submit(process_task, process_id)


process_execution_scheduler = ProcessScheduler(60, submit_process, logger)

# Background task processing function
def process_step_task(
process_id: int,
Expand All @@ -52,7 +55,7 @@ def process_step_task(
return False # Stop processing if the process is stopped

logger.log(f"Processing file: {process_step.asset.path}")
if process_step.status == ProcessStepStatus.COMPLETED:
if process_step.status == ProcessStepStatus.COMPLETED and process.type!="extract":
summaries.append(process_step.output.get("summary", ""))
return True

Expand Down Expand Up @@ -133,7 +136,7 @@ def process_task(process_id: int):
process.started_at = datetime.utcnow()
db.commit()

process_steps = process_repository.get_process_steps(db, process.id)
process_steps = process_repository.get_process_steps_with_asset_content(db, process.id, [ProcessStepStatus.PENDING.name, ProcessStepStatus.IN_PROGRESS.name])
if not process_steps:
raise Exception("No process found!")

Expand All @@ -144,6 +147,11 @@ def process_task(process_id: int):
# Step 2: Process each step in parallel outside the database connection
failed_docs = []
summaries = []

ready_process_steps = [process_step for process_step in process_steps if process_step.asset.content.processing == AssetProcessingStatus.COMPLETED]

all_process_steps_ready = len(ready_process_steps) == len(process_steps) # Check if all process steps are ready

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(
Expand All @@ -154,7 +162,7 @@ def process_task(process_id: int):
failed_docs,
api_key,
)
for process_step in process_steps
for process_step in ready_process_steps
]
# Wait for all submitted tasks to complete
concurrent.futures.wait(futures)
Expand Down Expand Up @@ -186,6 +194,12 @@ def process_task(process_id: int):
if summary_of_summaries:
process.output = {"summary": summary_of_summaries}

if not all_process_steps_ready:
logger.info(f"Process id: [{process.id}] some steps preprocessing is missing moving to waiting queue")
process_execution_scheduler.add_process_to_queue(process.id)
# Skip status update since not all steps are ready
return

process.status = (
ProcessStatus.COMPLETED if not failed_docs else ProcessStatus.FAILED
)
Expand Down
65 changes: 65 additions & 0 deletions backend/app/processing/process_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from typing import Callable
from app.logger import Logger
import schedule
import threading
import time

class ProcessScheduler:
"""
A scheduler that manages a queue of processes to be executed.

It periodically checks the queue and executes processes if any are present.
If the queue becomes empty, the scheduler automatically stops to conserve resources.
The scheduler can be started and stopped as needed, and new processes can be added to the queue at any time.
"""
def __init__(self, secs: int, executor: Callable[[int], None], logger: Logger = None):
self.waiting_processes = []
self.scheduler_thread = None
self.scheduler_running = False
self.lock = threading.Lock()
self.executor = executor
if logger is None:
self.logger = Logger()
else:
self.logger = logger

schedule.every(secs).seconds.do(self._reprocess_holding_processes)

def _reprocess_holding_processes(self):
"""Internal method to process tasks in the queue."""
with self.lock:
if self.waiting_processes:
process_id = self.waiting_processes.pop(0)
self.logger.info(f"[ProcessScheduler]: Executing process from queue [{process_id}]")
self.executor(process_id)
else:
self.stop_scheduler()

def _run_scheduler(self):
"""Internal method to continuously run the scheduler."""
while self.scheduler_running:
schedule.run_pending()
time.sleep(1)

def start_scheduler(self):
"""Start the scheduler thread if it's not already running."""
if not self.scheduler_running:
self.logger.info("[ProcessScheduler]: Starting scheduler")
self.scheduler_running = True
self.scheduler_thread = threading.Thread(target=self._run_scheduler)
self.scheduler_thread.daemon = True
self.scheduler_thread.start()
else:
self.logger.info("[ProcessScheduler]: Scheduler is already running")

def stop_scheduler(self):
"""Stop the scheduler thread."""
self.scheduler_running = False
self.logger.info("[ProcessScheduler]: Scheduler stopped")

def add_process_to_queue(self, process_id: int) -> None:
gventuri marked this conversation as resolved.
Show resolved Hide resolved
"""Add a process to the queue and start the scheduler if needed."""
with self.lock:
self.waiting_processes.append(process_id)
self.logger.info(f"[ProcessScheduler]: Scheduler adding process [{process_id}]")
self.start_scheduler()
13 changes: 13 additions & 0 deletions backend/app/repositories/process_repository.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import List
from app.models.process import ProcessStatus
from sqlalchemy import func, or_
from sqlalchemy.orm import Session, joinedload, defer, aliased
Expand Down Expand Up @@ -91,6 +92,18 @@ def get_process_steps(db: Session, process_id: int):
)


def get_process_steps_with_asset_content(db: Session, process_id: int, status: List[ProcessStatus]):
return (
db.query(models.ProcessStep)
.filter(models.ProcessStep.process_id == process_id)
.filter(models.ProcessStep.status.in_(status))
.options(
joinedload(models.ProcessStep.process).joinedload(models.Process.project),
joinedload(models.ProcessStep.asset).joinedload(models.Asset.content),
)
.all()
)

def get_process_step(db: Session, step_id: int):
return (
db.query(models.ProcessStep)
Expand Down
18 changes: 16 additions & 2 deletions backend/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dateparser = "^1.2.0"
requests = "^2.32.3"
chromadb = "^0.5.5"
openai = "^1.51.2"
schedule = "^1.2.2"

[tool.poetry.group.dev.dependencies]
pytest = "^8.3.2"
Expand Down
141 changes: 141 additions & 0 deletions backend/tests/processing/test_process_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import unittest
from unittest.mock import MagicMock, patch
from app.processing.process_scheduler import ProcessScheduler

class TestProcessScheduler(unittest.TestCase):
def setUp(self):
self.executor_mock = MagicMock() # Mock the executor function
self.logger_mock = MagicMock() # Mock the logger
self.scheduler = ProcessScheduler(1, self.executor_mock, self.logger_mock)

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_add_process_to_queue(self, _):
# Test adding a process to the queue and ensure it's added correctly
process_id = 1
self.scheduler.add_process_to_queue(process_id)

# Check if the process is added to the waiting_processes list
self.assertIn(process_id, self.scheduler.waiting_processes)

# Check if the scheduler was started
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Starting scheduler")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_reprocess_holding_processes(self, _):
# Test processing when queue is not empty
process_id = 2
self.scheduler.waiting_processes = [process_id]

self.scheduler._reprocess_holding_processes()

# Ensure the executor was called with the process_id
self.executor_mock.assert_called_once_with(process_id)

# Ensure the logger recorded the correct process execution
self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Executing process from queue [{process_id}]")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_stop_scheduler_when_empty(self, _):
# Test if the scheduler stops when there are no processes left
self.scheduler.waiting_processes = [] # Empty the queue
self.scheduler._reprocess_holding_processes()

# Ensure the scheduler stops
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped")
self.assertFalse(self.scheduler.scheduler_running)

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_scheduler_starts_when_needed(self, _):
# Test if the scheduler starts when a new process is added
process_id = 3
self.scheduler.start_scheduler()

self.scheduler.add_process_to_queue(process_id)

# Ensure the process was added and scheduler started
self.assertIn(process_id, self.scheduler.waiting_processes)
self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Scheduler adding process [{process_id}]")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_scheduler_does_not_double_start(self, _):
# Test that scheduler doesn't start twice if it's already running
self.scheduler.scheduler_running = True # Simulate running scheduler

self.scheduler.start_scheduler()

# Ensure that a log is made saying it's already running
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler is already running")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_stop_scheduler(self, _):
# Test stopping the scheduler manually
self.scheduler.start_scheduler()
self.scheduler.stop_scheduler()

# Check if scheduler is stopped
self.assertFalse(self.scheduler.scheduler_running)
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_reprocess_multiple_processes(self, _):
# Test reprocessing when there are multiple processes in the queue
process_ids = [1, 2, 3]
self.scheduler.waiting_processes = process_ids.copy()

# Simulate running the scheduler multiple times
self.scheduler._reprocess_holding_processes()
self.scheduler._reprocess_holding_processes()
self.scheduler._reprocess_holding_processes()

# Ensure each process was executed in the correct order
self.executor_mock.assert_has_calls([
unittest.mock.call(1),
unittest.mock.call(2),
unittest.mock.call(3),
])
self.assertEqual(self.scheduler.waiting_processes, [])

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_no_processes_added(self, _):
# Test the behavior when no processes are added
self.scheduler.start_scheduler()
self.scheduler._reprocess_holding_processes()

# Scheduler should stop since no processes were added
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped")
self.assertFalse(self.scheduler.scheduler_running)

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_scheduler_stops_when_empty_after_processing(self, _):
# Test that the scheduler stops after processing all tasks
process_id = 1
self.scheduler.waiting_processes = [process_id]

# Process the task
self.scheduler._reprocess_holding_processes()

# Explicitly stop the scheduler after processing
self.scheduler.stop_scheduler()

# Ensure the scheduler stopped after processing the last task
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_scheduler_restart_after_adding_new_process(self, _):
# Test if the scheduler restarts when a new process is added after stopping
process_id_1 = 1
process_id_2 = 2
self.scheduler.waiting_processes = [process_id_1]

# Process the first task and stop the scheduler
self.scheduler._reprocess_holding_processes()
self.assertFalse(self.scheduler.scheduler_running)

# Add a new process and ensure the scheduler restarts
self.scheduler.add_process_to_queue(process_id_2)
self.assertTrue(self.scheduler.scheduler_running)
self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Scheduler adding process [{process_id_2}]")
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Starting scheduler")

if __name__ == '__main__':
unittest.main()
Loading