Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor[process_queue]: wait for process preprocessing before completely executing #26

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ generate-migration:

test:
poetry run pytest

lint:
poetry run ruff check .

lint-fix:
poetry run ruff check . --fix
gventuri marked this conversation as resolved.
Show resolved Hide resolved
22 changes: 17 additions & 5 deletions backend/app/processing/process_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from typing import List
from app.database import SessionLocal
from app.exceptions import CreditLimitExceededException
from app.models.asset_content import AssetProcessingStatus
from app.processing.process_scheduler import ProcessScheduler
from app.repositories import process_repository
from app.repositories import project_repository
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -24,16 +26,17 @@
from app.utils import clean_text
from app.vectorstore.chroma import ChromaDB


executor = ThreadPoolExecutor(max_workers=5)

logger = Logger()


def submit_process(process_id: int):
def submit_process(process_id: int) -> None:
executor.submit(process_task, process_id)


process_execution_scheduler = ProcessScheduler(60, submit_process, logger)

# Background task processing function
def process_step_task(
process_id: int,
Expand All @@ -52,7 +55,7 @@ def process_step_task(
return False # Stop processing if the process is stopped

logger.log(f"Processing file: {process_step.asset.path}")
if process_step.status == ProcessStepStatus.COMPLETED:
if process_step.status == ProcessStepStatus.COMPLETED and process.type!="extract":
summaries.append(process_step.output.get("summary", ""))
return True

Expand Down Expand Up @@ -133,7 +136,7 @@ def process_task(process_id: int):
process.started_at = datetime.utcnow()
db.commit()

process_steps = process_repository.get_process_steps(db, process.id)
process_steps = process_repository.get_process_steps_with_asset_content(db, process.id, ["PENDING", "IN_PROGRESS"])
gventuri marked this conversation as resolved.
Show resolved Hide resolved
if not process_steps:
raise Exception("No process found!")

Expand All @@ -144,6 +147,11 @@ def process_task(process_id: int):
# Step 2: Process each step in parallel outside the database connection
failed_docs = []
summaries = []

ready_process_steps = [process_step for process_step in process_steps if process_step.asset.content.processing == AssetProcessingStatus.COMPLETED]

all_process_step_ready = len(ready_process_steps) == len(process_steps) # Asset preprocessing is pending
gventuri marked this conversation as resolved.
Show resolved Hide resolved

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(
Expand All @@ -154,7 +162,7 @@ def process_task(process_id: int):
failed_docs,
api_key,
)
for process_step in process_steps
for process_step in ready_process_steps
]
# Wait for all submitted tasks to complete
concurrent.futures.wait(futures)
Expand Down Expand Up @@ -186,6 +194,10 @@ def process_task(process_id: int):
if summary_of_summaries:
process.output = {"summary": summary_of_summaries}

if not all_process_step_ready:
logger.info(f"Process id: [{process.id}] some steps preprocessing is missing moving to waiting queue")
process_execution_scheduler.add_process_to_queue(process.id)

gventuri marked this conversation as resolved.
Show resolved Hide resolved
process.status = (
ProcessStatus.COMPLETED if not failed_docs else ProcessStatus.FAILED
)
Expand Down
77 changes: 77 additions & 0 deletions backend/app/processing/process_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from typing import Callable
from app.logger import Logger
import schedule
import threading
import time

class ProcessScheduler:
"""
A scheduler that manages a queue of processes to be executed.
It periodically checks the queue and executes processes if any are present.
If the queue becomes empty, the scheduler automatically stops to conserve resources.
The scheduler can be started and stopped as needed, and new processes can be added to the queue at any time.
"""
def __init__(self, secs: int, executor: Callable[[int], None], logger: Logger):
gventuri marked this conversation as resolved.
Show resolved Hide resolved
self.waiting_processes = []
self.scheduler_thread = None
self.scheduler_running = False
self.executor = executor
self.logger = logger
schedule.every(secs).seconds.do(self._reprocess_holding_processes)

def _reprocess_holding_processes(self):
"""Internal method to process tasks in the queue."""
if self.waiting_processes:
id = self.waiting_processes.pop(0)
gventuri marked this conversation as resolved.
Show resolved Hide resolved
self.logger.info(f"[ProcessScheduler]: Executing process from queue [{id}]")
self.executor(id)
else:
self.stop_scheduler()

def _run_scheduler(self):
"""Internal method to continuously run the scheduler."""
while self.scheduler_running:
schedule.run_pending()
time.sleep(1)

def start_scheduler(self):
"""Start the scheduler thread if it's not already running."""
if not self.scheduler_running:
self.logger.info("[ProcessScheduler]: Starting scheduler")
self.scheduler_running = True
self.scheduler_thread = threading.Thread(target=self._run_scheduler)
self.scheduler_thread.daemon = True
self.scheduler_thread.start()
else:
self.logger.info("[ProcessScheduler]: Scheduler is already running")

def stop_scheduler(self):
"""Stop the scheduler thread."""
self.scheduler_running = False
self.logger.info("[ProcessScheduler]: Scheduler stopped")

def add_process_to_queue(self, process_id: int) -> None:
gventuri marked this conversation as resolved.
Show resolved Hide resolved
"""Add a process to the queue and start the scheduler if needed."""
self.waiting_processes.append(process_id)
self.logger.info(f"[ProcessScheduler]: Scheduler adding process [{process_id}]")
self.start_scheduler()


if __name__ == "__main__":

def print_process(process):
print(process)

scheduler = ProcessScheduler(15, print_process)
gventuri marked this conversation as resolved.
Show resolved Hide resolved

# Add processes to the queue and start processing
scheduler.add_process_to_queue("Process 1")
time.sleep(3) # Simulate some time for processing

# Add more processes to the queue
scheduler.add_process_to_queue("Process 2")
gventuri marked this conversation as resolved.
Show resolved Hide resolved
time.sleep(3)

# Simulate waiting for some time to see the scheduler behavior
time.sleep(300) # Wait 2 minutes to observe scheduler stopping when empty
13 changes: 13 additions & 0 deletions backend/app/repositories/process_repository.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import List
from app.models.process import ProcessStatus
from sqlalchemy import func, or_
from sqlalchemy.orm import Session, joinedload, defer, aliased
Expand Down Expand Up @@ -91,6 +92,18 @@ def get_process_steps(db: Session, process_id: int):
)


def get_process_steps_with_asset_content(db: Session, process_id: int, status: List[ProcessStatus]):
return (
db.query(models.ProcessStep)
.filter(models.ProcessStep.process_id == process_id)
.filter(models.ProcessStep.status.in_(status))
.options(
joinedload(models.ProcessStep.process).joinedload(models.Process.project),
joinedload(models.ProcessStep.asset).joinedload(models.Asset.content),
)
.all()
)

def get_process_step(db: Session, step_id: int):
return (
db.query(models.ProcessStep)
Expand Down
18 changes: 16 additions & 2 deletions backend/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dateparser = "^1.2.0"
requests = "^2.32.3"
chromadb = "^0.5.5"
openai = "^1.51.2"
schedule = "^1.2.2"

[tool.poetry.group.dev.dependencies]
pytest = "^8.3.2"
Expand Down
141 changes: 141 additions & 0 deletions backend/tests/processing/test_process_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import unittest
from unittest.mock import MagicMock, patch
from app.processing.process_scheduler import ProcessScheduler

class TestProcessScheduler(unittest.TestCase):
def setUp(self):
self.executor_mock = MagicMock() # Mock the executor function
self.logger_mock = MagicMock() # Mock the logger
self.scheduler = ProcessScheduler(1, self.executor_mock, self.logger_mock)

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_add_process_to_queue(self, _):
# Test adding a process to the queue and ensure it's added correctly
process_id = 1
self.scheduler.add_process_to_queue(process_id)

# Check if the process is added to the waiting_processes list
self.assertIn(process_id, self.scheduler.waiting_processes)

# Check if the scheduler was started
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Starting scheduler")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_reprocess_holding_processes(self, _):
# Test processing when queue is not empty
process_id = 2
self.scheduler.waiting_processes = [process_id]

self.scheduler._reprocess_holding_processes()

# Ensure the executor was called with the process_id
self.executor_mock.assert_called_once_with(process_id)

# Ensure the logger recorded the correct process execution
self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Executing process from queue [{process_id}]")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_stop_scheduler_when_empty(self, _):
# Test if the scheduler stops when there are no processes left
self.scheduler.waiting_processes = [] # Empty the queue
self.scheduler._reprocess_holding_processes()

# Ensure the scheduler stops
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped")
self.assertFalse(self.scheduler.scheduler_running)

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_scheduler_starts_when_needed(self, _):
# Test if the scheduler starts when a new process is added
process_id = 3
self.scheduler.start_scheduler()

self.scheduler.add_process_to_queue(process_id)

# Ensure the process was added and scheduler started
self.assertIn(process_id, self.scheduler.waiting_processes)
self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Scheduler adding process [{process_id}]")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_scheduler_does_not_double_start(self, _):
# Test that scheduler doesn't start twice if it's already running
self.scheduler.scheduler_running = True # Simulate running scheduler

self.scheduler.start_scheduler()

# Ensure that a log is made saying it's already running
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler is already running")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_stop_scheduler(self, _):
# Test stopping the scheduler manually
self.scheduler.start_scheduler()
self.scheduler.stop_scheduler()

# Check if scheduler is stopped
self.assertFalse(self.scheduler.scheduler_running)
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_reprocess_multiple_processes(self, _):
# Test reprocessing when there are multiple processes in the queue
process_ids = [1, 2, 3]
self.scheduler.waiting_processes = process_ids.copy()

# Simulate running the scheduler multiple times
self.scheduler._reprocess_holding_processes()
self.scheduler._reprocess_holding_processes()
self.scheduler._reprocess_holding_processes()

# Ensure each process was executed in the correct order
self.executor_mock.assert_has_calls([
unittest.mock.call(1),
unittest.mock.call(2),
unittest.mock.call(3),
])
self.assertEqual(self.scheduler.waiting_processes, [])

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_no_processes_added(self, _):
# Test the behavior when no processes are added
self.scheduler.start_scheduler()
self.scheduler._reprocess_holding_processes()

# Scheduler should stop since no processes were added
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped")
self.assertFalse(self.scheduler.scheduler_running)

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_scheduler_stops_when_empty_after_processing(self, _):
# Test that the scheduler stops after processing all tasks
process_id = 1
self.scheduler.waiting_processes = [process_id]

# Process the task
self.scheduler._reprocess_holding_processes()

# Explicitly stop the scheduler after processing
self.scheduler.stop_scheduler()

# Ensure the scheduler stopped after processing the last task
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped")

@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays
def test_scheduler_restart_after_adding_new_process(self, _):
# Test if the scheduler restarts when a new process is added after stopping
process_id_1 = 1
process_id_2 = 2
self.scheduler.waiting_processes = [process_id_1]

# Process the first task and stop the scheduler
self.scheduler._reprocess_holding_processes()
self.assertFalse(self.scheduler.scheduler_running)

# Add a new process and ensure the scheduler restarts
self.scheduler.add_process_to_queue(process_id_2)
self.assertTrue(self.scheduler.scheduler_running)
self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Scheduler adding process [{process_id_2}]")
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Starting scheduler")

if __name__ == '__main__':
unittest.main()
Loading