Skip to content

Commit

Permalink
fix[process_queue]: code improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ArslanSaleem committed Oct 16, 2024
1 parent e08fc37 commit 13f0b93
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 34 deletions.
5 changes: 4 additions & 1 deletion backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ generate-migration:
test:
poetry run pytest


# Check code for linting issues
lint:
poetry run ruff check .

# Automatically fix linting issues where possible
lint-fix:
poetry run ruff check . --fix
poetry run ruff check . --fix --show-fixes
8 changes: 5 additions & 3 deletions backend/app/processing/process_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def process_task(process_id: int):
process.started_at = datetime.utcnow()
db.commit()

process_steps = process_repository.get_process_steps_with_asset_content(db, process.id, ["PENDING", "IN_PROGRESS"])
process_steps = process_repository.get_process_steps_with_asset_content(db, process.id, [ProcessStepStatus.PENDING.name, ProcessStepStatus.IN_PROGRESS.name])
if not process_steps:
raise Exception("No process found!")

Expand All @@ -150,7 +150,7 @@ def process_task(process_id: int):

ready_process_steps = [process_step for process_step in process_steps if process_step.asset.content.processing == AssetProcessingStatus.COMPLETED]

all_process_step_ready = len(ready_process_steps) == len(process_steps) # Asset preprocessing is pending
all_process_steps_ready = len(ready_process_steps) == len(process_steps) # Check if all process steps are ready

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
Expand Down Expand Up @@ -194,9 +194,11 @@ def process_task(process_id: int):
if summary_of_summaries:
process.output = {"summary": summary_of_summaries}

if not all_process_step_ready:
if not all_process_steps_ready:
logger.info(f"Process id: [{process.id}] some steps preprocessing is missing moving to waiting queue")
process_execution_scheduler.add_process_to_queue(process.id)
# Skip status update since not all steps are ready
return

process.status = (
ProcessStatus.COMPLETED if not failed_docs else ProcessStatus.FAILED
Expand Down
48 changes: 18 additions & 30 deletions backend/app/processing/process_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,28 @@ class ProcessScheduler:
If the queue becomes empty, the scheduler automatically stops to conserve resources.
The scheduler can be started and stopped as needed, and new processes can be added to the queue at any time.
"""
def __init__(self, secs: int, executor: Callable[[int], None], logger: Logger):
def __init__(self, secs: int, executor: Callable[[int], None], logger: Logger = None):
self.waiting_processes = []
self.scheduler_thread = None
self.scheduler_running = False
self.lock = threading.Lock()
self.executor = executor
self.logger = logger
if logger is None:
self.logger = Logger()
else:
self.logger = logger

schedule.every(secs).seconds.do(self._reprocess_holding_processes)

def _reprocess_holding_processes(self):
"""Internal method to process tasks in the queue."""
if self.waiting_processes:
id = self.waiting_processes.pop(0)
self.logger.info(f"[ProcessScheduler]: Executing process from queue [{id}]")
self.executor(id)
else:
self.stop_scheduler()
with self.lock:
if self.waiting_processes:
process_id = self.waiting_processes.pop(0)
self.logger.info(f"[ProcessScheduler]: Executing process from queue [{process_id}]")
self.executor(process_id)
else:
self.stop_scheduler()

def _run_scheduler(self):
"""Internal method to continuously run the scheduler."""
Expand All @@ -53,25 +59,7 @@ def stop_scheduler(self):

def add_process_to_queue(self, process_id: int) -> None:
"""Add a process to the queue and start the scheduler if needed."""
self.waiting_processes.append(process_id)
self.logger.info(f"[ProcessScheduler]: Scheduler adding process [{process_id}]")
self.start_scheduler()


if __name__ == "__main__":

def print_process(process):
print(process)

scheduler = ProcessScheduler(15, print_process)

# Add processes to the queue and start processing
scheduler.add_process_to_queue("Process 1")
time.sleep(3) # Simulate some time for processing

# Add more processes to the queue
scheduler.add_process_to_queue("Process 2")
time.sleep(3)

# Simulate waiting for some time to see the scheduler behavior
time.sleep(300) # Wait 2 minutes to observe scheduler stopping when empty
with self.lock:
self.waiting_processes.append(process_id)
self.logger.info(f"[ProcessScheduler]: Scheduler adding process [{process_id}]")
self.start_scheduler()

0 comments on commit 13f0b93

Please sign in to comment.