Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue about not publishing data node updates #2316

Merged
merged 10 commits into from
Dec 13, 2024

Conversation

jrobinAV
Copy link
Member

@jrobinAV jrobinAV commented Dec 10, 2024

What type of PR is this? (check all applicable)

  • Refactor
  • Feature
  • Bug Fix
  • Optimization
  • Documentation Update

Description

Data node update events were not properly published when the update came from a task execution in standalone mode.

When a data node is written, 4 update events should be published:

  • last_edit_date attribute update
  • editor_id attribute update
  • edit_in_progress attribute update
  • editor_expiration_date attribute update
    When the writing was made by a task execution, the events were published into a Notifier specific to the process, while it MUST be published to the main application Notifier.

Related Tickets & Documents

How to reproduce the issue

from taipy.core import Status
from taipy.core.notification import (
    CoreEventConsumerBase,
    Event,
    EventEntityType,
    EventOperation,
    Notifier,
)
import taipy as tp
import taipy.gui.builder as tgb
from taipy.gui import Gui, State, notify
from taipy.core import Scenario, DataNode
from taipy import Config

import time
import pandas as pd


def some_task_1(input_data):
    time.sleep(2)
    return "hello world"


def some_task_2(intermediate_data):
    time.sleep(2)
    return f"{intermediate_data} from Taipy!"


Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)

input_data_cfg = Config.configure_data_node(
    id="input_data", storage_type="csv", default_data={"x": [1, 2, 3], "y": [4, 5, 6]}
)
intermediate_data_cfg = Config.configure_data_node(
    id="intermediate_data",
)
output_data_cfg = Config.configure_data_node(
    id="output_data",
)

task_1_cfg = Config.configure_task(
    id="task_1",
    function=some_task_1,
    input=input_data_cfg,
    output=intermediate_data_cfg,
    skippable=True,
)

task_2_cfg = Config.configure_task(
    id="task_2",
    function=some_task_2,
    input=intermediate_data_cfg,
    output=output_data_cfg,
    skippable=True,
)

scenario_cfg = Config.configure_scenario(
    id="scenario",
    task_configs=[task_1_cfg, task_2_cfg],
)


def notify_user(state, data_node, scenarios):
    if [state.scenario.id](http://state.scenario.id/) in scenarios:
        if data_node.config_id == "input_data":
            state.input_data = data_node.read()
        if data_node.config_id == "intermediate_data":
            notify(state, "success", "Task 1 is finished!")
        if data_node.config_id == "output_data":
            state.output_data = data_node.read()
            notify(state, "success", "Submission is finished!")


class SpecificCoreConsumer(CoreEventConsumerBase):
    def __init__(self, gui: Gui):
        self.gui = gui

        reg_id, queue = Notifier.register(
            # attribute_name="last_edit_date",
        )  # Adapt the registration to the events you want to listen to
        super().__init__(reg_id, queue)

    def process_event(self, event: Event):
        if (
            event.entity_type == EventEntityType.DATA_NODE
            and event.operation == EventOperation.UPDATE
            and event.attribute_name == "last_edit_date"):
        ):
            print(
                f"Received event: {event.operation} - {event.entity_type} - {event.entity_id}\n"
                f"                {event.attribute_name} - {event.attribute_value}\n"
            )
            data_node: DataNode = tp.get(event.entity_id)
            scenarios = data_node.get_parents().get("scenario")
            self.gui.broadcast_callback(
                notify_user,
                [data_node, scenarios],
            )


def change_scenario(state: State):
    state.input_data = state.scenario.input_data.read()
    state.output_data = state.scenario.output_data.read()


def run_scenario(state: State):
    print("Running scenario...")
    state.scenario.submit()
    print("Done!")


def on_exception(state, function_name: str, ex: Exception):
    print(ex, "in ", function_name)


if __name__ == "__main__":
    tp.Orchestrator().run()
    scenario: Scenario = tp.create_scenario(scenario_cfg)

    input_data: pd.DataFrame = scenario.input_data.read()
    output_data: int = scenario.output_data.read()

    with [tgb.Page](http://tgb.page/)() as page:
        tgb.scenario_selector(scenario, on_change=change_scenario)
        tgb.button("Run Scenario", on_action=run_scenario)
        tgb.chart("{input_data}")
        tgb.input("{output_data}")

    gui = Gui(page)
    SpecificCoreConsumer(gui).start()
    gui.run(debug=True)

branches or releases

  • develop
  • release/4.0

Check list

  • Does this solution meet the acceptance criteria of the related issue?
  • Is the related issue checklist completed?
  • Does this PR add unit tests for the developed code?
  • End-to-End tests have been added or updated?
  • Have the release notes been updated?

Copy link
Contributor

github-actions bot commented Dec 11, 2024

☂️ Python Coverage

current status: ✅

Overall Coverage

Lines Covered Coverage Threshold Status
19488 16967 87% 0% 🟢

New Files

No new covered files...

Modified Files

File Coverage Status
taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py 93% 🟢
taipy/core/_orchestrator/_dispatcher/_task_function_wrapper.py 95% 🟢
taipy/core/data/data_node.py 98% 🟢
TOTAL 95% 🟢

updated for commit: a856b37 by action🐍

@jrobinAV jrobinAV marked this pull request as ready for review December 11, 2024 11:45
@jrobinAV jrobinAV self-assigned this Dec 11, 2024
@jrobinAV jrobinAV added Core Related to Taipy Core 🟥 Priority: Critical Must be addressed as soon as possible Core: Job & Orchestrator Core: Data node 💥Malfunction Addresses an identified problem. labels Dec 11, 2024
@jrobinAV jrobinAV mentioned this pull request Dec 11, 2024
5 tasks
toan-quach
toan-quach previously approved these changes Dec 12, 2024
Copy link
Member

@toan-quach toan-quach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

Co-authored-by: Đỗ Trường Giang <do.giang@avaiga.com>
@jrobinAV jrobinAV dismissed stale reviews from toan-quach and joaoandre-avaiga via 4ed94c7 December 12, 2024 15:15
@jrobinAV jrobinAV requested a review from trgiangdo December 12, 2024 16:08
Copy link
Member

@trgiangdo trgiangdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jrobinAV jrobinAV merged commit 31a6b96 into develop Dec 13, 2024
125 of 126 checks passed
@jrobinAV jrobinAV deleted the feature/fix-data-node-update-notification branch December 13, 2024 15:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Core: Data node Core: Job & Orchestrator Core Related to Taipy Core 💥Malfunction Addresses an identified problem. 🟥 Priority: Critical Must be addressed as soon as possible
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[🐛 BUG] Never receiving Data Node notifications/update with Core event
4 participants