From 8dc1a0cbd6933d0108ff6d0f04fbe90260211b7a Mon Sep 17 00:00:00 2001 From: Toan Quach Date: Wed, 15 Nov 2023 16:46:02 +0700 Subject: [PATCH] fixd update submission status function --- src/taipy/core/submission/submission.py | 59 +++++++++++-------- .../core/submission/submission_status.py | 6 +- tests/core/submission/test_submission.py | 10 ++-- .../submission/test_submission_manager.py | 2 +- .../test_submission_manager_with_sql_repo.py | 18 +----- 5 files changed, 48 insertions(+), 47 deletions(-) diff --git a/src/taipy/core/submission/submission.py b/src/taipy/core/submission/submission.py index 62006d40..531583d8 100644 --- a/src/taipy/core/submission/submission.py +++ b/src/taipy/core/submission/submission.py @@ -53,7 +53,7 @@ def __init__( self.id = id or self.__new_id() self._jobs: Union[List[Job], List[JobId], List] = jobs or [] self._creation_date = creation_date or datetime.now() - self._submission_status = submission_status or SubmissionStatus.UNDEFINED + self._submission_status = submission_status or SubmissionStatus.SUBMITTED self._version = version or _VersionManagerFactory._build_manager()._get_latest_version() @staticmethod @@ -129,8 +129,9 @@ def __gt__(self, other): def __ge__(self, other): return self.creation_date.timestamp() == other.creation_date.timestamp() or self > other - def _update_submission_status(self, plop: Job): - submission_status = SubmissionStatus.UNDEFINED + def _update_submission_status(self, _: Job): + abandoned = False + canceled = False blocked = False pending = False running = False @@ -140,28 +141,40 @@ def _update_submission_status(self, plop: Job): if not job: continue if job.is_failed(): - submission_status = SubmissionStatus.FAILED - break + self.submission_status = SubmissionStatus.FAILED # type: ignore + return if job.is_canceled(): - submission_status = SubmissionStatus.CANCELED - break - if not blocked and job.is_blocked(): + canceled = True + if job.is_blocked(): blocked = True - if not pending and job.is_pending(): + continue + if job.is_pending() or job.is_submitted(): pending = True - if not running and job.is_running(): + continue + if job.is_running(): running = True - if not completed and (job.is_completed() or job.is_skipped()): + continue + if job.is_completed() or job.is_skipped(): completed = True - - if submission_status == SubmissionStatus.UNDEFINED: - if pending: - submission_status = SubmissionStatus.PENDING - elif blocked: - submission_status = SubmissionStatus.BLOCKED - elif running: - submission_status = SubmissionStatus.RUNNING - elif completed: - submission_status = SubmissionStatus.COMPLETED - - self.submission_status = submission_status # type: ignore + continue + if job.is_abandoned(): + abandoned = True + if canceled: + self.submission_status = SubmissionStatus.CANCELED # type: ignore + return + if abandoned: + self.submission_status = SubmissionStatus.UNDEFINED # type: ignore + return + if running: + self.submission_status = SubmissionStatus.RUNNING # type: ignore + return + if pending: + self.submission_status = SubmissionStatus.PENDING # type: ignore + return + if blocked: + self.submission_status = SubmissionStatus.BLOCKED # type: ignore + return + if completed: + self.submission_status = SubmissionStatus.COMPLETED # type: ignore + return + self.submission_status = SubmissionStatus.UNDEFINED # type: ignore diff --git a/src/taipy/core/submission/submission_status.py b/src/taipy/core/submission/submission_status.py index 8780c14e..32a15973 100644 --- a/src/taipy/core/submission/submission_status.py +++ b/src/taipy/core/submission/submission_status.py @@ -19,9 +19,12 @@ class SubmissionStatus(_ReprEnum): The possible values are: - - `UNDEFINED`: AN `UNDEFINED` submission has been created for execution but not processed yet by + - `SUBMITTED`: A `SUBMITTED` submission has been submitted for execution but not processed yet by the orchestrator. + - `UNDEFINED`: AN `UNDEFINED` submission's jobs have been submitted for execution but got some undefined + status changes. + - `PENDING`: A `PENDING` submission has been enqueued by the orchestrator. It is waiting for an executor to be available for its execution. @@ -36,6 +39,7 @@ class SubmissionStatus(_ReprEnum): - `COMPLETED`: A `COMPLETED` submission has successfully been executed. """ + SUBMITTED = 0 UNDEFINED = 1 BLOCKED = 2 PENDING = 3 diff --git a/tests/core/submission/test_submission.py b/tests/core/submission/test_submission.py index 1e8b4590..9ec70a6e 100644 --- a/tests/core/submission/test_submission.py +++ b/tests/core/submission/test_submission.py @@ -17,7 +17,7 @@ import pytest -from src.taipy.core import JobId, TaskId +from src.taipy.core import TaskId from src.taipy.core.job._job_manager_factory import _JobManagerFactory from src.taipy.core.job.job import Job from src.taipy.core.job.status import Status @@ -35,7 +35,7 @@ def test_create_submission(scenario, job, current_datetime): assert submission_1.entity_id == scenario.id assert submission_1.jobs == [] assert isinstance(submission_1.creation_date, datetime) - assert submission_1._submission_status == SubmissionStatus.UNDEFINED + assert submission_1._submission_status == SubmissionStatus.SUBMITTED assert submission_1._version is not None submission_2 = Submission( @@ -139,6 +139,7 @@ def test_update_single_submission_status(job_ids, expected_submission_status): (["job1_failed", "job6_completed"], SubmissionStatus.FAILED), (["job1_failed", "job7_skipped"], SubmissionStatus.FAILED), (["job1_failed", "job8_abandoned"], SubmissionStatus.FAILED), + (["job2_canceled", "job1_failed"], SubmissionStatus.FAILED), (["job3_blocked", "job1_failed"], SubmissionStatus.FAILED), (["job4_pending", "job1_failed"], SubmissionStatus.FAILED), (["job5_running", "job1_failed"], SubmissionStatus.FAILED), @@ -161,7 +162,6 @@ def test_update_submission_status_with_one_failed_job_in_jobs(job_ids, expected_ (["job2_canceled", "job6_completed"], SubmissionStatus.CANCELED), (["job2_canceled", "job7_skipped"], SubmissionStatus.CANCELED), (["job2_canceled", "job8_abandoned"], SubmissionStatus.CANCELED), - (["job2_canceled", "job1_failed"], SubmissionStatus.CANCELED), (["job3_blocked", "job2_canceled"], SubmissionStatus.CANCELED), (["job4_pending", "job2_canceled"], SubmissionStatus.CANCELED), (["job5_running", "job2_canceled"], SubmissionStatus.CANCELED), @@ -296,8 +296,8 @@ def test_auto_set_and_reload(): assert submission_2.jobs == [job_2, job_1] # auto set & reload on submission_status attribute - assert submission_1.submission_status == SubmissionStatus.UNDEFINED - assert submission_2.submission_status == SubmissionStatus.UNDEFINED + assert submission_1.submission_status == SubmissionStatus.SUBMITTED + assert submission_2.submission_status == SubmissionStatus.SUBMITTED submission_1.submission_status = SubmissionStatus.BLOCKED assert submission_1.submission_status == SubmissionStatus.BLOCKED assert submission_2.submission_status == SubmissionStatus.BLOCKED diff --git a/tests/core/submission/test_submission_manager.py b/tests/core/submission/test_submission_manager.py index 8b497473..ff50e3a8 100644 --- a/tests/core/submission/test_submission_manager.py +++ b/tests/core/submission/test_submission_manager.py @@ -26,7 +26,7 @@ def test_create_submission(scenario): assert submission_1.entity_id == scenario.id assert submission_1.jobs == [] assert isinstance(submission_1.creation_date, datetime) - assert submission_1._submission_status == SubmissionStatus.UNDEFINED + assert submission_1._submission_status == SubmissionStatus.SUBMITTED def test_get_submission(): diff --git a/tests/core/submission/test_submission_manager_with_sql_repo.py b/tests/core/submission/test_submission_manager_with_sql_repo.py index 788f5f3c..d1bfa211 100644 --- a/tests/core/submission/test_submission_manager_with_sql_repo.py +++ b/tests/core/submission/test_submission_manager_with_sql_repo.py @@ -12,28 +12,12 @@ from datetime import datetime from time import sleep -import pytest - from src.taipy.core import Task -from src.taipy.core._orchestrator._dispatcher._job_dispatcher import _JobDispatcher -from src.taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory from src.taipy.core._repository.db._sql_session import _build_engine, _SQLSession from src.taipy.core._version._version_manager_factory import _VersionManagerFactory -from src.taipy.core.config.job_config import JobConfig -from src.taipy.core.data import InMemoryDataNode -from src.taipy.core.data._data_manager import _DataManager -from src.taipy.core.data._data_manager_factory import _DataManagerFactory -from src.taipy.core.exceptions.exceptions import JobNotDeletedException -from src.taipy.core.job._job_manager import _JobManager -from src.taipy.core.job._job_manager_factory import _JobManagerFactory -from src.taipy.core.job.job_id import JobId -from src.taipy.core.job.status import Status from src.taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory from src.taipy.core.submission.submission import Submission from src.taipy.core.submission.submission_status import SubmissionStatus -from src.taipy.core.task._task_manager import _TaskManager -from src.taipy.core.task._task_manager_factory import _TaskManagerFactory -from src.taipy.core.task.task import Task def init_managers(): @@ -56,7 +40,7 @@ def test_create_submission(scenario, init_sql_repo): assert submission_1.entity_id == scenario.id assert submission_1.jobs == [] assert isinstance(submission_1.creation_date, datetime) - assert submission_1._submission_status == SubmissionStatus.UNDEFINED + assert submission_1._submission_status == SubmissionStatus.SUBMITTED def test_get_submission(init_sql_repo):