Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Commit

Permalink
Improve Core Events to support additional metadata (#829)
Browse files Browse the repository at this point in the history
* Change event API and introduce the make_event function to dynamically create event from an entity

* test: Improve unit tests

* test: order of events is not guaranteed

* Remove config_id from the Event class, use more Notifier.publish
  • Loading branch information
gmarabout authored Nov 22, 2023
1 parent a9c256d commit 652fd79
Show file tree
Hide file tree
Showing 24 changed files with 843 additions and 224 deletions.
6 changes: 3 additions & 3 deletions src/taipy/core/_entity/_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from typing import List

from .._entity._reload import _get_manager
from ..notification import _publish_event
from ..notification import Notifier


class _Entity:
Expand All @@ -34,6 +34,6 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
self._properties.data.update(self._properties._pending_changes)
_get_manager(self._MANAGER_NAME)._set(self)

while self._in_context_attributes_changed_collector:
_publish_event(*self._in_context_attributes_changed_collector.pop(0))
for event in self._in_context_attributes_changed_collector:
Notifier.publish(event)
_get_manager(self._MANAGER_NAME)._set(self)
31 changes: 15 additions & 16 deletions src/taipy/core/_entity/_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@

from collections import UserDict

from ..notification import _ENTITY_TO_EVENT_ENTITY_TYPE, EventOperation, _publish_event
from ..notification import _ENTITY_TO_EVENT_ENTITY_TYPE, EventOperation, Notifier, _make_event


class _Properties(UserDict):

__PROPERTIES_ATTRIBUTE_NAME = "properties"

def __init__(self, entity_owner, **kwargs):
Expand All @@ -29,20 +28,20 @@ def __setitem__(self, key, value):
from ... import core as tp

if hasattr(self, "_entity_owner"):
to_publish_event_parameters = [
_ENTITY_TO_EVENT_ENTITY_TYPE[self._entity_owner._MANAGER_NAME],
self._entity_owner.id,
event = _make_event(
self._entity_owner,
EventOperation.UPDATE,
self.__PROPERTIES_ATTRIBUTE_NAME,
]
attribute_name=self.__PROPERTIES_ATTRIBUTE_NAME,
attribute_value=value,
)
if not self._entity_owner._is_in_context:
tp.set(self._entity_owner)
_publish_event(*to_publish_event_parameters)
Notifier.publish(event)
else:
if key in self._pending_deletions:
self._pending_deletions.remove(key)
self._pending_changes[key] = value
self._entity_owner._in_context_attributes_changed_collector.append(to_publish_event_parameters)
self._entity_owner._in_context_attributes_changed_collector.append(event)

def __getitem__(self, key):
from taipy.config.common._template_handler import _TemplateHandler as _tpl
Expand All @@ -54,16 +53,16 @@ def __delitem__(self, key):
from ... import core as tp

if hasattr(self, "_entity_owner"):
to_publish_event_parameters = [
_ENTITY_TO_EVENT_ENTITY_TYPE[self._entity_owner._MANAGER_NAME],
self._entity_owner.id,
event = _make_event(
self._entity_owner,
EventOperation.UPDATE,
self.__PROPERTIES_ATTRIBUTE_NAME,
]
attribute_name=self.__PROPERTIES_ATTRIBUTE_NAME,
attribute_value=None,
)
if not self._entity_owner._is_in_context:
tp.set(self._entity_owner)
_publish_event(*to_publish_event_parameters)
Notifier.publish(event)
else:
self._pending_changes.pop(key, None)
self._pending_deletions.add(key)
self._entity_owner._in_context_attributes_changed_collector.append(to_publish_event_parameters)
self._entity_owner._in_context_attributes_changed_collector.append(event)
20 changes: 12 additions & 8 deletions src/taipy/core/_entity/_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import functools

from ..notification import EventOperation, _publish_event
from ..notification import EventOperation, Notifier, _make_event


class _Reloader:
Expand Down Expand Up @@ -65,19 +65,23 @@ def __set_entity(fct):
def _do_set_entity(self, *args, **kwargs):
fct(self, *args, **kwargs)
entity_manager = _get_manager(manager)
to_publish_event_parameters = [
entity_manager._EVENT_ENTITY_TYPE,
self.id,
if len(args) == 1:
value = args[0]
else:
value = args
event = _make_event(
self,
EventOperation.UPDATE,
fct.__name__,
]
attribute_name=fct.__name__,
attribute_value=value,
)
if not self._is_in_context:
entity = _Reloader()._reload(manager, self)
fct(entity, *args, **kwargs)
entity_manager._set(entity)
_publish_event(*to_publish_event_parameters)
Notifier.publish(event)
else:
self._in_context_attributes_changed_collector.append(to_publish_event_parameters)
self._in_context_attributes_changed_collector.append(event)

return _do_set_entity

Expand Down
36 changes: 31 additions & 5 deletions src/taipy/core/_manager/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
# specific language governing permissions and limitations under the License.

import pathlib
from importlib import metadata
from typing import Dict, Generic, Iterable, List, Optional, TypeVar, Union

from taipy.logger._taipy_logger import _TaipyLogger

from .._entity._entity_ids import _EntityIds
from .._repository._abstract_repository import _AbstractRepository
from ..exceptions.exceptions import ModelNotFound
from ..notification import EventOperation, _publish_event
from ..notification import Event, EventOperation, Notifier

EntityType = TypeVar("EntityType")

Expand All @@ -34,7 +35,13 @@ def _delete_all(cls):
"""
cls._repository._delete_all()
if hasattr(cls, "_EVENT_ENTITY_TYPE"):
_publish_event(cls._EVENT_ENTITY_TYPE, "all", EventOperation.DELETION, None)
Notifier.publish(
Event(
cls._EVENT_ENTITY_TYPE,
EventOperation.DELETION,
metadata={"delete_all": True},
)
)

@classmethod
def _delete_many(cls, ids: Iterable):
Expand All @@ -44,7 +51,14 @@ def _delete_many(cls, ids: Iterable):
cls._repository._delete_many(ids)
if hasattr(cls, "_EVENT_ENTITY_TYPE"):
for entity_id in ids:
_publish_event(cls._EVENT_ENTITY_TYPE, entity_id, EventOperation.DELETION, None) # type: ignore
Notifier.publish(
Event(
cls._EVENT_ENTITY_TYPE, # type: ignore
EventOperation.DELETION,
entity_id=entity_id,
metadata={"delete_all": True},
)
)

@classmethod
def _delete_by_version(cls, version_number: str):
Expand All @@ -53,7 +67,13 @@ def _delete_by_version(cls, version_number: str):
"""
cls._repository._delete_by(attribute="version", value=version_number)
if hasattr(cls, "_EVENT_ENTITY_TYPE"):
_publish_event(cls._EVENT_ENTITY_TYPE, None, EventOperation.DELETION, None) # type: ignore
Notifier.publish(
Event(
cls._EVENT_ENTITY_TYPE, # type: ignore
EventOperation.DELETION,
metadata={"delete_by_version": version_number},
)
)

@classmethod
def _delete(cls, id):
Expand All @@ -62,7 +82,13 @@ def _delete(cls, id):
"""
cls._repository._delete(id)
if hasattr(cls, "_EVENT_ENTITY_TYPE"):
_publish_event(cls._EVENT_ENTITY_TYPE, id, EventOperation.DELETION, None)
Notifier.publish(
Event(
cls._EVENT_ENTITY_TYPE,
EventOperation.DELETION,
entity_id=id,
)
)

@classmethod
def _set(cls, entity: EntityType):
Expand Down
6 changes: 5 additions & 1 deletion src/taipy/core/cycle/_cycle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ def _create(
frequency, properties, creation_date=creation_date, start_date=start_date, end_date=end_date, name=name
)
cls._set(cycle)
_publish_event(cls._EVENT_ENTITY_TYPE, cycle.id, EventOperation.CREATION, None)
_publish_event(
cls._EVENT_ENTITY_TYPE,
EventOperation.CREATION,
entity_id=cycle.id,
)
return cycle

@classmethod
Expand Down
21 changes: 21 additions & 0 deletions src/taipy/core/cycle/cycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .._entity._properties import _Properties
from .._entity._reload import _Reloader, _self_reload, _self_setter
from ..exceptions.exceptions import _SuspiciousFileOperation
from ..notification.event import Event, EventEntityType, EventOperation, _make_event
from .cycle_id import CycleId


Expand Down Expand Up @@ -176,3 +177,23 @@ def get_simple_label(self) -> str:
The simple label of the cycle as a string.
"""
return self._get_simple_label()


@_make_event.register(Cycle)
def _make_event_for_cycle(
cycle: Cycle,
operation: EventOperation,
/,
attribute_name: Optional[str] = None,
attribute_value: Optional[Any] = None,
**kwargs,
) -> Event:
metadata = {**kwargs}
return Event(
entity_type=EventEntityType.CYCLE,
entity_id=cycle.id,
operation=operation,
attribute_name=attribute_name,
attribute_value=attribute_value,
metadata=metadata,
)
9 changes: 5 additions & 4 deletions src/taipy/core/data/_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ..config.data_node_config import DataNodeConfig
from ..cycle.cycle_id import CycleId
from ..exceptions.exceptions import InvalidDataNodeType
from ..notification import EventEntityType, EventOperation, _publish_event
from ..notification import Event, EventEntityType, EventOperation, Notifier, _make_event
from ..scenario.scenario_id import ScenarioId
from ..sequence.sequence_id import SequenceId
from ._abstract_file import _AbstractFileDataNode
Expand All @@ -33,7 +33,6 @@


class _DataManager(_Manager[DataNode], _VersionMixin):

__DATA_NODE_CLASS_MAP = DataNode._class_map() # type: ignore
_ENTITY_NAME = DataNode.__name__
_EVENT_ENTITY_TYPE = EventEntityType.DATA_NODE
Expand Down Expand Up @@ -77,7 +76,7 @@ def _create_and_set(
cls._set(data_node)
if isinstance(data_node, _AbstractFileDataNode):
_append_to_backup_file(new_file_path=data_node._path)
_publish_event(cls._EVENT_ENTITY_TYPE, data_node.id, EventOperation.CREATION, None)
Notifier.publish(_make_event(data_node, EventOperation.CREATION))
return data_node

@classmethod
Expand Down Expand Up @@ -166,7 +165,9 @@ def _delete_by_version(cls, version_number: str):
cls._clean_pickle_files(data_nodes)
cls._remove_dn_file_paths_in_backup_file(data_nodes)
cls._repository._delete_by(attribute="version", value=version_number)
_publish_event(cls._EVENT_ENTITY_TYPE, None, EventOperation.DELETION, None)
Notifier.publish(
Event(EventEntityType.DATA_NODE, EventOperation.DELETION, metadata={"delete_by_version": version_number})
)

@classmethod
def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None) -> List[DataNode]:
Expand Down
21 changes: 21 additions & 0 deletions src/taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ..common._warnings import _warn_deprecated
from ..exceptions.exceptions import DataNodeIsBeingEdited, NoData
from ..job.job_id import JobId
from ..notification.event import Event, EventEntityType, EventOperation, _make_event
from ._filter import _FilterDataNode
from .data_node_id import DataNodeId, Edit
from .operator import JoinOperator
Expand Down Expand Up @@ -555,3 +556,23 @@ def get_simple_label(self) -> str:
The simple label of the data node as a string.
"""
return self._get_simple_label()


@_make_event.register(DataNode)
def make_event_for_datanode(
data_node: DataNode,
operation: EventOperation,
/,
attribute_name: Optional[str] = None,
attribute_value: Optional[Any] = None,
**kwargs,
) -> Event:
metadata = {"config_id": data_node.config_id, **kwargs}
return Event(
entity_type=EventEntityType.DATA_NODE,
entity_id=data_node.id,
operation=operation,
attribute_name=attribute_name,
attribute_value=attribute_value,
metadata=metadata,
)
5 changes: 2 additions & 3 deletions src/taipy/core/job/_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
from .._version._version_manager_factory import _VersionManagerFactory
from .._version._version_mixin import _VersionMixin
from ..exceptions.exceptions import JobNotDeletedException
from ..notification import EventEntityType, EventOperation, _publish_event
from ..notification import EventEntityType, EventOperation, Notifier, _make_event
from ..task.task import Task
from .job import Job
from .job_id import JobId


class _JobManager(_Manager[Job], _VersionMixin):

_ENTITY_NAME = Job.__name__
_ID_PREFIX = "JOB_"
_repository: _AbstractRepository
Expand Down Expand Up @@ -52,7 +51,7 @@ def _create(
version=version,
)
cls._set(job)
_publish_event(cls._EVENT_ENTITY_TYPE, job.id, EventOperation.CREATION, None)
Notifier.publish(_make_event(job, EventOperation.CREATION))
job._on_status_change(*callbacks)
return job

Expand Down
26 changes: 25 additions & 1 deletion src/taipy/core/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import traceback
from datetime import datetime
from typing import Callable, List
from typing import Any, Callable, List, Optional

from taipy.logger._taipy_logger import _TaipyLogger

Expand All @@ -22,6 +22,7 @@
from .._entity._reload import _self_reload, _self_setter
from .._version._version_manager_factory import _VersionManagerFactory
from ..common._utils import _fcts_to_dict
from ..notification.event import Event, EventEntityType, EventOperation, _make_event
from ..task.task import Task
from .job_id import JobId
from .status import Status
Expand Down Expand Up @@ -70,6 +71,9 @@ def __init__(self, id: JobId, task: Task, submit_id: str, submit_entity_id: str,
self.__logger = _TaipyLogger._get_logger()
self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()

def get_event_context(self):
return {"task_config_id": self._task.config_id}

@property # type: ignore
@_self_reload(_MANAGER_NAME)
def task(self):
Expand Down Expand Up @@ -351,3 +355,23 @@ def is_deletable(self) -> bool:
from ... import core as tp

return tp.is_deletable(self)


@_make_event.register(Job)
def _make_event_for_job(
job: Job,
operation: EventOperation,
/,
attribute_name: Optional[str] = None,
attribute_value: Optional[Any] = None,
**kwargs,
) -> Event:
metadata = {"creation_date": job.creation_date, "task_config_id": job._task.config_id}
return Event(
entity_type=EventEntityType.JOB,
entity_id=job.id,
operation=operation,
attribute_name=attribute_name,
attribute_value=attribute_value,
metadata={**metadata, **kwargs},
)
2 changes: 1 addition & 1 deletion src/taipy/core/notification/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@
from ._registration import _Registration
from ._topic import _Topic
from .core_event_consumer import CoreEventConsumerBase
from .event import _ENTITY_TO_EVENT_ENTITY_TYPE, Event, EventEntityType, EventOperation
from .event import _ENTITY_TO_EVENT_ENTITY_TYPE, Event, EventEntityType, EventOperation, _make_event
from .notifier import Notifier, _publish_event
from .registration_id import RegistrationId
Loading

0 comments on commit 652fd79

Please sign in to comment.