-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore[processes]: wait for process preprocessing before comple
tely executing
- Loading branch information
1 parent
2d0254f
commit e08fc37
Showing
7 changed files
with
271 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
from typing import Callable | ||
from app.logger import Logger | ||
import schedule | ||
import threading | ||
import time | ||
|
||
class ProcessScheduler: | ||
""" | ||
A scheduler that manages a queue of processes to be executed. | ||
It periodically checks the queue and executes processes if any are present. | ||
If the queue becomes empty, the scheduler automatically stops to conserve resources. | ||
The scheduler can be started and stopped as needed, and new processes can be added to the queue at any time. | ||
""" | ||
def __init__(self, secs: int, executor: Callable[[int], None], logger: Logger): | ||
self.waiting_processes = [] | ||
self.scheduler_thread = None | ||
self.scheduler_running = False | ||
self.executor = executor | ||
self.logger = logger | ||
schedule.every(secs).seconds.do(self._reprocess_holding_processes) | ||
|
||
def _reprocess_holding_processes(self): | ||
"""Internal method to process tasks in the queue.""" | ||
if self.waiting_processes: | ||
id = self.waiting_processes.pop(0) | ||
self.logger.info(f"[ProcessScheduler]: Executing process from queue [{id}]") | ||
self.executor(id) | ||
else: | ||
self.stop_scheduler() | ||
|
||
def _run_scheduler(self): | ||
"""Internal method to continuously run the scheduler.""" | ||
while self.scheduler_running: | ||
schedule.run_pending() | ||
time.sleep(1) | ||
|
||
def start_scheduler(self): | ||
"""Start the scheduler thread if it's not already running.""" | ||
if not self.scheduler_running: | ||
self.logger.info("[ProcessScheduler]: Starting scheduler") | ||
self.scheduler_running = True | ||
self.scheduler_thread = threading.Thread(target=self._run_scheduler) | ||
self.scheduler_thread.daemon = True | ||
self.scheduler_thread.start() | ||
else: | ||
self.logger.info("[ProcessScheduler]: Scheduler is already running") | ||
|
||
def stop_scheduler(self): | ||
"""Stop the scheduler thread.""" | ||
self.scheduler_running = False | ||
self.logger.info("[ProcessScheduler]: Scheduler stopped") | ||
|
||
def add_process_to_queue(self, process_id: int) -> None: | ||
"""Add a process to the queue and start the scheduler if needed.""" | ||
self.waiting_processes.append(process_id) | ||
self.logger.info(f"[ProcessScheduler]: Scheduler adding process [{process_id}]") | ||
self.start_scheduler() | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
def print_process(process): | ||
print(process) | ||
|
||
scheduler = ProcessScheduler(15, print_process) | ||
|
||
# Add processes to the queue and start processing | ||
scheduler.add_process_to_queue("Process 1") | ||
time.sleep(3) # Simulate some time for processing | ||
|
||
# Add more processes to the queue | ||
scheduler.add_process_to_queue("Process 2") | ||
time.sleep(3) | ||
|
||
# Simulate waiting for some time to see the scheduler behavior | ||
time.sleep(300) # Wait 2 minutes to observe scheduler stopping when empty |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
import unittest | ||
from unittest.mock import MagicMock, patch | ||
from app.processing.process_scheduler import ProcessScheduler | ||
|
||
class TestProcessScheduler(unittest.TestCase): | ||
def setUp(self): | ||
self.executor_mock = MagicMock() # Mock the executor function | ||
self.logger_mock = MagicMock() # Mock the logger | ||
self.scheduler = ProcessScheduler(1, self.executor_mock, self.logger_mock) | ||
|
||
@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays | ||
def test_add_process_to_queue(self, _): | ||
# Test adding a process to the queue and ensure it's added correctly | ||
process_id = 1 | ||
self.scheduler.add_process_to_queue(process_id) | ||
|
||
# Check if the process is added to the waiting_processes list | ||
self.assertIn(process_id, self.scheduler.waiting_processes) | ||
|
||
# Check if the scheduler was started | ||
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Starting scheduler") | ||
|
||
@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays | ||
def test_reprocess_holding_processes(self, _): | ||
# Test processing when queue is not empty | ||
process_id = 2 | ||
self.scheduler.waiting_processes = [process_id] | ||
|
||
self.scheduler._reprocess_holding_processes() | ||
|
||
# Ensure the executor was called with the process_id | ||
self.executor_mock.assert_called_once_with(process_id) | ||
|
||
# Ensure the logger recorded the correct process execution | ||
self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Executing process from queue [{process_id}]") | ||
|
||
@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays | ||
def test_stop_scheduler_when_empty(self, _): | ||
# Test if the scheduler stops when there are no processes left | ||
self.scheduler.waiting_processes = [] # Empty the queue | ||
self.scheduler._reprocess_holding_processes() | ||
|
||
# Ensure the scheduler stops | ||
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") | ||
self.assertFalse(self.scheduler.scheduler_running) | ||
|
||
@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays | ||
def test_scheduler_starts_when_needed(self, _): | ||
# Test if the scheduler starts when a new process is added | ||
process_id = 3 | ||
self.scheduler.start_scheduler() | ||
|
||
self.scheduler.add_process_to_queue(process_id) | ||
|
||
# Ensure the process was added and scheduler started | ||
self.assertIn(process_id, self.scheduler.waiting_processes) | ||
self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Scheduler adding process [{process_id}]") | ||
|
||
@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays | ||
def test_scheduler_does_not_double_start(self, _): | ||
# Test that scheduler doesn't start twice if it's already running | ||
self.scheduler.scheduler_running = True # Simulate running scheduler | ||
|
||
self.scheduler.start_scheduler() | ||
|
||
# Ensure that a log is made saying it's already running | ||
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler is already running") | ||
|
||
@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays | ||
def test_stop_scheduler(self, _): | ||
# Test stopping the scheduler manually | ||
self.scheduler.start_scheduler() | ||
self.scheduler.stop_scheduler() | ||
|
||
# Check if scheduler is stopped | ||
self.assertFalse(self.scheduler.scheduler_running) | ||
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") | ||
|
||
@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays | ||
def test_reprocess_multiple_processes(self, _): | ||
# Test reprocessing when there are multiple processes in the queue | ||
process_ids = [1, 2, 3] | ||
self.scheduler.waiting_processes = process_ids.copy() | ||
|
||
# Simulate running the scheduler multiple times | ||
self.scheduler._reprocess_holding_processes() | ||
self.scheduler._reprocess_holding_processes() | ||
self.scheduler._reprocess_holding_processes() | ||
|
||
# Ensure each process was executed in the correct order | ||
self.executor_mock.assert_has_calls([ | ||
unittest.mock.call(1), | ||
unittest.mock.call(2), | ||
unittest.mock.call(3), | ||
]) | ||
self.assertEqual(self.scheduler.waiting_processes, []) | ||
|
||
@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays | ||
def test_no_processes_added(self, _): | ||
# Test the behavior when no processes are added | ||
self.scheduler.start_scheduler() | ||
self.scheduler._reprocess_holding_processes() | ||
|
||
# Scheduler should stop since no processes were added | ||
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") | ||
self.assertFalse(self.scheduler.scheduler_running) | ||
|
||
@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays | ||
def test_scheduler_stops_when_empty_after_processing(self, _): | ||
# Test that the scheduler stops after processing all tasks | ||
process_id = 1 | ||
self.scheduler.waiting_processes = [process_id] | ||
|
||
# Process the task | ||
self.scheduler._reprocess_holding_processes() | ||
|
||
# Explicitly stop the scheduler after processing | ||
self.scheduler.stop_scheduler() | ||
|
||
# Ensure the scheduler stopped after processing the last task | ||
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Scheduler stopped") | ||
|
||
@patch('time.sleep', return_value=None) # Mocking sleep to avoid delays | ||
def test_scheduler_restart_after_adding_new_process(self, _): | ||
# Test if the scheduler restarts when a new process is added after stopping | ||
process_id_1 = 1 | ||
process_id_2 = 2 | ||
self.scheduler.waiting_processes = [process_id_1] | ||
|
||
# Process the first task and stop the scheduler | ||
self.scheduler._reprocess_holding_processes() | ||
self.assertFalse(self.scheduler.scheduler_running) | ||
|
||
# Add a new process and ensure the scheduler restarts | ||
self.scheduler.add_process_to_queue(process_id_2) | ||
self.assertTrue(self.scheduler.scheduler_running) | ||
self.logger_mock.info.assert_any_call(f"[ProcessScheduler]: Scheduler adding process [{process_id_2}]") | ||
self.logger_mock.info.assert_any_call("[ProcessScheduler]: Starting scheduler") | ||
|
||
if __name__ == '__main__': | ||
unittest.main() |