Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

feature/#764 added submission entity, manager and repo #822

Merged
merged 22 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/taipy/core/_entity/_entity_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(self):
self.sequence_ids = set()
self.job_ids = set()
self.cycle_ids = set()
self.submission_ids = set()

def __add__(self, other: _EntityIds):
self.data_node_ids.update(other.data_node_ids)
Expand All @@ -28,6 +29,7 @@ def __add__(self, other: _EntityIds):
self.sequence_ids.update(other.sequence_ids)
self.job_ids.update(other.job_ids)
self.cycle_ids.update(other.cycle_ids)
self.submission_ids.update(other.submission_ids)
return self

def __iadd__(self, other: _EntityIds):
Expand Down
2 changes: 2 additions & 0 deletions src/taipy/core/_entity/_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def _get_manager(manager: str):
from ..job._job_manager_factory import _JobManagerFactory
from ..scenario._scenario_manager_factory import _ScenarioManagerFactory
from ..sequence._sequence_manager_factory import _SequenceManagerFactory
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from ..task._task_manager_factory import _TaskManagerFactory

return {
Expand All @@ -100,4 +101,5 @@ def _get_manager(manager: str):
"cycle": _CycleManagerFactory._build_manager(),
"job": _JobManagerFactory._build_manager(),
"task": _TaskManagerFactory._build_manager(),
"submission": _SubmissionManagerFactory._build_manager(),
}[manager]
2 changes: 2 additions & 0 deletions src/taipy/core/_manager/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def _delete_entities_of_multiple_types(cls, _entity_ids: _EntityIds):
from ..job._job_manager_factory import _JobManagerFactory
from ..scenario._scenario_manager_factory import _ScenarioManagerFactory
from ..sequence._sequence_manager_factory import _SequenceManagerFactory
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from ..task._task_manager_factory import _TaskManagerFactory

_CycleManagerFactory._build_manager()._delete_many(_entity_ids.cycle_ids)
Expand All @@ -125,6 +126,7 @@ def _delete_entities_of_multiple_types(cls, _entity_ids: _EntityIds):
_TaskManagerFactory._build_manager()._delete_many(_entity_ids.task_ids)
_JobManagerFactory._build_manager()._delete_many(_entity_ids.job_ids)
_DataManagerFactory._build_manager()._delete_many(_entity_ids.data_node_ids)
_SubmissionManagerFactory._build_manager()._delete_many(_entity_ids.submission_ids)

@classmethod
def _export(cls, id: str, folder_path: Union[str, pathlib.Path]):
Expand Down
2 changes: 0 additions & 2 deletions src/taipy/core/_orchestrator/_abstract_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ def submit(
def submit_task(
cls,
task: Task,
submit_id: Optional[str] = None,
submit_entity_id: Optional[str] = None,
callbacks: Optional[Iterable[Callable]] = None,
force: bool = False,
wait: bool = False,
Expand Down
86 changes: 55 additions & 31 deletions src/taipy/core/_orchestrator/_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ..job._job_manager_factory import _JobManagerFactory
from ..job.job import Job
from ..job.job_id import JobId
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from ..task.task import Task
from ._abstract_orchestrator import _AbstractOrchestrator

Expand Down Expand Up @@ -66,31 +67,38 @@ def submit(
Returns:
The created Jobs.
"""
submit_id = cls.__generate_submit_id()
res = []
submission = _SubmissionManagerFactory._build_manager()._create(submittable.id) # type: ignore
jobs = []
tasks = submittable._get_sorted_tasks()
with cls.lock:
for ts in tasks:
for task in ts:
res.append(
cls._submit_task(
task, submit_id, submittable.id, callbacks=callbacks, force=force # type: ignore
jobs.append(
cls._lock_dn_output_and_create_job(
task,
submission.id,
submission.entity_id,
callbacks=itertools.chain([submission._update_submission_status], callbacks or []),
force=force, # type: ignore
)
)

submission.jobs = jobs # type: ignore

cls._orchestrate_job_to_run_or_block(jobs)

if Config.job_config.is_development:
cls._check_and_execute_jobs_if_development_mode()
else:
if wait:
cls.__wait_until_job_finished(res, timeout=timeout)
return res
cls.__wait_until_job_finished(jobs, timeout=timeout)

return jobs

@classmethod
def submit_task(
cls,
task: Task,
submit_id: Optional[str] = None,
submit_entity_id: Optional[str] = None,
callbacks: Optional[Iterable[Callable]] = None,
force: bool = False,
wait: bool = False,
Expand All @@ -103,55 +111,69 @@ def submit_task(
submit_id (str): The optional id to differentiate each submission.
callbacks: The optional list of functions that should be executed on job status change.
force (bool): Enforce execution of the task even if its output data nodes are cached.
wait (bool): Wait for the orchestrated job created from the task submission to be finished in asynchronous
mode.
timeout (Union[float, int]): The optional maximum number of seconds to wait for the job to be finished
before returning.
wait (bool): Wait for the orchestrated job created from the task submission to be finished
in asynchronous mode.
timeout (Union[float, int]): The optional maximum number of seconds to wait for the job
to be finished before returning.
Returns:
The created `Job^`.
"""
submission = _SubmissionManagerFactory._build_manager()._create(task.id)
submit_id = submission.id
with cls.lock:
job = cls._submit_task(task, submit_id, submit_entity_id, callbacks, force)
job = cls._lock_dn_output_and_create_job(
task,
submit_id,
submission.entity_id,
itertools.chain([submission._update_submission_status], callbacks or []),
force,
)

jobs = [job]
submission.jobs = jobs # type: ignore

cls._orchestrate_job_to_run_or_block(jobs)

if Config.job_config.is_development:
cls._check_and_execute_jobs_if_development_mode()
else:
if wait:
cls.__wait_until_job_finished(job, timeout=timeout)

return job

@classmethod
def _submit_task(
def _lock_dn_output_and_create_job(
cls,
task: Task,
submit_id: Optional[str] = None,
submit_entity_id: Optional[str] = None,
submit_id: str,
submit_entity_id: str,
callbacks: Optional[Iterable[Callable]] = None,
force: bool = False,
) -> Job:
submit_id = submit_id if submit_id else cls.__generate_submit_id()
submit_entity_id = submit_entity_id if submit_entity_id else task.id

for dn in task.output.values():
dn.lock_edit()
job = _JobManagerFactory._build_manager()._create(
task, itertools.chain([cls._on_status_change], callbacks or []), submit_id, submit_entity_id, force=force
)
cls._orchestrate_job_to_run_or_block(job)

return job

@staticmethod
def __generate_submit_id():
return f"SUBMISSION_{str(uuid.uuid4())}"

@classmethod
def _orchestrate_job_to_run_or_block(cls, job: Job):
if cls._is_blocked(job):
job.blocked()
cls.blocked_jobs.append(job)
else:
job.pending()
def _orchestrate_job_to_run_or_block(cls, jobs: List[Job]):
blocked_jobs = []
pending_jobs = []

for job in jobs:
if cls._is_blocked(job):
job.blocked()
blocked_jobs.append(job)
else:
job.pending()
pending_jobs.append(job)

cls.blocked_jobs.extend(blocked_jobs)
for job in pending_jobs:
cls.jobs_to_run.put(job)

@classmethod
Expand Down Expand Up @@ -200,6 +222,7 @@ def _on_status_change(cls, job: Job):
if job.is_completed() or job.is_skipped():
cls.__unblock_jobs()
elif job.is_failed():
print(f"\nJob {job.id} failed, abandoning subsequent jobs.\n")
cls._fail_subsequent_jobs(job)

@classmethod
Expand Down Expand Up @@ -272,6 +295,7 @@ def _fail_subsequent_jobs(cls, failed_job: Job):
cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys()))
)
for job in to_fail_or_abandon_jobs:
print(f"Abandoning job: {job.id}")
job.abandoned()
to_fail_or_abandon_jobs.update([failed_job])
cls.__remove_blocked_jobs(to_fail_or_abandon_jobs)
Expand Down
4 changes: 4 additions & 0 deletions src/taipy/core/_repository/db/_sql_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def init_db(cls):
from ...data._data_model import _DataNodeModel
from ...job._job_model import _JobModel
from ...scenario._scenario_model import _ScenarioModel
from ...submission._submission_model import _SubmissionModel
from ...task._task_model import _TaskModel

cls._connection.execute(
Expand All @@ -64,6 +65,9 @@ def init_db(cls):
cls._connection.execute(
str(CreateTable(_VersionModel.__table__, if_not_exists=True).compile(dialect=sqlite.dialect()))
)
cls._connection.execute(
str(CreateTable(_SubmissionModel.__table__, if_not_exists=True).compile(dialect=sqlite.dialect()))
)

return cls._connection

Expand Down
2 changes: 1 addition & 1 deletion src/taipy/core/_version/_version_sql_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _get_latest_version(self):
str(self.table.select().filter_by(is_latest=True).compile(dialect=sqlite.dialect()))
).fetchone():
return latest["id"]
return ""
raise ModelNotFound(self.model_type, "")

def _set_development_version(self, version_number):
if old_development := self.db.execute(str(self.table.select().filter_by(is_development=True))).fetchone():
Expand Down
7 changes: 7 additions & 0 deletions src/taipy/core/cycle/_cycle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .._repository._abstract_repository import _AbstractRepository
from ..job._job_manager_factory import _JobManagerFactory
from ..notification import EventEntityType, EventOperation, _publish_event
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from .cycle import Cycle
from .cycle_id import CycleId

Expand Down Expand Up @@ -118,6 +119,12 @@ def _get_children_entity_ids(cls, cycle: Cycle) -> _EntityIds:
if job.task.id in entity_ids.task_ids:
entity_ids.job_ids.add(job.id)

submissions = _SubmissionManagerFactory._build_manager()._get_all()
submitted_entity_ids = list(entity_ids.scenario_ids.union(entity_ids.sequence_ids, entity_ids.task_ids))
for submission in submissions:
if submission.entity_id in submitted_entity_ids:
entity_ids.submission_ids.add(submission.id)

return entity_ids

@classmethod
Expand Down
3 changes: 1 addition & 2 deletions src/taipy/core/job/_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class _JobManager(_Manager[Job], _VersionMixin):
_ENTITY_NAME = Job.__name__
_ID_PREFIX = "JOB_"
_repository: _AbstractRepository
_EVENT_ENTITY_TYPE = EventEntityType.JOB

@classmethod
def _get_all(cls, version_number: Optional[str] = None) -> List[Job]:
Expand All @@ -37,8 +38,6 @@ def _get_all(cls, version_number: Optional[str] = None) -> List[Job]:
filters = cls._build_filters_with_version(version_number)
return cls._repository._load_all(filters)

_EVENT_ENTITY_TYPE = EventEntityType.JOB

@classmethod
def _create(
cls, task: Task, callbacks: Iterable[Callable], submit_id: str, submit_entity_id: str, force=False
Expand Down
4 changes: 2 additions & 2 deletions src/taipy/core/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ def __lt__(self, other):
return self.creation_date.timestamp() < other.creation_date.timestamp()

def __le__(self, other):
return self.creation_date.timestamp() == other.creation_date.timestamp() or self < other
return self.creation_date.timestamp() <= other.creation_date.timestamp()

def __gt__(self, other):
return self.creation_date.timestamp() > other.creation_date.timestamp()

def __ge__(self, other):
return self.creation_date.timestamp() == other.creation_date.timestamp() or self > other
return self.creation_date.timestamp() >= other.creation_date.timestamp()

def __eq__(self, other):
return self.id == other.id
Expand Down
4 changes: 3 additions & 1 deletion src/taipy/core/notification/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class EventEntityType(_ReprEnum):

`EventEntityType` is used as an attribute of the `Event^` object to describe
an entity that was changed.<br>
The possible operations are `CYCLE`, `SCENARIO`, `SEQUENCE`, `TASK`, `DATA_NODE`, or `JOB`.
The possible operations are `CYCLE`, `SCENARIO`, `SEQUENCE`, `TASK`, `DATA_NODE`, `JOB` or `SUBMISSION`.
"""

CYCLE = 1
Expand All @@ -44,6 +44,7 @@ class EventEntityType(_ReprEnum):
TASK = 4
DATA_NODE = 5
JOB = 6
SUBMISSION = 7


_NO_ATTRIBUTE_NAME_OPERATIONS = set([EventOperation.CREATION, EventOperation.DELETION, EventOperation.SUBMISSION])
Expand All @@ -55,6 +56,7 @@ class EventEntityType(_ReprEnum):
"data": EventEntityType.DATA_NODE,
"job": EventEntityType.JOB,
"cycle": EventEntityType.CYCLE,
"submission": EventEntityType.SUBMISSION,
}


Expand Down
9 changes: 9 additions & 0 deletions src/taipy/core/scenario/_scenario_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from ..job._job_manager_factory import _JobManagerFactory
from ..job.job import Job
from ..notification import EventEntityType, EventOperation, _publish_event
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from ..task._task_manager_factory import _TaskManagerFactory
from .scenario import Scenario
from .scenario_id import ScenarioId
Expand Down Expand Up @@ -398,10 +399,18 @@ def _get_children_entity_ids(cls, scenario: Scenario) -> _EntityIds:
for data_node in scenario.data_nodes.values():
if data_node.owner_id == scenario.id:
entity_ids.data_node_ids.add(data_node.id)

jobs = _JobManagerFactory._build_manager()._get_all()
for job in jobs:
if job.task.id in entity_ids.task_ids:
entity_ids.job_ids.add(job.id)

submissions = _SubmissionManagerFactory._build_manager()._get_all()
submitted_entity_ids = list(entity_ids.scenario_ids.union(entity_ids.sequence_ids, entity_ids.task_ids))
for submission in submissions:
if submission.entity_id in submitted_entity_ids:
entity_ids.submission_ids.add(submission.id)

return entity_ids

@classmethod
Expand Down
9 changes: 9 additions & 0 deletions src/taipy/core/sequence/_sequence_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ..scenario._scenario_manager_factory import _ScenarioManagerFactory
from ..scenario.scenario import Scenario
from ..scenario.scenario_id import ScenarioId
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from ..task._task_manager_factory import _TaskManagerFactory
from ..task.task import Task, TaskId
from .sequence import Sequence
Expand Down Expand Up @@ -241,10 +242,18 @@ def _get_children_entity_ids(cls, sequence: Sequence) -> _EntityIds:
for data_node in task.data_nodes.values():
if data_node.owner_id == sequence.id:
entity_ids.data_node_ids.add(data_node.id)

jobs = _JobManagerFactory._build_manager()._get_all()
for job in jobs:
if job.task.id in entity_ids.task_ids:
entity_ids.job_ids.add(job.id)

submissions = _SubmissionManagerFactory._build_manager()._get_all()
submitted_entity_ids = list(entity_ids.sequence_ids.union(entity_ids.task_ids))
for submission in submissions:
if submission.entity_id in submitted_entity_ids:
entity_ids.submission_ids.add(submission.id)

return entity_ids

@classmethod
Expand Down
10 changes: 10 additions & 0 deletions src/taipy/core/submission/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright 2023 Avaiga Private Limited
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
Loading
Loading