Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Commit

Permalink
added entity ids for submission, fixed notifier tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Toan Quach authored and Toan Quach committed Nov 15, 2023
1 parent 70aa020 commit b7bae70
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 31 deletions.
2 changes: 2 additions & 0 deletions src/taipy/core/_entity/_entity_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(self):
self.sequence_ids = set()
self.job_ids = set()
self.cycle_ids = set()
self.submission_ids = set()

def __add__(self, other: _EntityIds):
self.data_node_ids.update(other.data_node_ids)
Expand All @@ -28,6 +29,7 @@ def __add__(self, other: _EntityIds):
self.sequence_ids.update(other.sequence_ids)
self.job_ids.update(other.job_ids)
self.cycle_ids.update(other.cycle_ids)
self.submission_ids.update(other.submission_ids)
return self

def __iadd__(self, other: _EntityIds):
Expand Down
2 changes: 2 additions & 0 deletions src/taipy/core/_manager/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def _delete_entities_of_multiple_types(cls, _entity_ids: _EntityIds):
from ..job._job_manager_factory import _JobManagerFactory
from ..scenario._scenario_manager_factory import _ScenarioManagerFactory
from ..sequence._sequence_manager_factory import _SequenceManagerFactory
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from ..task._task_manager_factory import _TaskManagerFactory

_CycleManagerFactory._build_manager()._delete_many(_entity_ids.cycle_ids)
Expand All @@ -125,6 +126,7 @@ def _delete_entities_of_multiple_types(cls, _entity_ids: _EntityIds):
_TaskManagerFactory._build_manager()._delete_many(_entity_ids.task_ids)
_JobManagerFactory._build_manager()._delete_many(_entity_ids.job_ids)
_DataManagerFactory._build_manager()._delete_many(_entity_ids.data_node_ids)
_SubmissionManagerFactory._build_manager()._delete_many(_entity_ids.submission_ids)

@classmethod
def _export(cls, id: str, folder_path: Union[str, pathlib.Path]):
Expand Down
7 changes: 7 additions & 0 deletions src/taipy/core/cycle/_cycle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .._repository._abstract_repository import _AbstractRepository
from ..job._job_manager_factory import _JobManagerFactory
from ..notification import EventEntityType, EventOperation, _publish_event
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from .cycle import Cycle
from .cycle_id import CycleId

Expand Down Expand Up @@ -118,6 +119,12 @@ def _get_children_entity_ids(cls, cycle: Cycle) -> _EntityIds:
if job.task.id in entity_ids.task_ids:
entity_ids.job_ids.add(job.id)

submissions = _SubmissionManagerFactory._build_manager()._get_all()
submitted_entity_ids = list(entity_ids.scenario_ids.union(entity_ids.sequence_ids, entity_ids.task_ids))
for submission in submissions:
if submission.entity_id in submitted_entity_ids:
entity_ids.submission_ids.add(submission.id)

return entity_ids

@classmethod
Expand Down
9 changes: 9 additions & 0 deletions src/taipy/core/scenario/_scenario_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from ..job._job_manager_factory import _JobManagerFactory
from ..job.job import Job
from ..notification import EventEntityType, EventOperation, _publish_event
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from ..task._task_manager_factory import _TaskManagerFactory
from .scenario import Scenario
from .scenario_id import ScenarioId
Expand Down Expand Up @@ -398,10 +399,18 @@ def _get_children_entity_ids(cls, scenario: Scenario) -> _EntityIds:
for data_node in scenario.data_nodes.values():
if data_node.owner_id == scenario.id:
entity_ids.data_node_ids.add(data_node.id)

jobs = _JobManagerFactory._build_manager()._get_all()
for job in jobs:
if job.task.id in entity_ids.task_ids:
entity_ids.job_ids.add(job.id)

submissions = _SubmissionManagerFactory._build_manager()._get_all()
submitted_entity_ids = list(entity_ids.scenario_ids.union(entity_ids.sequence_ids, entity_ids.task_ids))
for submission in submissions:
if submission.entity_id in submitted_entity_ids:
entity_ids.submission_ids.add(submission.id)

return entity_ids

@classmethod
Expand Down
9 changes: 9 additions & 0 deletions src/taipy/core/sequence/_sequence_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ..scenario._scenario_manager_factory import _ScenarioManagerFactory
from ..scenario.scenario import Scenario
from ..scenario.scenario_id import ScenarioId
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from ..task._task_manager_factory import _TaskManagerFactory
from ..task.task import Task, TaskId
from .sequence import Sequence
Expand Down Expand Up @@ -241,10 +242,18 @@ def _get_children_entity_ids(cls, sequence: Sequence) -> _EntityIds:
for data_node in task.data_nodes.values():
if data_node.owner_id == sequence.id:
entity_ids.data_node_ids.add(data_node.id)

jobs = _JobManagerFactory._build_manager()._get_all()
for job in jobs:
if job.task.id in entity_ids.task_ids:
entity_ids.job_ids.add(job.id)

submissions = _SubmissionManagerFactory._build_manager()._get_all()
submitted_entity_ids = list(entity_ids.sequence_ids.union(entity_ids.task_ids))
for submission in submissions:
if submission.entity_id in submitted_entity_ids:
entity_ids.submission_ids.add(submission.id)

return entity_ids

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion src/taipy/core/submission/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,4 @@ def _update_submission_status(self, plop: Job):
elif completed:
submission_status = SubmissionStatus.COMPLETED

self.submission_status = submission_status
self.submission_status = submission_status # type: ignore
8 changes: 8 additions & 0 deletions src/taipy/core/task/_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,20 @@ def _get_children_entity_ids(cls, task: Task):
entity_ids = _EntityIds()

from ..job._job_manager_factory import _JobManagerFactory
from ..submission._submission_manager_factory import _SubmissionManagerFactory

jobs = _JobManagerFactory._build_manager()._get_all()

for job in jobs:
if job.task.id == task.id:
entity_ids.job_ids.add(job.id)

submissions = _SubmissionManagerFactory._build_manager()._get_all()
submitted_entity_ids = list(entity_ids.task_ids)
for submission in submissions:
if submission.entity_id in submitted_entity_ids:
entity_ids.submission_ids.add(submission.id)

return entity_ids

@classmethod
Expand Down
15 changes: 7 additions & 8 deletions tests/core/notification/test_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ def test_publish_event():

job = scenario.submit()[0]

assert registration_queue.qsize() == 6
published_events = []
while registration_queue.qsize() != 0:
published_events.append(registration_queue.get())
Expand All @@ -600,21 +601,18 @@ def test_publish_event():
EventOperation.UPDATE,
EventOperation.UPDATE,
EventOperation.UPDATE,
EventOperation.UPDATE,
EventOperation.SUBMISSION,
]
expected_attribute_names = [None, None, "jobs", "submission_status", "submission_status", "status", None]
expected_attribute_names = [None, None, "jobs", "status", "submission_status", None]
expected_event_types = [
EventEntityType.SUBMISSION,
EventEntityType.JOB,
EventEntityType.SUBMISSION,
EventEntityType.SUBMISSION,
EventEntityType.SUBMISSION,
EventEntityType.JOB,
EventEntityType.SUBMISSION,
EventEntityType.SCENARIO,
]
expected_event_entity_id = [job.submit_id, job.id, job.submit_id, job.submit_id, job.submit_id, job.id, scenario.id]

expected_event_entity_id = [job.submit_id, job.id, job.submit_id, job.id, job.submit_id, scenario.id]
assert all(
[
event.entity_type == expected_event_types[i]
Expand All @@ -628,7 +626,7 @@ def test_publish_event():
# Test DELETION Event

tp.delete(scenario.id)
assert registration_queue.qsize() == 6
assert registration_queue.qsize() == 7

published_events = []
while registration_queue.qsize() != 0:
Expand All @@ -641,9 +639,10 @@ def test_publish_event():
EventEntityType.TASK,
EventEntityType.JOB,
EventEntityType.DATA_NODE,
EventEntityType.SUBMISSION,
]

expected_event_entity_id = [cycle.id, sequence.id, scenario.id, task.id, job.id, dn.id]
expected_event_entity_id = [cycle.id, sequence.id, scenario.id, task.id, job.id, dn.id, job.submit_id]
expected_event_operation_type = [EventOperation.DELETION] * len(expected_event_types)

assert all(
Expand Down
41 changes: 19 additions & 22 deletions tests/core/submission/test_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import pytest

from src.taipy.core import TaskId, JobId
from src.taipy.core import JobId, TaskId
from src.taipy.core.job._job_manager_factory import _JobManagerFactory
from src.taipy.core.job.job import Job
from src.taipy.core.job.status import Status
Expand Down Expand Up @@ -99,9 +99,13 @@ def mock_get_jobs(job_ids):


def __test_update_submission_status(job_ids, expected_submission_status):
with (patch("src.taipy.core.submission.submission.Submission.jobs",
new_callable=mock.PropertyMock,
return_value=(mock_get_jobs(job_ids)))):
with (
patch(
"src.taipy.core.submission.submission.Submission.jobs",
new_callable=mock.PropertyMock,
return_value=(mock_get_jobs(job_ids)),
)
):
submission = Submission("submission_id")
submission._update_submission_status(None)
assert submission.submission_status == expected_submission_status
Expand All @@ -124,7 +128,6 @@ def test_update_single_submission_status(job_ids, expected_submission_status):
__test_update_submission_status(job_ids, expected_submission_status)



@pytest.mark.parametrize(
"job_ids, expected_submission_status",
[
Expand Down Expand Up @@ -167,9 +170,7 @@ def test_update_submission_status_with_one_failed_job_in_jobs(job_ids, expected_
(["job8_abandoned", "job2_canceled"], SubmissionStatus.CANCELED),
],
)
def test_update_submission_status_with_one_canceled_job_in_jobs(
job_ids, expected_submission_status
):
def test_update_submission_status_with_one_canceled_job_in_jobs(job_ids, expected_submission_status):
__test_update_submission_status(job_ids, expected_submission_status)


Expand All @@ -185,24 +186,22 @@ def test_update_submission_status_with_one_canceled_job_in_jobs(
(["job7_skipped", "job4_pending"], SubmissionStatus.PENDING),
],
)
def test_update_submission_status_with_no_failed_or_cancel_one_pending_in_jobs(
job_ids, expected_submission_status
):
def test_update_submission_status_with_no_failed_or_cancel_one_pending_in_jobs(job_ids, expected_submission_status):
__test_update_submission_status(job_ids, expected_submission_status)


@pytest.mark.parametrize(
"job_ids, expected_submission_status",
[
(["job5_running", "job3_blocked"], SubmissionStatus.RUNNING),
# (["job5_running", "job4_pending"], SubmissionStatus.RUNNING),
# (["job5_running", "job5_running"], SubmissionStatus.RUNNING),
# (["job5_running", "job6_completed"], SubmissionStatus.RUNNING),
# (["job5_running", "job7_skipped"], SubmissionStatus.RUNNING),
# (["job3_blocked", "job5_running"], SubmissionStatus.RUNNING),
# (["job4_pending", "job5_running"], SubmissionStatus.RUNNING),
# (["job6_completed", "job5_running"], SubmissionStatus.RUNNING),
# (["job7_skipped", "job5_running"], SubmissionStatus.RUNNING),
(["job5_running", "job4_pending"], SubmissionStatus.RUNNING),
(["job5_running", "job5_running"], SubmissionStatus.RUNNING),
(["job5_running", "job6_completed"], SubmissionStatus.RUNNING),
(["job5_running", "job7_skipped"], SubmissionStatus.RUNNING),
(["job3_blocked", "job5_running"], SubmissionStatus.RUNNING),
(["job4_pending", "job5_running"], SubmissionStatus.RUNNING),
(["job6_completed", "job5_running"], SubmissionStatus.RUNNING),
(["job7_skipped", "job5_running"], SubmissionStatus.RUNNING),
],
)
def test_update_submission_status_with_no_failed_cancel_nor_pending_one_running_in_jobs(
Expand Down Expand Up @@ -236,9 +235,7 @@ def test_update_submission_status_with_no_failed_cancel_pending_nor_running_one_
(["job7_skipped", "job7_skipped"], SubmissionStatus.COMPLETED),
],
)
def test_update_submission_status_with_only_completed_or_skipped_in_jobs(
job_ids, expected_submission_status
):
def test_update_submission_status_with_only_completed_or_skipped_in_jobs(job_ids, expected_submission_status):
__test_update_submission_status(job_ids, expected_submission_status)


Expand Down

0 comments on commit b7bae70

Please sign in to comment.